Spark Streaming消费Kafka的数据进行统计
流处理平台:
这里是第四步的实现:
Spark Streaming整合Kafka采用的是Receiver-based,另一种方式Direct Approach,稍作修改就行。
package spark
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* Spark Streaming对接Kafka
*/
object KafkaStreamingApp {
def main(args: Array[String]): Unit = {
if(args.length != 4) {
System.err.println("Usage: KafkaStreamingApp <zkQuorum> <group> <topics> <numThreads>")
}
val Array(zkQuorum, group, topics, numThreads) = args
val sparkConf = new SparkConf().setAppName("KafkaReceiverWordCount")
.setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(5))
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
// Spark Streaming如何对接Kafka
val messages = KafkaUtils.createStream(ssc, zkQuorum, group,topicMap)
messages.map(_._2).count().print()
ssc.start()
ssc.awaitTermination()
}
}