SparkStreaming内部结构以及经典案例与测试工具的使用--------updateSetBykey以及检查点的运用
1.SparkStreaming的内部结构,Spark Streaming将连续的数据流抽象为DStream。在内部,DStream 由一个RDD序列表示,然后将一个个RDD通过SparkEngine处理后输出。
-------------------------------开发自己的实时词频统计程序----------------------
**特别需要注意的一个是虚拟机的核数最小是2,因为一个用于接收数据,一个用于处理数据
object MyNetworkWordCount {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
//创建StreamingContext对象
val SparkConf = new SparkConf().setAppName("MyNetworkWordCount").setMaster("local[2]")
//定义一个采样时间,每隔两秒钟采集一次数据
val ssc = new StreamingContext(SparkConf,Seconds(2))
//创建一个离散流,DStream代表输入的数据流
val lines = ssc.socketTextStream("hadoop01",5678)
//处理数据
val words: DStream[String] = lines.flatMap(_.split(" "))
val result = words.map(x=>(x,1)).reduceByKey(_+_)
//输出结果
result.print()
//启动StreamingContext,开始执行计算
ssc.start()
//等待计算完成
ssc.awaitTermination()
}
}
------------------------------开发自己的实时词频统计程序(累计单词出现次数)--------------
**这里面可以统计累计出现的次数了,但是一关闭程序之前的记录就清空了
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
object MyNetworkTotalWordCount {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
val sparkConf = new SparkConf().setAppName("MyNetworkWordCount").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf,Seconds(3))
ssc.checkpoint("./skt")
val lines: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop01",5678)
val result: DStream[(String, Int)] = lines.flatMap(_.split(" ")).map((_,1))
//Option的作用是,如果有的话就是Some,没有的话就是NULL
val updateStateFunc = (currValues:Seq[Int], preValues:Option[Int])=>{
val currentTotal = currValues.sum
val totalValues = preValues.getOrElse(0)
Some(currentTotal+totalValues)
}
val totalResult = result.updateStateByKey(updateStateFunc)
//输出
totalResult.print()
//启动StreamingContext
ssc.start()
//等待计算完成
ssc.awaitTermination()
}
}
-----------------------------自动保存之前的数据,关闭程序后再次打开依然可以使用---------
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
object MyNetworkTotalWordCountV2 {
val ckp = "./ckp"
/**
* 该函数会作用在相同key的value上
*/
//
def updateFunction (newValues:Seq[Int],runningCount:Option[Int]):Option[Int] = {
//得到当前的总和
val currentTotal = newValues.sum
//执行累加操作,如果是第一次执行(如果单词第一次执行,则没有之前的值)
val totalValues = runningCount.getOrElse(0)
Some(currentTotal+totalValues)
}
//如果从checkout目录中恢复不了上一个Job的实例,则创建一个新的ssc
def funcToCreateContext() = {
println("new Create")
//创建StreamingContext对象
val sparkConf = new SparkConf().setAppName("MyNetworkWordCount").setMaster("local[2]")
//定义一个采样时间,每隔2秒采集一次数据,这个时间不能随意设置
val ssc = new StreamingContext(sparkConf,Seconds(2))
//设置检查点目录
ssc.checkpoint(ckp)
//创建一个离散流,DStream代表输入的数据流
val lines: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop01", 1122)
//设置checkpoint,默认每6秒做一次checkpoint
lines.checkpoint(Seconds(6))
//处理分词,每个单词记一次数
val words = lines.flatMap(_.split(" "))
val pairs = words.map(x=>(x, 1))
//累加
val totalResult = pairs.updateStateByKey(updateFunction _)
totalResult.print()
ssc
}
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.OFF)
//定义一个采样时间,每隔2秒钟采集一次数据,这个时间不能随意设置
val ssc = StreamingContext.getOrCreate(ckp, funcToCreateContext _)
//开始计算
ssc.start()
//等待计算被中断
ssc.awaitTermination()
}
}
---------------------------------测试工具NetCat-----------------------------------------------------
使用工具:NetCat
步骤:(1)启动netcat服务器
nc -lk 1234
(2)启动SparkStreaming的客户端
bin/run-example streaming.NetworkWordCount bigdata01 1234
安装nc步骤:
(1)先将已安装的nc删除:yum erase nc
(2)下载较低版本的nc的.rpm文件
wget http://vault.centos.org/6.6/os/x86_64/Packages/nc-1.84-22.el6.x86_64.rpm
(3)安装.rpm文件
rpm -iUv nc-1.84-22.el6.x86_64.rpm
执行以上步骤命令后检查nc是否安装好,执行 nc -lk 1234
开启一个端口用来输入数据,端口号和上面程序设置的要一样,这样就可以看到上面程序的执行结果了。