mapreduce 任务提交的几种不同写法

写了几个MapReduce代码,发现有好几种提交方式,网上搜了一下,没有找到对比这几种提交写法的帖子,所以准备自己写一下。比较简单,没有什么花头

同样是新版API中,最开始接触到的都是这种最基本的提交方式

 

	public static void main(String[] args) throws Exception {

		BasicConfigurator.configure();
		Configuration conf = new Configuration();
		LOGGER.info(conf.toString());
		Job job = Job.getInstance(conf);

		job.setJarByClass(IdDataProcess.class);
		job.setMapperClass(IdDataProcess.DataMapper.class);
		job.setReducerClass(IdDataProcess.DataReducer.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);

		String uri = "hdfs://hadoop:[email protected]:9000/cf/cust";
		FileInputFormat.addInputPath(job, new Path(uri));
		Path out = new Path("./certno");
		FileOutputFormat.setOutputPath(job, out);
		FileSystem fileSystem = FileSystem.get(new URI(out.toString()), new Configuration());
		if (fileSystem.exists(out)) {
			fileSystem.delete(out, true);
		}
		boolean result = job.waitForCompletion(true);//提交任务
		Path hdfsPath = new Path("./certno/part-r-00000");
		FSDataInputStream fsDataInputStream = fileSystem.open(hdfsPath);
		OutputStream outputStream = new FileOutputStream("./data/province/result");
		IOUtils.copyBytes(fsDataInputStream, outputStream, 4096, true);

		System.exit(result ? 0 : 1);
	}

job.waitForCompletion方法后台调用的是Job.submit方法提交任务到集群执行。

后来看到很多代码是下面这种形式:

public class SortDataPreprocessor extends Configured implements Tool {
  
  @Override
  public int run(String[] args) throws Exception {
    Job job = JobBuilder.parseInputAndOutput(this, getConf(), args);
    if (job == null) {
      return -1;
    }

    job.setMapperClass(CleanerMapper.class);
    job.setOutputKeyClass(IntWritable.class);
    job.setOutputValueClass(Text.class);
    job.setNumReduceTasks(0);
    job.setOutputFormatClass(SequenceFileOutputFormat.class);
    SequenceFileOutputFormat.setCompressOutput(job, true);
    SequenceFileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
    SequenceFileOutputFormat.setOutputCompressionType(job,
        CompressionType.BLOCK);

    return job.waitForCompletion(true) ? 0 : 1;
  }
  public static void main(String[] args) throws Exception {
    int exitCode = ToolRunner.run(new SortDataPreprocessor(), args);
    System.exit(exitCode);
  }
}

其中Tool是hadoop提供的一个工具接口

/**
 * A tool interface that supports handling of generic command-line options.
 * 
 * <p><code>Tool</code>, is the standard for any Map-Reduce tool/application. 
 * The tool/application should delegate the handling of 
 * <a href="{@docRoot}/../hadoop-project-dist/hadoop-common/CommandsManual.html#Generic_Options">
 * standard command-line options</a> to {@link ToolRunner#run(Tool, String[])} 
 * and only handle its custom arguments.</p>
 * 
 * <p>Here is how a typical <code>Tool</code> is implemented:</p>
 * <p><blockquote><pre>
 *     public class MyApp extends Configured implements Tool {
 *     
 *       public int run(String[] args) throws Exception {
 *         // <code>Configuration</code> processed by <code>ToolRunner</code>
 *         Configuration conf = getConf();
 *         
 *         // Create a JobConf using the processed <code>conf</code>
 *         JobConf job = new JobConf(conf, MyApp.class);
 *         
 *         // Process custom command-line options
 *         Path in = new Path(args[1]);
 *         Path out = new Path(args[2]);
 *         
 *         // Specify various job-specific parameters     
 *         job.setJobName("my-app");
 *         job.setInputPath(in);
 *         job.setOutputPath(out);
 *         job.setMapperClass(MyMapper.class);
 *         job.setReducerClass(MyReducer.class);
 *
 *         // Submit the job, then poll for progress until the job is complete
 *         JobClient.runJob(job);
 *         return 0;
 *       }
 *       
 *       public static void main(String[] args) throws Exception {
 *         // Let <code>ToolRunner</code> handle generic command-line options 
 *         int res = ToolRunner.run(new Configuration(), new MyApp(), args);
 *         
 *         System.exit(res);
 *       }
 *     }
 * </pre></blockquote></p>
 * 
 * @see GenericOptionsParser
 * @see ToolRunner
 */
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface Tool extends Configurable {
  /**
   * Execute the command with the given arguments.
   * 
   * @param args command specific arguments.
   * @return exit code.
   * @throws Exception
   */
  int run(String [] args) throws Exception;
}

因为Tool接口继承了Configurable接口,所以在我们自己的代码中必须实现Configurable和Tool两个接口的方法。

上面例子里面是通过继承Configured类来完成对Configurable接口方法实现的。Tool接口中只有一个run方法,在上面代码中直接实现。其实最终提交job还是直接在run方法的实现逻辑里面的job.waitForCompletion(true) 来完成的。这样看起来和第一种最原始最朴实的写法是一样的,那为什么要使用Tool接口呢。

我们看到Tool接口继承了Configurable接口,所以猜测Tool接口估计是可以实现MapReduce Job的配置参数传递。

下面可以接着看一下ToolRunner类,因为我们上面代码里是用过使用ToolRunner.run方法来完成的

    public static void main(String [] args) throws Exception {
        int exitCode = ToolRunner.run(new HadoopConfDisplay(), args);
        System.exit(exitCode);
    }

看ToolRunner类,发现这个类的run方法传递了三个参数,分别是Configuration、Tool和args。

public class ToolRunner {
 
  /**
   * Runs the given <code>Tool</code> by {@link Tool#run(String[])}, after 
   * parsing with the given generic arguments. Uses the given 
   * <code>Configuration</code>, or builds one if null.
   * 
   * Sets the <code>Tool</code>'s configuration with the possibly modified 
   * version of the <code>conf</code>.  
   * 
   * @param conf <code>Configuration</code> for the <code>Tool</code>.
   * @param tool <code>Tool</code> to run.
   * @param args command-line arguments to the tool.
   * @return exit code of the {@link Tool#run(String[])} method.
   */
  public static int run(Configuration conf, Tool tool, String[] args) 
    throws Exception{
    if(conf == null) {
      conf = new Configuration();
    }
    GenericOptionsParser parser = new GenericOptionsParser(conf, args);
    //set the configuration back, so that Tool can configure itself
    tool.setConf(conf);
    
    //get the args w/o generic hadoop args
    String[] toolArgs = parser.getRemainingArgs();
    return tool.run(toolArgs);
  }

这里调用了GenericOptionsParser,这个类实现了hadoop配置文件和参数的解析。这就为打包好的MapReduce程序提供了很大的灵活性,通用命令行可以实现下面这些内容:

mapreduce 任务提交的几种不同写法

看源码,在GenericOptionsParser类中的processGeneralOption方法,代码限定了可以支持哪些参数内容。(3.1.1的代码支持的参数已经高于上面截图了,截图是第四版的权威指南里面的)

  private void processGeneralOptions(Configuration conf,
      CommandLine line) throws IOException {
    if (line.hasOption("fs")) {
      FileSystem.setDefaultUri(conf, line.getOptionValue("fs"));
    }

    if (line.hasOption("jt")) {
      String optionValue = line.getOptionValue("jt");
      if (optionValue.equalsIgnoreCase("local")) {
        conf.set("mapreduce.framework.name", optionValue);
      }

      conf.set("yarn.resourcemanager.address", optionValue, 
          "from -jt command line option");
    }
    if (line.hasOption("conf")) {
      String[] values = line.getOptionValues("conf");
      for(String value : values) {
        conf.addResource(new Path(value));
      }
    }
    if (line.hasOption("libjars")) {
      conf.set("tmpjars", 
               validateFiles(line.getOptionValue("libjars"), conf),
               "from -libjars command line option");
      //setting libjars in client classpath
      URL[] libjars = getLibJars(conf);
      if(libjars!=null && libjars.length>0) {
        conf.setClassLoader(new URLClassLoader(libjars, conf.getClassLoader()));
        Thread.currentThread().setContextClassLoader(
            new URLClassLoader(libjars, 
                Thread.currentThread().getContextClassLoader()));
      }
    }
    if (line.hasOption("files")) {
      conf.set("tmpfiles", 
               validateFiles(line.getOptionValue("files"), conf),
               "from -files command line option");
    }
    if (line.hasOption("archives")) {
      conf.set("tmparchives", 
                validateFiles(line.getOptionValue("archives"), conf),
                "from -archives command line option");
    }
    if (line.hasOption('D')) {
      String[] property = line.getOptionValues('D');
      for(String prop : property) {
        String[] keyval = prop.split("=", 2);
        if (keyval.length == 2) {
          conf.set(keyval[0], keyval[1], "from command line");
        }
      }
    }
    conf.setBoolean("mapreduce.client.genericoptionsparser.used", true);
    
    // tokensFile
    if(line.hasOption("tokenCacheFile")) {
      String fileName = line.getOptionValue("tokenCacheFile");
      // check if the local file exists
      FileSystem localFs = FileSystem.getLocal(conf);
      Path p = localFs.makeQualified(new Path(fileName));
      if (!localFs.exists(p)) {
          throw new FileNotFoundException("File "+fileName+" does not exist.");
      }
      if(LOG.isDebugEnabled()) {
        LOG.debug("setting conf tokensFile: " + fileName);
      }
      UserGroupInformation.getCurrentUser().addCredentials(
          Credentials.readTokenStorageFile(p, conf));
      conf.set("mapreduce.job.credentials.json", p.toString(),
               "from -tokenCacheFile command line option");

    }
  }

个人理解,这样就写出来的MapReduce是一个具有较大的灵活性的工具,可以在参数里面配置具体的集群地址、各种参数和输入输出路径。

 

还有一种提交方式,是用JobControl和ControlledJob来实现的,因为稍微复杂一点的业务需求,都不是单个MapReduce可以解决的,都需要拆分多个job。涉及到多个job时,就需要考虑任务直接的依赖问题。这时最直接的想法就是用JobControl

	public static void main(String[] args) throws Exception {

		BasicConfigurator.configure();
		Configuration conf = new Configuration();
		LOGGER.info(conf.toString());

		FileSystem fileSystem;
		Path out;
		String uri;
		FSDataInputStream fsDataInputStream;
		OutputStream outputStream;
		String fileName;
		Path hdfsPath;

		// First job, extract repayment time slot
		Job job1 = Job.getInstance(conf);

		// Second job, partition data according to different time slots
		Job job2 = Job.getInstance(conf);

		// Third job, get details of premature data
		Job job3 = Job.getInstance(conf);

		// Forth job, get details of careless data
		Job job4 = Job.getInstance(conf);

		// Fifth job, get details of overdue data
		Job job5 = Job.getInstance(conf);

		/**
		 * Job control
		 */
		ControlledJob cJob1 = new ControlledJob(conf);
		cJob1.setJob(job1);
		ControlledJob cJob2 = new ControlledJob(conf);
		cJob2.setJob(job2);
		ControlledJob cJob3 = new ControlledJob(conf);
		cJob3.setJob(job3);
		ControlledJob cJob4 = new ControlledJob(conf);
		cJob4.setJob(job4);
		ControlledJob cJob5 = new ControlledJob(conf);
		cJob5.setJob(job5);

		cJob2.addDependingJob(cJob1);
		cJob3.addDependingJob(cJob2);
		cJob4.addDependingJob(cJob3);
		cJob5.addDependingJob(cJob4);

		JobControl jobControl = new JobControl("Repayment");
		jobControl.addJob(cJob1);
		jobControl.addJob(cJob2);
		jobControl.addJob(cJob3);
		jobControl.addJob(cJob4);
		jobControl.addJob(cJob5);

		Thread thread = new Thread(jobControl);
		thread.start();
		while (true) {
			if (jobControl.allFinished()) {
				LOGGER.info(jobControl.getSuccessfulJobList().toString());
				jobControl.stop();
				break;
			}
		}

这里JobControl实现了Runnable接口。job的提交逻辑在JobControl类的run方法内。最终也是调用Job类的submit方法。

参考了下面这个帖子的内容,感谢

https://blog.csdn.net/yaoyaostep/article/details/12619997