flume整合streaming

flume整合streaming
这是flume和streaming的两中通信方式
1.push:flume将数据推到streaming,这种方式的缺点如果flume推送过来的数据spark接收不了,那么就会导致数据的丢失
配置文件flume-push.sh

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# source
a1.sources.r1.type = spooldir
#监听的目录文件
a1.sources.r1.spoolDir = /home/hadoop/log
a1.sources.r1.fileHeader = true

# Describe the sink
a1.sinks.k1.type = avro
#这是接收方 ,streaming在集群中跑,所以这个ip是写在集群中的。
a1.sinks.k1.hostname = 10.8.160.30
a1.sinks.k1.port = 8888

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

java代码
首先添加pom文件

<dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-flume_2.10</artifactId>
        <version>${spark.version}</version>
    </dependency>

public class FlumePushWordCount {
    public static void main(String[] args) throws InterruptedException {
        SparkConf conf = new SparkConf().setAppName("flume").setMaster("local[2]");
        JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(5000));
        //推送方式:flume向spark发送数据 这个ip是window上的ip地址。
        JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(ssc, "10.8.160.30", 8888);
        JavaDStream<String> stringJavaDStream = flumeStream.flatMap(new FlatMapFunction<SparkFlumeEvent, String>() {
            @Override
            public Iterator<String> call(SparkFlumeEvent sparkFlumeEvent) throws Exception {
                //flume中的数据通过event.getBody.array来拿到数据
                String line =new String( sparkFlumeEvent.event().getBody().array());
                String[] split = line.split(" ");
                List<String> list = new ArrayList<>();
                for (String ss : split) {
                    list.add(ss);
                }
                return list.iterator();
            }
        });
        JavaPairDStream<String, Integer> stringIntegerJavaPairDStream = stringJavaDStream.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {
                Tuple2<String, Integer> tuple2 = new Tuple2<>(s, 1);
                return tuple2;
            }
        });
        JavaPairDStream<String, Integer> stringIntegerJavaPairDStream1 = stringIntegerJavaPairDStream.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer integer, Integer integer2) throws Exception {
                return integer + integer2;
            }
        });
        stringIntegerJavaPairDStream1.print();
        ssc.start();
        ssc.awaitTermination();
    }
}

用如下的命令在flume节点上启动flume机器。

[[email protected] /usr/local/flume]$bin/flume-ng agent -n a1 -c conf -f conf/myconf/flume-push-streaming.conf

2.streaming通过poll的方式从flume中拉去数据,这个是常用的。
使用之前需要下载下面三个jar包,然后丢到flume/lib下面

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /export/data/flume
a1.sources.r1.fileHeader = true

# Describe the sink
a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
//这值得是flume所在节点的主机名称。
a1.sinks.k1.hostname = mini4 
a1.sinks.k1.port = 8888

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1


/**
 * streaming从flume中去拉去数据
 */
public class FlumePollWCApp {
    public static void main(String[] args) throws InterruptedException {
        SparkConf conf = new SparkConf().setAppName("wc").setMaster("local[2]");
        JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(5000));
        //这里的ip地址是flume的ip地址
        JavaReceiverInputDStream<SparkFlumeEvent> pollingStream = FlumeUtils.createPollingStream(
                ssc, "mini4", 8888, StorageLevel.MEMORY_AND_DISK());
        JavaDStream<String> stringJavaDStream = pollingStream.flatMap(new FlatMapFunction<SparkFlumeEvent, String>() {
            @Override
            public Iterator<String> call(SparkFlumeEvent sparkFlumeEvent) throws Exception {
                String s = new String(sparkFlumeEvent.event().getBody().array());
                String[] arr = s.split(" ");
                List<String> list = new ArrayList<String>();
                for (String ss : arr) {
                    list.add(ss);
                }
                return list.iterator();
            }
        });
        JavaPairDStream<String, Integer> stringIntegerJavaPairDStream = stringJavaDStream.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {
                Tuple2<String, Integer> tuple2 = new Tuple2<>(s, 1);
                return tuple2;
            }
        });
        JavaPairDStream<String, Integer> stringIntegerJavaPairDStream1 = stringIntegerJavaPairDStream.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer integer, Integer integer2) throws Exception {
                return integer + integer2;
            }
        });
        stringIntegerJavaPairDStream1.print();
        ssc.start();
        ssc.awaitTermination();
    }
}

/**
 * streaming从flume中去拉去数据
 */
public class FlumePollWCApp {
    public static void main(String[] args) throws InterruptedException {
        SparkConf conf = new SparkConf().setAppName("wc").setMaster("local[2]");
        JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(5000));
        //这里的ip地址是flume的ip地址
        JavaReceiverInputDStream<SparkFlumeEvent> pollingStream = FlumeUtils.createPollingStream(
                ssc, "mini4", 8888, StorageLevel.MEMORY_AND_DISK());
        JavaDStream<String> stringJavaDStream = pollingStream.flatMap(new FlatMapFunction<SparkFlumeEvent, String>() {
            @Override
            public Iterator<String> call(SparkFlumeEvent sparkFlumeEvent) throws Exception {
                String s = new String(sparkFlumeEvent.event().getBody().array());
                String[] arr = s.split(" ");
                List<String> list = new ArrayList<String>();
                for (String ss : arr) {
                    list.add(ss);
                }
                return list.iterator();
            }
        });
        JavaPairDStream<String, Integer> stringIntegerJavaPairDStream = stringJavaDStream.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {
                Tuple2<String, Integer> tuple2 = new Tuple2<>(s, 1);
                return tuple2;
            }
        });
        JavaPairDStream<String, Integer> stringIntegerJavaPairDStream1 = stringIntegerJavaPairDStream.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer integer, Integer integer2) throws Exception {
                return integer + integer2;
            }
        });
        stringIntegerJavaPairDStream1.print();
        ssc.start();
        ssc.awaitTermination();
    }
}