SparkStreaming第一个程序--从socket端口读取数据并统计单词数量
因为是要读取socket端口的数据,所以要启动一个socket端口,可以在虚拟机中安装nc,
安装命令为:sudo yum install nc
,当然也可以自己写一个socket端口。
安装好之后,启动客户端和服务端:nc -lk 8888
,
streaming读取socket端口数据的原理:
代码如下:
package XXX
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
/**
* Create by 。。。
*
*/
object StreamingWordCount {
def main(args: Array[String]): Unit = {
//离线任务是创建SparkContext,现在要实现实时计算,用StreamingContext
val conf = new SparkConf().setAppName("StreamingWordCount").setMaster("local[2]")
val sc = new SparkContext(conf)
//StreamingContext是对SparkContext得包装,包了一层就增加了实时的功能
//第二个参数是小批次产生的时间间隔
val ssc = new StreamingContext(sc,Seconds(5))
//有了StreamingContext,就可以创建SparkStreaming的抽象DStream了
//从一个socket端口中读取数据
val lines: ReceiverInputDStream[String] = ssc.socketTextStream("192.168.67.134",8888)
//对DStream进行操作,操作这个抽象(代理,描述),就像操作本地集合一样
//切分压平
val words: DStream[String] = lines.flatMap(_.split(" "))
//将单词和1组合在一起
val wordAndOne: DStream[(String, Int)] = words.map((_,1))
//按key进行聚合
val reduced: DStream[(String, Int)] = wordAndOne.reduceByKey(_+_)
//打印结果(Action)
reduced.print()
//启动sparkStreaming程序
ssc.start()
//等待优雅的退出
ssc.awaitTermination()
}
}