#动态增加 Receiver 1.0版
sparkstreaming 对 receiver 的处理方式是把Receiver作为RDD提交给集群,这里的做法是将新添加的receiver封装成RDD,作为作业提交给集群系统:ssc.sparkContext.runJob(tempRDD, ssc.sparkContext.clean(startReceiver)),关于原理,请参考sparkStreaming.doc;这里只对修改内容和修改后的效果进行介绍
下面是在源代码中添加的一个类,主要完成两个任务
- 设置inputstream和outputstream
- 封装和运行receiver的RDD,函数见startReceivers() (注:这个类中要使用到一些类的属性值,需要修改相应变量的private性质或val性质,如:ssc.scheduler.receiverTracker.receivedBlockTracker.streamId)
/*
* Licensed to the Apache Software Founda,tion (ASF) uner one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.streaming.dreceiver
import org.apache.spark.streaming.dstream.{ReceiverInputDStream, InputDStream}
import org.apache.spark.{Logging, SerializableWritable, SparkEnv, SparkException}
import org.apache.spark.SparkContext._
import org.apache.spark.streaming.{StreamingContext, Time}
import org.apache.spark.streaming.receiver.{Receiver, ReceiverSupervisorImpl, StopReceiver}
import scala.collection.mutable.ArrayBuffer
/**
* Created by havstack on 3/14/15.
*/
class AddInputStream(ssc:StreamingContext) extends Logging{
private val receiverInputStreams = ssc.graph.getReceiverInputStreams()
private val receiverInputStreamIds = receiverInputStreams.map { _.id }
//获取新加入的Inputtream
val input = receiverInputStreams(receiverInputStreams.length-1)
private val inputStreams = new ArrayBuffer[ReceiverInputDStream[_]]()
inputStreams += input
//获取新加入的Inputtream的ID
val inputId = receiverInputStreamIds(receiverInputStreamIds.length-1)
private val receiverExecutor = new ReceiverLauncher()
def start(){
//启动receiver
receiverExecutor.start()
//把inputId添加到streamId,系统根据streamId作为Block块进行存储,
//同时根据streamId把Block数据加载到内存
val temp = ssc.scheduler.receiverTracker.receivedBlockTracker.streamId.toBuffer+=inputId
ssc.scheduler.receiverTracker.receivedBlockTracker.streamId=temp.toArray
ssc.scheduler.receiverTracker.receiverInputStreamIds = temp.toArray
//初始化ouputStream
val outputStreams=ssc.graph.getOutputStreams()
val outStream = outputStreams(outputStreams.length-1)
val timer = ssc.scheduler.jobGenerator.returntimer()
val startTime = new Time(timer.getRestartTime(timer.getStartTime()))
outStream.initialize(startTime - ssc.graph.batchDuration)
outStream.remember(null)
outStream.validate()
}
private class ReceiverLauncher {
@transient val env = ssc.env
@transient val thread = new Thread() {
override def run() {
try {
SparkEnv.set(env)
startReceivers()
} catch {
case ie: InterruptedException => logInfo("ReceiverLauncher interrupted")
}
}
}
def start() {
thread.start()
}
def stop() {
// Send the stop signal to all the receivers
}
/**
* Get the receivers from the ReceiverInputDStreams, distributes them to the
* worker nodes as a parallel collection, and runs them.
*/
private def startReceivers() {
val receivers = inputStreams.map(nis => {
val rcvr = nis.getReceiver()
rcvr.setReceiverId(nis.id)
rcvr
})
val hasLocationPreferences = receivers.map(_.preferredLocation.isDefined).reduce(_ && _)
// Create the parallel collection of receivers to distributed them on the worker nodes
val tempRDD =
if (hasLocationPreferences) {
val receiversWithPreferences = receivers.map(r => (r, Seq(r.preferredLocation.get)))
ssc.sc.makeRDD[Receiver[_]](receiversWithPreferences)
} else {
ssc.sc.makeRDD(receivers, receivers.size)
}
val checkpointDirOption = Option(ssc.checkpointDir)
val serializableHadoopConf = new SerializableWritable(ssc.sparkContext.hadoopConfiguration)
// Function to start the receiver on the worker node
val startReceiver = (iterator: Iterator[Receiver[_]]) => {
if (!iterator.hasNext) {
throw new SparkException(
"Could not start receiver as object not found.")
}
val receiver = iterator.next()
val supervisor = new ReceiverSupervisorImpl(
receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
supervisor.start() //ReceiverSupervisorImpl负责注册receiver
supervisor.awaitTermination()
}
// Run the dummy Spark job to ensure that all slaves have registered.
// This avoids all the receivers to be scheduled on the same node.
if (!ssc.sparkContext.isLocal) {
ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect()
}
// Distribute the receivers and start them
logInfo("Starting " + receivers.length + " receivers")
ssc.sparkContext.runJob(tempRDD, ssc.sparkContext.clean(startReceiver))
logInfo("All of the receivers have been terminated")
}
/** Stops the receivers. */
}
}
####1、 接收器的定义
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.storage.StorageLevel
import org.apache.spark.Logging
import java.net.Socket
import java.io.BufferedReader
import java.io.InputStreamReader
class CustomReceiver(host: String, port: Int)
extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {
def onStart() {
// Start the thread that receives data over a connection
new Thread("Socket Receiver") {
override def run() { receive() }
}.start()
}
def onStop() {
// There is nothing much to do as the thread calling receive()
// is designed to stop by itself isStopped() returns false
}
/** Create a socket connection and receive data until receiver is stopped */
private def receive() {
var socket: Socket = null
var userInput: String = null
try {
// Connect to host:port
socket = new Socket(host, port)
// Until stopped or connection broken continue reading
val reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), "UTF-8"))
userInput = reader.readLine()
while(!isStopped && userInput != null) {
store(userInput)
userInput = reader.readLine()
}
reader.close()
socket.close()
// Restart in an attempt to connect again when server is active again
restart("Trying to connect again")
} catch {
case e: java.net.ConnectException =>
// restart if could not connect to server
restart("Error connecting to " + host + ":" + port, e)
case t: Throwable =>
// restart if there is any other error
restart("Error receiving data", t)
}
}
}
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.dreceiver.AddInputStream
//import org.apache.spark.streaming.andy.AddReceiverTracker
/**
* Created by havstack on 3/14/15.
*/
object StreamAction{
def add(ssc:StreamingContext){
//val ad= new AddInputStream(ssc)
//ad.start()
val c=new CustomReceiver("192.168.8.193",9999)
val addStream=ssc.receiverStream(c)
val words = addStream.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.foreachRDD(rdd => rdd.foreach(record => println(record._1 + " " + record._2)))
//val ad= new AddReceiverTracker(ssc,1)
//ad.startUp()
val ad=new AddInputStream(ssc)
ad.start()
}
}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
/**
* Created by havstack on 3/16/15.
*/
object StreamingWordCount {
def main(args:Array[String]): Unit = {
if (args.length < 1) {
System.err.println("Usage: NetworkWordCount <hostname> <port>")
System.exit(1)
}
//StreamingExamples.setStreamingLogLevels()
// Create the context with a 1 second batch size
val sparkConf = new SparkConf().setAppName("StreamingWordCount").setMaster(args(0))
.setJars(List("/home/havstack/SparkRemoteDebug/out/artifacts/SparkRemoteDebug_jar/SparkRemoteDebug.jar"))
.set("spark.cores.max","4")
val ssc = new StreamingContext(sparkConf, Seconds(2))
val t =ssc.receiverStream(new CustomReceiver("192.168.8.193", 9998))
val words = t.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.foreachRDD(rdd => rdd.foreach(record => println(record._1 + " " + record._2)))
//StreamAction.add(ssc)
ssc.start()
//利用线程动态添加一个接收器
new Thread(new Runnable {
override def run(): Unit = {
println("waiting-----------------------------------------------------------------")
Thread.sleep(10000)
println("start new receiver-----------------------------------------------------------------")
StreamAction.add(ssc)
}
}).start()
ssc.awaitTermination()
}
}
利用nc -lk 9998和nc -lk 9999分别启动两个网络服务,在9998端口只输入字母,在9999端口只输入数字,查看运行结果,可以看到,能够统计到字母和数字,说明新启动的接收器能够正常运行