走进大数据丨 Flink第一个自定义程序wordcount
创建maven仓库添加xml依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.test.yibo</groupId>
<artifactId>FlinkTest0919</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<encoding>UTF-8</encoding>
<scala.version>2.11.8</scala.version>
<scala.binary.version>2.11</scala.binary.version>
<hadoop.version>2.7.1</hadoop.version>
<flink.version>1.7.0</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.22</version>
</dependency>
</dependencies>
</project>
然后创建java类 类名为WordCount
package com.itstar.Flink;
public class FlinkProject {
//java程序的主要入口
public static void main(String [] args) throws Exception{
//设置本地开发环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 去连接远程 的服务 服务器地址 端口 自定义的
DataStream<String> text = env.socketTextStream("169.254.43.100",5353);
//flatMap就是算子
text.flatMap(new FlatMapFunction<String, String>() {
public void flatMap(String arg0, Collector<String> arg1) throws Exception {
// TODO Auto-generated method stub
String [] splited = arg0.split(" ");
for (String string : splited) {
arg1.collect(string);
}
}
}).map(new MapFunction<String, Tuple2<String, Integer>>() {
public Tuple2<String, Integer> map(String arg0) throws Exception {
// TODO Auto-generated method stub
return new Tuple2<String, Integer>(arg0,1);
}
//类似于合并
}).keyBy(0).timeWindow(Time.seconds(2),Time.seconds(1)).sum(1).print().setParallelism(1);
//因为是懒加载,所以得调用
env.execute();
}
}