spark streaming与kafka整合

Spark 2.2和2.3支持0.8和0.10两种Kafka API,而且0.8版本的API可以和Kafka Broker 0.9 及0.10兼容,但0.10版本的API不能与Kakka 0.8兼容。两个版本Kafka API的功能对比如下:

spark streaming与kafka整合

由此可知,相对于0.8版本,Kafka API 0.10版本的主要变化在于:

1)只支持Direct DStream 的连接模式,基于Receiver的⽅式Kafka连接⽅式不再⽀持(但Spark Streaming Receiver仍然⽀持)。

2)支持Offset Commit API,因而不再需要完全通过手工编写代码的方式来进行 Offset 管理。

3)暂不支持 Python。

另外,CDH Spark2 中支持 Kafka 0.9 和 0.10 两个版本。在 CDH 中调用Kafka,需要在 CM手工指定 Kafka 的版本,如下图所示。

spark streaming与kafka整合

注:接下来的例子都是使用spark-kafka010的版本,且使用的java,如果要查看scala如何实现,请查看参考链接。

创建一个Direct Stream

import java.util.*;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.kafka010.*;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import scala.Tuple2;

Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:9092,anotherhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);

Collection<String> topics = Arrays.asList("topicA", "topicB");

final JavaInputDStream<ConsumerRecord<String, String>> stream =
  KafkaUtils.createDirectStream(
    streamingContext,
    LocationStrategies.PreferConsistent(),
    ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
  );

stream.transform(rdd -> extractRecordOffset(rdd))
        .flatMapToPair(msg -> prepropress(msg))
        .mapWithState(StateSpec.function(mapStateFunction)
                .timeout(Durations.minutes(60)))
        .stateSnapshots()
        .foreachRDD(rdd -> {
            writeKLinesToKudu(rdd);
            ((CanCommitOffsets)stream.inputDStream()).commitAsync(offsetRanges.get());
        });

对于创建一个Direct Stream需要传入三个参数:

  1. StreamingContext : spark streaming的上下文环境。
  2. LocationStrategies

新版的kafka consumer API需要提前将消息加载到buffers里。因此,出于性能原因,Spark集成在excutor上保存消费者缓存(而不是为每个批次重新创建它们),同时也可以更好的调度消费者所在主机上的分区。

在大多数情况下,我们应该使用LocationStrategies.PreferConsistent。这将把分区均匀的分配在可用的executor执行器之间。如果我们的执行程序与Kafka代理所在的主机相同,请使用PreferBrokers,这将优先会从kafka leader节点上去调度分区。最后,如果我们的分区之间的负载有明显偏差,这时候可以考虑使用PreferFixed。这允许指定分区到主机的显式映射(任何未指定的分区将使用一致的位置)。

总结一下:

LocationStrategies.PreferConsistent :均匀地将各个分区分配到各个执行器上面;

LocationStrategies. PreferBrokers:优先会从kafka leader节点上去调度分区;

LocationStrategies. PreferFixed:允许指定分区到主机的显式映射;

消费者的缓存的默认最大大小为64,如果您希望处理超过(64 *个执行程序数)Kafka分区,则可以通过以下方式更改此设置:spark.streaming.kafka.consumer.cache.maxCapacity

缓存由topic partition和group.id键入,因此对每个调用使用一个单独 group.id的createDirectStream。

ConsumerStrategies

新的Kafka consumer API有许多不同的方式来指定topics,其中一些需要相当多的后对象实例化设置。 ConsumerStrategies提供了一种抽象,允许Spark即使在从检查点重新启动后也能获得正确配置的消费者。

ConsumerStrategies.Subscribe,如上所示,允许我们订阅固定的topics集合。SubscribePattern允许您使用正则表达式来指定感兴趣的topics。注意,与0.8集成不同,在运行期间使用Subscribe或SubscribePattern应该响应添加分区。最后,Assign允许我们指定固定的分区集合。所有三个策略都有重载的构造函数,允许我们指定特定分区的起始偏移量。

如果您具有上述选项不满足的特定用户设置需求,则ConsumerStrategy是可以扩展的公共类。

新建一个Topic集合

Collection<String> topics = Arrays.asList({“test1”,“test2”});

注:extractRecordOffset()见下面,prepropress()数据预处理函数,mapStateFunction()状态处理函数(见spark streaming状态处理)。writeKLinesToKudu()(写kudu函数)将结果写入kudu。

创建RDD

如果我们有一个更适合批处理的用例,则可以为定义的偏移量范围创建RDD。

OffsetRange[] offsetRanges = {
  // topic, partition, inclusive starting offset, exclusive ending offset
  OffsetRange.create("test", 0, 0, 100),
  OffsetRange.create("test", 1, 0, 100)
};

JavaRDD<ConsumerRecord<String, String>> rdd = KafkaUtils.createRDD(
  sparkContext,
  kafkaParams,
  offsetRanges,
  LocationStrategies.PreferConsistent()
);

获取Offsets

public static JavaRDD<ConsumerRecord<String,String>> extractRecordOffset(
        JavaRDD<ConsumerRecord<String,String>> rdd) {
    OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
    offsetRanges.set(offsets);
    rdd.foreachPartition(messages -> {
        TaskContext ctx=TaskContext.get();
        String execId=SparkEnv.get().executorId();

        OffsetRange o = offsets[ctx.partitionId()];
        logger.info("Start to process offset range of {}/{} at Executor {}: {} - {}",
                o.topic(),o.partition(),execId,o.fromOffset(),o.untilOffset());
    });
    return rdd;
}

Kafka有一个偏移提交API,将偏移存储在特殊的Kafka的topic中。默认情况下,新消费者将定期自动提交偏移量。有时候我们并不像让kafka定期提交存储偏移量,因为会导致有可能消费者已经轮询了那条消息,但是spark streaming并没有成功的输出那些消息,这就会导致如果重启,会丢失那些消息。这就是为什么上面的流示例将“enable.auto.commit”设置为false的原因。但是,我们可以在使用commitAsyncAPI 存储了输出后,向Kafka提交偏移量。

((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges);

 

参考:

http://spark.apache.org/docs/2.1.0/streaming-kafka-0-10-integration.html