Flink代码kill掉yarn任务,并且实现自动savepoint。

一,最近在做平台,就是前后端分离的项目,简单的说就是对各种组件整合一下子,所以呢,提交任务啥的都在平台上搞了。

二,这里实现的功能很简单吧。就是代码模式,执行任务就可以kill掉yarn上的Flink任务。并且能自动生成savapoint

三,我们需要写入的参数是:

   1)yarn 任务id 

String appId = "application_1600222031782_0023";

   2)Flink任务的jobId

String jobid = "c4d7e2ff6a35d402eaf54b9f9ca0f6c6";

  3)需要savapoint地址

String savePoint = "hdfs://dev-ct6-dc-master01:8020/flink-savepoints5";

 pom依赖:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-yarn_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>

如果不成功,执行任务的时候加上hadoop_home的环境变量(下面只是参考)

Flink代码kill掉yarn任务,并且实现自动savepoint。

四,代码

import org.apache.flink.api.common.JobID;
import org.apache.flink.client.cli.CliArgsException;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.util.FlinkException;
import org.apache.flink.yarn.YarnClusterClientFactory;
import org.apache.flink.yarn.YarnClusterDescriptor;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

 public class StopYarnJob {
    public static void main(String[] args) throws FlinkException, CliArgsException, ExecutionException, InterruptedException {
        String hadoop_home = System.getProperty("HADOOP_HOME");
        System.out.println("hadoop_home = " + hadoop_home);
        String configurationDirectory = "G:/flink_working_tools/yarn-conf";
        String appId = "application_1600222031782_0023";
        String jobid = "c4d7e2ff6a35d402eaf54b9f9ca0f6c6";
        String savePoint = "hdfs://dev-ct6-dc-master01:8020/flink-savepoints5";
        //获取flink的配置
        Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration(
                configurationDirectory);
//        Configuration flinkConfiguration = new Configuration();
        flinkConfiguration.set(YarnConfigOptions.APPLICATION_ID, appId);
        YarnClusterClientFactory clusterClientFactory = new YarnClusterClientFactory();
        ApplicationId applicationId = clusterClientFactory.getClusterId(flinkConfiguration);
        if (applicationId == null) {
            throw new FlinkException(
                    "No cluster id was specified. Please specify a cluster to which you would like to connect.");
        }

        YarnClusterDescriptor clusterDescriptor = clusterClientFactory
                .createClusterDescriptor(
                        flinkConfiguration);
        ClusterClient<ApplicationId> clusterClient = clusterDescriptor.retrieve(
                applicationId).getClusterClient();

        JobID jobID = parseJobId(jobid);
        CompletableFuture<String> completableFuture = clusterClient.stopWithSavepoint(
                jobID,
                true,
                savePoint);

        String savepoint = completableFuture.get();
        System.out.println(savepoint);
    }

    private static JobID parseJobId(String jobIdString) throws CliArgsException {
        if (jobIdString == null) {
            throw new CliArgsException("Missing JobId");
        }

        final JobID jobId;
        try {
            jobId = JobID.fromHexString(jobIdString);
        } catch (IllegalArgumentException e) {
            throw new CliArgsException(e.getMessage());
        }
        return jobId;
    }
}

五,测试效果

1)我们现实拿example的案例代码,打包提交到集群

 

public class SocketWindowWordCount {
    private static final Logger logger = Logger.getLogger(SocketWindowWordCount.class);
    public static void main(String[] args) throws Exception {

        // the host and the port to connect to
        final String hostname;
        final int port;
        try {
            final ParameterTool params = ParameterTool.fromArgs(args);
            hostname = params.has("hostname") ? params.get("hostname") : "localhost";
            port = params.getInt("port");
        } catch (Exception e) {
            System.err.println("No port specified. Please run 'SocketWindowWordCount " +
                    "--hostname <hostname> --port <port>', where hostname (localhost by default) " +
                    "and port is the address of the text server");
            System.err.println("To start a simple text server, run 'netcat -l <port>' and " +
                    "type the input text into the command line");
            return;
        }

        // get the execution environment
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // get input data by connecting to the socket
        DataStream<String> text = env.socketTextStream(hostname, port, "\n");
        text.print("数据源");
        // parse the data, group it, window it, and aggregate the counts
        DataStream<WordWithCount> windowCounts = text

                .flatMap(new FlatMapFunction<String, WordWithCount>() {
                    @Override
                    public void flatMap(String value, Collector<WordWithCount> out) {
                        for (String word : value.split("\\s")) {
                            out.collect(new WordWithCount(word, 1L));
                        }
                    }
                })

                .keyBy(value -> "aaa")
                .process(new KeyedProcessFunction<String, WordWithCount, WordWithCount>() {
                    private transient ValueState<Long> valueState;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        super.open(parameters);
                        ValueStateDescriptor<Long> valueStateDescriptor = new ValueStateDescriptor("totalAmountState",
                                TypeInformation.of(new TypeHint<Long>() {
                                }));
                        valueState = this.getRuntimeContext().getState(valueStateDescriptor);
                    }

                    @Override
                    public void processElement(WordWithCount wordWithCount, Context context, Collector<WordWithCount> out) throws Exception {
                        Long value = valueState.value();
                        Long  counts = wordWithCount.count;
                        if (value != null){
                            System.out.println("打印内存state = "+value);
                            logger.error("打印内存state = "+value);
                            value  = value+counts;
                            valueState.update(value);
                            out.collect(wordWithCount);
                        }else {
                            value  = counts;
                            valueState.update(value);
                        }

                    }
                });


        // print the results with a single thread, rather than in parallel
        windowCounts.print().setParallelism(1);

        env.execute("Socket Window WordCount");
    }

    // ------------------------------------------------------------------------

    /**
     * Data type for words with count.
     */
    public static class WordWithCount {

        public String word;
        public long count;

        public WordWithCount() {}

        public WordWithCount(String word, long count) {
            this.word = word;
            this.count = count;
        }

        @Override
        public String toString() {
            return word + " : " + count;
        }
    }
}

 

2)运行任务脚本 :

export HADOOP_CLASSPATH=/opt/cloudera/parcels/CDH/jars/*;./bin/flink run -m yarn-cluster \
-yD yarn.containers.vcores=2 \
./examples/streaming/SocketWindowWordCount.jar  --hostname 192.168.6.31 --port 12345 

 

3)我们在linux环境 192.168.6.31节点执行 nc -l 12345  

随意输入点数据

Flink代码kill掉yarn任务,并且实现自动savepoint。

 

4)我们这个时候查看yarn任务打印输出:

假定state = 11

 

5)我们执行main方法,干掉任务,然后查看hdfs:

发现生成了文件..............

Flink代码kill掉yarn任务,并且实现自动savepoint。

 

6)再次启动任务 ,输入nc -l 12345 输入一条数据,发现state打印是从上次state=12开始的,验证成功,savapoint有效果

Flink代码kill掉yarn任务,并且实现自动savepoint。