Scala Flink1.7 消费kafka2.x模板 数据存入hdfs

demo1将数据从kafka之中读取出来,wordwount最后将写出的数据存入hdfs
demo2 将数据存入Dgraph 图数据库 (全为Scala实现)

kafka、flink等相关依赖与 maven打包依赖
<dependencies>
        <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-sdks-java-io-kafka</artifactId>
            <version>2.4.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.1.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-runners-core-java</artifactId>
            <version>2.4.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-runners-flink_2.11</artifactId>
            <version>2.4.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.5.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>1.5.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>1.5.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime_2.11</artifactId>
            <version>1.5.2</version>
            <!--<scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.5.2</version>
            <!--<scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-metrics-core</artifactId>
            <version>1.5.2</version>
            <!--<scope>provided</scope>-->
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.10 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
            <version>1.5.2</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/com.spotify/scio-core -->
        <dependency>
            <groupId>com.spotify</groupId>
            <artifactId>scio-core_2.11</artifactId>
            <version>0.6.1</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch -->
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>6.5.4</version>
        </dependency>

        <dependency>
             <groupId>org.elasticsearch.client</groupId>
             <artifactId>transport</artifactId>
                <version>6.5.4</version>
         </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-rabbitmq_2.10</artifactId>
            <version>1.2.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-rabbitmq_2.11</artifactId>
            <version>1.7.1</version>
        </dependency>

        </dependencies>

<build>
<plugins>
    <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>3.0.0</version>
        <executions>
            <execution>
                <phase>package</phase>
                <goals>
                    <goal>shade</goal>
                </goals>
                <configuration>
                    <artifactSet>
                        <excludes>
                            <exclude>com.google.code.findbugs:jsr305</exclude>
                            <exclude>org.slf4j:*</exclude>
                            <exclude>log4j:*</exclude>
                        </excludes>
                    </artifactSet>
                    <transformers>
                        <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                            <!-- 输入程序入口类-->
                            <mainClass>my.programs.main.clazz</mainClass>
                        </transformer>
                    </transformers>
                </configuration>
            </execution>
        </executions>
    </plugin>
    <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <configuration>
            <source>6</source>
            <target>6</target>
        </configuration>
    </plugin>
</plugins>
</build>

上代码

Scala Flink1.7 消费kafka2.x模板 数据存入hdfs
上代码:

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.connectors.fs.{StringWriter}
import org.apache.flink.streaming.connectors.fs.bucketing.{BucketingSink}
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer010}

object FlinkDemo {
  def main(args: Array[String]): Unit = {
    //System.setProperties()
    val env = StreamExecutionEnvironment.createLocalEnvironment()

    val properties = new Properties()
    //kafka位置 老版本的 kafka是配置zookeeper地址
    properties.setProperty("bootstrap.servers","192.168.5.166:9092")
    properties.setProperty("group.id", "test")
    properties.setProperty("fs.default-scheme","hdfs://192.168.5.166:8020")


    //设置消费者kafka话题 与 数据格式 !!!接下来的数据处理一定要引入 import org.apache.flink.api.scala._ 这个包
    //它包含了程序所需的隐式转换 ,不然无法对datastreaming进行操作 !!!
    val text = env.addSource(new FlinkKafkaConsumer010[String]("youkia",new SimpleStringSchema(),properties))

    //hdfs写入工具
    val sink = new BucketingSink[(String,Int)]("hdfs://192.168.5.166:9000/output")
    //设置写大小与检查间隔
    sink.setWriter(new StringWriter()).setBatchSize(20).setBatchRolloverInterval(2000)

      //以5秒为固定时间窗口 将数据写入hdfs与输出 也可以使用不同的窗口 具体请参考flink 窗口概念
    val counts = text.flatMap(_.split(" ")).map((_,1)).keyBy(0).timeWindow(Time.seconds(5)).sum(1)

    counts.print()

    counts.addSink(sink)

    env.execute("WDCount Test")
  }
}

效果如下

Scala Flink1.7 消费kafka2.x模板 数据存入hdfs
Scala Flink1.7 消费kafka2.x模板 数据存入hdfs
Scala Flink1.7 消费kafka2.x模板 数据存入hdfs
** demo2请看下一篇**