Flink Streaming 广播变量的实现

1,问题思考,在spark里面不管是离线还是实时去使用广播变量,都比较简单,通过可以广播一些静态变量,静态文件,可是在flink里面怎么使用呢?

 

2,在网上查找一个代码 跟本地实践,Flink 可以有两种广播方式:

Flink Streaming 广播变量的实现

  1)广播配置文件 或者静态变量 

     我使用的是Flink的分布式缓存 具体代码如下

 

package com.coder.flink.core.cache

import java.io.{File, FileInputStream}
import java.util
import java.util.Properties

import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.commons.io.FileUtils
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.configuration.Configuration
import org.apache.flink.api.scala.ExecutionEnvironment



import org.apache.flink.api.scala._

object CacheProperties {


  def main(args: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    //在分布式缓存中将本地的文件进行注册
        env.registerCachedFile("D:/wxgz-local/resources_ceshi/too.properties", "too")

    val weight: DataSet[String] = env.fromElements("a","b")


    weight.map(new RichMapFunction[String, String] {
      override def open(parameters: Configuration): Unit = {
        super.open(parameters)
        val file: File = getRuntimeContext.getDistributedCache.getFile("too")
        val prop = new Properties

        prop.load(new FileInputStream(file))

        val value = prop.getProperty("cycle")
        println(s"value  =  ${value}")
        //        val aa: util.List[_] = FileUtils.readLines(file, "UTF-8");
        //        println(aa)

      }

      override def map(value: String): String = {
        "11"
      }
    }).print()

//    env.execute("test cache")

  }


}

 

 

2)配置流实现,这个是网上找的demo 简化了,代码如下,个人觉得没spark广播变量那么简单好用:

 

package com.coder.flink.core.broadcast;

import com.coder.flink.core.utils.RedisUtils;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.util.Collector;
import redis.clients.jedis.Jedis;

import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

/**
 * 作者(author):miao
 * 时间(date): 2019-04-01 15:17
 * 功能描述(description):使用广播流实现配置的动态更新
 */
public class BroadcastStreamDemo_test {

    public static void main(String[] args) throws Exception {

        // 构建流处理环境
        final StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();

        // 配置处理环境的并发度为4
        environment.setParallelism(4);

        final MapStateDescriptor<String, String> CONFIG_KEYWORDS = new MapStateDescriptor<>(
                "config-keywords",
                BasicTypeInfo.STRING_TYPE_INFO,
                BasicTypeInfo.STRING_TYPE_INFO);

        // 自定义广播流(单例)
        BroadcastStream<Map<String, String>> broadcastStream = environment.addSource(new RichSourceFunction<Map<String, String>>() {

            private volatile boolean isRunning = true;
            //测试数据集
            private String[] dataSet = new String[]{
                    "java",
                    "swift",
                    "php",
                    "go",
                    "python"
            };
            private volatile Map<String, String> map = new ConcurrentHashMap(16);

            /**
             * 数据源:模拟每30秒随机更新一次拦截的关键字
             * @param ctx
             * @throws Exception
             */
            @Override
            public void run(SourceContext<Map<String, String>> ctx) throws Exception {

                while (isRunning) {
                    long newTime = System.currentTimeMillis();

                    //todo 定时刷新,睡眠6秒
                    TimeUnit.SECONDS.sleep(3);
                    if (newTime % 3 == 0) {
                        map.clear();
                        map.put("A", "AAAA");
                    } else if (newTime % 3 == 1) {
                        map.clear();
                        map.put("B", "BBBB");
                    } else if (newTime % 3 == 2) {
                        map.clear();
                        map.put("C", "CCCC");
                    } else {
                        map.clear();
                        map.put("D", "DDDD");
                    }
                    ctx.collect(map);
                    System.out.println("发送的Map:" + map.toString());

                }

            }

            @Override
            public void cancel() {
                isRunning = false;
            }
        }).setParallelism(1).broadcast(CONFIG_KEYWORDS);

        // 自定义数据流(单例)
        DataStream<String> dataStream = environment.addSource(new RichSourceFunction<String>() {

            private volatile boolean isRunning = true;

            //测试数据集
            private String[] dataSet = new String[]{
                    "你喜欢武藤兰吗",
                    "永井流奈也还可以呢",
                    "青木真麻你了解过吗",
                    "坂本真綾也是很不错的呢",
                    "佐藤遥希呢,很漂亮呢"
            };

            /**
             * 模拟每3秒随机产生1条消息
             * @param ctx
             * @throws Exception
             */
            @Override
            public void run(SourceContext<String> ctx) throws Exception {
                int size = dataSet.length;
                while (isRunning) {
                    TimeUnit.SECONDS.sleep(3);
                    int seed = (int) (Math.random() * size);
                    ctx.collect(dataSet[seed]);
                    System.out.println("kafka接收数据:" + dataSet[seed]);
                }
            }

            @Override
            public void cancel() {
                isRunning = false;
            }

        }).setParallelism(1);


        // 数据流和广播流连接处理并将拦截结果打印
        dataStream.connect(broadcastStream).process(new BroadcastProcessFunction<String, Map<String, String>, String>() {


            //todo redis 发送的数据
            private   volatile  Map<String, String> keywords;

            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                //todo 给 keywords 默认值 默认去查Redis
                if(keywords ==null){
                    System.out.println("2222 = " + 2222);
                    Jedis jedis = RedisUtils.getJedis();
                    Map<String, String> resultMap = jedis.hgetAll("flink_test3");
                    System.out.println("resultMap.toString() = " + resultMap.toString());
                    keywords = resultMap ;
                }

            }

            @Override
            public void processBroadcastElement(Map<String, String> value, Context ctx, Collector<String> out) throws Exception {
                //todo 获取的Map的数据

                keywords = value;


//                System.out.println(new Date() + ",获取的Map的数据:" + value.toString());
            }

            @Override
            public void processElement(String value, ReadOnlyContext ctx, Collector<String> out) throws Exception {

                out.collect("kafka的消息:" + value + ", Map的数据:" + keywords);


            }


        }).print();

        // 懒加载执行
        environment.execute();
    }

}