实时计算组件-flink本地开发环境搭建

本篇记录搭建flink本地开发环境的过程,使用的flink版本为1.5。

00 下载安装、启动

一、JDK版本要求

想要运行flink,JDK版本需要是JDK8+,如果版本低于8,需要进行升级噢。
实时计算组件-flink本地开发环境搭建

二、下载安装、启动

从下载页面下载一个二进制文件,根据自己的实际情况,选择Hadoop/Scala组合版本。如果是使用本地文件系统的话,选择任何Hadoop版本都可以。
1.点击flink版本查看flink版本
实时计算组件-flink本地开发环境搭建
2.点击flink下载地址进行下载
实时计算组件-flink本地开发环境搭建
3.进入到下载目录

cd ~/app

4.解压文件

tar -xzvf flink-1.5.4-bin-hadoop28-scala_2.11.tgz

实时计算组件-flink本地开发环境搭建
5.进入安装目录

cd flink-1.5.4

实时计算组件-flink本地开发环境搭建
6.启动本地flink集群
实时计算组件-flink本地开发环境搭建

./bin/start-cluster.sh

实时计算组件-flink本地开发环境搭建
7.查看日志文件
实时计算组件-flink本地开发环境搭建
8.在浏览器中输入http://localhost:8081,访问flink web ui
实时计算组件-flink本地开发环境搭建
以上说明,flink启动成功。

01 代码示例

1.添加依赖

<dependencies>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-streaming-java_2.11</artifactId>
		<version>1.5.4</version>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-streaming-scala_2.11</artifactId>
		<version>1.5.4</version>
	</dependency>
</dependencies>

2.关键代码

/**
 * @author ccc
 */
public class SocketWindowWordCount {

    public static void main(String[] args) throws Exception {
        Host host = new Host();
        ParameterTool params;
        try {
            params = ParameterTool.fromArgs(args);
            host.setHostname(params.has("hostname") ? params.get("hostname") : "localhost");
            host.setPort(params.getInt("port"));
        } catch (Exception e) {
            System.err.println("未指定端口." +
                    "请执行'SocketWindowWordCount --hostname <hostname> --port <port>'," +
                    "主机名(默认localhost), " +
                    "端口是文本服务器的地址");
            System.err.println("请执行'netcat -l <port>'命令, 启动一个简单的文本服务器, 然后在命令行输入文本");
            return;
        }

        // 准备运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 获取输入的文本
        DataStream<String> text = env.socketTextStream(host.getHostname(), host.getPort(), "\n");

        // 执行解析数据, 分组, 窗口化, 聚合操作
        // 每5秒打印一次单词出现的次数
        DataStream<WordWithCount> windowCounts = text
                .flatMap(new FlatMapFunction<String, WordWithCount>() {
                    @Override
                    public void flatMap(String value, Collector<WordWithCount> out) {
                        Arrays.asList(value.split("\\s")).forEach(word -> out.collect(new WordWithCount(word, 1L)));
                    }
                })
                .keyBy("word")
                .timeWindow(Time.seconds(5))
                .reduce((ReduceFunction<WordWithCount>) (a, b) -> new WordWithCount(a.getWord(), a.getCount() + b.getCount()));
        windowCounts.print().setParallelism(1);
        env.execute("Socket Window WordCount");
    }

}
/**
 * 主机
 *
 * @author ccc
 */
public class Host {

    // 主机名
    private String hostname;

    // 端口号
    private int port;

    public String getHostname() {
        return hostname;
    }

    public void setHostname(String hostname) {
        this.hostname = hostname;
    }

    public int getPort() {
        return port;
    }

    public void setPort(int port) {
        this.port = port;
    }

    @Override
    public String toString() {
        return hostname + " : " + port;
    }

}
/**
 * @author ccc
 */
public class WordWithCount {

    // 单词
    private String word;

    // 单词数
    private long count;

    public WordWithCount() {}

    public WordWithCount(String word, long count) {
        this.word = word;
        this.count = count;
    }

    public String getWord() {
        return word;
    }

    public void setWord(String word) {
        this.word = word;
    }

    public long getCount() {
        return count;
    }

    public void setCount(long count) {
        this.count = count;
    }

    @Override
    public String toString() {
        return word + " : " + count;
    }

}

02 运行flink程序

1.使用netcat命令, 启动服务器

nc -l 8999

2.使用maven打包应用程序,提交flink作业

./bin/flink run -c com.ccc.flink.main.SocketWindowWordCount /Users/ccc/Documents/app/flink-1.0-SNAPSHOT.jar --port 8999

实时计算组件-flink本地开发环境搭建
3.观察flink web ui和日志输出文件
可以看到flink job正在执行, 这个时候flink程序已经连接上服务器, 等待输入文本。
实时计算组件-flink本地开发环境搭建
4.输入文本
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
hello world
a
b
b

b
erji
二级
英语
英语

英语
英语
英语
英语
英语
英语
英语
英语
英语
英语
英语
英语
英语
英语
英语

4.再次观察flink web ui和日志输出文件
实时计算组件-flink本地开发环境搭建
实时计算组件-flink本地开发环境搭建
可以看到每次窗口操作结束的时候,会打印结果
5.停止flink集群

./bin/stop-cluster.sh

实时计算组件-flink本地开发环境搭建

更多内容请关注微信公众号: “大数据开发与学习茶馆”
实时计算组件-flink本地开发环境搭建