基于SparkStreaming的Window Operations
Window Operations有点类似于Storm中的State,可以设置窗口的大小和滑动窗口的间隔来动态的获取当前Steaming的允许状态
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Milliseconds, Seconds, StreamingContext}
object WindowOpts {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("WindowOpts").setMaster("local[2]")
val streamContext = new StreamingContext(conf,Milliseconds(5000))
val lines = streamContext.socketTextStream("192.168.12.77",8888)
val words = lines.flatMap(_.split(" ")).map((_,1))
/* def reduceByKeyAndWindow(reduceFunc : scala.Function2[V, V, V],
windowDuration : org.apache.spark.streaming.Duration,
slideDuration : org.apache.spark.streaming.Duration) : org.apache.spark.streaming.dstream.DStream[scala.Tuple2[K, V]] = { /* compiled code */ }*/
/**
* windowDuration:窗口持续时间
* slideDuration:滑动持续时间
*/
val windowWordCount = words.reduceByKeyAndWindow((a:Int,b:Int) => (a + b),Seconds(15),Seconds(10))
windowWordCount.print()
streamContext.start()
streamContext.awaitTermination()
}
}
2.在Linux机器上用YUM安装nc工具
yum install -y nc
启动一个服务器并监听8888端口
3.运行代码程序