flume kafka sparkstreaming整合后集群报错org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/uti
简介
整个项目架构是在CDH中,flume采集数据到kafka,然后sparkstreaming消费(flume1.7版本,kafka0.10版本,spark 2.1版本)。本来local本地模式测试已经没有问题,但是部署到集群上就报错如下:
Exception in thread "streaming-start" java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/Collection;)V
at org.apache.spark.streaming.kafka010.Subscribe.onStart(ConsumerStrategy.scala:84)
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.consumer(DirectKafkaInputDStream.scala:75)
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.start(DirectKafkaInputDStream.scala:243)
at org.apache.spark.streaming.DStreamGraph$$anonfun$start$5.apply(DStreamGraph.scala:49)
at org.apache.spark.streaming.DStreamGraph$$anonfun$start$5.apply(DStreamGraph.scala:49)
at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach_quick(ParArray.scala:143)
at scala.collection.parallel.mutable.ParArray$ParArrayIterator.foreach(ParArray.scala:136)
at scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
at scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)
at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)
at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
17/08/15 20:09:30 ERROR yarn.ApplicationMaster: RECEIVED SIGNAL TERM
原因分析
其实这个在官方文档中有介绍。地址如下:
简单说,就是kafka集成spark2,需要在CDH中进行设置。官网介绍了2中方法。这里我采用了第二种,在CDH中进行修改配置的方法。步骤如下:
进入CDH的spark2配置界面,在搜索框中输入SPARK_KAFKA_VERSION,出现如下图,然后选择对应版本,这里我应该选择的是0.10,然后保存配置,重启生效。重新跑sparkstreaming任务,问题解决。