Scala Flink1.7 消费kafka2.x模板 数据存入hdfs
demo1将数据从kafka之中读取出来,wordwount最后将写出的数据存入hdfs
demo2 将数据存入Dgraph 图数据库 (全为Scala实现)
kafka、flink等相关依赖与 maven打包依赖
<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-kafka</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-core-java</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-flink_2.11</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.5.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.5.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.5.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime_2.11</artifactId>
<version>1.5.2</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.5.2</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-metrics-core</artifactId>
<version>1.5.2</version>
<!--<scope>provided</scope>-->
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.10 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_2.11</artifactId>
<version>1.5.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.spotify/scio-core -->
<dependency>
<groupId>com.spotify</groupId>
<artifactId>scio-core_2.11</artifactId>
<version>0.6.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch -->
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>6.5.4</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>6.5.4</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-rabbitmq_2.10</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-rabbitmq_2.11</artifactId>
<version>1.7.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<!-- 输入程序入口类-->
<mainClass>my.programs.main.clazz</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>6</source>
<target>6</target>
</configuration>
</plugin>
</plugins>
</build>
上代码
上代码:
import java.util.Properties
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.connectors.fs.{StringWriter}
import org.apache.flink.streaming.connectors.fs.bucketing.{BucketingSink}
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer010}
object FlinkDemo {
def main(args: Array[String]): Unit = {
//System.setProperties()
val env = StreamExecutionEnvironment.createLocalEnvironment()
val properties = new Properties()
//kafka位置 老版本的 kafka是配置zookeeper地址
properties.setProperty("bootstrap.servers","192.168.5.166:9092")
properties.setProperty("group.id", "test")
properties.setProperty("fs.default-scheme","hdfs://192.168.5.166:8020")
//设置消费者kafka话题 与 数据格式 !!!接下来的数据处理一定要引入 import org.apache.flink.api.scala._ 这个包
//它包含了程序所需的隐式转换 ,不然无法对datastreaming进行操作 !!!
val text = env.addSource(new FlinkKafkaConsumer010[String]("youkia",new SimpleStringSchema(),properties))
//hdfs写入工具
val sink = new BucketingSink[(String,Int)]("hdfs://192.168.5.166:9000/output")
//设置写大小与检查间隔
sink.setWriter(new StringWriter()).setBatchSize(20).setBatchRolloverInterval(2000)
//以5秒为固定时间窗口 将数据写入hdfs与输出 也可以使用不同的窗口 具体请参考flink 窗口概念
val counts = text.flatMap(_.split(" ")).map((_,1)).keyBy(0).timeWindow(Time.seconds(5)).sum(1)
counts.print()
counts.addSink(sink)
env.execute("WDCount Test")
}
}
效果如下
** demo2请看下一篇**