Spark Streaming + Kafka:如何从kafka消息检查主题名称

问题描述:

我使用Spark Streaming从Kafka主题列表中读取。 我正在关注此link的官方API。我正在使用的方法是:Spark Streaming + Kafka:如何从kafka消息检查主题名称

val kafkaParams = Map("metadata.broker.list" -> configuration.getKafkaBrokersList(), "auto.offset.reset" -> "largest") 
val topics = Set(configuration.getKafkaInputTopic()) 
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topics) 

我想知道执行器如何从主题列表中读取消息?他们的政策是什么?他们会阅读一个主题,然后当他们完成消息传递给其他主题?

最重要的是,我怎么能在调用这个方法后,检查RDD中的消息的主题是什么?

stream.foreachRDD(rdd => rdd.map(t => { 
     val key = t._1 
     val json = t._2 
     val topic = ??? 
}) 
+0

您可以使用地图像VAR记录= stream.map(记录=>(record.topic)) –

+0

@ israel.zinc我认为'stream'中的元素是'Tuple2 [String,String]'。有没有参数或方法称为主题 – salvob

我想知道如何将消息从主题的 列表读取执行?他们的政策是什么?他们会读取一个主题,然后当他们完成消息传递给其他主题?

在直接流式方法中,驱动程序负责读取要使用的卡夫卡主题的偏移量。它的作用是在主题,分区和需要阅读的偏移量之间创建一个映射。发生这种情况后,司机会将每个工人分配给范围以读入特定的Kafka主题。这意味着,如果一个工作人员可以同时运行2个任务(就这个例子而言,它通常可以运行更多),那么它可能会同时从两个不同的卡夫卡主题中读取。

如何在调用此方法后检查RDD中的消息 的主题是什么?

您可以使用createDirectStream超载,这需要MessageHandler[K, V]

val topicsToPartitions: Map[TopicAndPartition, Long] = ??? 

val stream: DStream[(String, String)] = 
    KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
     ssc, 
     kafkaParams, 
     topicsToPartitions, 
     mam: MessageAndMetadata[String, String]) => (mam.topic(), mam.message()) 
+0

谢谢@Yuval,但仍然。如何在阅读卡夫卡时访问消息和主题。 'messageHandler'作为'createDirectStream'的参数,它看起来不能。 – salvob

+0

@salvob我的代码片段正是如此。输出将是一个'DStream [(String,String)]',其中第一个是主题名称。 –

+0

您的代码定义了一个可能包含每条记录的消息和主题的流。 但是,当我尝试打印元组的内容时(在我的问题中带有'println(key + topic + message)'的代码片段),没有任何事情发生,方法'rdd.count()'返回消息的数量正确虽然 – salvob