Spark Streaming

Spark Streaming

SparkStreaming概述

SparkStreaming概念

Spark Streaming类似于Apache Storm,用于流式数据的处理。

官方文档介绍,Spark Streaming有高吞吐量和容错能力强等特点。Spark Streaming支持的数据输入源很多,例如:Kafka、Flume、Twitter、ZeroMQ和简单的TCP套接字等等。数据输入后可以用Spark的高度抽象原语如:map、reduce、join、window等进行运算。而结果也能保存在很多地方,如HDFS,数据库等。另外Spark Streaming也能和MLlib(机器学习)以及Graphx完美融合。

**解惑:流式大数据 **
流式大数据从这个角度看,可以把大数据分成两个:一个是批式大数据,另一个是流式大数据。
举个例子来说:我们把数据当成水库的话,水库里面存在的水就是批式大数据,进来的水是流式大数据。

Spark Streaming

Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。
Amazon Kinesis 可让您轻松收集、处理和分析实时流数据,以便您及时获得见解并对新信息快速做出响应。
dashboard是一种实时连续桌面信息检索引擎,以窗口形式出现在桌面上,可以提供实时天气情况、股票报价、航班时间等信息.
dashboard是商业智能仪表盘(business intelligence dashboard,BI dashboard)的简称,它是一般商业智能都拥有的实现数据可视化的模块,是向企业展示度量信息和关键业务指标(KPI)现状的数据虚拟化工具 。
dashboard在一个简单屏幕上联合并整理数字、公制和绩效记分卡。它们调整适应特定角色并展示为单一视角或部门指定的度量。
dashboard关键的特征是从多种数据源获取实时数据,并且是定制化的交互式界面。
dashboard以丰富的,可交互的可视化界面为数据提供更好的使用体验。

为什么要学习Spark Streaming

易用 (ease of use)

容错 (Fault Tolerance)

易整合到Spark体系 (Spark Integration)

SparkStreaming原理

Spark Streaming

应用场景

股票交易、交通状况、广告推荐、阿里双11

SparkStreaming 微批处理(类似于电梯),它并不是纯的批处理
优点:吞吐量大,可以做复杂的业务逻辑(保证每个job的处理小于batch interval)
缺点:数据延迟度较高

公司中为什么选用SparkStreaming要多一些?
1.秒级别延迟,通常应用程序是可以接受的
2.可以应用机器学习,SparkSQL…可扩展性比较好,数据吞吐量较高

实时计算框架对比

storm spark streaming flink
实时性: 有延迟
吞吐量:
应用范围 只能进行实时计算 离线+实时 离线+实时
编程便利性 算子较少 算子比较丰富 算子比较丰富
是否支持ML 不支持 支持 不支持
图计算 不支持 支持 不支持

流处理、高吞吐、

流数据:实时的数据
批数据:历史的数据

app/brower – server服务器 – 日志服务器 – flume采集日志 – kafka消息中间键 – Spark Streaming(storm)实时处理 – 数据库

把数据分成批,唯批处理
divides the data into batches

DStream

什么是DStream?

Discretized Stream是Spark Streaming的基础抽象,代表持续性的数据流和经过各种Spark原语操作后的结果数据流。在内部实现上,DStream是一系列连续的RDD来表示。每个RDD含有一段时间间隔内的数据,如下图:
Spark Streaming
spark流接收实时输入数据流,并将数据分成几个批次,然后由Spark engine进行处理,生成最终的结果流。
Spark Streaming

DStream相关操作

DStream上的原语与RDD的类似,分为Transformations(转换)和Output Operations(输出)两种,此外转换操作中还有一些比较特殊的原语,如:updateStateByKey()、transform()以及各种Window相关的原语。

官网: http://spark.apache.org/docs/latest/streaming-programming-guide.html
Transformations on DStreams
Similar to that of RDDs, transformations allow the data from the input DStream to be modified. DStreams support many of the transformations available on normal Spark RDD’s. Some of the common ones are as follows.

Transformation Meaning
map(func) Return a new DStream by passing each element of the source DStream through a function func.
flatMap(func) Similar to map, but each input item can be mapped to 0 or more output items.
filter(func) Return a new DStream by selecting only the records of the source DStream on which func returns true.
repartition(numPartitions) Changes the level of parallelism in this DStream by creating more or fewer partitions.
union(otherStream) Return a new DStream that contains the union of the elements in the source DStream and otherDStream.
count() Return a new DStream of single-element RDDs by counting the number of elements in each RDD of the source DStream.
reduce(func) Return a new DStream of single-element RDDs by aggregating the elements in each RDD of the source DStream using a function func (which takes two arguments and returns one). The function should be associative and commutative so that it can be computed in parallel.
countByValue() When called on a DStream of elements of type K, return a new DStream of (K, Long) pairs where the value of each key is its frequency in each RDD of the source DStream.
reduceByKey(func, [numTasks]) When called on a DStream of (K, V) pairs, return a new DStream of (K, V) pairs where the values for each key are aggregated using the given reduce function. Note: By default, this uses Spark’s default number of parallel tasks (2 for local mode, and in cluster mode the number is determined by the config property spark.default.parallelism) to do the grouping. You can pass an optional numTasks argument to set a different number of tasks.
join(otherStream, [numTasks]) When called on two DStreams of (K, V) and (K, W) pairs, return a new DStream of (K, (V, W)) pairs with all pairs of elements for each key.
cogroup(otherStream, [numTasks]) When called on a DStream of (K, V) and (K, W) pairs, return a new DStream of (K, Seq[V], Seq[W]) tuples.
transform(func) Return a new DStream by applying a RDD-to-RDD function to every RDD of the source DStream. This can be used to do arbitrary RDD operations on the DStream.
updateStateByKey(func) Return a new “state” DStream where the state for each key is updated by applying the given function on the previous state of the key and the new values for the key. This can be used to maintain arbitrary state data for each key.

特殊的Transformations
1、UpdateStateByKeyOperation
UpdateStateByKey原语用于记录历史记录,上文中Word Count示例中就用到了该特性。若不用UpdateStateByKey来更新状态,那么每次数据进来后分析完成后,结果输出后将不再保存

2、TransformOperation
Transform原语允许DStream上执行任意的RDD-to-RDD函数。通过该函数可以方便的扩展Spark API。此外,MLlib(机器学习)以及Graphx也是通过本函数来进行结合的。

3、WindowOperations
Window Operations有点类似于Storm中的State,可以设置窗口的大小和滑动窗口的间隔来动态的获取当前Steaming的允许状态。
Spark Streaming

Output Operations on DStreams
Output Operations可以将DStream的数据输出到外部的数据库或文件系统,当某个Output Operations原语被调用时(与RDD的Action相同),streaming程序才会开始真正的计算过程。

Some of the common window operations are as follows. All of these operations take the said two parameters - windowLength and slideInterval.

Transformation Meaning
window(windowLength, slideInterval) Return a new DStream which is computed based on windowed batches of the source DStream.
countByWindow(windowLength, slideInterval) Return a sliding window count of elements in the stream.
reduceByWindow(func, windowLength, slideInterval) Return a new single-element stream, created by aggregating elements in the stream over a sliding interval using func. The function should be associative and commutative so that it can be computed correctly in parallel.
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]) When called on a DStream of (K, V) pairs, returns a new DStream of (K, V) pairs where the values for each key are aggregated using the given reduce function func over batches in a sliding window. Note: By default, this uses Spark’s default number of parallel tasks (2 for local mode, and in cluster mode the number is determined by the config property spark.default.parallelism) to do the grouping. You can pass an optional numTasks argument to set a different number of tasks.
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]) A more efficient version of the above reduceByKeyAndWindow() where the reduce value of each window is calculated incrementally using the reduce values of the previous window. This is done by reducing the new data that enters the sliding window, and “inverse reducing” the old data that leaves the window. An example would be that of “adding” and “subtracting” counts of keys as the window slides. However, it is applicable only to “invertible reduce functions”, that is, those reduce functions which have a corresponding “inverse reduce” function (taken as parameter invFunc). Like in reduceByKeyAndWindow, the number of reduce tasks is configurable through an optional argument. Note that checkpointing must be enabled for using this operation.
countByValueAndWindow(windowLength, slideInterval, [numTasks]) When called on a DStream of (K, V) pairs, returns a new DStream of (K, Long) pairs where the value of each key is its frequency within a sliding window. Like in reduceByKeyAndWindow, the number of reduce tasks is configurable through an optional argument.

DStreams操作

DStreams转换

DStream上的原语与RDD的类似,分为Transformations(转换)和Output Operations(输出)两种,此外转换操作中还有一些比较特殊的原语,如:updateStateByKey()、transform()以及各种Window相关的原语。

Transformation Meaning
map(func) 将源DStream中的每个元素通过一个函数func从而得到新的DStreams。
flatMap(func) 和map类似,但是每个输入的项可以被映射为0或更多项。
filter(func) 选择源DStream中函数func判为true的记录作为新DStreams
repartition(numPartitions) 通过创建更多或者更少的partition来改变此DStream的并行级别。
union(otherStream) 联合源DStreams和其他DStreams来得到新DStream
count() 统计源DStreams中每个RDD所含元素的个数得到单元素RDD的新DStreams。
reduce(func) 通过函数func(两个参数一个输出)来整合源DStreams中每个RDD元素得到单元素RDD的DStreams。这个函数需要关联从而可以被并行计算。
countByValue() 对于DStreams中元素类型为K调用此函数,得到包含(K,Long)对的新DStream,其中Long值表明相应的K在源DStream中每个RDD出现的频率。
reduceByKey(func,[numTasks]) 对(K,V)对的DStream调用此函数,返回同样(K,V)对的新DStream,但是新DStream中的对应V为使用reduce函数整合而来。Note:默认情况下,这个操作使用Spark默认数量的并行任务(本地模式为2,集群模式中的数量取决于配置参数spark.default.parallelism)。你也可以传入可选的参数numTaska来设置不同数量的任务。
join(otherStream,[numTasks]) 两DStream分别为(K,V)和(K,W)对,返回(K,(V,W))对的新DStream。
cogroup(otherStream,[numTasks]) 两DStream分别为(K,V)和(K,W)对,返回(K,(Seq[V],Seq[W])对新DStreams
transform(func) 将RDD到RDD映射的函数func作用于源DStream中每个RDD上得到新DStream。这个可用于在DStream的RDD上做任意操作。updateStateByKey(func)得到”状态”DStream,其中每个key状态的更新是通过将给定函数用于此key的上一个状态和新值而得到。这个可用于保存每个key值的任意状态数据。

DStream 的转化操作可以分为无状态(stateless)和有状态(stateful)两种。
• 在无状态转化操作中,每个批次的处理不依赖于之前批次的数据。常见的 RDD 转化操作,例如 map()、filter()、reduceByKey() 等,都是无状态转化操作。
• 相对地,有状态转化操作需要使用之前批次的数据或者是中间结果来计算当前批次的数据。有状态转化操作包括基于滑动窗口的转化操作和追踪状态变化的转化操作。

无状态转化操作

无状态转化操作就是把简单的 RDD 转化操作应用到每个批次上,也就是转化 DStream 中的每一个 RDD。部分无状态转化操作列在了下表中。 注意,针对键值对的 DStream 转化操作(比如 reduceByKey())要添加import StreamingContext._ 才能在 Scala中使用。 ’
Spark Streaming
需要记住的是,尽管这些函数看起来像作用在整个流上一样,但事实上每个 DStream 在内部是由许多 RDD(批次)组成,且无状态转化操作是分别应用到每个 RDD 上的。例如, reduceByKey() 会归约每个时间区间中的数据,但不会归约不同区间之间的数据。
举个例子,在之前的wordcount程序中,我们只会统计1秒内接收到的数据的单词个数,而不会累加。
无状态转化操作也能在多个 DStream 间整合数据,不过也是在各个时间区间内。例如,键 值对 DStream 拥有和RDD 一样的与连接相关的转化操作,也就是 cogroup()、join()、 leftOuterJoin() 等。我们可以在 DStream 上使用这些操作,这样就对每个批次分别执行了对应的 RDD 操作。
我们还可以像在常规的 Spark 中一样使用 DStream 的 union() 操作将它和另一个 DStream 的内容合并起来,也可以使用 StreamingContext.union() 来合并多个流。

有状态转化操作

特殊的Transformations

追踪状态变化UpdateStateByKey
UpdateStateByKey原语用于记录历史记录,有时,我们需要在 DStream 中跨批次维护状态(例如流计算中累加wordcount)。针
对这种情况,updateStateByKey() 为我们提供了对一个状态变量的访问,用于键值对形式的DStream。给定一个由(键,事件)对构成的 DStream,并传递一个指定如何根据新的事件 更新每个键对应状态的函数,它可以构建出一个新的 DStream,其内部数据为(键,状态) 对。
updateStateByKey() 的结果会是一个新的 DStream,其内部的 RDD 序列是由每个时间区间对应的(键,状态)对组成的。
updateStateByKey操作使得我们可以在用新信息进行更新时保持任意的状态。为使用这个功能,你需要做下面两步:

  1. 定义状态,状态可以是一个任意的数据类型。
  2. 定义状态更新函数,用此函数阐明如何使用之前的状态和来自输入流的新值对状态进行更新。
    使用updateStateByKey需要对检查点目录进行配置,会使用检查点来保存状态。

Window Operations
Window Operations有点类似于Storm中的State,可以设置窗口的大小和滑动窗口的间隔来动态的获取当前Steaming的允许状态。
基于窗口的操作会在一个比 StreamingContext 的批次间隔更长的时间范围内,通过整合多个批次的结果,计算出整个窗口的结果。
Spark Streaming
所有基于窗口的操作都需要两个参数,分别为窗口时长以及滑动步长,两者都必须是 StreamContext 的批次间隔的整数倍。窗口时长控制每次计算最近的多少个批次的数据,其实就是最近的 windowDuration/batchInterval 个批次。如果有一个以 10 秒为批次间隔的源 DStream,要创建一个最近 30 秒的时间窗口(即最近 3 个批次),就应当把 windowDuration 设为 30 秒。而滑动步长的默认值与批次间隔相等,用来控制对新的 DStream 进行计算的间隔。如果源 DStream 批次间隔为 10 秒,并且我们只希望每两个批次计算一次窗口结果,就应该把滑动步长设置为 20 秒。
假设,你想拓展前例从而每隔十秒对持续30秒的数据生成word count。为做到这个,我们需要在持续30秒数据的(word,1)对DStream上应用reduceByKey。使用操作reduceByKeyAndWindow.

# reduce last 30 seconds of data, every 10 second
windowedWordCounts = pairs.reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x -y, 30, 20)
Transformation Meaning
window(windowLength, slideInterval) 基于对源DStream窗化的批次进行计算返回一个新的DStream
countByWindow(windowLength,slideInterval) 返回一个滑动窗口计数流中的元素。
reduceByWindow(func,windowLength,slideInterval) 通过使用自定义函数整合滑动区间流元素来创建一个新的单元素流。
reduceByKeyAndWindow(func,windowLength,slideInterval,[numTasks]) 当在一个(K,V)对的DStream上调用此函数,会返回一个新(K,V)对的DStream,此处通过对滑动窗口中批次数据使用reduce函数来整合每个key的value值。Note:默认情况下,这个操作使用Spark的默认数量并行任务(本地是2),在集群模式中依据配置属性(spark.default.parallelism)来做grouping。你可以通过设置可选参数numTasks来设置不同数量的tasks。
reduceByKeyAndWindow(func,invFunc,windowLength,slideInterval,[numTasks]) 这个函数是上述函数的更高效版本,每个窗口的reduce值都是通过用前一个窗的reduce值来递增计算。通过reduce进入到滑动窗口数据并”反向reduce”离开窗口的旧数据来实现这个操作。一个例子是随着窗口滑动对keys的“加”“减”计数。通过前边介绍可以想到,这个函数只适用于”可逆的reduce函数”,也就是这些reduce函数有相应的”反reduce”函数(以参数invFunc形式传入)。如前述函数,reduce任务的数量通过可选参数来配置。注意:为了使用这个操作,检查点必须可用。
countByValueAndWindow(windowLength,slideInterval,[numTasks]) 对(K,V)对的DStream调用,返回(K,Long)对的新DStream,其中每个key的值是其在滑动窗口中频率。如上,可配置reduce任务数量。

reduceByWindow() 和 reduceByKeyAndWindow() 让我们可以对每个窗口更高效地进行归约操作。它们接收一个归约函数,在整个窗口上执行,比如 +。除此以外,它们还有一种特殊形式,通过只考虑新进入窗口的数据和离开窗 口的数据,让 Spark 增量计算归约结果。这种特殊形式需要提供归约函数的一个逆函数,比 如 + 对应的逆函数为 -。对于较大的窗口,提供逆函数可以大大提高执行效率.
Spark Streaming

countByWindow() 和 countByValueAndWindow() 作为对数据进行 计数操作的简写。countByWindow() 返回一个表示每个窗口中元素个数的 DStream,而 countByValueAndWindow() 返回的 DStream 则包含窗口中每个值的个数.

重要操作
updateStateByKey:得到”状态”DStream,其中每个key状态的更新是通过将给定函数用于此key的上一个状态和新值而得到。这个可用于保存每个key值的任意状态数据。可以按批次累加updateStateByKey。

Transform原语允许DStream上执行任意的RDD-to-RDD函数。即使这些函数并没有在DStream的API中暴露出来,通过该函数可以方便的扩展Spark API。此外,MLlib(机器学习)以及Graphx也是通过本函数来进行结合的。该函数每一批次调度一次。
比如下面的例子,在进行单词统计的时候,想要过滤掉spam的信息。

val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information
val cleanedDStream = wordCounts.transform { rdd =>
	rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data
	cleaning
	...
}

其实也就是对DStream中的RDD应用转换。

窗口函数,Window Operations有点类似于Storm中的State,可以设置窗口的大小和滑动窗口的间隔来动态的获取当前Steaming的允许状态。可以调用窗口操作来计算数据的聚合

Join 操作:连接操作(leftOuterJoin, rightOuterJoin, fullOuterJoin也可以),可以连接Stream-Stream,windows-stream to windows-stream、stream-dataset

Stream-Stream Joins

val stream1: DStream[String, String] = ...
val stream2: DStream[String, String] = ...
val joinedStream = stream1.join(stream2)
val windowedStream1 = stream1.window(Seconds(20))
val windowedStream2 = stream2.window(Minutes(1))
val joinedStream = windowedStream1.join(windowedStream2)

Stream-dataset joins

val dataset: RDD[String, String] = ...
val windowedStream = stream.window(Seconds(20))...
val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) }

DStreams输出

输出操作指定了对流数据经转化操作得到的数据所要执行的操作(例如把结果推入外部数据库或输出到屏幕上)。与RDD 中的惰性求值类似,如果一个 DStream 及其派生出的 DStream 都没有被执行输出操作,那么这些 DStream就都不会被求值。如果 StreamingContext 中没有设定输出操作,整个 context 就都不会启动。

Output Operation Meaning
print() 在运行流程序的驱动结点上打印DStream中每一批次数据的最开始10个元素。这用于开发和调试。在Python API中,同样的操作叫pprint()。
saveAsTextFiles(prefix,[suffix]) 以text文件形式存储这个DStream的内容。每一批次的存储文件名基于参数中的prefix和suffix。”prefix-Time_IN_MS[.suffix]”.
saveAsObjectFiles(prefix,[suffix]) 以Java对象序列化的方式将Stream中的数据保存为 SequenceFiles . 每一批次的存储文件名基于参数中的为"prefix-TIME_IN_MS[.suffix]". Python中目前不可用。
saveAsHadoopFiles(prefix,[suffix]) 将Stream中的数据保存为 Hadoop files. 每一批次的存储文件名基于参数中的为"prefix-TIME_IN_MS[.suffix]". Python API Python中目前不可用。
foreachRDD(func) 这是最通用的输出操作,即将函数func用于产生于stream的每一个RDD。其中参数传入的函数func应该实现将每一个RDD中数据推送到外部系统,如将RDD存入文件或者通过网络将其写入数据库。注意:函数func在运行流应用的驱动中被执行,同时其中一般函数RDD操作从而强制其对于流RDD的运算。

通用的输出操作 foreachRDD(),它用来对 DStream 中的 RDD 运行任意计算。这和transform() 有些类似,都可以让我们访问任意 RDD。在 foreachRDD() 中,可以重用我们在 Spark 中实现的所有行动操作。比如,常见的用例之一是把数据写到诸如 MySQL 的外部数据库中。

需要注意的:

  1. 连接不能写在driver层面
  2. 如果写在foreach则每个RDD都创建,得不偿失
  3. 增加foreachPartition,在分区创建
  4. 可以考虑使用连接池优化

案例实战

netcat能通过TCP和UDP在网络中读写数据。
nc -lk 8888
在服务器 A 执行以上命令,将会把 nc 绑定到 8888 端口,并开始监听请求。 -l 代表 netcat 将以监听模式运行; -k表示 nc 在接收完一个请求后不会立即退出,而是会继续监听其他请求。 这时就可以请求该接口了, nc 会把请求报文输出到标准输出。
这里我们使用netcat进行传输数据。
1.

import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

object sparkStreamingWC {
  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setAppName("sparkStreamingWC").setMaster("local[2]")

    val sc = new SparkContext(sparkConf)

    //创建sparkstreaming入口对象
    //设置数据批次的时间间隔
    val ssc:StreamingContext = new StreamingContext(sc,Seconds(6))

    //监听服务的IP,端口
    val dstream= ssc.socketTextStream("hadoop01",1235)

    val res:DStream[(String,Int)] = dstream.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)

    //打印结果数据,默认打印RDD中的前十个元素
    res.print()

    //提交任务到集群
    ssc.start()

    //线程等待下一批次的任务
    ssc.awaitTermination()
  }
}
import org.apache.spark.{HashPartitioner, SparkConf}
import org.apache.spark.streaming.{Milliseconds, StreamingContext}

//实现批次数据的累加
object sparkStreamingWC2 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("sparkStreamingWC2").setMaster("local[2]")
    val ssc = new StreamingContext(conf,Milliseconds(100000))

    //设置检查点,存已经处理的历史数据
    ssc.checkpoint("c://data/ck")

    //获取数据
    val dstream = ssc.socketTextStream("hadoop01",1234)
    val tup = dstream.flatMap(_.split(" ")).map((_,1))
    val res = tup.updateStateByKey(func,new HashPartitioner(ssc.sparkContext.defaultParallelism),true)

    res.print()
    ssc.start()
    ssc.awaitTermination()
  }
    val func =  (it:Iterator[(String, Seq[Int], Option[Int])]) =>{
      it.map(x=>{
        (x._1,x._2.sum+x._3.getOrElse(0))//当前+历史数据
      })
    }
}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}

//过滤广告黑名单
object transformDemo {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("transformOP").setMaster("local[*]")

    val ssc = new StreamingContext(conf,Seconds(5))

    //定义一个黑名单
    val blackList = List(("tom",true),("jerry",true))
    val blackListRdd = ssc.sparkContext.parallelize(blackList)
    //收到的数据流
    val socketData = ssc.socketTextStream("hadoop01",2345)
    //解析出用户信息
    val user:DStream[(String,String)] = socketData.map(line=>(line.split(" ")(1),line))

    //join ???
    //(key,(收到的行信息,null或者true))
    val filterRDD = user.transform(u=>{
      val joinrdd = u.leftOuterJoin(blackListRdd)
      val tmp = joinrdd.filter(tuple=>{
        if(tuple._2._2.getOrElse(false)){
          false
        }else{
          true
        }
      })
      tmp.map(tuple=>tuple._2._1)
    })

    filterRDD.print()
    ssc.start()
    ssc.awaitTermination()
  }
}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object windowOpObj {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("windoeOpObj").setMaster("local[*]")
    val ssc = new StreamingContext(conf,Seconds(5))

    //存储不同(之前)批次的数据
    ssc.checkpoint("c://data/ck01")

    //获取数据
    val dstream = ssc.socketTextStream("hadoop01",2345)

    //调用窗口函数聚合多个批次的数据
    val tuple = dstream.flatMap(_.split(" ")).map((_,1))
    //设置窗口长度,以及滑动时间间隔,滑动时间应该是批处理的倍数
    val res = tuple.reduceByKeyAndWindow((x:Int,y:Int)=>{x+y},Seconds(15),Seconds(10))

    res.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

附:
pv,uv,vv:
1、PV即PageView,网站浏览量,指页面的浏览次数,用以衡量网站用户访问的网页数量。用户没打开一个页面便记录1次PV,多次打开同一页面则浏览量累计;
2、UV即UniqueVistor,独立访客数,指1天内访问某站点的人数,以cookie为依据。1天内同一访客的多次访问只计为1个访客;
3、VV即VisitView,访客的访问次数,用以记录所有访客1天内访问了多少次您的网站。当访客完成所有浏览并最终关掉该网站的所有页面时便完成了一次访问,同一访客1天内可能有多次访问行为,访问次数累计;
4、IP即独立IP数,指1天内使用不同IP地址的用户访问网站的数量,同一IP无论访问了几个页面,独立IP数均为1