flume整合streaming
这是flume和streaming的两中通信方式
1.push:flume将数据推到streaming,这种方式的缺点如果flume推送过来的数据spark接收不了,那么就会导致数据的丢失
配置文件flume-push.sh
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# source
a1.sources.r1.type = spooldir
#监听的目录文件
a1.sources.r1.spoolDir = /home/hadoop/log
a1.sources.r1.fileHeader = true
# Describe the sink
a1.sinks.k1.type = avro
#这是接收方 ,streaming在集群中跑,所以这个ip是写在集群中的。
a1.sinks.k1.hostname = 10.8.160.30
a1.sinks.k1.port = 8888
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
java代码
首先添加pom文件
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-flume_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
public class FlumePushWordCount {
public static void main(String[] args) throws InterruptedException {
SparkConf conf = new SparkConf().setAppName("flume").setMaster("local[2]");
JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(5000));
//推送方式:flume向spark发送数据 这个ip是window上的ip地址。
JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(ssc, "10.8.160.30", 8888);
JavaDStream<String> stringJavaDStream = flumeStream.flatMap(new FlatMapFunction<SparkFlumeEvent, String>() {
@Override
public Iterator<String> call(SparkFlumeEvent sparkFlumeEvent) throws Exception {
//flume中的数据通过event.getBody.array来拿到数据
String line =new String( sparkFlumeEvent.event().getBody().array());
String[] split = line.split(" ");
List<String> list = new ArrayList<>();
for (String ss : split) {
list.add(ss);
}
return list.iterator();
}
});
JavaPairDStream<String, Integer> stringIntegerJavaPairDStream = stringJavaDStream.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
Tuple2<String, Integer> tuple2 = new Tuple2<>(s, 1);
return tuple2;
}
});
JavaPairDStream<String, Integer> stringIntegerJavaPairDStream1 = stringIntegerJavaPairDStream.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
});
stringIntegerJavaPairDStream1.print();
ssc.start();
ssc.awaitTermination();
}
}
用如下的命令在flume节点上启动flume机器。
[[email protected] /usr/local/flume]$bin/flume-ng agent -n a1 -c conf -f conf/myconf/flume-push-streaming.conf
2.streaming通过poll的方式从flume中拉去数据,这个是常用的。
使用之前需要下载下面三个jar包,然后丢到flume/lib下面
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /export/data/flume
a1.sources.r1.fileHeader = true
# Describe the sink
a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
//这值得是flume所在节点的主机名称。
a1.sinks.k1.hostname = mini4
a1.sinks.k1.port = 8888
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
/**
* streaming从flume中去拉去数据
*/
public class FlumePollWCApp {
public static void main(String[] args) throws InterruptedException {
SparkConf conf = new SparkConf().setAppName("wc").setMaster("local[2]");
JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(5000));
//这里的ip地址是flume的ip地址
JavaReceiverInputDStream<SparkFlumeEvent> pollingStream = FlumeUtils.createPollingStream(
ssc, "mini4", 8888, StorageLevel.MEMORY_AND_DISK());
JavaDStream<String> stringJavaDStream = pollingStream.flatMap(new FlatMapFunction<SparkFlumeEvent, String>() {
@Override
public Iterator<String> call(SparkFlumeEvent sparkFlumeEvent) throws Exception {
String s = new String(sparkFlumeEvent.event().getBody().array());
String[] arr = s.split(" ");
List<String> list = new ArrayList<String>();
for (String ss : arr) {
list.add(ss);
}
return list.iterator();
}
});
JavaPairDStream<String, Integer> stringIntegerJavaPairDStream = stringJavaDStream.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
Tuple2<String, Integer> tuple2 = new Tuple2<>(s, 1);
return tuple2;
}
});
JavaPairDStream<String, Integer> stringIntegerJavaPairDStream1 = stringIntegerJavaPairDStream.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
});
stringIntegerJavaPairDStream1.print();
ssc.start();
ssc.awaitTermination();
}
}
/**
* streaming从flume中去拉去数据
*/
public class FlumePollWCApp {
public static void main(String[] args) throws InterruptedException {
SparkConf conf = new SparkConf().setAppName("wc").setMaster("local[2]");
JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(5000));
//这里的ip地址是flume的ip地址
JavaReceiverInputDStream<SparkFlumeEvent> pollingStream = FlumeUtils.createPollingStream(
ssc, "mini4", 8888, StorageLevel.MEMORY_AND_DISK());
JavaDStream<String> stringJavaDStream = pollingStream.flatMap(new FlatMapFunction<SparkFlumeEvent, String>() {
@Override
public Iterator<String> call(SparkFlumeEvent sparkFlumeEvent) throws Exception {
String s = new String(sparkFlumeEvent.event().getBody().array());
String[] arr = s.split(" ");
List<String> list = new ArrayList<String>();
for (String ss : arr) {
list.add(ss);
}
return list.iterator();
}
});
JavaPairDStream<String, Integer> stringIntegerJavaPairDStream = stringJavaDStream.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
Tuple2<String, Integer> tuple2 = new Tuple2<>(s, 1);
return tuple2;
}
});
JavaPairDStream<String, Integer> stringIntegerJavaPairDStream1 = stringIntegerJavaPairDStream.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) throws Exception {
return integer + integer2;
}
});
stringIntegerJavaPairDStream1.print();
ssc.start();
ssc.awaitTermination();
}
}