整合flume-kafka-sparkStreaming完整代码:
本人学习笔记,不提供基础知识讲解。
本文实现效果是:
监控hadoop01节点的/home/hadoop/logs/flume.log,当该文件有内容追加时,将追加内容发送到hadoop02的44444端口,
hadoop02节点监控到44444有消息时,将消息push到kafka集群的topic为flume-kafka下。
sparkstreaming进行流式计算单词数。
注意:
根据自己的情况对参数进行调整!!!
代码:
package com.jtv.sparkStreaming_kafka import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* Consumes messages from one or more topics in Kafka.
* <checkPointDir> is the Spark Streaming checkpoint directory.
* <brokers> is for bootstrapping and the producer will only use it for getting metadata
* <topics> is a list of one or more kafka topics to consume from
* <batchTime> is the Spark Streaming batch duration in seconds.
*/ object sparkStreaming_kafka_direct_10_HA {
def main(args: Array[String]) {
System.setProperty("HADOOP_USER_NAME", "hadoop")
val ssc = createContext(args)
//The Streaming system starts.
ssc.start()
ssc.awaitTermination()
}
def createContext(args: Array[String]) : StreamingContext = {
if (args.length != 4) {
System.err.println("Usage: DstreamKafkaCount<checkPointDir> <brokers> <topic> <batchTime>")
System.exit(1)
}
val Array(checkPointDir, brokers, topics, batchTime) = args
// Create a Streaming startup environment.
//集群方式
//val sparkConf = new SparkConf().setAppName("sparkStreaming_kafka_direct_10_HA")
//本地方式
val sparkConf = new SparkConf().setAppName("sparkStreaming_kafka_direct_10_HA").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(batchTime.toLong))
ssc.sparkContext.setLogLevel("WARN")
//Configure the CheckPoint directory for the Streaming.
//This parameter is mandatory because of existence of the window concept.
ssc.checkpoint(checkPointDir)
// Get the list of topic used by kafka
val topicArr = topics.split(",")
val topicSet = topicArr.toSet
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> brokers,
"value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
"group.id" -> "DemoConsumer",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val locationStrategy = LocationStrategies.PreferConsistent
val consumerStrategy = ConsumerStrategies.Subscribe[String, String](topicArr, kafkaParams)
// Create direct kafka stream with brokers and topics
// Receive data from the Kafka and generate the corresponding DStream
val stream = KafkaUtils.createDirectStream[String, String](ssc, locationStrategy, consumerStrategy)
//stream.map(record => (record.key, record.value))
val kafkaStreams: DStream[String] = stream.map(_.value())
val resultDStream: DStream[(String, Int)] = kafkaStreams
.flatMap(_.split(" "))
.map((_, 1))
.updateStateByKey(updataFunc)
resultDStream.print()
ssc
}
def updataFunc(values : Seq[Int], state : Option[Int]) : Option[Int] =
Some(values.sum + state.getOrElse(0))
}
|
执行流程:
1、首先在hadoop01节点上创建一个文件:/home/hadoop/logs/flume.log
2、启动spark,kafka,zookeeper,hdfs。
3、在hadoop02节点上运行:(avro-kafka.conf文件见后面代码)
flume-ng agent --conf conf --conf-file /home/hadoop/apps/flume/agentConf/avro-kafka.conf --name agent2 -Dflume.root.logger=INFO,console
4、在hadoop01节点上运行:(exec-avro.conf文件见后面代码)
flume-ng agent --conf conf --conf-file /home/hadoop/apps/flume/agentConf/exec-avro.conf --name agent1 -Dflume.root.logger=INFO,console
5、在kafka集群任意节点(例如:hadoop03)节点上启动kafka消费者,
kafka-console-consumer.sh --bootstrap-server hadoop01:9092,hadoop02:9092,hadoop03:9092 --from-beginning --topic flume-kafka
6、执行程序(本地方法、打jar包方法)
7、打jar包方式,需要一些依赖包,放在/home/hadoop/lib/目录下,自行下载。
8、cd /home/hadoop/logs目录下:echo aa bb c >> flume.log
1、本地执行:
Program arguments:
"/sparkStreaming/direct_kafka_10_HA/" "hadoop01:9092,hadoop02:9092,hadoop03:9092" "flume-kafka" 4
2、打jar包到集群运行:
spark-submit命令:
spark-submit \
--class com.jtv.sparkStreaming_kafka.sparkStreaming_kafka_direct_10_HA \
--master spark://hadoop02:7077,hadoop03:7077 \
--driver-memory 512m \
--total-executor-cores 3 \
--executor-memory 512m \
--supervise \
--jars /home/hadoop/lib/spark-streaming-kafka-0-10_2.11-2.3.2.jar,\
/home/hadoop/lib/kafka-clients-2.1.1.jar,\
/home/hadoop/lib/metrics-core-2.2.0.jar,\
/home/hadoop/lib/spark-streaming_2.11-2.3.2.jar,\
/home/hadoop/lib/zkclient-0.3.jar \
/home/hadoop/localFile/original-SparkCore-1.0-SNAPSHOT.jar \
/sparkStreaming/direct_kafka_10_HA/ \
hadoop01:9092,hadoop02:9092,hadoop03:9092 \
flume-kafka \
4
|
效果:
avro-kafka.conf
# The configuration file needs to define the sources,
# the channels and the sinks.
# Sources, channels and sinks are defined per agent,
# in this case called 'agent'
agent2.sources = r2
agent2.channels = c2
agent2.sinks = k2
#define sources
agent2.sources.r2.type = avro
agent2.sources.r2.bind = hadoop02
agent2.sources.r2.port = 44444
#define channels
agent2.channels.c2.type = memory
agent2.channels.c2.capacity = 1000
agent2.channels.c2.transactionCapacity = 100
#define sink
agent2.sinks.k2.type = org.apache.flume.sink.kafka.KafkaSink
agent2.sinks.k2.brokerList = hadoop01:9092,hadoop02:9092,hadoop03:9092
agent2.sinks.k2.topic = flume-kafka
agent2.sinks.k2.batchSize = 4
agent2.sinks.k2.requiredAcks = 1
#bind sources and sink to channel
agent2.sources.r2.channels = c2
agent2.sinks.k2.channel = c2
|
exec-avro.conf
# The configuration file needs to define the sources,
# the channels and the sinks.
# Sources, channels and sinks are defined per agent,
# in this case called 'agent'
agent1.sources = r1
agent1.channels = c1
agent1.sinks = k1
#define sources
agent1.sources.r1.type = exec
agent1.sources.r1.command = tail -F /home/hadoop/logs/flume.log
#define channels
agent1.channels.c1.type = memory
agent1.channels.c1.capacity = 1000
agent1.channels.c1.transactionCapacity = 100
#define sink
agent1.sinks.k1.type = avro
agent1.sinks.k1.hostname = hadoop02
agent1.sinks.k1.port = 44444
#bind sources and sink to channel
agent1.sources.r1.channels = c1
agent1.sinks.k1.channel = c1
|
pom.xml:(spark所用到的全部jar,可自行删减)
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion>
<groupId>com.jtv.spark</groupId> <artifactId>SparkCore</artifactId> <version>1.0-SNAPSHOT</version>
<properties> <project.build.sourceEncoding>UTF8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <encoding>UTF-8</encoding> <scala.version>2.11.8</scala.version> <spark.version>2.3.2</spark.version> <hive.version>2.3.3</hive.version> <hadoop.version>2.7.6</hadoop.version> <mysql.connect>5.1.46</mysql.connect> <storm.version>1.2.2</storm.version> <streaming.kafka.version>2.3.2</streaming.kafka.version> <scala.compat.version>2.11</scala.compat.version> </properties>
<dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency>
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>${spark.version}</version> </dependency>
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>${spark.version}</version> </dependency>
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>${spark.version}</version> </dependency>
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-graphx_2.11</artifactId> <version>${spark.version}</version> </dependency>
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-mllib_2.11</artifactId> <version>${spark.version}</version> </dependency>
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency>
<!-- SparkStreaming和kafka做整合 -->
<!--<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.11</artifactId>
<version>1.6.3</version>
</dependency>-->
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
<!--<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>1.1.0</version>
</dependency>-->
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-8_2.11</artifactId> <version>${streaming.kafka.version}</version> </dependency>
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-flume_2.11</artifactId> <version>${spark.version}</version> </dependency>
<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>${mysql.connect}</version> </dependency>
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.11</artifactId> <version>${spark.version}</version> </dependency>
<!-- https://mvnrepository.com/artifact/org.apache.hive/hive-exec -->
<dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>${hive.version}</version> </dependency>
<dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-jdbc</artifactId> <version>${hive.version}</version> </dependency>
<!-- https://mvnrepository.com/artifact/junit/junit -->
<dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency>
<!-- https://mvnrepository.com/artifact/org.apache.storm/storm-core -->
<!--<dependency>-->
<!--<groupId>org.apache.storm</groupId>-->
<!--<artifactId>storm-core</artifactId>-->
<!--<version>${storm.version}</version>-->
<!--</dependency>-->
<!-- https://mvnrepository.com/artifact/com.101tec/zkclient -->
<dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.3</version> </dependency>
<!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.10</version> <type>pom</type> </dependency>
</dependencies>
<build> <pluginManagement> <plugins> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.5.1</version> </plugin> </plugins> </pluginManagement> <plugins> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <executions> <execution> <id>scala-compile-first</id> <phase>process-resources</phase> <goals> <goal>add-source</goal> <goal>compile</goal> </goals> </execution> <execution> <id>scala-test-compile</id> <phase>process-test-resources</phase> <goals> <goal>testCompile</goal> </goals> </execution> </executions> </plugin>
<plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <executions> <execution> <phase>compile</phase> <goals> <goal>compile</goal> </goals> </execution> </executions> </plugin>
<plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.4.3</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> </configuration> </execution> </executions> </plugin> </plugins> </build> </project>
|