单节点群集在多节点群集上工作?

问题描述:

我已经为单机群集编写mapreduce java代码而不使用Tool,它可以在多节点群集上工作,还是需要进行更改? 下面的代码标记化的字符串并计算每个文本文件单节点群集在多节点群集上工作?

public class tr 
    { 
    public static class Map extends MapReduceBase implements Mapper<LongWritable, Text,Text,IntWritable> 
    { 
     Text word=new Text(); 
     IntWritable one=new IntWritable(1); 
       String imptoken; 
     public static List<String> stopwords=new ArrayList<String>(); 
     public void map(LongWritable key,Text value,OutputCollector<Text,IntWritable> output,Reporter reporter) throws IOException 
     { 
         addwords(); 
       String line=value.toString(); 
         line=line.replaceAll("[^A-Za-z]"," ").toLowerCase(); 
         StringTokenizer st=new StringTokenizer(line); 
      while(st.hasMoreTokens()) 
     { 
       imptoken=st.nextToken(); 
          if(stopwords.contains(imptoken)) 
       { 

       }    
       else 
       { 
        word.set(imptoken); 
        output.collect(word, one); 
       }        
        } 
    } 
      public void addwords() throws IOException 
     { 
    FileSystem fs = FileSystem.get(new Configuration()); 
    Path stop=new Path("/user/hduser/stopword.txt"); 
    BufferedReader br=new BufferedReader(new InputStreamReader(fs.open(stop))); 
    String stopword=br.readLine(); 
    while(stopword!=null) 
    { 
     stopwords.add(stopword); 
     stopword=br.readLine(); 
    } 

     } 

} 
public static class Reduce extends MapReduceBase implements Reducer<Text,IntWritable, Text, IntWritable> 
{ 
    public void reduce(Text key,Iterator<IntWritable> value,OutputCollector<Text,IntWritable> output,Reporter reporter) throws IOException 
    { 
     int sum=0; 
     while(value.hasNext()) 
     { 
      sum=sum+value.next().get(); 
     } 
        /* Path paths=new Path("/user/hduser/input1/"); 
     FileSystem fs=FileSystem.get(new Configuration()); 
     FileStatus[] status = fs.listStatus(paths); 
        Path[] list = FileUtil.stat2Paths(status); 
        String keystr=key.toString(); 
        for(Path file : list) 
        { 
         BufferedReader br=new BufferedReader(new InputStreamReader(fs.open(file))); 
       String word=br.readLine(); 
       while(word!=null) 
       { 
       if(word.equals(keystr)) 
       { 
        sum=0; 
       } 
       word=br.readLine(); 
       } 

        }*/ 

        output.collect(key, new IntWritable(sum)); 
      }  
    } 

public static void main(String args[]) throws IOException 
{    
    FileSystem fs = FileSystem.get(new Configuration()); 
    Path[] paths = new Path[args.length]; 
    for (int i = 0; i < paths.length; i++) 
    { 
     paths[i] = new Path(args[i]); 
    } 

    FileStatus[] status = fs.listStatus(paths); 
      Path[] listedPaths = FileUtil.stat2Paths(status); 

    FSDataInputStream in = null; 
    for (Path p : listedPaths) 
    { 
     JobConf conf = new JobConf(tr.class); 
      conf.setJobName("tr"); 

      conf.setOutputKeyClass(Text.class); 
      conf.setOutputValueClass(IntWritable.class); 

      conf.setMapperClass(Map.class); 
      conf.setCombinerClass(Reduce.class); 
      conf.setReducerClass(Reduce.class); 

      conf.setInputFormat(TextInputFormat.class); 
        conf.setOutputFormat(TextOutputFormat.class); 

      String name=p.getName(); 
      String absolutepath=p.getParent().toString()+"/"+name; 

      FileInputFormat.setInputPaths(conf, new Path(absolutepath)); 
        FileOutputFormat.setOutputPath(conf, new Path(args[1])); 

        JobClient.runJob(conf); 

        Path local=new Path("/home/hduser/meproj/projectfiles/"); 
        Path source=new Path(args[1]+"/"+"part-00000"); 

        fs.copyToLocalFile(source, local); 

        File file=new File("/home/hduser/meproj/projectfiles/part-00000"); 
        file.renameTo(new File("/home/hduser/meproj/projectfiles/"+name)); 
        fs.delete(new Path(args[1]), true); 
    } 
} 


} 
+0

它应该这样做,除非你已经明确地编码它,否则它不会。为什么你使用旧的hadoop API有什么特别的原因吗? – Quetzalcoatl 2013-04-30 16:17:42

+0

我是hadoop的新手,我还没有正确理解MapReduce编程,我不知道如何在语料库中为每个文件运行MapReduce程序,因此可以告诉我在单节点和多节点集群上为每个文件实施MapReduce程序的替代方法 – user2200278 2013-04-30 16:32:26

词频当你写使用它会为所有群集设置工作,除非你是专门做一些突破,像在本地工作的Hadoop程序文件在一台机器上。

您正以独立于设置的方式(您应该这样做)在Mapper和Reducer中完成工作,因此它应该可以在任何地方工作。

这与您的问题无关,但您不应该循环遍历文件并在每个路径上运行独立的作业。真的,你应该对所有这些人进行一次工作。您可以将所有这些单独的路径放在同一个文件夹中,并将该文件夹指定为输入。或者你可以在多条路径上运行hadoop(参见answer