Flink Streaming 广播变量的实现
1,问题思考,在spark里面不管是离线还是实时去使用广播变量,都比较简单,通过可以广播一些静态变量,静态文件,可是在flink里面怎么使用呢?
2,在网上查找一个代码 跟本地实践,Flink 可以有两种广播方式:
1)广播配置文件 或者静态变量
我使用的是Flink的分布式缓存 具体代码如下
package com.coder.flink.core.cache import java.io.{File, FileInputStream} import java.util import java.util.Properties import org.apache.flink.api.common.functions.RichMapFunction import org.apache.commons.io.FileUtils import org.apache.flink.api.common.functions.RichMapFunction import org.apache.flink.configuration.Configuration import org.apache.flink.api.scala.ExecutionEnvironment import org.apache.flink.api.scala._ object CacheProperties { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment //在分布式缓存中将本地的文件进行注册 env.registerCachedFile("D:/wxgz-local/resources_ceshi/too.properties", "too") val weight: DataSet[String] = env.fromElements("a","b") weight.map(new RichMapFunction[String, String] { override def open(parameters: Configuration): Unit = { super.open(parameters) val file: File = getRuntimeContext.getDistributedCache.getFile("too") val prop = new Properties prop.load(new FileInputStream(file)) val value = prop.getProperty("cycle") println(s"value = ${value}") // val aa: util.List[_] = FileUtils.readLines(file, "UTF-8"); // println(aa) } override def map(value: String): String = { "11" } }).print() // env.execute("test cache") } }
2)配置流实现,这个是网上找的demo 简化了,代码如下,个人觉得没spark广播变量那么简单好用:
package com.coder.flink.core.broadcast; import com.coder.flink.core.utils.RedisUtils; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.BroadcastStream; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.util.Collector; import redis.clients.jedis.Jedis; import java.util.Date; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; /** * 作者(author):miao * 时间(date): 2019-04-01 15:17 * 功能描述(description):使用广播流实现配置的动态更新 */ public class BroadcastStreamDemo_test { public static void main(String[] args) throws Exception { // 构建流处理环境 final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); // 配置处理环境的并发度为4 environment.setParallelism(4); final MapStateDescriptor<String, String> CONFIG_KEYWORDS = new MapStateDescriptor<>( "config-keywords", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO); // 自定义广播流(单例) BroadcastStream<Map<String, String>> broadcastStream = environment.addSource(new RichSourceFunction<Map<String, String>>() { private volatile boolean isRunning = true; //测试数据集 private String[] dataSet = new String[]{ "java", "swift", "php", "go", "python" }; private volatile Map<String, String> map = new ConcurrentHashMap(16); /** * 数据源:模拟每30秒随机更新一次拦截的关键字 * @param ctx * @throws Exception */ @Override public void run(SourceContext<Map<String, String>> ctx) throws Exception { while (isRunning) { long newTime = System.currentTimeMillis(); //todo 定时刷新,睡眠6秒 TimeUnit.SECONDS.sleep(3); if (newTime % 3 == 0) { map.clear(); map.put("A", "AAAA"); } else if (newTime % 3 == 1) { map.clear(); map.put("B", "BBBB"); } else if (newTime % 3 == 2) { map.clear(); map.put("C", "CCCC"); } else { map.clear(); map.put("D", "DDDD"); } ctx.collect(map); System.out.println("发送的Map:" + map.toString()); } } @Override public void cancel() { isRunning = false; } }).setParallelism(1).broadcast(CONFIG_KEYWORDS); // 自定义数据流(单例) DataStream<String> dataStream = environment.addSource(new RichSourceFunction<String>() { private volatile boolean isRunning = true; //测试数据集 private String[] dataSet = new String[]{ "你喜欢武藤兰吗", "永井流奈也还可以呢", "青木真麻你了解过吗", "坂本真綾也是很不错的呢", "佐藤遥希呢,很漂亮呢" }; /** * 模拟每3秒随机产生1条消息 * @param ctx * @throws Exception */ @Override public void run(SourceContext<String> ctx) throws Exception { int size = dataSet.length; while (isRunning) { TimeUnit.SECONDS.sleep(3); int seed = (int) (Math.random() * size); ctx.collect(dataSet[seed]); System.out.println("kafka接收数据:" + dataSet[seed]); } } @Override public void cancel() { isRunning = false; } }).setParallelism(1); // 数据流和广播流连接处理并将拦截结果打印 dataStream.connect(broadcastStream).process(new BroadcastProcessFunction<String, Map<String, String>, String>() { //todo redis 发送的数据 private volatile Map<String, String> keywords; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); //todo 给 keywords 默认值 默认去查Redis if(keywords ==null){ System.out.println("2222 = " + 2222); Jedis jedis = RedisUtils.getJedis(); Map<String, String> resultMap = jedis.hgetAll("flink_test3"); System.out.println("resultMap.toString() = " + resultMap.toString()); keywords = resultMap ; } } @Override public void processBroadcastElement(Map<String, String> value, Context ctx, Collector<String> out) throws Exception { //todo 获取的Map的数据 keywords = value; // System.out.println(new Date() + ",获取的Map的数据:" + value.toString()); } @Override public void processElement(String value, ReadOnlyContext ctx, Collector<String> out) throws Exception { out.collect("kafka的消息:" + value + ", Map的数据:" + keywords); } }).print(); // 懒加载执行 environment.execute(); } }