Flink快速入门

第1章针对Flink的基本原理、架构和组件进行了分析,本章开始快速实现一个Flink的入门案例,这样可以加深对之前内容的理解。

2.1 Flink开发环境分析

2.1.1 开发工具推荐

在实战之前,需要先说明一下开发工具的问题。官方建议使用IntelliJ IDEA,因为它默认集成了Scala和Maven环境,使用更加方便,当然使用Eclipse也是可以的。

开发Flink程序时,可以使用Java或者Scala语言,个人建议使用Scala,因为使用Scala实现函数式编程会比较简洁。当然使用Java也可以,只不过实现起来代码逻辑比较笨重罢了。

在开发Flink程序的时候,建议使用Maven管理依赖。针对Maven仓库,建议使用国内镜像仓库地址,因为国外仓库下载较慢,可以使用国内阿里云的Maven仓库。

注意:如果发现依赖国内源无法下载的时候,记得切换回国外源。利用国内阿里云Maven仓库镜像进行相关配置时,需要修改$Maven_HOME/conf/settings.xml文件。

  1. <mirror>
  2. <id>aliMaven</id>
  3. <name>aliyun Maven</name>
  4. <url>http://Maven.aliyun.com/nexus/content/groups/public/</url>
  5. <mirrorOf>central</mirrorOf>
  6. </mirror>

2.1.2 Flink程序依赖配置

在使用Maven管理Flink程序相关依赖的时候,需要提前将它们配置好。对应的Maven项目创建完成以后,也需要在这个项目的pom.xml文件中进行相关配置。

使用Java语言开发Flink程序的时候需要添加以下配置。

注意:在这里使用的Flink版本是1.6.1。如果使用的是其他版本,需要到Maven仓库中查找对应版本的Maven配置。

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-java</artifactId>
  4. <version>1.6.1</version>
  5. <scope>provided</scope>
  6. </dependency>
  7. <dependency>
  8. <groupId>org.apache.flink</groupId>
  9. <artifactId>flink-streaming-java_2.11</artifactId>
  10. <version>1.6.1</version>
  11. <scope>provided</scope>
  12. </dependency>

使用Scala语言开发Flink程序的时候需要添加下面的配置。

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-scala_2.11</artifactId>
  4. <version>1.6.1</version>
  5. <scope>provided</scope>
  6. </dependency>
  7. <dependency>
  8. <groupId>org.apache.flink</groupId>
  9. <artifactId>flink-streaming-scala_2.11</artifactId>
  10. <version>1.6.1</version>
  11. <scope>provided</scope>
  12. </dependency>

注意:在IDEA等开发工具中运行代码的时候,需要把依赖配置中的scope属性注释掉。在编译打JAR包的时候,需要开启scope属性,这样最终的JAR包就不会把这些依赖包也包含进去,因为集群中本身是有Flink的相关依赖的。

2.2 Flink程序开发步骤

开发Flink程序有固定的流程。

(1)获得一个执行环境。

(2)加载/创建初始化数据。

(3)指定操作数据的Transaction算子。

(4)指定计算好的数据的存放位置。

(5)调用execute()触发执行程序。

注意:Flink程序是延迟计算的,只有最后调用execute()方法的时候才会真正触发执行程序。

延迟计算的好处:你可以开发复杂的程序,Flink会将这个复杂的程序转成一个Plan,并将Plan作为一个整体单元执行!

在这里,提前创建一个Flink的Maven项目,起名为FlinkExample,效果如图2.1所示。

 

Flink快速入门

图2.1 项目目录

后面的Java代码全部存放在src/main/Java目录下,Scala代码全部存放在src/main/Scala目录下,流计算相关的代码存放在对应的streaming目录下,批处理相关的代码则存放在对应的batch目录下。

2.3 Flink流处理(Streaming)案例开发

需求分析:通过Socket手工实时产生一些单词,使用Flink实时接收数据,对指定时间窗口内(如2s)的数据进行聚合统计,并且把时间窗口内计算的结果打印出来。

2.3.1 Java代码开发

首先添加Java代码对应的Maven依赖,参考2.1.2节的内容。注意,在下面的代码中,我们会创建一个WordWithCount类,这个类主要是为了方便统计每个单词出现的总次数。

需求:实现每隔1s对最近2s内的数据进行汇总计算。

分析:通过Socket模拟产生单词,使用Flink程序对数据进行汇总计算。

代码实现如下。

  1. package xuwei.tech.streaming;
  2.  
  3. import org.apache.Flink.api.common.functions.FlatMapFunction;
  4. import org.apache.Flink.api.Java.utils.ParameterTool;
  5. import org.apache.Flink.contrib.streaming.state.RocksDBStateBackend;
  6. import org.apache.Flink.runtime.state.filesystem.FsStateBackend;
  7. import org.apache.Flink.runtime.state.memory.MemoryStateBackend;
  8. import org.apache.Flink.streaming.api.DataStream.DataStream;
  9. import org.apache.Flink.streaming.api.DataStream.DataStreamSource;
  10. import org.apache.Flink.streaming.api.environment.StreamExecutionEnvironment;
  11. import org.apache.Flink.streaming.api.windowing.time.Time;
  12. import org.apache.Flink.util.Collector;
  13.  
  14. /**
  15. * 单词计数之滑动窗口计算
  16. *
  17. * Created by xuwei.tech
  18. */
  19. public class SocketWindowWordCountJava {
  20.  
  21. public static void main(String[] args) throws Exception{
  22. //获取需要的端口号
  23. int port;
  24. try {
  25. ParameterTool parameterTool = ParameterTool.fromArgs(args);
  26. port = parameterTool.getInt("port");
  27. }catch (Exception e){
  28. System.err.println("No port set. use default port 9000--Java");
  29. port = 9000;
  30. }
  31.  
  32. //获取Flink的运行环境
  33. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  34.  
  35. String hostname = "hadoop100";
  36. String delimiter = "\n";
  37. //连接Socket获取输入的数据
  38. DataStreamSource<String> text = env.socketTextStream(hostname, port, delimiter);
  39.  
  40. // a a c
  41.  
  42. // a 1
  43. // a 1
  44. // c 1
  45. DataStream<WordWithCount> windowCounts = text.flatMap(new FlatMapFunction
  46. <String, WordWithCount>() {
  47. public void flatMap(String value, Collector<WordWithCount> out) throws
  48. Exception {
  49. String[] splits = value.split("\\\s");
  50. for (String word : splits) {
  51. out.collect(new WordWithCount(word, 1L));
  52. }
  53. }
  54. }).keyBy("word")
  55. .timeWindow(Time.seconds(2), Time.seconds(1))//指定时间窗口大小为2s,指定时间间隔为1s
  56. .sum("count");//在这里使用sum或者reduce都可以
  57. /*.reduce(new ReduceFunction<WordWithCount>() {
  58. public WordWithCount reduce(WordWithCount a,
  59. WordWithCount b) throws Exception {
  60.  
  61. return new WordWithCount(a.word,a.count+b.count);
  62. }
  63. })*/
  64. //把数据打印到控制台并且设置并行度
  65. windowCounts.print().setParallelism(1);
  66. //这一行代码一定要实现,否则程序不执行
  67. env.execute("Socket window count");
  68.  
  69. }
  70.  
  71. public static class WordWithCount{
  72. public String word;
  73. public long count;
  74. public WordWithCount(){}
  75. public WordWithCount(String word,long count){
  76. this.word = word;
  77. this.count = count;
  78. }
  79. @Override
  80. public String toString() {
  81. return "WordWithCount{" +
  82. "word='" + word + '\'' +
  83. ", count=" + count +
  84. '}';
  85. }
  86. }
  87. }

2.3.2 Scala代码开发

首先添加Scala代码对应的Maven依赖,参考2.1.2节的内容。在这里通过case class的方式在Scala中创建一个类。

需求:实现每隔1s对最近2s内的数据进行汇总计算。

分析:通过Socket模拟产生单词,使用Flink程序对数据进行汇总计算。

代码实现如下。

 
  1. package xuwei.tech.streaming
  2.  
  3. import org.apache.Flink.api.Java.utils.ParameterTool
  4. import org.apache.Flink.streaming.api.Scala.StreamExecutionEnvironment
  5. import org.apache.Flink.streaming.api.windowing.time.Time
  6.  
  7. /**
  8. * 单词计数之滑动窗口计算
  9. *
  10. * Created by xuwei.tech
  11. */
  12. object SocketWindowWordCountScala {
  13.  
  14. def main(args: Array[String]): Unit = {
  15.  
  16. //获取Socket端口号
  17. val port: Int = try {
  18. ParameterTool.fromArgs(args).getInt("port")
  19. }catch {
  20. case e: Exception => {
  21. System.err.println("No port set. use default port 9000--Scala")
  22. }
  23. 9000
  24. }
  25.  
  26. //获取运行环境
  27. val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  28.  
  29. //连接Socket获取输入数据
  30. val text = env.socketTextStream("hadoop100",port,'\n')
  31.  
  32. //解析数据(把数据打平),分组,窗口计算,并且聚合求sum
  33.  
  34. //注意:必须要添加这一行隐式转行,否则下面的FlatMap方法执行会报错
  35. import org.apache.Flink.api.Scala._
  36.  
  37. val windowCounts = text.flatMap(line => line.split("\\\s"))//打平,把每一行单词都切开
  38. .map(w => WordWithCount(w,1))//把单词转成word , 1这种形式
  39. .keyBy("word")//分组
  40. .timeWindow(Time.seconds(2),Time.seconds(1))//指定窗口大小,指定间隔时间
  41. .sum("count");// sum或者reduce都可以
  42. //.reduce((a,b)=>WordWithCount(a.word,a.count+b.count))
  43.  
  44. //打印到控制台
  45. windowCounts.print().setParallelism(1);
  46.  
  47. //执行任务
  48. env.execute("Socket window count");
  49.  
  50. }
  51.  
  52. case class WordWithCount(word: String,count: Long)
  53.  
  54. }

2.3.3 执行程序

在前面的案例代码中指定hostname为hadoop100,port默认为9000,表示流处理程序默认监听这个主机的9000端口。因此在执行程序之前,需要先在hadoop100这个节点上面监听这个端口,通过执行下面命令实现。

  1. [[email protected] soft]# nc -l 9000
  2. a
  3. b
  4. a

然后在IDEA中运行编写完成的程序代码,结果如下。

  1. WordWithCount{word='a', count=1}
  2. WordWithCount{word='b', count=1}
  3. WordWithCount{word='a', count=2}
  4. WordWithCount{word='b', count=1}
  5. WordWithCount{word='a', count=1}

2.4 Flink批处理(Batch)案例开发

前面使用Flink实现了一个典型的流式计算案例,下面来看一下Flink的另一个应用场景——Batch离线批处理。

2.4.1 Java代码开发

需求:统计一个文件中的单词出现的总次数,并且把结果存储到文件中。

Java代码实现如下。

  1. package xuwei.tech.batch;
  2.  
  3. import org.apache.Flink.api.common.functions.FlatMapFunction;
  4. import org.apache.Flink.api.Java.DataSet;
  5. import org.apache.Flink.api.Java.ExecutionEnvironment;
  6. import org.apache.Flink.api.Java.operators.DataSource;
  7. import org.apache.Flink.api.Java.tuple.Tuple2;
  8. import org.apache.Flink.util.Collector;
  9.  
  10. /**
  11. *单词计数之离线计算
  12. *
  13. * Created by xuwei.tech
  14. */
  15. public class BatchWordCountJava {
  16.  
  17. public static void main(String[] args) throws Exception{
  18. String inputPath = "D:\\\data\\\file";
  19. String outPath = "D:\\\data\\\result";
  20.  
  21. //获取运行环境
  22. ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  23. //获取文件中的内容
  24. DataSource<String> text = env.readTextFile(inputPath);
  25.  
  26. DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).groupBy(0).sum(1);
  27. counts.writeAsCsv(outPath,"\n"," ").setParallelism(1);
  28. env.execute("batch word count");
  29.  
  30. }
  31.  
  32. public static class Tokenizer implements FlatMapFunction<String,Tuple2<String,
  33. Integer>>{
  34. public void flatMap(String value, Collector<Tuple2<String, Integer>> out)
  35. throws Exception {
  36. String[] tokens = value.toLowerCase().split("\\\W+");
  37. for (String token: tokens) {
  38. if(token.length()>0){
  39. out.collect(new Tuple2<String, Integer>(token,1));
  40. }
  41. }
  42. }
  43. }
  44. }

2.4.2 Scala代码开发

需求:统计一个文件中的单词出现的总次数,并且把结果存储到文件中。

Scala代码实现如下。

  1. package xuwei.tech.batch
  2.  
  3. import org.apache.Flink.api.Scala.ExecutionEnvironment
  4.  
  5. /**
  6. * 单词计数之离线计算
  7. * Created by xuwei.tech
  8. */
  9. object BatchWordCountScala {
  10.  
  11. def main(args: Array[String]): Unit = {
  12. val inputPath = "D:\\\data\\\file"
  13. val outPut = "D:\\\data\\\result"
  14.  
  15. val env = ExecutionEnvironment.getExecutionEnvironment
  16. val text = env.readTextFile(inputPath)
  17.  
  18. //引入隐式转换
  19. import org.apache.Flink.api.Scala._
  20.  
  21. val counts = text.flatMap(_.toLowerCase.split("\\\W+"))
  22. .filter(_.nonEmpty)
  23. .map((_,1))
  24. .groupBy(0)
  25. .sum(1)
  26. counts.writeAsCsv(outPut,"\n"," ").setParallelism(1)
  27. env.execute("batch word count")
  28. }
  29.  
  30. }

2.4.3 执行程序

首先,代码中指定的inputPath是D:\\\data\\\file目录,我们需要在这个目录下面创建一些文件,并在文件中输入一些单词。

  1. D:\data\file>dir
  2. 2018/03/20 09:01 24 a.txt
  3. D:\data\file>type a.txt
  4. hello a hello b
  5. hello a

然后,在IDEA中运行程序代码,产生的结果会被存储到outPut指定的D:\\\data\\\result文件中。

  1. D:\data>type result
  2. hello 3
  3. b 1
  4. a 2

本文摘自刚刚上架的《Flink入门与实战》徐葳 著

Flink快速入门

  • 这是一本Flink入门级图书,力求详细而完整地描述Flink基础理论与实际操作。
  • 采用Flink 1.6版本写作,案例丰富实用,做到学以致用。
  • 细节与案例兼顾,深入浅出展现Flink技术精髓。
  • 51CTO热门网课配套教材,可与网课结合学习,快速提升大数据开发技能。

本书旨在帮助读者从零开始快速掌握Flink的基本原理与核心功能。本书首先介绍了Flink的基本原理和安装部署,并对Flink中的一些核心API进行了详细分析。然后配套对应的案例分析,分别使用Java代码和Scala代码实现案例。最后通过两个项目演示了Flink在实际工作中的一些应用场景,帮助读者快速掌握Flink开发。
学习本书需要大家具备一些大数据的基础知识,比如Hadoop、Kafka、Redis、Elasticsearch等框架的基本安装和使用。本书也适合对大数据实时计算感兴趣的读者阅读。