1. 从下载地址:https://flink.apache.org/downloads.html,下载相应版本的Flink安装文件进行解压。Flink版本中包含了匹配的hadoop的版本和scala的版本,为以后的YARN模式的安装做准备(Hadoop为2.7.4版本),这里选用的为最新版本的Flink+Hadoop2.7+Scala2.11版本的安装文件。下载后解压:

cd /home/yitian/flink
tar -zxvf flink-1.7.2-bin-hadoop27-scala_2.11.tgz 

2. Flink解压后,直接可以启用本地模式。这种方式简单,便于了解Flink的一些基本使用,使用如下命令启动本地模式:

[email protected]:~/flink/flink-1.7.2/bin$ ./start-cluster.sh 
Starting cluster.
Starting standalonesession daemon on host flink.
Starting taskexecutor daemon on host flink.

3. 启动完成后,可以使用jps命令查看启动的进程:

[email protected]:~$ jps
8035 TaskManagerRunner
7591 StandaloneSessionClusterEntrypoint
8378 Jps

4. 查看WebUI,可以检验Flink是否启动成功,默认端口为8081。(这个端口和Aurora的端口重复,这里仅是提醒)




Flink官方提供了一个叫做quickstart的项目作为使用Flink BatchJob和StreamJob API的示例,可以通过如下的命令直接下载到本地。下载完成后就是一个mavn的idea项目,直接导入到IDEA中就可以使用。

[email protected]:~/flink-idea$ curl https://flink.apache.org/q/quickstart.sh | bash
[INFO] Project created from Archetype in dir: /home/yitian/flink-idea/quickstart
[INFO] ------------------------------------------------------------------------
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 13:25 min
[INFO] Finished at: 2019-03-13T10:11:30-07:00
[INFO] Final Memory: 16M/151M
[INFO] ------------------------------------------------------------------------

	A sample quickstart Flink Job has been created.
	Switch into the directory using
		 cd quickstart
	Import the project there using your favorite IDE (Import it as a maven project)
	Build a jar inside the directory using
		 mvn clean package
	You will find the runnable jar in quickstart/target
	Consult our website if you have any troubles: http://flink.apache.org/community.html#mailing-lists


[email protected]:~/flink-idea/quickstart$ tree .
├── pom.xml
└── src
    └── main
        ├── java
        │   └── org
        │       └── myorg
        │           └── quickstart
        │               ├── BatchJob.java
        │               └── StreamingJob.java
        └── resources
            └── log4j.properties

7 directories, 4 files

需要注意的时,在Flink1.7.2(2019.03.13这个时间上,Flink的最新版本)中,quickstart项目中仅提供了两个示例的模板文件。不像之前1.3版本中,里面还有WordCount的一些示例文件。目前示例拓扑的文件可以在Flink github项目中的flink-examples目录下找到。通过将其中的一些需要探索的示例文件拷贝到quickflink中就可以使用。






通过查看Flink的WordCount例子,来进一步了解Flink的Stream API的使用:

1. 使用StreamExecutionEnvironment创建执行的环境:

// set up the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

 2. 数据源。如果在运行参数中指定了运行的input数据源,则会使用input指定的文件作为输入。如果未指定,则使用WordCount默认的WordCountData作为数据源,它是一段数组存储的文本文字:

public class WordCountData {

	public static final String[] WORDS = new String[] {
		"To be, or not to be,--that is the question:--",
		"Whether 'tis nobler in the mind to suffer",
		"The slings and arrows of outrageous fortune",
		"Or to take arms against a sea of troubles,",
		"And by opposing end them?--To die,--to sleep,--",
		"No more; and by a sleep to say we end",
		"The heartache, and the thousand natural shocks",
		"That flesh is heir to,--'tis a consummation",
		"Devoutly to be wish'd. To die,--to sleep;--",
		"To sleep! perchance to dream:--ay, there's the rub;",
		"For in that sleep of death what dreams may come,",
		"When we have shuffled off this mortal coil,",
		"Must give us pause: there's the respect",
		"That makes calamity of so long life;",
		"For who would bear the whips and scorns of time,",
		"The oppressor's wrong, the proud man's contumely,",
		"The pangs of despis'd love, the law's delay,",
		"The insolence of office, and the spurns",

 3. 逻辑实现。WordCount的实现功能很清楚:分词,统计词频。在Flink的这个wordcount中,首先实现了一个FlatMapFunction接口类,用于重写FlatMap的功能。

public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
	public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
		// normalize and split the line
		String[] tokens = value.toLowerCase().split("\\W+");
		// emit the pairs
		for (String token : tokens) {
			if (token.length() > 0) {
				out.collect(new Tuple2<>(token, 1));

这个方法的运行结果就是,将输入的每一句sentence进行分词,得到:(word, 1)这样的key,value初始结果。存放到一个Tuple2类型的集合中。


DataStream<Tuple2<String, Integer>> counts =
	// split up the lines in pairs (2-tuples) containing: (word,1)
	text.flatMap(new Tokenizer())
	// group by the tuple field "0" and sum up tuple field "1"

 在进行完flatMap操作后,所有的值变为(field0, field1)的形式,keyBy(0),即是通过对field0进行分组,然后sum(1)对分组后的filed1值进行求和。最后结果保存到DataStream中。


// emit result
if (params.has("output")) {
} else {
	System.out.println("Printing result to stdout. Use --output to specify output path.");


// execute program
env.execute("Streaming WordCount");




[email protected]:~/flink/flink-1.7.2/bin$ ./stop-cluster.sh 
Stopping taskexecutor daemon (pid: 8035) on host flink.
Stopping standalonesession daemon (pid: 7591) on host flink.