Spark 流数据处理简介

一、Spark 基础知识

1.1 Spark 简介

       Spark是专为大规模数据处理而设计的快速通用的计算引擎,可用它来完成各种各样的运算,包括 SQL 查询、文本处理、机器学习等。

1.2 核心概念介绍

Spark常用术语

Spark 流数据处理简介

Spark编程模型

Spark应用程序可分两部分:Driver部分和Executor部分

Spark 流数据处理简介

Driver部分主要是对SparkContext进行配置、初始化以及关闭。初始化SparkContext是为了构建Spark应用程序的运行环境,在初始化SparkContext,要先导入一些Spark的类和隐式转换;在Executor部分运行完毕后,需要将SparkContext关闭。

Executor部分负责对Task 的执行运算。

共享变量

       在Spark运行时,一个函数传递给RDD内的patition操作时,该函数所用到的变量在每个运算节点上都复制并维护了一份,并且各个节点之间不会相互影响。但是在Spark Application中,可能需要共享一些变量,提供Task或驱动程序使用。Spark提供了两种共享变量:

广播变量(Broadcast Variables):可以缓存到各个节点的共享变量,通常为只读

  • 广播变量缓存到各个节点的内存中,而不是每个 Task
  • 广播变量被创建后,能在集群中运行的任何函数调用
  • 广播变量是只读的,不能在被广播后修改
  • 对于大数据集的广播, Spark 尝试使用高效的广播算法来降低通信成本

累计器:只支持加法操作的变量,可以实现计数器和变量求和。用户可以调用SparkContext.accumulator(v)创建一个初始值为v的累加器,而运行在集群上的Task可以使用“+=”操作,但这些任务却不能读取;只有驱动程序才能获取累加器的值。

RDD

      Resilient Distributed Datasets,弹性分布式数据集,是分布式内存的一个抽象概念,可以被抽象地理解为一个大的数组(Array object),但是这个数组是分布在集群上的。RDD是Spark的核心数据结构,通过RDD的依赖关系形成Spark的调度顺序。通过对RDD的操作形成整个Spark程序。

RDD主要属性:

  • 一组分片(partition),即数据集的基本组成单位
  • 一个计算每个分片的函数
  • 对parent RDD的依赖,这个依赖描述了RDD之间的 lineage
  • 对于key-value的RDD,一个Partitioner,这是可选择的
  • 一个列表,存储存取每个partition的preferred位置
     

RDD可以有两种计算操作算子:Transformation(变换)与Action(行动)

1)Transformation(变换)

    Transformation操作是延迟计算的,也就是说从一个RDD转换生成另一个RDD的转换操作不是马上执行,需要等到有Actions操作时,才真正触发运算。

 2)Action(行动)

      Action算子会触发Spark提交作业(Job),并将数据输出到Spark系统。

RDD运行原理

Spark 流数据处理简介

RDD在Spark架构中运行,主要分为三步:

1、Spark 应用程序进行各种转换(transformation)操作,通过行动(action)操作触发作业运行,提交之后,根据RDD 之间的依赖关系构建DAG图;

2、DAGScheduler 把DAG 拆分成相互依赖的调度阶段(stage),拆分调度阶段是以 RDD 的依赖关系(宽/窄 依赖)作为依据。每个调度阶段包含一个或者多个任务,这些任务形成任务集(TaskSet),提交给底层的TaskScheduler 进行调度执行。DAGScheduler 监控整个运行调度过程,如果出现失败,则重新提交调度阶段

3、TaskScheduler 接受发送过来的 任务集(TaskSet),然后以任务的形式分发给集群中的worker 节点中的Execuor中去执行。

1.3 Spark 2.x时代

Spark 流数据处理简介

Spark 1.x 时代里,以 SparkContext(及 RDD API)为基础,在 structured data 场景衍生出了 SQLContext, HiveContext,在 streaming 场景衍生出了 StreamingContext,很是琳琅满目。

Spark 流数据处理简介

Spark 2.x 则咔咔咔精简到只保留一个 SparkSession 作为主程序入口,以 Dataset/DataFrame 为主要的用户 API,同时满足 structured data, streaming data, machine learning, graph 等应用场景,大大减少使用者需要学习的内容,爽爽地又重新实现了一把当年的 "one stack to rule them all" 的理想。

Spark 流数据处理简介

Spark 2.x 的 Dataset/DataFrame 与 Spark 1.x 的 RDD 的不同:

  • Spark 1.x 的 RDD 更多意义上是一个一维、只有行概念的数据集,比如 RDD[Person],那么一行就是一个 Person,存在内存里也是把 Person 作为一个整体(序列化前的 java object,或序列化后的 bytes)。
  • Spark 2.x 里,一个 Person 的 Dataset 或 DataFrame,是二维行+列的数据集,比如一行一个 Person,有 name:Stringage:Intheight:Double 三列;在内存里的物理结构,也会显式区分列边界。
  • Dataset/DataFrame 存储方式无区别:两者在内存中的存储方式是完全一样的、是按照二维行列(UnsafeRow)来存的,所以在没必要区分 Dataset 或 DataFrame 在 API 层面的差别时,我们统一写作 Dataset/DataFrame

⚠️ 其实 Spark 1.x 就有了 DataFrame 的概念,但还仅是 SparkSQL 模块的主要 API ;到了 2.0 时则 Dataset/DataFrame 不局限在 SparkSQL、而成为 Spark 全局的主要 API。

⚠️ DataFrame也可以叫Dataset[Row],每一行的类型是Row。

 

二、Spark 流式计算编程模型

2.1、Spark Streaming简介

       Spark Streaming在内部的处理机制是,接收实时流的数据,并根据一定的时间间隔拆分成一批批的数据,然后通过Spark Engine处理这些批数据,最终得到处理后的一批批结果数据。对应批数据,在Spark内核对应一个RDD实例,因此,对应流数据的DStream可以看成是一组RDDs,即RDD的一个序列。通俗点理解的话,在流数据分成一批一批后,通过一个先进先出的队列,然后 Spark Engine从该队列中依次取出一个个批数据,把批数据封装成一个RDD,然后进行处理。

术语定义

离散流(discretized stream)或DStream:这是Spark Streaming对内部持续的实时数据流的抽象描述,即我们处理的一个实时数据流,在Spark Streaming中对应于一个DStream 实例。

批数据(batch data):这是化整为零的第一步,将实时流数据以时间片为单位进行分批,将流处理转化为时间片数据的批处理。随着持续时间的推移,这些处理结果就形成了对应的结果数据流了。

时间片或批处理时间间隔( batch interval):这是人为地对流数据进行定量的标准,以时间片作为我们拆分流数据的依据。一个时间片的数据对应一个RDD实例。

窗口长度(window length):一个窗口覆盖的流数据的时间长度。必须是批处理时间间隔的倍数,

滑动时间间隔:前一个窗口到后一个窗口所经过的时间长度。必须是批处理时间间隔的倍数

Input DStream :一个input DStream是一个特殊的DStream,将Spark Streaming连接到一个外部数据源来读取数据。

 

 计算流程

        Spark Streaming是将流式计算分解成一系列短小的批处理作业。把Spark Streaming的输入数据按照batch size(如1秒)分成一段一段的数据(Discretized Stream),每一段数据都转换成Spark中的RDD,然后将Spark Streaming中对DStream的Transformation操作变为针对Spark中对RDD的Transformation操作,将RDD经过操作变成中间结果保存在内存中。整个流式计算根据业务的需求可以对中间的结果进行叠加,或者存储到外部设备。

Spark 流数据处理简介     Spark 流数据处理简介

DStream 是 RDD 的模板,而且 DStream 和 RDD 具有相同的 transformation 操作,比如 map(), filter(), reduce() ……等等(正是这些相同的 transformation 使得 DStreamGraph 能够忠实记录 RDD DAG 的计算逻辑),DStream 维护了对每个产出的 RDD 实例的引用,DStream 去掉 batch 维度就是 RDD。

在 Spark Streaming 程序的入口,我们都会定义一个 batchDuration,就是需要每隔多长时间就比照静态的 DStreamGraph 来动态生成一个 RDD DAG 实例。在 Spark Streaming 里,总体负责动态作业调度的具体类是 JobScheduler,在 Spark Streaming 程序开始运行的时候,会生成一个 JobScheduler 的实例,并被 start() 运行起来。

2.2、Structured Streaming

import org.apache.spark._
import org.apache.spark.streaming._
  
// 首先配置一下本 word example 将跑在本机,app name 是 NetworkWordCount
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
// batchDuration 设置为 1 秒,然后创建一个 streaming 入口
val ssc = new StreamingContext(conf, Seconds(1))
  
// ssc.socketTextStream() 将创建一个 SocketInputDStream;这个 InputDStream 的 SocketReceiver 将监听本机 9999 端口
val lines = ssc.socketTextStream("localhost", 9999)
  
val words = lines.flatMap(_.split(" "))      // DStream transformation
val pairs = words.map(word => (word, 1))     // DStream transformation
val wordCounts = pairs.reduceByKey(_ + _)    // DStream transformation
wordCounts.print()                           // DStream output
// 上面 4 行利用 DStream transformation 构造出了 lines -> words -> pairs -> wordCounts -> .print() 这样一个 DStreamGraph
// 但注意,到目前是定义好了产生数据的 SocketReceiver,以及一个 DStreamGraph,这些都是静态的
  
// 下面这行 start() 将在幕后启动 JobScheduler, 进而启动 JobGenerator 和 ReceiverTracker
// ssc.start()
//    -> JobScheduler.start()
//        -> JobGenerator.start();    开始不断生成一个一个 batch
//        -> ReceiverTracker.start(); 开始往 executor 上分布 ReceiverSupervisor 了,也会进一步创建和启动 Receiver
ssc.start()
  
// 然后用户 code 主线程就 block 在下面这行代码了
// block 的后果就是,后台的 JobScheduler 线程周而复始的产生一个一个 batch 而不停息
// 也就是在这里,我们前面静态定义的 DStreamGraph 的 print(),才一次一次被在 RDD 实例上调用,一次一次打印出当前 batch 的结果
ssc.awaitTermination()

       Spark2.0新增了Structured Streaming,它是基于SparkSQL构建的可扩展和容错的流式数据处理引擎,使得实时流式数据计算可以和离线计算采用相同的处理方式(DataFrame&SQL)。Structured Streaming顾名思义,它将数据源和计算结果都映射成一张”结构化”的表,在计算的时候以结构化的方式去操作数据流,大大方便和提高了数据开发的效率。

Structured Streaming将数据源和计算结果都看做是无限大的表,数据源中每个批次的数据,经过计算,都添加到结果表中作为行。

Spark 流数据处理简介

 

在每一个周期时,新的内容将会增加到表尾,查询的结果将会更新到结果表中。一旦结果表被更新,就需要将改变后的表内容输出到外部的sink中。

structured streaming支持三种输出模式:

  • Complete mode: 整个更新的结果表都会被输出。
  • Append mode: 只有新增加到结果表的数据会被输出。
  • Updated mode: 只有被更新的结果表会输出。


word count example

Spark 流数据处理简介

 

处理 Late Data (迟到数据)和 Watermarking (水印)

Spark 流数据处理简介

maximum event time tracked (引擎跟踪的最大事件时间)是 蓝色虚线,watermark 设置为 (max event time - '10 mins') 在每个触发的开始处是红线。例如,当引擎观察数据 (12:14, dog) 时,它为下一个触发器设置 watermark 为 12:04 。该 watermark 允许 engine 保持 intermediate state (中间状态)另外 10 分钟以允许延迟 late data to be counted (要计数的数据)。例如,数据 (12:09, cat) 是 out of order and late (不正常的,而且延迟了),它落在了 windows 12:05 - 12:15 和 12:10 - 12:20 。因为它仍然在 watermark 12:04 之前的触发器,引擎仍然将 intermediate counts (中间计数)保持为状态并正确 updates the counts of the related windows (更新相关窗口的计数)。然而,当 watermark 更新为 12:11 时,window (12:00 - 12:10) 的中间状态被清除,所有 subsequent data (后续数据)(例如 (12:04, donkey) )被认为是 “too late” ,因此被忽视。

三、流式计算+Kafka 编程实例

3.1 Spark Streaming 读取kafka 数据的两种方式

Spark 流数据处理简介       Spark 流数据处理简介

 

基于Receiver的方式:这种方式利用接收器(Receiver)来接收kafka中的数据,其最基本是使用Kafka高阶用户API接口。对于所有的接收器,从kafka接收来的数据会存储在spark的executor中,之后spark streaming提交的job会处理这些数据

直接读取方式(Direct):在spark1.3之后,引入了Direct方式。不同于Receiver的方式,Direct方式没有receiver这一层,其会周期性的获取Kafka中每个topic的每个partition中的最新offsets,之后根据设定的maxRatePerPartition来处理每个batch

两种方式的对比:

  • 简化的并行:在Receiver的方式中创建多个Receiver之后利用union来合并成一个Dstream的方式提高数据传输并行度。而在Direct方式中,Kafka中的partition与RDD中的partition是一一对应的并行读取Kafka数据,这种映射关系也更利于理解和优化。
  • 高效:在Receiver的方式中,为了达到0数据丢失需要将数据存入Write Ahead Log中,这样在Kafka和日志中就保存了两份数据,浪费!而Direct方式不存在这个问题,只要我们Kafka的数据保留时间足够长,我们都能够从Kafka进行数据恢复。
  • offset 管理:在Receiver的方式中,使用的是Kafka的高阶API接口从Zookeeper中获取offset值,这也是传统的从Kafka中读取数据的方式,但由于Spark Streaming消费的数据可能和Zookeeper中记录的offset不同步,这种方式偶尔会造成数据重复消费。而Direct方式,直接使用了简单的低阶Kafka API,所消费的 offset 需要自己记录下来,记录offset 的方式 一般是checkpoint 保留 相关的meta信息或者写回zookeeper。

3.2 Spark Streaming 读取kafka 实例

本例使用Direct 的方式 消费kafka 数据,执行简单的sql 处理逻辑,将计算结果回写到文件中。(py spark 实现)

##################################################################################
1、定义运行参数

# streaming 执行间隔
duration = 30
# zk 连接地址
zk_host = "10.0.*.*:2181"
# kafka broker 节点
brokers = "10.0.*.*:9091"
# 待消费topic 主题
topic = "topic.name"
kafkaParams = {"metadata.broker.list": brokers}

# 消费 客户端 组id
group_id = "kafkaToSparkStreamingDemo"

# 控制消费速率,每秒从每个partition 取的message条数
maxRatePerSecFromPartition = 100


##################################################################################

2、初始化spark 入口

# 初始化spark入口
conf = SparkConf().setAppName("kafkaStreamingTest")\
      .set("spark.streaming.kafka.maxRatePerPartition",maxRatePerSecFromPartition)

sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, duration)

# 初始化 sql 上下文
sqlContext = SQLContext(sc)


##################################################################################

3、计算获取kafka 的起始 offset(消费起点)

# 获取kafka上可用的 offset 范围
consumer = KafkaConsumer(topic,bootstrap_servers=brokers,group_id=group_id)
partitions = consumer.partitions_for_topic(topic)
topicPartitions = list()
for partition in partitions:
    topicPartitions.append(TopicPartition(topic,partition))

# 可用的offset 最小值
earliest_offsets = consumer.beginning_offsets(topicPartitions)
# 可用的offset 最小值
last_offset = consumer.end_offsets(topicPartitions)

# 获取zk 上存储的 offset 范围
offset_ranges = kafkaInfo.get_offset_ranges(group_id, topic)

fromOffsets = dict()
for partition in offset_ranges:
    from_offset = long(offset_ranges[partition])

    # 如果zk 上存储的开始值 比 kakfa 可用的最大值还大(尾部越界 一般是因为删除并重建过 kakfa topic,zk上的记录未更新)
 # 将消费的开始 点修正为 kafka 可用的最大值
 if from_offset > last_offset[TopicPartition(topic=topic, partition=partition)]:
        from_offset = last_offset[TopicPartition(topic=topic, partition=partition)]

    # 如果zk 上存储的开始值 比 kakfa 可用的最小值还小(头部越界 消息过期,太久没消费,kafka已经清理了相关的message)
 # 将消费的开始 点修正为 kafka 可用的最小值
 if from_offset < earliest_offsets[TopicPartition(topic=topic, partition=partition)]:
        from_offset = earliest_offsets[TopicPartition(topic=topic, partition=partition)]

    fromOffsets[TopicAndPartition(topic,partition)] = long(from_offset)

##################################################################################

4、使用Direct开始消费 kafka 数据

kafka_streams = KafkaUtils.createDirectStream(ssc, [topic], \
                        kafkaParams=kafkaParams, fromOffsets=fromOffsets)

##################################################################################
5、对 获取的kafka Dstream 开始业务处理流程

#定义数据处理链
process_chinnel(kafka_streams)
def process_sql(format_streams):
    def process(time, rdd):
        if not rdd.isEmpty():
            try:
                # 得到spark 实例
  spark = getSparkSessionInstance(rdd.context.getConf())
                sqlContext = SQLContext(spark)

                # 创建DF,注册临时表
  df = spark.createDataFrame(rdd)
                df.createOrReplaceTempView("ids_tmp_table")

                # 执行sql 业务
  sql_str = """
 SELECT count(1) as conn_count,max(src_port) as max_sport,
  min(src_port) as min_sport,max(dest_port) as max_dport,
  min(dest_port) as min_dport,collect_set(flow_id) as flow_ids_list ,
   min(timestamp) as start_time,max(timestamp) as end_time,
 src_ip,dest_ip,app_proto,event_type
 FROM ids_tmp_table
 GROUP BY src_ip,dest_ip,app_proto,event_type
 """
  result_rdd  = sqlContext.sql(sql_str).toJSON()

                # 处理计算后的结果,本例保存到文件中
  temp_file_path = "/tmp/%s" % str(uuid.uuid1())
                def insert_all_line(lines):
                    for line in lines:
                        with open(temp_file_path,'aw+') as f:
                            f.write(line)
                            f.write('\n')
                result_rdd.foreachPartition(insert_all_line)

            except:
                traceback.print_exc()

    format_streams.foreachRDD(process)

##################################################################################
6、启动程序,等待程序运行
    ssc.start()
    ssc.awaitTermination()

3.3  Structured Streaming 读取kafka 实例

目的:使用 structured streaming 读取kafka 数据,进行一些聚合操作,将结果写入到 mysql 中。

//定义配置参数
val kafkaServerList = "10.0.*.*:9091"
val zkList = "10.0.*.*:2181"
val topicSet = "test.topicname"
val consumerGroup = "StructuredStreamin"
// 定义json 的数据结构
val schema: StructType = StructType(
  Seq(StructField("ts", DoubleType,true),
 StructField("A", IntegerType,true),
 StructField("smac", StringType,true),
 StructField("dmac",StringType,true),
 StructField("sip", StringType,true),
 StructField("sport",IntegerType,true),
 StructField("dip", StringType,true),
 StructField("dport",IntegerType,true),
 StructField("host", StringType,true),
 StructField("protocol",StringType,true),
 StructField("worker", StringType,true),
 StructField("is_alive",BooleanType,true),
 StructField("sensor_ip",StringType,true),
 StructField("sensor_id", StringType,true)
)
)
//初始化spark 运行上下文
val spark = SparkSession
  .builder
 .appName("StructuredStreamingExample")
  .getOrCreate
 
import spark.implicits._
 
 
// 设定kafka 消费属性
val lines = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", kafkaServerList)
  .option("zookeeper.connect", zkList)
  .option("startingOffsets", "latest")
  .option("max.poll.records", 10)
  .option("subscribe", topicSet)
  .option("group.id", consumerGroup)
  .load()
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING) AS json")
  .select(from_json($"json".cast(StringType),schema).as("data"))
 
lines.select("data.*").createOrReplaceTempView("asset_tmp_tab")
 
//执行sql 数据聚合
val sqlStr =
  """
 | SELECT sip,sport,dip,dport,protocol,
 | CAST(from_unixtime(ts DIV 1000000) as timestamp) as time
 | FROM asset_tmp_tab
 
 """.stripMargin
 
 
val filterData = spark.sql(sqlStr)
 
// 使用Watermarking 聚合 5分钟内的数据
val windowedCounts = filterData
  .withWatermark("time","5 minutes")
  .groupBy(window($"time", "1 minutes", "1 minutes"),$"sip",$"dip")
  .count()
  .select($"sip",$"dip",$"window.start",$"window.end",$"count")
 
//配置query 输出
val writer = new JDBCSink()
val query = windowedCounts
  .writeStream
  .foreach(writer)
  .outputMode("update")
  .option("checkpointLocation","/checkpoint/")
  .start()
 
query.awaitTermination()
spark.stop()
 
 
//自定义jdbc输出方式
class JDBCSink() extends ForeachWriter[Row]{
  val driver = "com.mysql.jdbc.Driver"
    var connection:Connection = _
  var statement:Statement = _
 
  def open(partitionId: Long,version: Long): Boolean = {
    Class.forName(driver)
    connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/spark_test", "root", "[email protected]#d")
    statement = connection.createStatement
    true
     }
  def process(value: Row): Unit = {
    statement.executeUpdate("replace into structured_test(sip,dip,start_time,end_time,count) values("
     + "'" + value.getString(0) + "'" + ","//sip
     + "'" + value.getString(1) + "'" + ","//dip
     + "'" + value.getTimestamp(2) + "'" + "," //start_time
     + "'" + value.getTimestamp(3) + "'" + "," //end_time
     + value.getLong(4) //count
     + ")")
  }
 
  def close(errorOrNull: Throwable): Unit = {
    connection.close
  }
}

参考学习资料:

    spark 入门实战:http://www.cnblogs.com/shishanyuan/p/4699644.html

    深入理解spark之架构与原理:https://juejin.im/post/5a73c8386fb9a0635e3cafaa?utm_source=gold_browser_extension

    Spark 资源集合:https://github.com/lw-lin/CoolplaySpark/tree/master/Spark%20%E8%B5%84%E6%BA%90%E9%9B%86%E5%90%88

    Spark性能优化指南——基础篇:https://tech.meituan.com/spark-tuning-basic.html 

    Spark性能优化指南——高级篇 https://tech.meituan.com/spark-tuning-pro.html