走进大数据丨 Flink第一个自定义程序wordcount

创建maven仓库添加xml依赖

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0"

         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>

 

    <groupId>com.test.yibo</groupId>

    <artifactId>FlinkTest0919</artifactId>

    <version>1.0-SNAPSHOT</version>

    <properties>

        <maven.compiler.source>1.8</maven.compiler.source>

        <maven.compiler.target>1.8</maven.compiler.target>

        <encoding>UTF-8</encoding>

        <scala.version>2.11.8</scala.version>

        <scala.binary.version>2.11</scala.binary.version>

        <hadoop.version>2.7.1</hadoop.version>

        <flink.version>1.7.0</flink.version>

    </properties>

    <dependencies>

        <dependency>

            <groupId>org.scala-lang</groupId>

            <artifactId>scala-library</artifactId>

            <version>${scala.version}</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-java</artifactId>

            <version>${flink.version}</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>

            <version>${flink.version}</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-scala_${scala.binary.version}</artifactId>

            <version>${flink.version}</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>

            <version>${flink.version}</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-table_${scala.binary.version}</artifactId>

            <version>${flink.version}</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-clients_${scala.binary.version}</artifactId>

            <version>${flink.version}</version>

        </dependency>

        <dependency>

            <groupId>org.apache.flink</groupId>

            <artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>

            <version>${flink.version}</version>

        </dependency>

        <dependency>

            <groupId>org.apache.hadoop</groupId>

            <artifactId>hadoop-client</artifactId>

            <version>${hadoop.version}</version>

        </dependency>

        <dependency>

            <groupId>mysql</groupId>

            <artifactId>mysql-connector-java</artifactId>

            <version>5.1.38</version>

        </dependency>

        <dependency>

            <groupId>com.alibaba</groupId>

            <artifactId>fastjson</artifactId>

            <version>1.2.22</version>

        </dependency>

    </dependencies>

 

</project>

然后创建java类 类名为WordCount

package com.itstar.Flink;

public class FlinkProject {

//java程序的主要入口

public static void main(String [] args) throws Exception{

//设置本地开发环境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 去连接远程 的服务          服务器地址 端口  自定义的

DataStream<String> text = env.socketTextStream("169.254.43.100",5353);

//flatMap就是算子

text.flatMap(new FlatMapFunction<String, String>() {

 

public void flatMap(String arg0, Collector<String> arg1) throws Exception {

// TODO Auto-generated method stub

String [] splited = arg0.split(" ");

for (String string : splited) {

arg1.collect(string);

}

}

}).map(new MapFunction<String, Tuple2<String, Integer>>() {

 

public Tuple2<String, Integer> map(String arg0) throws Exception {

// TODO Auto-generated method stub

return new Tuple2<String, Integer>(arg0,1);

}

//类似于合并

}).keyBy(0).timeWindow(Time.seconds(2),Time.seconds(1)).sum(1).print().setParallelism(1);

//因为是懒加载,所以得调用

env.execute();

}

}

走进大数据丨 Flink第一个自定义程序wordcount