Flink代码kill掉yarn任务,并且实现自动savepoint。
一,最近在做平台,就是前后端分离的项目,简单的说就是对各种组件整合一下子,所以呢,提交任务啥的都在平台上搞了。
二,这里实现的功能很简单吧。就是代码模式,执行任务就可以kill掉yarn上的Flink任务。并且能自动生成savapoint
三,我们需要写入的参数是:
1)yarn 任务id
String appId = "application_1600222031782_0023";
2)Flink任务的jobId
String jobid = "c4d7e2ff6a35d402eaf54b9f9ca0f6c6";
3)需要savapoint地址
String savePoint = "hdfs://dev-ct6-dc-master01:8020/flink-savepoints5";
pom依赖:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-yarn_${scala.binary.version}</artifactId> <version>${flink.version}</version> </dependency>
如果不成功,执行任务的时候加上hadoop_home的环境变量(下面只是参考)
四,代码
import org.apache.flink.api.common.JobID; import org.apache.flink.client.cli.CliArgsException; import org.apache.flink.client.program.ClusterClient; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.util.FlinkException; import org.apache.flink.yarn.YarnClusterClientFactory; import org.apache.flink.yarn.YarnClusterDescriptor; import org.apache.flink.yarn.configuration.YarnConfigOptions; import org.apache.hadoop.yarn.api.records.ApplicationId; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; public class StopYarnJob { public static void main(String[] args) throws FlinkException, CliArgsException, ExecutionException, InterruptedException { String hadoop_home = System.getProperty("HADOOP_HOME"); System.out.println("hadoop_home = " + hadoop_home); String configurationDirectory = "G:/flink_working_tools/yarn-conf"; String appId = "application_1600222031782_0023"; String jobid = "c4d7e2ff6a35d402eaf54b9f9ca0f6c6"; String savePoint = "hdfs://dev-ct6-dc-master01:8020/flink-savepoints5"; //获取flink的配置 Configuration flinkConfiguration = GlobalConfiguration.loadConfiguration( configurationDirectory); // Configuration flinkConfiguration = new Configuration(); flinkConfiguration.set(YarnConfigOptions.APPLICATION_ID, appId); YarnClusterClientFactory clusterClientFactory = new YarnClusterClientFactory(); ApplicationId applicationId = clusterClientFactory.getClusterId(flinkConfiguration); if (applicationId == null) { throw new FlinkException( "No cluster id was specified. Please specify a cluster to which you would like to connect."); } YarnClusterDescriptor clusterDescriptor = clusterClientFactory .createClusterDescriptor( flinkConfiguration); ClusterClient<ApplicationId> clusterClient = clusterDescriptor.retrieve( applicationId).getClusterClient(); JobID jobID = parseJobId(jobid); CompletableFuture<String> completableFuture = clusterClient.stopWithSavepoint( jobID, true, savePoint); String savepoint = completableFuture.get(); System.out.println(savepoint); } private static JobID parseJobId(String jobIdString) throws CliArgsException { if (jobIdString == null) { throw new CliArgsException("Missing JobId"); } final JobID jobId; try { jobId = JobID.fromHexString(jobIdString); } catch (IllegalArgumentException e) { throw new CliArgsException(e.getMessage()); } return jobId; } }
五,测试效果
1)我们现实拿example的案例代码,打包提交到集群
public class SocketWindowWordCount { private static final Logger logger = Logger.getLogger(SocketWindowWordCount.class); public static void main(String[] args) throws Exception { // the host and the port to connect to final String hostname; final int port; try { final ParameterTool params = ParameterTool.fromArgs(args); hostname = params.has("hostname") ? params.get("hostname") : "localhost"; port = params.getInt("port"); } catch (Exception e) { System.err.println("No port specified. Please run 'SocketWindowWordCount " + "--hostname <hostname> --port <port>', where hostname (localhost by default) " + "and port is the address of the text server"); System.err.println("To start a simple text server, run 'netcat -l <port>' and " + "type the input text into the command line"); return; } // get the execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // get input data by connecting to the socket DataStream<String> text = env.socketTextStream(hostname, port, "\n"); text.print("数据源"); // parse the data, group it, window it, and aggregate the counts DataStream<WordWithCount> windowCounts = text .flatMap(new FlatMapFunction<String, WordWithCount>() { @Override public void flatMap(String value, Collector<WordWithCount> out) { for (String word : value.split("\\s")) { out.collect(new WordWithCount(word, 1L)); } } }) .keyBy(value -> "aaa") .process(new KeyedProcessFunction<String, WordWithCount, WordWithCount>() { private transient ValueState<Long> valueState; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); ValueStateDescriptor<Long> valueStateDescriptor = new ValueStateDescriptor("totalAmountState", TypeInformation.of(new TypeHint<Long>() { })); valueState = this.getRuntimeContext().getState(valueStateDescriptor); } @Override public void processElement(WordWithCount wordWithCount, Context context, Collector<WordWithCount> out) throws Exception { Long value = valueState.value(); Long counts = wordWithCount.count; if (value != null){ System.out.println("打印内存state = "+value); logger.error("打印内存state = "+value); value = value+counts; valueState.update(value); out.collect(wordWithCount); }else { value = counts; valueState.update(value); } } }); // print the results with a single thread, rather than in parallel windowCounts.print().setParallelism(1); env.execute("Socket Window WordCount"); } // ------------------------------------------------------------------------ /** * Data type for words with count. */ public static class WordWithCount { public String word; public long count; public WordWithCount() {} public WordWithCount(String word, long count) { this.word = word; this.count = count; } @Override public String toString() { return word + " : " + count; } } }
2)运行任务脚本 :
export HADOOP_CLASSPATH=/opt/cloudera/parcels/CDH/jars/*;./bin/flink run -m yarn-cluster \
-yD yarn.containers.vcores=2 \
./examples/streaming/SocketWindowWordCount.jar --hostname 192.168.6.31 --port 12345
3)我们在linux环境 192.168.6.31节点执行 nc -l 12345
随意输入点数据
4)我们这个时候查看yarn任务打印输出:
假定state = 11
5)我们执行main方法,干掉任务,然后查看hdfs:
发现生成了文件..............
6)再次启动任务 ,输入nc -l 12345 输入一条数据,发现state打印是从上次state=12开始的,验证成功,savapoint有效果