5.1 SparkStreaming接受socket数据,实现单词计数WordCount
5.1.1 架构图
5.1.2 实现流程
(1)安装并启动生产者
首先在linux服务器上用YUM安装nc工具,nc命令是netcat命令的简称,它是用来设置路由器。我们可以利用它向某个端口发送数据。
yum install -y nc
(2)通过netcat工具向指定的端口发送数据
nc -lk 9999
(3)编写Spark Streaming程序
[AppleScript] 纯文本查看 复制代码
?
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
package cn.test.spark[ / align]
import org.apache.spark.streaming.dstream. { DStream , ReceiverInputDStream }
import org.apache.spark.streaming. { Seconds , StreamingContext }
import org.apache.spark. { SparkConf , SparkContext }
/ * *
* sparkStreming流式处理接受socket数据,实现单词统计
* /
object SparkStreamingTCP {
def main ( args : Array[String] ) : Unit = {
/ / 配置sparkConf参数
val sparkConf : SparkConf = new SparkConf ( ) .setAppName ( "SparkStreamingTCP" ) .setMaster ( "local[2]" )
/ / 构建sparkContext对象
val sc : SparkContext = new SparkContext ( sparkConf )
/ / 设置日志输出级别
sc.setLogLevel ( "WARN" )
/ / 构建StreamingContext对象,每个批处理的时间间隔
val scc : StreamingContext = new StreamingContext ( sc , Seconds ( 5 ) )
/ / 注册一个监听的IP地址和端口 用来收集数据
val lines : ReceiverInputDStream[String] = scc.socketTextStream ( "192.168.200.160" , 9999 )
/ / 切分每一行记录
val words : DStream[String] = lines.flatMap ( _.split ( " " ) )
/ / 每个单词记为 1
val wordAndOne : DStream[ ( String , Int ) ] = words.map ( ( _ , 1 ) )
/ / 分组聚合
val result : DStream[ ( String , Int ) ] = wordAndOne.reduceByKey ( _ + _ )
/ / 打印数据
result . print ( )
scc.start ( )
scc.awaitTermination ( )
}
}
|
由于使用的是本地模式"local[2]"所以可以直接在本地运行该程序
注意:要指定并行度,如在本地运行设置setMaster("local[2]"),相当于启动两个线程,一个给receiver,一个给computer。如果是在集群中运行,必须要求集群中可用core数大于1。
5.1.3 执行查看效果
(1)先执行nc -lk 9999
(2)然后在执行以上代码
(3)不断的在(1)中输入不同的单词,观察IDEA控制台输出
现象:sparkStreaming每隔5s计算一次当前5s内的数据,然后将每个批次的数据输出。
5.2 SparkStreaming接受socket数据,实现所有批次单词计数结果累加
在上面的那个案例中存在这样一个问题:每个批次的单词次数都被正确的统计出来,但是结果不能累加!如果将所有批次的结果数据进行累加使用
updateStateByKey(func)来更新状态.
5.2.1 架构图
5.2.2 实现流程
(2)安装并启动生成者
首先在linux服务器上用YUM安装nc工具,nc命令是netcat命令的简称,都是用来设置路由器。我们可以利用它向某个端口发送数据。
yum install -y nc
(2)启动一个服务端并监听9999端口
nc-lk 9999
向指定的端口发送数据
(3)编写Spark Streaming程序
[AppleScript] 纯文本查看 复制代码
?
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
package cn.test.spark[ / align]
import org.apache.spark.streaming.dstream. { DStream , ReceiverInputDStream }
import org.apache.spark.streaming. { Seconds , StreamingContext }
import org.apache.spark. { SparkConf , SparkContext }
/ * *
* sparkStreaming流式处理,接受socket数据,实现单词统计并且每个批次数据结果累加
* /
object SparkStreamingTCPTotal {
/ / newValues 表示当前批次汇总成的 ( word , 1 ) 中相同单词的所有的 1
/ / runningCount 历史的所有相同 key 的 value 总和
def updateFunction ( newValues : Seq[Int] , runningCount : Option[Int] ) : Option[Int] = {
val newCount = runningCount.getOrElse ( 0 ) + newValues.sum
Some ( newCount )
}
def main ( args : Array[String] ) : Unit = {
/ / 配置sparkConf参数
val sparkConf : SparkConf = new SparkConf ( ) .setAppName ( "SparkStreamingTCPTotal" ) .setMaster ( "local[2]" )
/ / 构建sparkContext对象
val sc : SparkContext = new SparkContext ( sparkConf )
/ / 设置日志输出的级别
sc.setLogLevel ( "WARN" )
/ / 构建StreamingContext对象,每个批处理的时间间隔
val scc : StreamingContext = new StreamingContext ( sc , Seconds ( 5 ) )
/ / 设置checkpoint路径,当前项目下有一个ck目录
scc.checkpoint ( "./ck" )
/ / 注册一个监听的IP地址和端口 用来收集数据
val lines : ReceiverInputDStream[String] = scc.socketTextStream ( "192.168.200.160" , 9999 )
/ / 切分每一行记录
val words : DStream[String] = lines.flatMap ( _.split ( " " ) )
/ / 每个单词记为 1
val wordAndOne : DStream[ ( String , Int ) ] = words.map ( ( _ , 1 ) )
/ / 累计统计单词出现的次数
val result : DStream[ ( String , Int ) ] = wordAndOne.updateStateByKey ( updateFunction )
result . print ( )
scc.start ( )
scc.awaitTermination ( )
}
}
|
通过函数updateStateByKey实现。根据key的当前值和key的之前批次值,对key进行更新,返回一个新状态的DStream
5.2.3 执行查看效果
(1)先执行nc -lk 9999
(2)然后在执行以上代码
(3)不断的在(1)中输入不同的单词,观察IDEA控制台输出
现象:sparkStreaming每隔5s计算一次当前5s内的数据,然后将每个批次的结果数据累加输出。
5.3 SparkStreaming开窗函数reduceByKeyAndWindow,实现单词计数5.3.1 架构图
5.3.2 实现流程
(1)安装并启动生成者
首先在linux服务器上用YUM安装nc工具,nc命令是netcat命令的简称,都是用来设置路由器。我们可以利用它向某个端口发送数据。
yum install -y nc
(2)启动一个服务端并监听9999端口
nc-lk 9999
向指定的端口发送数据
(3)编写Spark Streaming程序
[AppleScript] 纯文本查看 复制代码
?
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
package cn.test.spark[ / align]
import org.apache.spark.streaming.dstream. { DStream , ReceiverInputDStream }
import org.apache.spark.streaming. { Seconds , StreamingContext }
import org.apache.spark. { SparkConf , SparkContext }
/ * *
* sparkStreming开窗函数 ---统计一定时间内单词出现的次数
* /
object SparkStreamingTCPWindow {
def main ( args : Array[String] ) : Unit = {
/ / 配置sparkConf参数
val sparkConf : SparkConf = new SparkConf ( ) .setAppName ( "SparkStreamingTCPWindow" ) .setMaster ( "local[2]" )
/ / 构建sparkContext对象
val sc : SparkContext = new SparkContext ( sparkConf )
sc.setLogLevel ( "WARN" )
/ / 构建StreamingContext对象,每个批处理的时间间隔
val scc : StreamingContext = new StreamingContext ( sc , Seconds ( 5 ) )
/ / 注册一个监听的IP地址和端口 用来收集数据
val lines : ReceiverInputDStream[String] = scc.socketTextStream ( "192.168.200.160" , 9999 )
/ / 切分每一行记录
val words : DStream[String] = lines.flatMap ( _.split ( " " ) )
/ / 每个单词记为 1
val wordAndOne : DStream[ ( String , Int ) ] = words.map ( ( _ , 1 ) )
/ / reduceByKeyAndWindow函数参数意义:
/ / windowDuration : 表示 window 框住的时间长度,如本例 5 秒切分一次RDD,框 10 秒,就会保留最近 2 次切分的RDD
/ / slideDuration : 表示 window 滑动的时间长度,即每隔多久执行本计算
val result : DStream[ ( String , Int ) ] = wordAndOne.reduceByKeyAndWindow ( ( a : Int , b : Int ) = > a + b , Seconds ( 10 ) , Seconds ( 5 ) )
result . print ( )
scc.start ( )
scc.awaitTermination ( )
}
}
|
5.3.3 执行查看效果(1)先执行nc -lk 9999
(2)然后在执行以上代码
(3)不断的在(1)中输入不同的单词,观察IDEA控制台输出
现象:sparkStreaming每隔5s计算一次当前在窗口大小为10s内的数据,然后将结果数据输出。
5.4 SparkStreaming开窗函数统计一定时间内的热门词汇5.4.1 架构图
5.4.2 实现流程
(1)安装并启动生产者
首先在linux服务器上用YUM安装nc工具,nc命令是netcat命令的简称,都是用来设置路由器。我们可以利用它向某个端口发送数据。
yum install -y nc
(2)启动一个服务端并监听9999端口
nc -lk 9999
向指定的端口发送数据
(3)编写Spark Streaming程序
[AppleScript] 纯文本查看 复制代码
?
01
02
03
04
05
06
07
08
09
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
|
package cn.test.spark
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream. { DStream , ReceiverInputDStream }
import org.apache.spark.streaming. { Seconds , StreamingContext }
import org.apache.spark. { SparkConf , SparkContext }
/ * *
* sparkStreming开窗函数应用 ----统计一定时间内的热门词汇
* /
object SparkStreamingTCPWindowHotWords {
def main ( args : Array[String] ) : Unit = {
/ / 配置sparkConf参数
val sparkConf : SparkConf = new SparkConf ( ) .setAppName ( "SparkStreamingTCPWindowHotWords" ) .setMaster ( "local[2]" )
/ / 构建sparkContext对象
val sc : SparkContext = new SparkContext ( sparkConf )
sc.setLogLevel ( "WARN" )
/ / 构建StreamingContext对象,每个批处理的时间间隔
val scc : StreamingContext = new StreamingContext ( sc , Seconds ( 5 ) )
/ / 注册一个监听的IP地址和端口 用来收集数据
val lines : ReceiverInputDStream[String] = scc.socketTextStream ( "192.168.200.160" , 9999 )
/ / 切分每一行记录
val words : DStream[String] = lines.flatMap ( _.split ( " " ) )
/ / 每个单词记为 1
val wordAndOne : DStream[ ( String , Int ) ] = words.map ( ( _ , 1 ) )
/ / reduceByKeyAndWindow函数参数意义:
/ / windowDuration : 表示 window 框住的时间长度,如本例 5 秒切分一次RDD,框 10 秒,就会保留最近 2 次切分的RDD
/ / slideDuration : 表示 window 滑动的时间长度,即每隔多久执行本计算
val result : DStream[ ( String , Int ) ] = wordAndOne.reduceByKeyAndWindow ( ( a : Int , b : Int ) = > a + b , Seconds ( 10 ) , Seconds ( 5 ) )
val data = result .transform ( rdd = > {
/ / 降序处理后,取前 3 位
val dataRDD : RDD[ ( String , Int ) ] = rdd.sortBy ( t = > t._ 2 , false )
val sortResult : Array[ ( String , Int ) ] = dataRDD.take ( 3 )
println ( "--------------print top 3 begin--------------" )
sortResult.foreach ( println )
println ( "--------------print top 3 end--------------" )
dataRDD
} )
data . print ( )
scc.start ( )
scc.awaitTermination ( )
}
}
|
5.4.3 执行查看效果
(1)先执行nc -lk 9999
(3)然后在执行以上代码
(3)不断的在(1)中输入不同的单词,观察IDEA控制台输出
现象:sparkStreaming每隔5s计算一次当前在窗口大小为10s内的数据,然后将单词出现次数最多的前3位进行输出打印。