基于SparkStreaming的Window Operations

Window Operations有点类似于Storm中的State,可以设置窗口的大小和滑动窗口的间隔来动态的获取当前Steaming的允许状态

基于SparkStreaming的Window Operations

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Milliseconds, Seconds, StreamingContext}

object WindowOpts {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("WindowOpts").setMaster("local[2]")
    val streamContext = new StreamingContext(conf,Milliseconds(5000))

    val lines = streamContext.socketTextStream("192.168.12.77",8888)

    val words = lines.flatMap(_.split(" ")).map((_,1))

/*    def reduceByKeyAndWindow(reduceFunc : scala.Function2[V, V, V],
                             windowDuration : org.apache.spark.streaming.Duration,
                             slideDuration : org.apache.spark.streaming.Duration) : org.apache.spark.streaming.dstream.DStream[scala.Tuple2[K, V]] = { /* compiled code */ }*/
    /**
      * windowDuration:窗口持续时间
      * slideDuration:滑动持续时间
      */
    val windowWordCount = words.reduceByKeyAndWindow((a:Int,b:Int) => (a + b),Seconds(15),Seconds(10))

    windowWordCount.print()
    streamContext.start()
    streamContext.awaitTermination()
  }
}

 2.在Linux机器上用YUM安装nc工具

yum install -y nc

启动一个服务器并监听8888端口

基于SparkStreaming的Window Operations

3.运行代码程序

基于SparkStreaming的Window Operations

基于SparkStreaming的Window Operations

基于SparkStreaming的Window Operations