实时计算组件-flink本地开发环境搭建
本篇记录搭建flink本地开发环境的过程,使用的flink版本为1.5。
00 下载安装、启动
一、JDK版本要求
想要运行flink,JDK版本需要是JDK8+,如果版本低于8,需要进行升级噢。
二、下载安装、启动
从下载页面下载一个二进制文件,根据自己的实际情况,选择Hadoop/Scala组合版本。如果是使用本地文件系统的话,选择任何Hadoop版本都可以。
1.点击flink版本查看flink版本
2.点击flink下载地址进行下载
3.进入到下载目录
cd ~/app
4.解压文件
tar -xzvf flink-1.5.4-bin-hadoop28-scala_2.11.tgz
5.进入安装目录
cd flink-1.5.4
6.启动本地flink集群
./bin/start-cluster.sh
7.查看日志文件
8.在浏览器中输入http://localhost:8081,访问flink web ui
以上说明,flink启动成功。
01 代码示例
1.添加依赖
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.5.4</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.11</artifactId>
<version>1.5.4</version>
</dependency>
</dependencies>
2.关键代码
/**
* @author ccc
*/
public class SocketWindowWordCount {
public static void main(String[] args) throws Exception {
Host host = new Host();
ParameterTool params;
try {
params = ParameterTool.fromArgs(args);
host.setHostname(params.has("hostname") ? params.get("hostname") : "localhost");
host.setPort(params.getInt("port"));
} catch (Exception e) {
System.err.println("未指定端口." +
"请执行'SocketWindowWordCount --hostname <hostname> --port <port>'," +
"主机名(默认localhost), " +
"端口是文本服务器的地址");
System.err.println("请执行'netcat -l <port>'命令, 启动一个简单的文本服务器, 然后在命令行输入文本");
return;
}
// 准备运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 获取输入的文本
DataStream<String> text = env.socketTextStream(host.getHostname(), host.getPort(), "\n");
// 执行解析数据, 分组, 窗口化, 聚合操作
// 每5秒打印一次单词出现的次数
DataStream<WordWithCount> windowCounts = text
.flatMap(new FlatMapFunction<String, WordWithCount>() {
@Override
public void flatMap(String value, Collector<WordWithCount> out) {
Arrays.asList(value.split("\\s")).forEach(word -> out.collect(new WordWithCount(word, 1L)));
}
})
.keyBy("word")
.timeWindow(Time.seconds(5))
.reduce((ReduceFunction<WordWithCount>) (a, b) -> new WordWithCount(a.getWord(), a.getCount() + b.getCount()));
windowCounts.print().setParallelism(1);
env.execute("Socket Window WordCount");
}
}
/**
* 主机
*
* @author ccc
*/
public class Host {
// 主机名
private String hostname;
// 端口号
private int port;
public String getHostname() {
return hostname;
}
public void setHostname(String hostname) {
this.hostname = hostname;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
@Override
public String toString() {
return hostname + " : " + port;
}
}
/**
* @author ccc
*/
public class WordWithCount {
// 单词
private String word;
// 单词数
private long count;
public WordWithCount() {}
public WordWithCount(String word, long count) {
this.word = word;
this.count = count;
}
public String getWord() {
return word;
}
public void setWord(String word) {
this.word = word;
}
public long getCount() {
return count;
}
public void setCount(long count) {
this.count = count;
}
@Override
public String toString() {
return word + " : " + count;
}
}
02 运行flink程序
1.使用netcat命令, 启动服务器
nc -l 8999
2.使用maven打包应用程序,提交flink作业
./bin/flink run -c com.ccc.flink.main.SocketWindowWordCount /Users/ccc/Documents/app/flink-1.0-SNAPSHOT.jar --port 8999
3.观察flink web ui和日志输出文件
可以看到flink job正在执行, 这个时候flink程序已经连接上服务器, 等待输入文本。
4.输入文本
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
a
b
b
b
erji
二级
英语
英语
英语
英语
英语
英语
英语
英语
英语
英语
英语
英语
英语
英语
英语
英语
英语
4.再次观察flink web ui和日志输出文件
可以看到每次窗口操作结束的时候,会打印结果
5.停止flink集群
./bin/stop-cluster.sh
更多内容请关注微信公众号: “大数据开发与学习茶馆”