Spark Streaming + Kafka:如何从kafka消息检查主题名称
问题描述:
我使用Spark Streaming从Kafka主题列表中读取。 我正在关注此link的官方API。我正在使用的方法是:Spark Streaming + Kafka:如何从kafka消息检查主题名称
val kafkaParams = Map("metadata.broker.list" -> configuration.getKafkaBrokersList(), "auto.offset.reset" -> "largest")
val topics = Set(configuration.getKafkaInputTopic())
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topics)
我想知道执行器如何从主题列表中读取消息?他们的政策是什么?他们会阅读一个主题,然后当他们完成消息传递给其他主题?
最重要的是,我怎么能在调用这个方法后,检查RDD中的消息的主题是什么?
stream.foreachRDD(rdd => rdd.map(t => {
val key = t._1
val json = t._2
val topic = ???
})
答
我想知道如何将消息从主题的 列表读取执行?他们的政策是什么?他们会读取一个主题,然后当他们完成消息传递给其他主题?
在直接流式方法中,驱动程序负责读取要使用的卡夫卡主题的偏移量。它的作用是在主题,分区和需要阅读的偏移量之间创建一个映射。发生这种情况后,司机会将每个工人分配给范围以读入特定的Kafka主题。这意味着,如果一个工作人员可以同时运行2个任务(就这个例子而言,它通常可以运行更多),那么它可能会同时从两个不同的卡夫卡主题中读取。
如何在调用此方法后检查RDD中的消息 的主题是什么?
您可以使用createDirectStream
超载,这需要MessageHandler[K, V]
:
val topicsToPartitions: Map[TopicAndPartition, Long] = ???
val stream: DStream[(String, String)] =
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc,
kafkaParams,
topicsToPartitions,
mam: MessageAndMetadata[String, String]) => (mam.topic(), mam.message())
您可以使用地图像VAR记录= stream.map(记录=>(record.topic)) –
@ israel.zinc我认为'stream'中的元素是'Tuple2 [String,String]'。有没有参数或方法称为主题 – salvob