SparkStreaming(三)离散流(DStream)

3、离散流(DStream)

SparkStreaming使用”微批次”的架构,把流式计算当做一系列连续的小规模批处理来对待。SparkStreaming从各种输入源中读取数据,并把数据分组为小的批次。新的批次按均匀的时间间隔创建出来。在每个时间区间开始的时候,一个新的批次就创建出来,在该区间内收到的数据都会被添加到这个批次中。在时间区间结束时,批次停止增长。时间区间的大小是由批处理间隔这个参数决定的。批次间隔一般设在500毫秒到几秒之间,由应用开发者配置。每个输入批次都形成一个RDD,以Spark作业的方式处理并生成其他的RDD。处理的结果可以以批处理的方式传给外部系统。

         SparkStreaming(三)离散流(DStream)

                                     图1:SparkStreaming的高层次架构

3.1、简介及流程

DiscretizedStreamDStream 是Spark Streaming对流式数据的基本抽象。它表示连续的数据流,这些连续的数据流可以是从数据源接收的输入数据流,也可以是通过对输入数据流执行转换操作而生成的经处理的数据流。它是一个RDD序列,每个RDD代表数据流中的一个时间片内的数据。如下图所示,DStream中的每个RDD都包含一定时间间隔内的数据。

SparkStreaming(三)离散流(DStream)

                            图2:DStream是一个持续的RDD序列

我们可以从外部输入源创建DStream,也可以对其他DStream应用进行转化操作得到新的DStream。DStream支持许多Spark的RDDD支持的转化操作。另外,DStream还有”有状态”的转化操作,可以用来聚合不同时间片内的数据。

         SparkStreaming(三)离散流(DStream)

                                     图3:案例二中DStream转化关系

注意:

    在本地运行Spark Streaming程序时,请勿使用“local”或“local [1]”作为主URL 这两种方法都意味着只有一个线程将用于本地运行任务。 如果您正在使用基于接收器的输入DStream(例如套接字,Kafka,Flume等),那么将使用单个线程来运行接收器,而不留下用于处理接收数据的线程。 因此,在本地运行时,始终使用“local [n]”作为主URL,其中n>要运行的接收器数量(有关如何设置主服务器的信息,请参阅Spark属性)。

将逻辑扩展到在集群上运行,分配给Spark Streaming应用程序的核心数必须大于接收器数。 否则系统将接收数据,但无法处理数据。

3.2、Basic Source:

我们已经在快速示例中查看了ssc.socketTextStream(...),它通过TCP套接字连接从文本数据创建DStream。 除了套接字之外,StreamingContext API还提供了从文件创建DStream作为输入源的方法。

3.3、File Streams:

对于从与HDFS API兼容的任何文件系统(即HDFS,S3,NFS)上的文件读取数据,可以通过StreamingContext.fileStream[KeyClass, ValueClass, InputFormatClass]创建DStream。

文件流不需要运行接收器,因此不需要分配任何内核来接受文件数据。

对于简单的文本文件,最简单的方法是StreamingContext.textFileStream(dataDirectory)

如何监控目录?

  1. Spark Streaming将监视目录dataDirectory并处理在该目录中创建的所有文件。
  2. 可以监视一个简单的目录,例如“hdfs// namenode8040 / logs /”。直接在这种路径下的所有文件将在发现时进行处理。
  3. 可以提供POSIX glob模式,例如“hdfs// namenode8040 / logs / 2017 / *”。这里,DStream将包含与模式匹配的目录中的所有文件。那就是:它是目录的模式,而不是目录中的文件。
  4. 所有文件必须采用相同的数据格式。
  5. 根据文件的修改时间而不是创建时间,文件被视为时间段的一部分。
  6. 处理完毕后,对当前窗口中文件的更改不会导致重新读取文件。即:忽略更新。
  7. 目录下的文件越多,扫描更改所需的时间就越长 - 即使没有修改过任何文件。
  8. 如果使用通配符来标识目录,例如“hdfs// namenode8040 / logs / 2016- *”,则重命名整个目录以匹配路径将把目录添加到受监视目录列表中。只有修改时间在当前窗口内的目录中的文件才会包含在流中。
  9. 调用FileSystem.setTimes()来修复时间戳是一种在稍后的窗口中拾取文件的方法,即使其内容未更改。

3.4、使用对象存储作为数据源

“完整”文件系统(如HDFS)会在创建输出流后立即在其文件上设置修改时间。打开文件时,即使在完全写入数据之前,它也可能包含在DStream - 之后将忽略对同一窗口中文件的更新。即:可能会遗漏更改,并从流中省略数据。

要保证在窗口中选择更改,请将文件写入不受监视的目录,然后在关闭输出流后立即将其重命名为目标目录。如果重命名的文件在其创建窗口期间出现在扫描的目标目录中,则将拾取新数据。

         相比之下,Amazon S3Azure Storage等对象存储通常具有较慢的重命名操作,因为实际上是复制了数据。此外,重命名的对象可能具有rename()操作的时间作为其修改时间,因此可能不被视为原始创建时间所暗示的窗口的一部分。

         需要对目标对象存储进行仔细测试,以验证存储的时间戳行为是否与Spark Streaming所期望的一致。可能是直接写入目标目录是通过所选对象库流式传输数据的适当策略。

3.5、基于自定义接收器的流

         可以使用通过自定义接收器接收的数据流创建DStream。

3.6、RDD作为流的队列

为了使用测试数据测试Spark Streaming应用程序,还可以使用streamingContext.queueStream(queueOfRDDs)基于RDD队列创建DStream。 推入队列的每个RDD将被视为DStream中的一批数据,并像流一样处理。