Spark的Direct方式接收kafka消息实现WordCount

1.yarn集群开启

2.启动zookeeper集群(kafka需要)

3.启动kafka服务端、生产者和消费者端(生产者模拟往kafka灌入数据,消费者端打印数据)

3.1启动kafka服务端

Spark的Direct方式接收kafka消息实现WordCount

3.2启动kafka生产者

Spark的Direct方式接收kafka消息实现WordCount

3.3启动kafka消费者

Spark的Direct方式接收kafka消息实现WordCount

4.spark官方Demo改吧改吧

找到你的spark安装目录-->spark-2.0.2-bin-hadoop2.6/examples/src/main/scala/org/apache/spark/examples/streaming

Spark的Direct方式接收kafka消息实现WordCount

代码如下:

import kafka.serializer.StringDecoder

import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf

/**
  * Consumes messages from one or more topics in Kafka and does wordcount.
  * Usage: DirectKafkaWordCount <brokers> <topics>
  *   <brokers> is a list of one or more Kafka brokers
  *   <topics> is a list of one or more kafka topics to consume from
  *
  * Example:
  *    $ bin/run-example streaming.DirectKafkaWordCount broker1-host:port,broker2-host:port \
  *    topic1,topic2
  */
object DirectKafkaWordCount {
  def main(args: Array[String]): Unit = {
//    if (args.length < 2) {
//      System.err.println(s"""
//                            |Usage: DirectKafkaWordCount <brokers> <topics>
//                            |  <brokers> is a list of one or more Kafka brokers
//                            |  <topics> is a list of one or more kafka topics to consume from
//                            |
//        """.stripMargin)
//      System.exit(1)
//    }

    val Array(brokers, topics) = args

    // Create context with 2 second batch interval
    val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(2))

    // Create direct kafka stream with brokers and topics
    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topicsSet)

    // Get the lines, split them into words, count the words and print
    val lines = messages.map(_._2)
//    val words = lines.flatMap(_.split(" "))
    val wordCounts = lines.map(x => (x, 1L)).reduceByKey(_ + _)
    wordCounts.print()

    // Start the computation
    ssc.start()
    ssc.awaitTermination()
  }
  }

5.打包上传到集群,编写启动脚本如下,并运行

/usr/local/src/spark-2.0.2-bin-hadoop2.6/bin/spark-submit\
        --class streaming.DirectKafkaWordCount\
        --master yarn-cluster \
        --executor-memory 1G \
        --total-executor-cores 1 \
        --files $HIVE_HOME/conf/hive-site.xml \
        ./sparkTest-1.0-SNAPSHOT-jar-with-dependencies.jar\
        192.168.2.11:9092 topic_name

6.在yarn集群上查看启动的Application

Spark的Direct方式接收kafka消息实现WordCount

7.在kafka的生产者端输入数据,可以在kafka的消费端看到打印的数据,然后再在yarn集群查看日志可以看到计算结果。

Spark的Direct方式接收kafka消息实现WordCount