SparkStreaming&Kafka——Direct方式
SparkStreaming&Kafka——Direct方式
Github地址
https://github.com/holdbelief/spark/tree/master/SparkStreaming/SparkStreamingExamples/SparkStreaming_Kafka/Direct
整体架构
Reciever模式下,SparkStreaming需要启动一个ReceiverTask每隔BatchInterval时间从Kafka读取消息,消息偏移量由ZK管理,而Direct模式下,SparkStreaming和Kafka直接连接,SparkStreaming去Kafka去pull数据,这个消费偏移量由SparkStreaming自己来维护(实际上通过checkpoint来管理的,checkpoint和job是异步的,总的来时SparkStreaming的事务机制并不是很完善),避免了数据的丢失(相对而言,不是绝对的)
由于SparkStreaming与Kafka直连,所以没有ReceiverTask,不需要额外的线程执行Receiver任务,所以setMaster("local[1]"),的时候最小可以设置为1.
并行度问题:
1、linesDStram里面封装到的是RDD,RDD里面有partition与这个topic的parititon数是一致的。
2、从kafka中读来的数据封装一个DStram里面,可以对这个DStream重分区 reaprtitions(numpartition)
代码解析
package com.bjsxt.java.spark.streaming;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import kafka.serializer.StringDecoder;
import scala.Tuple2;
/**
* 并行度问题:
* 1、linesDStram里面封装到的是RDD, RDD里面有partition与这个topic的parititon数是一致的。
* 2、从kafka中读来的数据封装一个DStram里面,可以对这个DStream重分区 reaprtitions(numpartition)
* @author faith
*/
public class SparkStreamingOnKafkaDirected {
public static void main(String[] args) {
SparkConf conf = new SparkConf()
/*
* Receiver模式下,需要使用一(数量可以设置,例如N个)个线程执行ReceiverTask,所以
* setMaster("local[2]")至少要是2个(N+1个)线程。
* 而Direct模式下,由于SparkStreaming直接从Kafka中取数据,没有ReceiverTask,所以最少可以设置1个线程
* 当在Eclipse中运行的时候,去掉setMaster("local[1]")的注释,在yarn或者Standalone中运行的时候,注释掉
*/
// .setMaster("local[1]")
.setAppName("SparkStreamingOnKafkaDirected");
JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(5));
/*
* 第一个泛型是Kafka消息偏移量的数据类型
* 第二个泛型是Kafka消息体的数据类型
*/
Map<String, String> kafkaParameters = new HashMap<String, String>();
/*
* metadata.broker.list,Kafka Broker的列表
*/
kafkaParameters.put("metadata.broker.list", "faith-openSUSE:9092,faith-Kylin:9092,faith-Mint:9092");
HashSet<String> topics = new HashSet<String>();
topics.add("Topic1");
JavaPairInputDStream<String,String> lines = KafkaUtils.createDirectStream(jsc,
String.class, // Kafka消息偏移量的数据类型
String.class, // Kafka消息体的数据类型
StringDecoder.class, // Kafka消息偏移量的反序列化处理类
StringDecoder.class, // Kafka消息的反序列化处理类
kafkaParameters,
topics);
JavaPairDStream<String,String> lines_repartition = lines.repartition(10);
JavaDStream<String> words = lines_repartition.flatMap(new FlatMapFunction<Tuple2<String,String>, String>() {
private static final long serialVersionUID = 1L;
@Override
public Iterable<String> call(Tuple2<String, String> tuple) throws Exception {
return Arrays.asList(tuple._2.split(" "));
}
});
JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
private static final long serialVersionUID = 7498759112557640377L;
@Override
public Tuple2<String, Integer> call(String word) throws Exception {
return new Tuple2<String, Integer>(word, 1);
}
});
JavaPairDStream<String, Integer> wordsCount = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
private static final long serialVersionUID = -6334959299984344333L;
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
/*
* 将word_count的内容写入Mysql
*/
wordsCount.foreachRDD(new VoidFunction<JavaPairRDD<String, Integer>>() {
@Override
public void call(JavaPairRDD<String, Integer> pairRdd) throws Exception {
/*
* foreahPartition与foreach的不同是:
* foreach:每循环一次是一个数据
* foreachPartition:每循环一次是一个Partition,处理一个Partition里面的所有数据,
* 注意VoidFunction的参数是Iterator,也就是每隔Partition里面数据的迭代
* foreachPartition常常用于向数据库例如数据,当使用foreach时候,每次循环是一个数据,那么每个数据
* 就要创建一个数据链连接,foreachPartition是每一个Partition创建一个数据库链接。
*/
pairRdd.foreachPartition(new VoidFunction<Iterator<Tuple2<String,Integer>>>() {
@Override
public void call(Iterator<Tuple2<String, Integer>> vs) throws Exception {
JDBCWrapper jdbcWrapper = JDBCWrapper.getJDBCInstance();
List<Object[]> insertParams = new ArrayList<Object[]>();
while ( vs.hasNext() ) {
Tuple2<String, Integer> next = vs.next();
insertParams.add(new Object[] {next._1, next._2});
}
if ( !insertParams.isEmpty() ) {
System.out.println(insertParams);
jdbcWrapper.doBatch("INSERT INTO wordcount VALUES(?, ?)", insertParams);
}
}
});
}
});
jsc.start();
jsc.awaitTermination();
}
}
服务器规划
faith-Fedora
faith-Ubuntu
faith-openSUSE
faith-Kylin
faith-Mint
faith-Fedora2
zk
zk1
√
zk2
√
zk3
√
HDFS
NN1
√
NN2
√
JN1
√
JN2
√
JN3
√
DN1
√
DN2
√
DN3
√
Yarn
RS1
√
RS2
√
NM1
√
NM2
√
NM3
√
Spark
client
√
Kafka
Broker1
√
Broker2
√
Broker3
√
-
启动集群
启动ZK集群
分别在faith-openSUSE、faith-Kylin、faith-Mint上执行zkServer.sh start。
启动HDFS集群
在faith-Fedora节点上执行start-dfs.sh
启动Yarn集群
在faith-Kylin节点上执行start-yarn.sh
在faith-Mint节点上执行yarn-daemon.sh start resourcemanager
启动Kafka集群
在faith-openSUSE、faith-Kylin、faith-Mint节点上分别执行下面命令,启动Kafka集群:
-
运行测试
创建名字为Topic1的Topic
在Console启动Kafka的Producer
启动Producer,并输入一些字符串
如果是在Eclipse中执行,可以直接在控制台看到结果
-
在Yarn中执行
Client方式提交
Cluster方式提交
数据库展示
填坑
由于本例中使用的Spark版本是1.6.2,与之对应的Scala版本是2.10,在Maven的POM文件中加入SparkStreaming依赖的时候,scala的版本要为2.10,不能是2.11或其他。例如:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>1.6.2</version>
</dependency>
如果写成2.11,有可能造成一些问题,例如本例中,2.11的时候,当使用yarn-client或者yarn-Client模式下,无法获取BlockManager的问题,程序一直卡在这里,等待获取BlockManager,不继续执行。或者还有可能造成其他各种问题。
总之Spark的版本要和Scala的版本对应上。
参考文章
https://www.iteblog.com/archives/1326.html