首先在分析Spark Streaming 之前,我们要先运行一个简单的例子来体会一下什么是Spark Streaming,它到底是做什么的。我假设大家实在linux系统上研究spark,首先需要下载一个scala IDE for eclipse,网址如下: http://scala-ide.org/ 根据自己的linux版本信息下载对应的版本。接下来我们配置eclipse让它执行spark相关程序。 ###在eclipse上 运行spark Streaming wordCount程序。 下载好IDE之后,解压该文件即可,然后双击eclipse可执行文件(或则在终端下到eclipse目录下执行 ./eclipse指令即可)
- 新建一个scala project。
- 到spark目录下将lib文件夹下的spark-assembly_2.10-0.9.0-incubating-hadoop2.2.0.jar复制到eclipse/workspace/you project name/lib/
- 右击project->build path->configure build path ->Add Externam JARs 将lib目录下的spark jar包添加到build path。
- 新建一个scala class 随意命名。如下代码:
package org
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.storage.StorageLevel
/**
* Counts words in text encoded with UTF8 received from the network every second.
*
* Usage: NetworkWordCount <master> <hostname> <port>
* <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1.
* <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
*
* To run this on your local machine, you need to first run a Netcat server
* `$ nc -lk 9999`
* and then run the example
* `$ ./bin/run-example org.apache.spark.streaming.examples.NetworkWordCount local[2] localhost 9999`
*/
object WordCount {
def main(args: Array[String]) {
if (args.length < 3) {
System.err.println("Usage: NetworkWordCount <master> <hostname> <port>\n" +
"In local mode, <master> should be 'local[n]' with n > 1")
System.exit(1)
}
spark1.1.0往后的版本需要这么写
/*val conf = new SparkConf()
.setMaster(args(0))
.setAppName("NetworkWordCount")
val batchInterval = Milliseconds(Integer.parseInt("2000"))
val ssc = new StreamingContext(conf,batchInterval)*/
// Create the context with a 1 second batch size
val ssc = new StreamingContext(args(0), "NetworkWordCount", Seconds(1),
System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
// Create a NetworkInputDStream on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by 'nc')
val lines = ssc.socketTextStream(args(1), args(2).toInt, StorageLevel.MEMORY_ONLY_SER)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
将代码复制到新建的scala类中,将上述 WordCount改成你的类名。细心的同学将会发现,这其实就是spark自带的例子程序,这里将它放到eclipse上执行,完全只是起到抛砖引玉的作用,之后大家可以在这基础之上开发你自己的应用程序。完完全全可以借鉴这个程序,来编写自己的spark Streaming程序,而且很简单
- 右击scala类->run as ->run configure->Arguments ->填入3个参数 local[4] 192.168.1.98 9999 //这边解释一下,local[4]是本地执行并且使用4核进行处理计算,ip地址是接受socket的ip地址,如果是本机上执行的话填写自己的ip,9999是端口号。之后点击run
- 在终端上执行 nc -lk 9999 //这里的9999与上面的9999必须保持一致。 这里应该先与第6步执行。
- 在终端上输入一些单词,然后在eclipse的输出中看打印信息。这样一个完整的spark streaming wordcount程序就执行起来了。
这边我简单的讲解一下spark streaming程序的编写步骤,之后进行展开(部分内容借鉴了 http://www.cnblogs.com/cenyuhai/p/3841000.html 这篇博客)。
- 1.首先新建一个Sparkconf()对象配置spark Streaming相关参数。
- 2.使用conf实例化StreamingContext对象
- 3.调用StreamingContext的socketTextStream
- 4.对获得的DStream进行处理
- 5.调用StreamingContext的start方法开始进行处理
- 6.调用awaitTermination()方法等待终端的停止,否值 一直执行程序。
接下来我们一步一步的将程序涉及到的相关内容进行深入分析。首先看看SparkConf类,看看它如何设置spark相关参数。 ###SparkConf类
- 首先找到这个文件(这里推荐大家使用intelliJIDEA阅读源码,简单而且可以进行单元调试,如果大家有需要的话,我下次再整理一下使用intellij阅读源码的配置文档) spark/core/src/main/scala/org/apache/spark/SparkConf.scala
- 接下来大家来看看这个类的定义以及它的一些方法,SparkConf这个类很简单,就是定义了一个HashMap[String,String],将一些需要的参数插入到hash表。 下面看一下其中的关键函数:
private val settings = new HashMap[String, String]()
if (loadDefaults) {
// Load any spark.* system properties
for ((k, v) <- System.getProperties.asScala if k.startsWith("spark.")) {
settings(k) = v
}
}
新建一个HashMap,如果将参数loadDefaults设置为true的话,那么上述代码会将系统的配置文件中设置的spark参数加载进来,典型的就是spark/conf/spark-defaults.conf文件,里面可以设置很多参数。但是一般我们都不会在文件中设置,因为在程序中设置会让代码可读写更好。如果要在生产环境中调试代码的话,那么更改配置文件的参数设置会比较好,因为不需要重新编译打包程序。
** Set a configuration variable. */
def set(key: String, value: String): SparkConf = {
if (key == null) {
throw new NullPointerException("null key")
}
if (value == null) {
throw new NullPointerException("null value")
}
settings(key) = value
this
}
这个函数很简单就是一个对Map的操作函数SparkConf里面的实际用来设置具体参数的函数就是调用它。 下面看一下几个常用的设置参数的函数
/**
* The master URL to connect to, such as "local" to run locally with one thread, "local[4]" to
* run locally with 4 cores, or "spark://master:7077" to run on a Spark standalone cluster.
*/
def setMaster(master: String): SparkConf = {
set("spark.master", master)
}
/** Set a name for your application. Shown in the Spark web UI. */
def setAppName(name: String): SparkConf = {
set("spark.app.name", name)
}
/** Set JAR files to distribute to the cluster. */
def setJars(jars: Seq[String]): SparkConf = {
for (jar <- jars if (jar == null)) logWarning("null jar passed to SparkContext constructor")
set("spark.jars", jars.filter(_ != null).mkString(","))
}
/** Set JAR files to distribute to the cluster. (Java-friendly version.) */
def setJars(jars: Array[String]): SparkConf = {
setJars(jars.toSeq)
}
都很简单,从函数名就能知道是该函数是干什么的了。如果大家想看一下所有的spark参数设置,及其含义: http://spark.apache.org/docs/1.0.2/configuration.html 这里很详细的介绍了各个参数的含义。 这里在生产环境中执行你们的程序的时候,将会非常有用,因为参数的设置优化对程序的效率将会提高很多,根据自身程序的特性,选择相应的参数将会十分有用。
配置好spark执行参数之后,接下来我们看一下spark Streaming的入口程序。 ###StreamingContext类
- 到spark/streaming/src/main/scala/org.apache.spark.streaming/StreamingContext.scala
- 首先看看StreamingContext的主构造器和几个辅助构造器
class StreamingContext private[streaming] (
sc_ : SparkContext,
cp_ : Checkpoint,
batchDur_ : Duration
) extends Logging {}
这是scala程序的语法特色,跟在类名之后的参数列表就是住构造器,三个参数,第一个是SparkContext,因为SparkStreaming也是在spark基础架构之上构建的一个流式架构,其执行还是在Spark上执行,所以这里最终需要构建一个SparkContext参数。第二个是bool型的,设置是否有Checkpoint,如果有的话,将从Checkpoint中构建SparkContextd。 第三个是设置SparkStreaming 的批间隔时间,一般设置为1s或则2s,根据应用不同,这里可以进行调优,它主要是用来设置DStreamGraph的batch
/**
* Create a StreamingContext using an existing SparkContext.
* @param sparkContext existing SparkContext
* @param batchDuration the time interval at which streaming data will be divided into batches
*/
def this(sparkContext: SparkContext, batchDuration: Duration) = {
this(sparkContext, null, batchDuration)
}
/**
* Create a StreamingContext by providing the configuration necessary for a new SparkContext.
* @param conf a org.apache.spark.SparkConf object specifying Spark parameters
* @param batchDuration the time interval at which streaming data will be divided into batches
*/
def this(conf: SparkConf, batchDuration: Duration) = {
this(StreamingContext.createNewSparkContext(conf), null, batchDuration)
}
/**
* Create a StreamingContext by providing the details necessary for creating a new SparkContext.
* @param master cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
* @param appName a name for your job, to display on the cluster web UI
* @param batchDuration the time interval at which streaming data will be divided into batches
*/
def this(
master: String,
appName: String,
batchDuration: Duration,
sparkHome: String = null,
jars: Seq[String] = Nil,
environment: Map[String, String] = Map()) = {
this(StreamingContext.createNewSparkContext(master, appName, sparkHome, jars, environment),
null, batchDuration)
}
/**
* Recreate a StreamingContext from a checkpoint file.
* @param path Path to the directory that was specified as the checkpoint directory
* @param hadoopConf Optional, configuration object if necessary for reading from
* HDFS compatible filesystems
*/
def this(path: String, hadoopConf: Configuration = new Configuration) =
this(null, CheckpointReader.read(path, new SparkConf(), hadoopConf).get, null)
接下来看看这四大辅助构造函数,写sparkstreaming代码的第二步就是构造一个StreamingContext,这边很清晰,具体可以从那几个方面构造函数。 接下来我们将从四个角度构造开始的案例程序。
val conf = new SparkConf()
.setMaster(args(0))
.setAppName("NetworkWordCount")
val spackcontext =new SparkContext(conf)
val batchInterval = Milliseconds(Integer.parseInt("2000"))
val ssc = new StreamingContext(conf,batchInterval)
val conf = new SparkConf()
.setMaster(args(0))
.setAppName("NetworkWordCount")
val batchInterval = Milliseconds(Integer.parseInt("2000"))
val ssc = new StreamingContext(conf,batchInterval)
val ssc = new StreamingContext(args(0), "NetworkWordCount", Seconds(1),
System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
接下来看看StreamingContext类里的常用方法。
/**
* Create an input stream with any arbitrary user implemented receiver.
* Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html
* @param receiver Custom implementation of Receiver
*/
@deprecated("Use receiverStream", "1.0.0")
def networkStream[T: ClassTag](
receiver: Receiver[T]): ReceiverInputDStream[T] = {
receiverStream(receiver)
}
/**
* Create an input stream with any arbitrary user implemented receiver.
* Find more details at: http://spark.apache.org/docs/latest/streaming-custom-receivers.html
* @param receiver Custom implementation of Receiver
*/
def receiverStream[T: ClassTag](
receiver: Receiver[T]): ReceiverInputDStream[T] = {
new PluggableInputDStream[T](this, receiver)
}
这两个方法很有用,当你需要实现自己的接收器时候,将要用到这这两个函数,其实就是将自己实现的流转换spark能认识而已。
/**
* Create a input stream from TCP source hostname:port. Data is received using
* a TCP socket and the receive bytes is interpreted as UTF8 encoded `\n` delimited
* lines.
* @param hostname Hostname to connect to for receiving data
* @param port Port to connect to for receiving data
* @param storageLevel Storage level to use for storing the received objects
* (default: StorageLevel.MEMORY_AND_DISK_SER_2)
*/
def socketTextStream(
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[String] = {
socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
}
/**
* Create a input stream from TCP source hostname:port. Data is received using
* a TCP socket and the receive bytes it interepreted as object using the given
* converter.
* @param hostname Hostname to connect to for receiving data
* @param port Port to connect to for receiving data
* @param converter Function to convert the byte stream to objects
* @param storageLevel Storage level to use for storing the received objects
* @tparam T Type of the objects received (after converting bytes to objects)
*/
def socketStream[T: ClassTag](
hostname: String,
port: Int,
converter: (InputStream) => Iterator[T],
storageLevel: StorageLevel
): ReceiverInputDStream[T] = {
new SocketInputDStream[T](this, hostname, port, converter, storageLevel)
}
socketInputDstream这是我们开始简单例子用到的DStream,在介绍完StreamingContext之后,将会以它为主流来介绍流如何接收,存储,以及计算。
/**
* Create a unified DStream from multiple DStreams of the same type and same slide duration.
*/
def union[T: ClassTag](streams: Seq[DStream[T]]): DStream[T] = {
new UnionDStream[T](streams.toArray)
}
这个方法很有用,当你有多路类似的流的时候,可以通过该操作将多路流进行合并。 好了socket里面其实就是初始化一些程序执行的参数并且定义了一些基本的方法,如:socketStream,这些方法实际上启动的是视频流如:SocketInputDStream。然后就是启动了两个很重要的方法
def addStreamingListener(streamingListener: StreamingListener) {
scheduler.listenerBus.addListener(streamingListener)
}
def start(): Unit = synchronized {
// Throw exception if the context has already been started once
// or if a stopped context is being started again
if (state == Started) {
throw new SparkException("StreamingContext has already been started")
}
if (state == Stopped) {
throw new SparkException("StreamingContext has already been stopped")
}
validate()
logInfo("Streamingcontext start ")
scheduler.start()
state = Started
}
这里通过对scheduler调度模块进行监听,并启动scheduler
以socketTextStream方法为例分析Spark Streaming的整个过程。
def socketTextStream(
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[String] = {
socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
}
def socketStream[T: ClassTag](
hostname: String,
port: Int,
converter: (InputStream) => Iterator[T],
storageLevel: StorageLevel
): ReceiverInputDStream[T] = {
new SocketInputDStream[T](this, hostname, port, converter, storageLevel)
}
首先看一下该函数定义,很简单,就是将参数传递给SocketInputDStream,毫无疑问,接下来看SocketInputDStream如何定义的。
class SocketInputDStream[T: ClassTag](
@transient ssc_ : StreamingContext,
host: String,
port: Int,
bytesToObjects: InputStream => Iterator[T],
storageLevel: StorageLevel
) extends ReceiverInputDStream[T](ssc_) {
def getReceiver(): Receiver[T] = {
new SocketReceiver(host, port, bytesToObjects, storageLevel)
}
}
SocketInputDStream继承了 ReceiverInputDStream继续往下追究其继承关系,可以看到完整的继承关系: SocketInputDStream>>ReceiverInputDStream>>InputDStream>>DStream。 具体实现ReceiverInputDStream的类有好几个,基本上都是从网络端来数据的。比如:RawInputDStream 它实现了ReceiverInputDStream的getReceiver方法,实例化了一个SocketReceiver来接收数据。 这边注意:如果要是实习那自己的网络接收流的话,这边可以仿着SocketInputDStream的实现方式。如何实现自己的接受流,我会另起一篇博文介绍如何实现。这篇文章只是将SparkStreaming的原理和数据流程进行讲解。
接下来看看SocketInputDStream的Receiver的具体实现。
private[streaming]
class SocketReceiver[T: ClassTag](
host: String,
port: Int,
bytesToObjects: InputStream => Iterator[T],
storageLevel: StorageLevel
) extends Receiver[T](storageLevel) with Logging {
def onStart() {
// Start the thread that receives data over a connection
new Thread("Socket Receiver") {
setDaemon(true)
override def run() { receive() }
}.start()
}
def onStop() {
// There is nothing much to do as the thread calling receive()
// is designed to stop by itself isStopped() returns false
}
/** Create a socket connection and receive data until receiver is stopped */
def receive() {
var socket: Socket = null
try {
logInfo("Connecting to " + host + ":" + port)
socket = new Socket(host, port)
logInfo("Connected to " + host + ":" + port)
val iterator = bytesToObjects(socket.getInputStream())
while(!isStopped && iterator.hasNext) {
store(iterator.next)
}
logInfo("Stopped receiving")
restart("Retrying connecting to " + host + ":" + port)
} catch {
case e: java.net.ConnectException =>
restart("Error connecting to " + host + ":" + port, e)
case t: Throwable =>
restart("Error receiving data", t)
} finally {
if (socket != null) {
socket.close()
logInfo("Closed socket to " + host + ":" + port)
}
}
}
}
这边把所有的实现代码贴出来进行讲解。 首先我们一定要有这么一个概念,InputStream和receiver是两个不同的东西,但是InputStream里面会启动receiver用来 接收数据。 SocketReceiver继承了 Receiver,看一下Receiver的代码不然发现,Receiver是一个抽象类,任何继承Reciever的类都需要实现几个方法,如果我们要写自己的接收器的话也不例外。onStart()、onStop()必须要实现,preferredLocation函数可以实现也可以不实现,这个函数可能会涉及到具体流计算的性能优化,暂时没有搞懂,以后有机会一定会把这里的研究成果展示给大家。 看看receive()函数,一个具体的接收数据的函数。
首先根据用户提供的host 和 port new一个socket,之后将socket接收到的字节流转换成Iterator迭代器。然后调用store方法将数据存储到缓冲队列里,ok啦数据拿过来了,并且存到了缓冲队列。 接下来看看store函数,
def store(dataItem: T) {
executor.pushSingle(dataItem)
}
executor是ReceiverSupervisor类型,Receiver的操作都是由它来处理。如果继续深究的话,这里会涉及到存储如何进行存储的。下面会用一小节来讲解store函数的存储。暂时不深究。 到这里我们知道lines的类型是SocketInputDStream,然后对它是一顿的转换,flatMap、map、reduceByKey、print,这些方法都不是RDD的那种方法,而是DStream独有的。 接下来我们来看看DStream的实现,并且看看里面那些关于map、flatmap等操作的具体实现。
到DStream目录下,里面文件比较多,但是大部分都是DStream方法的具体实现。首先需要确定DStream是一个抽象类。下面来看看这个类。这是一个很复杂的类,下面简单的介绍部分方法。
private[streaming] def getOrCompute(time: Time): Option[RDD[T]] = {
logInfo("get or compute run this way DStream")
// If this DStream was not initialized (i.e., zeroTime not set), then do it
// If RDD was already generated, then retrieve it from HashMap
generatedRDDs.get(time) match {
// If an RDD was already generated and is being reused, then
// probably all RDDs in this DStream will be reused and hence should be cached
case Some(oldRDD) => Some(oldRDD)
// if RDD was not generated, and if the time is valid
// (based on sliding time of this DStream), then generate the RDD
case None => {
if (isTimeValid(time)) {
compute(time) match {
case Some(newRDD) =>
if (storageLevel != StorageLevel.NONE) {
newRDD.persist(storageLevel)
logInfo("Persisting RDD " + newRDD.id + " for time " +
time + " to " + storageLevel + " at time " + time)
}
if (checkpointDuration != null &&
(time - zeroTime).isMultipleOf(checkpointDuration)) {
newRDD.checkpoint()
logInfo("Marking RDD " + newRDD.id + " for time " + time +
" for checkpointing at time " + time)
}
generatedRDDs.put(time, newRDD)
Some(newRDD)
case None =>
logInfo("nononenoenenonoenoneoneoneeneon")
None
}
} else {
None
}
}
}
}
这个方法是获得一个RDD或则生成一个RDD,该方法主要是供generateJob根据时间生成job用。
private[streaming] def generateJob(time: Time): Option[Job] = {
logInfo("dDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDstream")
getOrCompute(time) match {
case Some(rdd) => {
val jobFunc = () => {
val emptyFunc = { (iterator: Iterator[T]) => {} }
logInfo("use the sparkcontext for runjob")
logInfo("rdd partation" + rdd.partitions.size)
context.sparkContext.runJob(rdd, emptyFunc)
}
Some(new Job(time, jobFunc))
}
case None => None
}
}
如果改方法可以获得一个RDD,那么就调用sparkContext.runJob(rdd,emptyFunc)执行Job,如果没有的话,就返回None。
private[streaming] def clearMetadata(time: Time) {
val unpersistData = ssc.conf.getBoolean("spark.streaming.unpersist", true)
val oldRDDs = generatedRDDs.filter(_._1 <= (time - rememberDuration))
logDebug("Clearing references to old RDDs: [" +
oldRDDs.map(x => s"${x._1} -> ${x._2.id}").mkString(", ") + "]")
generatedRDDs --= oldRDDs.keys
if (unpersistData) {
logDebug("Unpersisting old RDDs: " + oldRDDs.values.map(_.id).mkString(", "))
oldRDDs.values.foreach { rdd =>
rdd.unpersist(false)
// Explicitly remove blocks of BlockRDD
rdd match {
case b: BlockRDD[_] =>
logInfo("Removing blocks of RDD " + b + " of time " + time)
b.removeBlocks()
case _ =>
}
}
}
logDebug("Cleared " + oldRDDs.size + " RDDs that were older than " +
(time - rememberDuration) + ": " + oldRDDs.keys.mkString(", "))
dependencies.foreach(_.clearMetadata(time))
}
该方法很重要,通过时间对RDD进行判断,如果该RDD已经计算了,那么该RDD就会从内存中移除。该方法很重要,保证内存及时的释放。
下面看看基于stream的一些操作。 Map flatMap filter等操作会生成相应的DStream,最总要的就是这里需要注意的是,那些涉及到Action操作,这边列出来,print():这个方法不好,并行力度不够,建议调试的时候可以,但是不好用于生产。还有一个就是saveAsTextFiles()这个方法比较好,推荐使用。这里教大家一个方法,如果涉及到的RDD操作有这句话的 val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p, allowLocal = true)表明是action操作。
def print() {
def foreachFunc = (rdd: RDD[T], time: Time) => {
val first11 = rdd.take(11)
println ("-------------------------------------------")
println ("Time: " + time)
println ("-------------------------------------------")
first11.take(10).foreach(println)
if (first11.size > 10) println("...")
println()
}
new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register()
}
这个方法最后会调用DStream的register()方法,register方法会将自身流添加到graph的outputStream中。这里需要注意的是,之所以需要注册到graph的输出流中,是因为上述两个操作只有注册为输出流 才会触发action操作,类似于RDD的action一样,不然改程序不会真正的计算。 上面大致的对所有的DStream进行了分析,下面来看看ssc.start()之后发生的有关与数据是如何产生存储的,之后又是如何计算的进行分析。
所有的一切都是从ssc.start()开始的,这个语句的执行会导致后面一系列的程序执行,包括数据存储,数据计算。下面首先看看它启动的过程。 ###启动过程 首先看看streamingContext类的start()方法。
def start(): Unit = synchronized {
// Throw exception if the context has already been started once
// or if a stopped context is being started again
if (state == Started) {
throw new SparkException("StreamingContext has already been started")
}
if (state == Stopped) {
throw new SparkException("StreamingContext has already been stopped")
}
validate()
logInfo("Streamingcontext start ")
scheduler.start()
state = Started
}
只有一句话是重点就是scheduler.start(),追踪这句话。一看就会知道它调用的是scheduler目录下的JobScheduler.scala中的start()方法。
def start(): Unit = synchronized {
if (eventActor != null) return // scheduler has already been started
logInfo("Starting JobScheduler")
logDebug("Starting JobScheduler")
eventActor = ssc.env.actorSystem.actorOf(Props(new Actor {
def receive = {
case event: JobSchedulerEvent => processEvent(event)
}
}), "JobScheduler")
listenerBus.start()
receiverTracker = new ReceiverTracker(ssc)
receiverTracker.start()
jobGenerator.start()
logInfo("Started JobScheduler")
}
- 1、启动了一个Actor来处理JobScheduler的JobStarted、JobCompleted、ErrorReported事件。
- 2、启动StreamingListenerBus作为监听器。
- 3、启动ReceiverTracker。
- 4、启动JobGenerator。
这边可以分两条主线进行分析了,第一条就是receiverTracker.start(),第二条就是JobGenerator.start() 第一条其实设计到数据的接收和存储,第二条涉及到作业产生和计算。 ###数据的接收和存储 我们接下来看看ReceiverTracker的start方法。
def start() = synchronized {
logInfo("receivertracker start!!!!!!!!!!!!")
if (actor != null) {
throw new SparkException("ReceiverTracker already started")
}
if (!receiverInputStreams.isEmpty) {
actor = ssc.env.actorSystem.actorOf(Props(new ReceiverTrackerActor),
"ReceiverTracker")
logInfo("ReceiverTracker start !!!!!!!!!!")
receiverExecutor.start()
logInfo("ReceiverTracker started")
}
}
- 1.首先判断一下receiverInputStreams是否为空,那receiverInputStreams是怎么时候写入值的呢?答案在SocketInputDStream的父类InputDStream当中,当实例化InputDStream的时候会在DStreamGraph里面添加InputStream。
查看InputStream类
abstract class InputDStream[T: ClassTag] (@transient ssc_ : StreamingContext)
extends DStream[T](ssc_) {
private[streaming] var lastValidTime: Time = null
ssc.graph.addInputStream(this)
//...
}
程序中每新建一个流就会调用InputStream中的ssc.graph.addInputStream(this)一次。
- 2、实例化ReceiverTrackerActor,它负责RegisterReceiver(注册Receiver)、AddBlock、ReportError(报告错误)、DeregisterReceiver(注销Receiver)等事件的处理。
- 3、启动receiverExecutor(实际类是ReceiverLauncher,这名字起得。。),它主要负责启动Receiver,start方法里面调用了startReceivers方法吧。
/**
* Get the receivers from the ReceiverInputDStreams, distributes them to the
* worker nodes as a parallel collection, and runs them.
*/
private def startReceivers() {
val receivers = receiverInputStreams.map(nis => {
val rcvr = nis.getReceiver()
rcvr.setReceiverId(nis.id)
rcvr
})
// 查看是否所有的receivers都有优先选择机器,这个需要重写Receiver的preferredLocation方法,目前只有FlumeReceiver重写了
// Right now, we only honor preferences if all receivers have them
val hasLocationPreferences = receivers.map(_.preferredLocation.isDefined).reduce(_ && _)
// 创建一个并行receiver集合的RDD, 把它们分散到各个worker节点
// Create the parallel collection of receivers to distributed them on the worker nodes
val tempRDD =
if (hasLocationPreferences) {
val receiversWithPreferences = receivers.map(r => (r, Seq(r.preferredLocation.get)))
ssc.sc.makeRDD[Receiver[_]](receiversWithPreferences)
} else {
ssc.sc.makeRDD(receivers, receivers.size)
}
// 在worker节点上启动Receiver的方法,遍历所有Receiver,然后启动
// Function to start the receiver on the worker node
val startReceiver = (iterator: Iterator[Receiver[_]]) => {
if (!iterator.hasNext) {
throw new SparkException(
"Could not start receiver as object not found.")
}
val receiver = iterator.next()
val executor = new ReceiverSupervisorImpl(receiver, SparkEnv.get)
executor.start()
executor.awaitTermination()
}
// Run the dummy Spark job to ensure that all slaves have registered.
// This avoids all the receivers to be scheduled on the same node.
if (!ssc.sparkContext.isLocal) {
ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect()
}
// Distribute the receivers and start them
logInfo("Starting " + receivers.length + " receivers")
// 把receivers分发出去,启动,这边将接收流封装到一个RDD中去,然后执行改RDD
ssc.sparkContext.runJob(tempRDD, startReceiver)
logInfo("All of the receivers have been terminated")
}
-
1、遍历receiverInputStreams获取所有的Receiver。
-
2、查看这些Receiver是否全都有优先选择机器。
-
3、把SparkContext的makeRDD方法把所有Receiver包装到ParallelCollectionRDD里面,并行度是Receiver的数量。
-
4、发个小任务给确保所有的slave节点都已经注册了(这个小任务有点儿莫名其妙,感觉怪怪的)。
-
5、提交作业,启动所有Receiver。 Spark写得实在是太巧妙了,居然可以把Receiver包装在RDD里面,当做是数据来处理! 启动Receiver的时候,new了一个ReceiverSupervisorImpl,然后调的start方法,主要干了这么三件事情,代码就不贴了。
-
1、启动BlockGenerator。
-
2、调用Receiver的OnStart方法,开始接受数据,并把数据写入到ReceiverSupervisor。
-
3、调用onReceiverStart方法,发送RegisterReceiver消息给driver报告自己启动了。
这边会有点晕头转向了,首先调用了ReceiverSupervisormpl类的start()函数 该方法调用的是ReceiverTracker.scala中的start()方法,因为ReceiverSupervisormpl继承了ReceiverTracker类,而没有覆盖这个方法,所以最终调用了ReceiverTracker的start()方法。
def start() {
onStart()
startReceiver()
}
onStart()调用的是ReceiverSupervisormpl的方法,因为进行了覆盖重写了。而startReceiver()方法没有重写,所以调用的是ReceiverTracker的 startReceiver方法。 先看onStart()方法
override protected def onStart() {
blockGenerator.start()
}
启动了blockGenerator.start()方法。BlockGenerator这个类很重要的。先看看startReceiver()方法,看看它有什么作用
def startReceiver(): Unit = synchronized {
try {
logInfo("Starting receiver")
receiver.onStart()
logInfo("Called receiver onStart")
onReceiverStart()
receiverState = Started
} catch {
case t: Throwable =>
stop("Error starting receiver " + streamId, Some(t))
}
}
1.调用了receiver.onStart()这个方法其实就是调用了SocketInputDStream的onStart方法,
def onStart() {
// Start the thread that receives data over a connection
new Thread("Socket Receiver") {
setDaemon(true)
override def run() { receive() }
}.start()
}
该方法很简单,启用一个线程启动receive()方法。
def receive() {
var socket: Socket = null
try {
logInfo("Connecting to " + host + ":" + port)
socket = new Socket(host, port)
logInfo("Connected to " + host + ":" + port)
val iterator = bytesToObjects(socket.getInputStream())
while(!isStopped && iterator.hasNext) {
store(iterator.next)
}
logInfo("Stopped receiving")
restart("Retrying connecting to " + host + ":" + port)
} catch {
case e: java.net.ConnectException =>
restart("Error connecting to " + host + ":" + port, e)
case t: Throwable =>
restart("Error receiving data", t)
} finally {
if (socket != null) {
socket.close()
logInfo("Closed socket to " + host + ":" + port)
}
}
}
这边开始接收数据了,看到了吗?数据会不断的调用store(iterator.next)方法进行存储。该方法掉用的是Reciever基类的store方法,然后调用
def store(dataItem: T) {
executor.pushSingle(dataItem)
}
最终调用ReceiverSupervisormpl类的方法。
def pushSingle(data: Any) {
blockGenerator += (data)
}
//这是BlockGenerator的方法
def += (data: Any): Unit = synchronized {
currentBuffer += data
}
最后竟然是调用的blockGenerator进行数据存储的,这边新建了一个 @volatile private var currentBuffer = new ArrayBuffer[Any]。一个Buffer数组,看到了吗?来的数据都是存放在里面的。 之前提到blockGenerator方法很重要,这边进行分析。来到ReceiverSupervisormpl类中看看blockGenerator。 看到之前启动blockGenerator方法了吗?上面红色部分。下面是blockGenerator的方法。
def start() {
blockIntervalTimer.start()
blockPushingThread.start()
logInfo("Started BlockGenerator")
}
它启动了一个定时器RecurringTimer和一个线程执行keepPushingBlocks方法。先看RecurringTimer的实现:
每隔一段时间就执行callback函数,callback函数是new的时候传进来的,是BlockGenerator的updateCurrentBuffer方法。
private def loop() {
try {
while (!stopped) {
clock.waitTillTime(nextTime)
callback(nextTime)
prevTime = nextTime
nextTime += period
logDebug("Callback for " + name + " called at time " + prevTime)
}
} catch {
case e: InterruptedException =>
}
每隔一段时间就执行callback函数,callback函数是new的时候传进来的,是BlockGenerator的updateCurrentBuffer方法。
private def updateCurrentBuffer(time: Long): Unit = synchronized {
try {
val newBlockBuffer = currentBuffer
currentBuffer = new ArrayBuffer[Any]
if (newBlockBuffer.size > 0) {
val blockId = StreamBlockId(receiverId, time - blockInterval)
val newBlock = new Block(blockId, newBlockBuffer)
blocksForPushing.put(newBlock) // put is blocking when queue is full
logDebug("Last element in " + blockId + " is " + newBlockBuffer.last)
}
} catch {
case ie: InterruptedException =>
logInfo("Block updating timer thread was interrupted")
case t: Throwable =>
reportError("Error in block updating thread", t)
}
}
看到这个方法了吗?首先将currentBuffer里面的数据赋值给 newBlockBuffer,之后currentBuffer重新申请一个新的数据为空。而 newBlockBuffer就形成一个Block, blockId是根据receiverId和当前时间相关信息得到。然后添加到blocksForPushing这个ArrayBlockingQueue队列当中。
提到这里,有两个参数需要大家注意的:
spark.streaming.blockInterval 默认值是200 spark.streaming.blockQueueSize 默认值是10
200毫秒一个block,队列的大小是10。
我们接下来看一下BlockGenerator另外启动的那个线程执行的keepPushingBlocks方法到底在干什么?
/** Keep pushing blocks to the BlockManager. */
private def keepPushingBlocks() {
logInfo("Started block pushing thread")
try {
while(!stopped) {
Option(blocksForPushing.poll(100, TimeUnit.MILLISECONDS)) match {
case Some(block) => pushBlock(block)
case None =>
}
}
// Push out the blocks that are still left
logInfo("Pushing out the last " + blocksForPushing.size() + " blocks")
while (!blocksForPushing.isEmpty) {
logDebug("Getting block ")
val block = blocksForPushing.take()
pushBlock(block)
logInfo("Blocks left to push " + blocksForPushing.size())
}
logInfo("Stopped block pushing thread")
} catch {
case ie: InterruptedException =>
logInfo("Block pushing thread was interrupted")
case t: Throwable =>
reportError("Error in block pushing thread", t)
}
}
它在把blocksForPushing中的block不停的拿出来,调用pushBlock方法,这个方法属于在实例化BlockGenerator的时候,从ReceiverSupervisorImpl传进来的BlockGeneratorListener的。
private def pushBlock(block: Block) {
listener.onPushBlock(block.id, block.buffer)
logInfo("Pushed block " + block.id)
}
这个方法调用了ReceiverSupervisorImpl
private val blockGenerator = new BlockGenerator(new BlockGeneratorListener {
def onError(message: String, throwable: Throwable) {
reportError(message, throwable)
}
def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) {
pushArrayBuffer(arrayBuffer, None, Some(blockId))
}
}, streamId, env.conf)
pushArrayBuffer
def pushArrayBuffer(
arrayBuffer: ArrayBuffer[_],
optionalMetadata: Option[Any],
optionalBlockId: Option[StreamBlockId]
) {
val blockId = optionalBlockId.getOrElse(nextBlockId)
val time = System.currentTimeMillis
blockManager.put(blockId, arrayBuffer.asInstanceOf[ArrayBuffer[Any]],
storageLevel, tellMaster = true)
logDebug("Pushed block " + blockId + " in " + (System.currentTimeMillis - time) + " ms")
reportPushedBlock(blockId, arrayBuffer.size, optionalMetadata)
}
好了,这里调用了blockManager.put方法将数据存储到内存中了。这边打住了因为这涉及到了spark core中的Storage模块,这里会单独写一个章节分析数据如何存储和备份的。 ###作业产生和计算 之前有提到jobScheduler的启动一方面启动了receiverTracker一方面启动了JobGenerator.start()。
下面来看看如何处理之前接收的数据吧。
def start(): Unit = synchronized {
if (eventActor != null) return // generator has already been started
logInfo("jobGenerator start")
eventActor = ssc.env.actorSystem.actorOf(Props(new Actor {
def receive = {
case event: JobGeneratorEvent => processEvent(event)
}
}), "JobGenerator")
if (ssc.isCheckpointPresent) {
restart()
} else {
startFirstTime()
}
}
- 1、启动一个actor处理JobGeneratorEvent事件。
- 2、如果是已经有CheckPoint了,就接着上次的记录进行处理,否则就是第一次启动。
我们先看startFirstTime吧,CheckPoint以后再说吧,有点儿小复杂。
/** Starts the generator for the first time */
private def startFirstTime() {
val startTime = new Time(timer.getStartTime())
graph.start(startTime - graph.batchDuration)
timer.start(startTime.milliseconds)
logInfo("Started JobGenerator at " + startTime)
}
- 1、timer.getStartTime计算出来下一个周期的到期时间,计算公式: (math.floor(clock.currentTime.toDouble / period) + 1).toLong * period,以当前的时间/除以间隔时间,再用math.floor求出它的上一个整数(即上一个周期的到期时间点),加上1,再乘以周期就等于下一个 周期的到期时间。
- 2、启动DStreamGraph,启动时间=startTime – graph.batchDuration。
- 3、启动Timer,我们看看它的定义:
private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
longTime => eventActor ! GenerateJobs(new Time(longTime)), "JobGenerator")
到这里就清楚了,DStreamGraph的间隔时间就是timer的间隔时间,启动时间要设置成比Timer早一个时间间隔,原因再慢慢探究。
可以看出来每隔一段时间,Timer给eventActor发送GenerateJobs消息,我们直接去看它的处理方法generateJobs吧,中间忽略了一步,大家自己看。
private def processEvent(event: JobGeneratorEvent) {
logDebug("Got event " + event)
event match {
case GenerateJobs(time) => generateJobs(time)
case ClearMetadata(time) => clearMetadata(time)
case DoCheckpoint(time) => doCheckpoint(time)
case ClearCheckpointData(time) => clearCheckpointData(time)
}
}
下面是generateJobs方法。
private def generateJobs(time: Time) {
SparkEnv.set(ssc.env)
Try(graph.generateJobs(time)) match {
case Success(jobs) =>
val receivedBlockInfo = graph.getReceiverInputStreams.map { stream =>
val streamId = stream.id
logInfo("receiverdblockinfo ::" + time)
val receivedBlockInfo = stream.getReceivedBlockInfo(time)
(streamId, receivedBlockInfo)
}.toMap
logInfo("submitJobset sucessedddddd")
jobScheduler.submitJobSet(JobSet(time, jobs, receivedBlockInfo))
case Failure(e) =>
jobScheduler.reportError("Error generating jobs for time " + time, e)
}
eventActor ! DoCheckpoint(time)
}
- 1、DStreamGraph生成jobs。
- 2、从stream那里获取接收到的Block信息。
- 3、调用submitJobSet方法提交作业。
- 4、提交完作业之后,做一个CheckPoint。
先看DStreamGraph是怎么生成的jobs。
def generateJobs(time: Time): Seq[Job] = {
logInfo("Generating jobs for time " + time)
val jobs = this.synchronized {
outputStreams.flatMap(outputStream => outputStream.generateJob(time))
}
logInfo("Generated " + jobs.length + " jobs for time " + time)
logDebug("Generated " + jobs.length + " jobs for time " + time)
jobs
}
outputStreams在这个例子里面是print这个方法里面添加的,这个在前面说了,我们继续看DStream的generateJob。
private[streaming] def generateJob(time: Time): Option[Job] = {
logInfo("dDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDstream")
getOrCompute(time) match {
case Some(rdd) => {
val jobFunc = () => {
val emptyFunc = { (iterator: Iterator[T]) => {} }
logInfo("use the sparkcontext for runjob")
logInfo("rdd partation" + rdd.partitions.size)
context.sparkContext.runJob(rdd, emptyFunc)
}
Some(new Job(time, jobFunc))
}
case None => None
}
}
- 1、调用getOrCompute方法获得RDD
- 2、new了一个方法去提交这个作业,缺什么都不做
为什么呢?这是直接跳转的错误,呵呵,因为这个outputStream是print方法返回的,它应该是ForEachDStream,所以我们应该看的是它里面的generateJob方法。
override def generateJob(time: Time): Option[Job] = {
logInfo("this is cause by print()")
parent.getOrCompute(time) match {
case Some(rdd) =>
val jobFunc = () => {
foreachFunc(rdd, time)
}
Some(new Job(time, jobFunc))
case None => None
}
}
这里请大家千万要注意,不要在这块被卡住了。我们看看它这个RDD是怎么出来的吧。
private[streaming] def getOrCompute(time: Time): Option[RDD[T]] = {
logInfo("get or compute run this way DStream")
// If this DStream was not initialized (i.e., zeroTime not set), then do it
// If RDD was already generated, then retrieve it from HashMap
generatedRDDs.get(time) match {
// If an RDD was already generated and is being reused, then
// probably all RDDs in this DStream will be reused and hence should be cached
// 这个RDD已经被生成过了,直接用就是了
case Some(oldRDD) => Some(oldRDD)
// if RDD was not generated, and if the time is valid
// 还没生成过,就调用compte函数生成一个
// (based on sliding time of this DStream), then generate the RDD
case None => {
if (isTimeValid(time)) {
compute(time) match {
case Some(newRDD) =>
if (storageLevel != StorageLevel.NONE) {
newRDD.persist(storageLevel)
logInfo("Persisting RDD " + newRDD.id + " for time " +
time + " to " + storageLevel + " at time " + time)
}
if (checkpointDuration != null &&
(time - zeroTime).isMultipleOf(checkpointDuration)) {
newRDD.checkpoint()
logInfo("Marking RDD " + newRDD.id + " for time " + time +
" for checkpointing at time " + time)
}
// 添加到generatedRDDs里面去,可以再次利用
generatedRDDs.put(time, newRDD)
Some(newRDD)
case None =>
logInfo("nononenoenenonoenoneoneoneeneon")
None
}
} else {
None
}
}
}
}
从上面的方法可以看出来它是通过每个DStream自己实现的compute函数得出来的RDD。我们找到SocketInputDStream,没有compute函数,在父类ReceiverInputDStream里面找到了。
override def compute(validTime: Time): Option[RDD[T]] = {
// If this is called for any time before the start time of the context,
// then this returns an empty RDD. This may happen when recovering from a
// master failure
if (validTime >= graph.startTime) {
val blockInfo = ssc.scheduler.receiverTracker.getReceivedBlockInfo(id)
receivedBlockInfo(validTime) = blockInfo
val blockIds = blockInfo.map(_.blockId.asInstanceOf[BlockId])
logInfo("blockIds +++++++++++++++" + blockIds.length)
logInfo("blockIds +++++++++++++++" + blockIds.foreach(ids=>println(ids.toString)))
Some(new BlockRDD[T](ssc.sc, blockIds))
} else {
Some(new BlockRDD[T](ssc.sc, Array[BlockId]()))
}
}
通过DStream的id把receiverTracker当中把接收到的block信息全部拿出来,记录到ReceiverInputDStream自身的receivedBlockInfo这个HashMap里面,就把RDD返回了,RDD里面实际包含的是Block的id的集合。 现在我们就可以回到之前JobGenerator的generateJobs方法,我们就清楚它这句是提交的什么了。 jobScheduler.submitJobSet(JobSet(time, jobs, receivedBlockInfo))
def submitJobSet(jobSet: JobSet) {
if (jobSet.jobs.isEmpty) {
logInfo("No jobs added for time " + jobSet.time)
} else {
logInfo("Added jobs for time" + jobSet.jobs.size)
jobSets.put(jobSet.time, jobSet)
jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
logInfo("Added jobs for time " + jobSet.time)
}
}
遍历jobSet里面的所有jobs,通过jobExecutor这个线程池提交。我们看一下JobHandler就知道了。
private class JobHandler(job: Job) extends Runnable {
def run() {
eventActor ! JobStarted(job)
logInfo("this job run +_+_+_+_+_+_+")
job.run()
eventActor ! JobCompleted(job)
}
}
- 1、通知eventActor处理JobStarted事件。
- 2、运行job。
- 3、通知eventActor处理JobCompleted事件。
这里的重点是job.run,事件处理只是更新相关的job信息。
在遍历BlockRDD的时候,在compute函数获取该Block(详细请看BlockRDD),然后对这个RDD的结果进行打印。
- 1、可以有多个输入,我们可以通过StreamingContext定 义多个输入,比如我们监听多个(host,ip),可以给它们定义各自的处理逻辑和输出,输出方式不仅限于print方法,还可以有别的方 法,saveAsTextFiles和saveAsObjectFiles。这块的设计是支持共享StreamingContext的。
- 2、StreamingContext启动了JobScheduler,JobScheduler启动ReceiverTracker和JobGenerator。
- 3、ReceiverTracker是通过把Receiver包装成RDD的方式,发送到Executor端运行起来的,Receiver起来之后向ReceiverTracker发送RegisterReceiver消息。
- 3、Receiver把接收到的数据,通过ReceiverSupervisor保存。
- 4、ReceiverSupervisorImpl把数据写入到BlockGenerator的一个ArrayBuffer当中。
- 5、BlockGenerator内部每个一段时间(默认是200毫秒)就把这个ArrayBuffer构造成Block添加到blocksForPushing当中。
- 6、BlockGenerator的另外一条线程则不断的把加入到blocksForPushing当中的Block写入到BlockManager当中,并向ReceiverTracker发送AddBlock消息。
- 7、JobGenerator内部有个定时器,定期生成Job,通过 DStream的id,把ReceiverTracker接收到的Block信息从BlockManager上抓取下来进行处理,这个间隔时间是我们在实 例化StreamingContext的时候传进去的那个时间,在这个例子里面是Seconds(1)。