整合flume-kafka-sparkStreaming完整代码-有状态更新updateStateByKey

整合flume-kafka-sparkStreaming完整代码:

本人学习笔记,不提供基础知识讲解。

本文实现效果是:

监控hadoop01节点的/home/hadoop/logs/flume.log,当该文件有内容追加时,将追加内容发送到hadoop02的44444端口,

hadoop02节点监控到44444有消息时,将消息push到kafka集群的topic为flume-kafka下。

sparkstreaming进行流式计算单词数。

注意:

      根据自己的情况对参数进行调整!!!

代码:

package com.jtv.sparkStreaming_kafka
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * Consumes messages from one or more topics in Kafka.
  *
<checkPointDir> is the Spark Streaming checkpoint directory.
  *
<brokers> is for bootstrapping and the producer will only use it for getting metadata
  *
<topics> is a list of one or more kafka topics to consume from
  *
<batchTime> is the Spark Streaming batch duration in seconds.
  */

object sparkStreaming_kafka_direct_10_HA {
   
def main(args: Array[String]) {
        System.setProperty(
"HADOOP_USER_NAME", "hadoop")
       
val ssc = createContext(args)

        
//The Streaming system starts.
       
ssc.start()
        ssc.awaitTermination()
    }

   
def createContext(args: Array[String]) : StreamingContext = {

       
if (args.length != 4) {
            System.
err.println("Usage: DstreamKafkaCount<checkPointDir> <brokers> <topic> <batchTime>")
            System.exit(
1)
        }

       
val Array(checkPointDir, brokers, topics, batchTime) = args
       
// Create a Streaming startup environment.

        //集群方式
        //
val sparkConf = new SparkConf().setAppName("sparkStreaming_kafka_direct_10_HA")

        //本地方式

        val sparkConf = new SparkConf().setAppName("sparkStreaming_kafka_direct_10_HA").setMaster("local[2]")
       
val ssc = new StreamingContext(sparkConf, Seconds(batchTime.toLong))

        ssc.sparkContext.setLogLevel(
"WARN")
       
//Configure the CheckPoint directory for the Streaming.
        //This parameter is mandatory because of existence of the window concept.
       
ssc.checkpoint(checkPointDir)

       
// Get the list of topic used by kafka
       
val topicArr = topics.split(",")
       
val topicSet = topicArr.toSet

       
val kafkaParams = Map[String, Object](
           
"bootstrap.servers" -> brokers,
           
"value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
            
"key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
           
"group.id" -> "DemoConsumer",
           
"auto.offset.reset" -> "latest",
           
"enable.auto.commit" -> (false: java.lang.Boolean)
        )

        
val locationStrategy = LocationStrategies.PreferConsistent
       
val consumerStrategy = ConsumerStrategies.Subscribe[String, String](topicArr, kafkaParams)

       
// Create direct kafka stream with brokers and topics
        // Receive data from the Kafka and generate the corresponding DStream
       
val stream = KafkaUtils.createDirectStream[String, String](ssc, locationStrategy, consumerStrategy)

       
//stream.map(record => (record.key, record.value))
       
val kafkaStreams: DStream[String] = stream.map(_.value())
       
val resultDStream: DStream[(String, Int)] = kafkaStreams
                .flatMap(_.split(
" "))
                .map((_,
1))
                .updateStateByKey(updataFunc)

        resultDStream.print()

        ssc
    }

   
def updataFunc(values : Seq[Int], state : Option[Int]) : Option[Int] =
        Some(values.sum + state.getOrElse(
0))
}

 

执行流程:

1、首先在hadoop01节点上创建一个文件:/home/hadoop/logs/flume.log

2、启动sparkkafkazookeeperhdfs

3、在hadoop02节点上运行:(avro-kafka.conf文件见后面代码)

flume-ng agent --conf conf --conf-file /home/hadoop/apps/flume/agentConf/avro-kafka.conf --name agent2 -Dflume.root.logger=INFO,console

4、在hadoop01节点上运行:(exec-avro.conf文件见后面代码)

flume-ng agent --conf conf --conf-file /home/hadoop/apps/flume/agentConf/exec-avro.conf --name agent1 -Dflume.root.logger=INFO,console

5、在kafka集群任意节点(例如:hadoop03)节点上启动kafka消费者,

kafka-console-consumer.sh --bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092 --from-beginning --topic flume-kafka

6、执行程序(本地方法、打jar包方法)

7、打jar包方式,需要一些依赖包,放在/home/hadoop/lib/目录下,自行下载。

8cd /home/hadoop/logs目录下:echo aa bb c >> flume.log

 

 

1、本地执行:

整合flume-kafka-sparkStreaming完整代码-有状态更新updateStateByKey

Program arguments:

"/sparkStreaming/direct_kafka_10_HA/" "hadoop01:9092,hadoop02:9092,hadoop03:9092" "flume-kafka" 4

 

 

2、打jar包到集群运行:

spark-submit命令:

spark-submit \

--class com.jtv.sparkStreaming_kafka.sparkStreaming_kafka_direct_10_HA \

--master spark://hadoop02:7077,hadoop03:7077 \

--driver-memory 512m \

--total-executor-cores 3 \

--executor-memory 512m \

--supervise \

--jars /home/hadoop/lib/spark-streaming-kafka-0-10_2.11-2.3.2.jar,\

/home/hadoop/lib/kafka-clients-2.1.1.jar,\

/home/hadoop/lib/metrics-core-2.2.0.jar,\

/home/hadoop/lib/spark-streaming_2.11-2.3.2.jar,\

/home/hadoop/lib/zkclient-0.3.jar \

/home/hadoop/localFile/original-SparkCore-1.0-SNAPSHOT.jar \

/sparkStreaming/direct_kafka_10_HA/ \

hadoop01:9092,hadoop02:9092,hadoop03:9092 \

flume-kafka \

4

 

效果:

整合flume-kafka-sparkStreaming完整代码-有状态更新updateStateByKey

 

avro-kafka.conf

# The configuration file needs to define the sources,

# the channels and the sinks.

# Sources, channels and sinks are defined per agent,

# in this case called 'agent'

 

agent2.sources = r2

agent2.channels = c2

agent2.sinks = k2

 

#define sources

agent2.sources.r2.type = avro

agent2.sources.r2.bind = hadoop02

agent2.sources.r2.port = 44444

 

#define channels

agent2.channels.c2.type = memory

agent2.channels.c2.capacity = 1000

agent2.channels.c2.transactionCapacity = 100

 

#define sink

agent2.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink

agent2.sinks.k2.brokerList = hadoop01:9092,hadoop02:9092,hadoop03:9092

agent2.sinks.k2.topic = flume-kafka

agent2.sinks.k2.batchSize = 4

agent2.sinks.k2.requiredAcks = 1

 

#bind sources and sink to channel

agent2.sources.r2.channels = c2

agent2.sinks.k2.channel = c2

 

exec-avro.conf

# The configuration file needs to define the sources,

# the channels and the sinks.

# Sources, channels and sinks are defined per agent,

# in this case called 'agent'

 

agent1.sources = r1

agent1.channels = c1

agent1.sinks = k1

 

#define sources

agent1.sources.r1.type = exec

agent1.sources.r1.command = tail -F /home/hadoop/logs/flume.log

 

#define channels

agent1.channels.c1.type = memory

agent1.channels.c1.capacity = 1000

agent1.channels.c1.transactionCapacity = 100

 

#define sink

agent1.sinks.k1.type = avro

agent1.sinks.k1.hostname = hadoop02

agent1.sinks.k1.port = 44444

 

#bind sources and sink to channel

agent1.sources.r1.channels = c1

agent1.sinks.k1.channel = c1

pom.xml:(spark所用到的全部jar,可自行删减)

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
        
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.jtv.spark</groupId>
    <artifactId>SparkCore</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <project.build.sourceEncoding>UTF8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <encoding>UTF-8</encoding>
        <scala.version>2.11.8</scala.version>
        <spark.version>2.3.2</spark.version>
        <hive.version>2.3.3</hive.version>
        <hadoop.version>2.7.6</hadoop.version>
        <mysql.connect>5.1.46</mysql.connect>
        <storm.version>1.2.2</storm.version>
        <streaming.kafka.version>2.3.2</streaming.kafka.version>
        <scala.compat.version>2.11</scala.compat.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-graphx_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
        </dependency>

        <!-- SparkStreamingkafka做整合 -->
        <!--<dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka_2.11</artifactId>
            <version>1.6.3</version>
        </dependency>-->

        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
        <!--<dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>1.1.0</version>
        </dependency>-->

       
<dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
            <version>${streaming.kafka.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-flume_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>${mysql.connect}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.hive/hive-exec -->
       
<dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-exec</artifactId>
            <version>${hive.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-jdbc</artifactId>
            <version>${hive.version}</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/junit/junit -->
       
<dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.storm/storm-core -->
        <!--<dependency>-->
            <!--<groupId>org.apache.storm</groupId>-->
            <!--<artifactId>storm-core</artifactId>-->
            <!--<version>${storm.version}</version>-->
        <!--</dependency>-->

        <!-- https://mvnrepository.com/artifact/com.101tec/zkclient -->
       
<dependency>
            <groupId>com.101tec</groupId>
            <artifactId>zkclient</artifactId>
            <version>0.3</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
        
<dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.10</version>
            <type>pom</type>
        </dependency>

    </dependencies>

    <build>
        <pluginManagement>
            <plugins>
                <plugin>
                    <groupId>net.alchim31.maven</groupId>
                    <artifactId>scala-maven-plugin</artifactId>
                    <version>3.2.2</version>
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.5.1</version>
                </plugin>
            </plugins>
        </pluginManagement>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <executions>
                    <execution>
                        <id>scala-compile-first</id>
                        <phase>process-resources</phase>
                        <goals>
                            <goal>add-source</goal>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                    <execution>
                        <id>scala-test-compile</id>
                        <phase>process-test-resources</phase>
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <executions>
                    <execution>
                        <phase>compile</phase>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>