SparkStreaming读取kafka数据的两种方式(receive与direct)对比

陈诉

大家都知道在spark1.3版本后,kafkautil里面提供了两个创建dstream的方法,一个是老版本中有的createStream方法,还有一个是后面新加的createDirectStream方法。总之,通过新方法创建出来的dstream的rdd partition和kafka的topic的partition是一一对应的,通过低阶API直接从kafka的topic消费消息,默认将偏移量保存在kafka内部。

对比

Receive

SparkStreaming读取kafka数据的两种方式(receive与direct)对比

过程:spark集群中的 worker节点中 exeutor线程里的 receiver接口会一直消费kafka中的数据,那么问题来了,假如我们定义5秒消费一次,如果spark集群定义了每个worker使用的cpu资源不足以消费完了这5秒的数据,那么就会出现数据的丢失,消费不了的那些数据就没了,并且streaming一经启动会一直循环消费拉取资源,如果出现上述问题,分配的cpu不足以消费5秒拉取的数据,那么丢失的数据便会越积越多

优点:Receive是使用的高级API,需要消费者连接Zookeeper来读取数据。是由Zookeeper来维护偏移量,不用我们来手动维护,这样的话就比较简单一些,减少了代码量。

缺点:
<1> 导致丢失数据。它是由Executor内的Receive来拉取数据并存放在内存中,再由Driver端提交的job来处理数据。这样的话,如果底层节点出现错误,就会发生数据丢失。
<2> 浪费资源。可以采取write ahead logs (AWL)方式将数据存到hdfs上面做备份,那么如果再发生错误,就可以从中再次读取数据。但是这样会导致同样的数据存储了两份,浪费了资源。
<3> 可能会导致重复读取数据。对于公司来说,一些数据宁可丢失了一小小部分也不能重复读取,但是这种由Zookeeper来记录偏移量的方式,可能会因为Spark和Zookeeper不同步,导致一份数据读取了两次。
<4> 效率低。因为是分批次执行的,它是接收数据,直到达到了设定的时间间隔,才进行计算。而且我们在KafkaUtils.createStream()中设定的partition数量,只会增加receive的数量,不能提高并行计算的效率,但我们可以设定不同的Group和Topic创建DStream,然后再用Union合并DStream,提高并行效率。

Direct

SparkStreaming读取kafka数据的两种方式(receive与direct)对比

过程:Direct方式就是使用executor直接连接kakfa节点,我们自定义偏移量的使用大小及存储备份方法。

优点:

<1> 高效性。当我们读取Topic下的数据时,它会自动对应Topic下的Partition生成相对应数量的RDD Partition,提高了计算时的并行度,提高了效率。
<2> 实现零数据丢失。当发生数据丢失,只要kafka上的数据进行了复制,就可以根据副本来进行数据重新拉取,不需要通过WAL来维持数据的完整性。
<3> 保证数据只被消费一次。因为我们可以将偏移量保存在一个地方,当我们读取数据时,可以从这里拿到数据的起始偏移量和读取偏移量确定读取范围,通过这些我们就可以读取数据,当读取完成后再更新偏移量,这就保证了数据只消费一次。

扩展

开发过程中SparkStreaming和kafka集成有两个版本:0.8及0.10+
0.8版本有Receiver和Direct模式(但是0.8版本生产环境问题较多,在Spark2.3之后不支持0.8版本了)
0.10以后只保留了direct模式(Reveiver模式不适合生产环境),并且API有变化(更加强大)

总之direct模式可以保证每次消费kafka数据都能从上一次保存的offset处开始消费数据

在接下来的博客中,我会向大家分别介绍三种kafka010采用Direct存储偏移量的方式。
1.存储于Mysql
2.存储于Zookeeper
3.存储于Redis