ES-Hadoop学习笔记-初识

ES-Hadoop是连接快速查询和大数据分析的桥梁,它能够无间隙的在Hadoop和ElasticSearch上移动数据。ES Hadoop索引Hadoop数据到Elasticsearch,充分利用其查询速度,大量聚合能力来使它比以往更快,同时可以使用HDFS作为Elasticsearch长期存档。ES-Hadoop可以本地集成Hadoop生态系统上的很多流行组件,比如Spark、Hive、Pig、Storm、MapReduce等。官方有张图可以很好说明

ES-Hadoop学习笔记-初识

下面直接看一个简单的ES与Hadoop之间数据移动的实例

项目依赖的jar包如下

[plain] view plain copy
  1. <dependency>  
  2.   <groupId>org.elasticsearch</groupId>  
  3.   <artifactId>elasticsearch-hadoop</artifactId>  
  4.   <version>2.3.2</version>  
  5. </dependency>  

ElasticSearch到Hadoop最简单的实例

[java] view plain copy
  1. import java.io.IOException;  
  2.   
  3. import org.apache.hadoop.conf.Configuration;  
  4. import org.apache.hadoop.fs.Path;  
  5. import org.apache.hadoop.io.Text;  
  6. import org.apache.hadoop.mapreduce.Job;  
  7. import org.apache.hadoop.mapreduce.Mapper;  
  8. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  9. import org.apache.hadoop.util.GenericOptionsParser;  
  10. import org.elasticsearch.hadoop.mr.EsInputFormat;  
  11. import org.elasticsearch.hadoop.mr.LinkedMapWritable;  
  12. import org.slf4j.Logger;  
  13. import org.slf4j.LoggerFactory;  
  14.   
  15. public class E2HJob01 {  
  16.       
  17.     private static Logger LOG = LoggerFactory.getLogger(E2HJob01.class);  
  18.   
  19.     public static void main(String args[]) {  
  20.         try {  
  21.             Configuration conf = new Configuration();  
  22.             conf.setBoolean("mapreduce.map.speculative"false);   
  23.             conf.setBoolean("mapreduce.reduce.speculative"false);   
  24.             //ElasticSearch节点  
  25.             conf.set("es.nodes""centos.host1:9200");  
  26.             //ElaticSearch Index/Type  
  27.             conf.set("es.resource""job/51/");   
  28.             String[] oArgs = new GenericOptionsParser(conf, args).getRemainingArgs();  
  29.             if (oArgs.length != 1) {  
  30.                 LOG.error("error");  
  31.                 System.exit(2);  
  32.             }  
  33.             Job job = Job.getInstance(conf, "51JOBE2H01");  
  34.             job.setJarByClass(E2HJob01.class);  
  35.             job.setInputFormatClass(EsInputFormat.class);  
  36.             job.setMapperClass(E2HMapper01.class);  
  37.             job.setMapOutputKeyClass(Text.class);  
  38.             job.setMapOutputValueClass(LinkedMapWritable.class);  
  39.               
  40.             FileOutputFormat.setOutputPath(job, new Path(oArgs[0]));  
  41.               
  42.             System.out.println(job.waitForCompletion(true));  
  43.         } catch (Exception e) {  
  44.             LOG.error(e.getMessage(), e);  
  45.         }  
  46.     }  
  47.       
  48. }  
  49.   
  50. class E2HMapper01 extends Mapper<Text, LinkedMapWritable, Text, LinkedMapWritable> {  
  51.   
  52.     private static final Logger LOG = LoggerFactory.getLogger(E2HMapper01.class);  
  53.       
  54.     @Override  
  55.     protected void setup(Context context) throws IOException, InterruptedException {  
  56.         super.setup(context);  
  57.     }  
  58.   
  59.     @Override  
  60.     protected void map(Text key, LinkedMapWritable value, Context context)  
  61.             throws IOException, InterruptedException {  
  62.         LOG.info("key {} value {}", key, value);  
  63.         context.write(key, value);  
  64.     }  
  65.       
  66.     @Override  
  67.     protected void cleanup(Context context) throws IOException, InterruptedException {  
  68.         super.cleanup(context);  
  69.     }  
  70.       
  71. }  

hadoop jar eshadoop.jar E2HJob01 /user/data/es/job/
从hadoop上的数据文件可以看到第一列是ES的doc id,第二列是doc data
也可以添加ES查询条件,实例如下
[java] view plain copy
  1. import java.io.IOException;  
  2. import java.util.HashMap;  
  3. import java.util.Map;  
  4. import java.util.Map.Entry;  
  5.   
  6. import org.apache.commons.lang.StringUtils;  
  7. import org.apache.hadoop.conf.Configuration;  
  8. import org.apache.hadoop.fs.Path;  
  9. import org.apache.hadoop.io.NullWritable;  
  10. import org.apache.hadoop.io.Text;  
  11. import org.apache.hadoop.io.Writable;  
  12. import org.apache.hadoop.mapreduce.Job;  
  13. import org.apache.hadoop.mapreduce.Mapper;  
  14. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  15. import org.apache.hadoop.util.GenericOptionsParser;  
  16. import org.elasticsearch.hadoop.mr.EsInputFormat;  
  17. import org.elasticsearch.hadoop.mr.LinkedMapWritable;  
  18. import org.platform.eshadoop.modules.examples.writable.JobWritable;  
  19. import org.slf4j.Logger;  
  20. import org.slf4j.LoggerFactory;  
  21.   
  22.   
  23. public class E2HJob02 {  
  24.       
  25.     private static Logger LOG = LoggerFactory.getLogger(E2HJob02.class);  
  26.   
  27.     public static void main(String args[]) {  
  28.         try {  
  29.             Configuration conf = new Configuration();  
  30.             conf.setBoolean("mapreduce.map.speculative"false);   
  31.             conf.setBoolean("mapreduce.reduce.speculative"false);   
  32.             conf.set("es.nodes""centos.host1:9200");  
  33.             conf.set("es.resource""job/51/");   
  34.             conf.set("es.query""?q=高*");    
  35.             String[] oArgs = new GenericOptionsParser(conf, args).getRemainingArgs();  
  36.             if (oArgs.length != 1) {  
  37.                 LOG.error("error");  
  38.                 System.exit(2);  
  39.             }  
  40.             Job job = Job.getInstance(conf, "51JOBE2H02");  
  41.             job.setJarByClass(E2HJob02.class);  
  42.             job.setInputFormatClass(EsInputFormat.class);  
  43.             job.setMapperClass(E2HMapper02.class);  
  44.             job.setMapOutputKeyClass(NullWritable.class);  
  45.             job.setMapOutputValueClass(JobWritable.class);  
  46.               
  47.             FileOutputFormat.setOutputPath(job, new Path(oArgs[0]));  
  48.               
  49.             System.out.println(job.waitForCompletion(true));  
  50.         } catch (Exception e) {  
  51.             LOG.error(e.getMessage(), e);  
  52.         }  
  53.     }  
  54.       
  55. }  
  56.   
  57. class E2HMapper02 extends Mapper<Text, LinkedMapWritable, NullWritable, JobWritable> {  
  58.   
  59.     private static final Logger LOG = LoggerFactory.getLogger(E2HMapper02.class);  
  60.       
  61.     @Override  
  62.     protected void setup(Context context) throws IOException, InterruptedException {  
  63.         super.setup(context);  
  64.     }  
  65.   
  66.     @Override  
  67.     protected void map(Text key, LinkedMapWritable value, Context context)  
  68.             throws IOException, InterruptedException {  
  69.         JobWritable writable = new JobWritable();  
  70.         writable.setId(key);  
  71.         Map<String, String> map = new HashMap<String, String>();  
  72.         for (Entry<Writable, Writable> entry : value.entrySet()) {  
  73.             LOG.info("key {} value {}", entry.getKey(), entry.getValue());  
  74.             map.put(entry.getKey().toString(), entry.getValue().toString());  
  75.         }  
  76.         String jobName = map.get("jobName");  
  77.         if (StringUtils.isNotBlank(jobName)) {  
  78.             writable.setJobName(new Text(jobName));  
  79.         }  
  80.         String jobUrl = map.get("jobUrl");  
  81.         if (StringUtils.isNotBlank(jobUrl)) {  
  82.             writable.setJobUrl(new Text(jobUrl));  
  83.         }  
  84.         String companyName = map.get("companyName");  
  85.         if (StringUtils.isNotBlank(companyName)) {  
  86.             writable.setCompanyName(new Text(companyName));  
  87.         }  
  88.         String companyUrl = map.get("companyUrl");  
  89.         if (StringUtils.isNotBlank(companyUrl)) {  
  90.             writable.setCompanyUrl(new Text(companyUrl));  
  91.         }  
  92.         String salary = map.get("salary");  
  93.         if (StringUtils.isNotBlank(salary)) {  
  94.             writable.setSalary(new Text(salary));  
  95.         }  
  96.         String workPlace = map.get("workPlace");  
  97.         if (StringUtils.isNotBlank(workPlace)) {  
  98.             writable.setWorkPlace(new Text(workPlace));  
  99.         }  
  100.         String contact = map.get("contact");  
  101.         if (StringUtils.isNotBlank(contact)) {  
  102.             writable.setContact(new Text(contact));  
  103.         }  
  104.         String welfare = map.get("welfare");  
  105.         if (StringUtils.isNotBlank(welfare)) {  
  106.             writable.setWelfare(new Text(welfare));  
  107.         }  
  108.         context.write(NullWritable.get(), writable);  
  109.     }  
  110.       
  111.     @Override  
  112.     protected void cleanup(Context context) throws IOException, InterruptedException {  
  113.         super.cleanup(context);  
  114.     }  
  115.       
  116. }  
[java] view plain copy
  1. import java.io.DataInput;  
  2. import java.io.DataOutput;  
  3. import java.io.IOException;  
  4.   
  5. import org.apache.hadoop.io.Text;  
  6. import org.apache.hadoop.io.Writable;  
  7.   
  8. public class JobWritable implements Writable, Cloneable {  
  9.       
  10.     private Text id = null;  
  11.       
  12.     private Text jobName = null;  
  13.       
  14.     private Text jobUrl = null;  
  15.       
  16.     private Text companyName = null;  
  17.       
  18.     private Text companyUrl = null;  
  19.       
  20.     private Text salary = null;  
  21.   
  22.     private Text workPlace = null;  
  23.   
  24.     private Text contact = null;  
  25.       
  26.     private Text welfare = null;  
  27.       
  28.     public JobWritable() {  
  29.         id = new Text();  
  30.         jobName = new Text();  
  31.         jobUrl = new Text();  
  32.         companyName = new Text();  
  33.         companyUrl = new Text();  
  34.         salary = new Text();  
  35.         workPlace = new Text();  
  36.         contact = new Text();  
  37.         welfare = new Text();  
  38.     }  
  39.   
  40.     public void readFields(DataInput dataInput) throws IOException {  
  41.         id.readFields(dataInput);  
  42.         jobName.readFields(dataInput);  
  43.         jobUrl.readFields(dataInput);  
  44.         companyName.readFields(dataInput);  
  45.         companyUrl.readFields(dataInput);  
  46.         salary.readFields(dataInput);  
  47.         workPlace.readFields(dataInput);  
  48.         contact.readFields(dataInput);  
  49.         welfare.readFields(dataInput);  
  50.     }  
  51.   
  52.     public void write(DataOutput dataOutput) throws IOException {  
  53.         id.write(dataOutput);  
  54.         jobName.write(dataOutput);  
  55.         jobUrl.write(dataOutput);  
  56.         companyName.write(dataOutput);  
  57.         companyUrl.write(dataOutput);  
  58.         salary.write(dataOutput);  
  59.         workPlace.write(dataOutput);  
  60.         contact.write(dataOutput);  
  61.         welfare.write(dataOutput);  
  62.     }  
  63.   
  64.     public Text getId() {  
  65.         return id;  
  66.     }  
  67.   
  68.     public void setId(Text id) {  
  69.         this.id = id;  
  70.     }  
  71.   
  72.     public Text getJobName() {  
  73.         return jobName;  
  74.     }  
  75.   
  76.     public void setJobName(Text jobName) {  
  77.         this.jobName = jobName;  
  78.     }  
  79.   
  80.     public Text getJobUrl() {  
  81.         return jobUrl;  
  82.     }  
  83.   
  84.     public void setJobUrl(Text jobUrl) {  
  85.         this.jobUrl = jobUrl;  
  86.     }  
  87.   
  88.     public Text getCompanyName() {  
  89.         return companyName;  
  90.     }  
  91.   
  92.     public void setCompanyName(Text companyName) {  
  93.         this.companyName = companyName;  
  94.     }  
  95.   
  96.     public Text getCompanyUrl() {  
  97.         return companyUrl;  
  98.     }  
  99.   
  100.     public void setCompanyUrl(Text companyUrl) {  
  101.         this.companyUrl = companyUrl;  
  102.     }  
  103.   
  104.     public Text getSalary() {  
  105.         return salary;  
  106.     }  
  107.   
  108.     public void setSalary(Text salary) {  
  109.         this.salary = salary;  
  110.     }  
  111.   
  112.     public Text getWorkPlace() {  
  113.         return workPlace;  
  114.     }  
  115.   
  116.     public void setWorkPlace(Text workPlace) {  
  117.         this.workPlace = workPlace;  
  118.     }  
  119.   
  120.     public Text getContact() {  
  121.         return contact;  
  122.     }  
  123.   
  124.     public void setContact(Text contact) {  
  125.         this.contact = contact;  
  126.     }  
  127.   
  128.     public Text getWelfare() {  
  129.         return welfare;  
  130.     }  
  131.   
  132.     public void setWelfare(Text welfare) {  
  133.         this.welfare = welfare;  
  134.     }  
  135.       
  136.     @Override  
  137.     public String toString() {  
  138.         return id + ":" + jobName + ":" + jobUrl + ":" + companyName + ":" + companyUrl +  
  139.                  ":" + salary + ":" + workPlace + ":" + contact + ":" + welfare;  
  140.     }  
  141.   
  142. }  
下面这个实例是每行直接以json格式存储在hadoop上

[java] view plain copy
  1. import java.io.IOException;  
  2. import java.util.HashMap;  
  3. import java.util.Map;  
  4. import java.util.Map.Entry;  
  5.   
  6. import org.apache.hadoop.conf.Configuration;  
  7. import org.apache.hadoop.fs.Path;  
  8. import org.apache.hadoop.io.NullWritable;  
  9. import org.apache.hadoop.io.Text;  
  10. import org.apache.hadoop.io.Writable;  
  11. import org.apache.hadoop.mapreduce.Job;  
  12. import org.apache.hadoop.mapreduce.Mapper;  
  13. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
  14. import org.apache.hadoop.util.GenericOptionsParser;  
  15. import org.elasticsearch.hadoop.mr.EsInputFormat;  
  16. import org.elasticsearch.hadoop.mr.LinkedMapWritable;  
  17. import org.slf4j.Logger;  
  18. import org.slf4j.LoggerFactory;  
  19.   
  20. import com.google.gson.Gson;  
  21.   
  22. public class E2HJob03 {  
  23.       
  24.     private static Logger LOG = LoggerFactory.getLogger(E2HJob03.class);  
  25.   
  26.     public static void main(String args[]) {  
  27.         try {  
  28.             Configuration conf = new Configuration();  
  29.             conf.setBoolean("mapreduce.map.speculative"false);   
  30.             conf.setBoolean("mapreduce.reduce.speculative"false);   
  31.             conf.set("es.nodes""centos.host1:9200");  
  32.             conf.set("es.resource""job/51/");   
  33.             String[] oArgs = new GenericOptionsParser(conf, args).getRemainingArgs();  
  34.             if (oArgs.length != 1) {  
  35.                 LOG.error("error");  
  36.                 System.exit(2);  
  37.             }  
  38.             Job job = Job.getInstance(conf, "51JOBE2H03");  
  39.             job.setJarByClass(E2HJob03.class);  
  40.             job.setInputFormatClass(EsInputFormat.class);  
  41.             job.setMapperClass(E2HMapper03.class);  
  42.             job.setMapOutputKeyClass(NullWritable.class);  
  43.             job.setMapOutputValueClass(Text.class);  
  44.               
  45.             FileOutputFormat.setOutputPath(job, new Path(oArgs[0]));  
  46.               
  47.             System.out.println(job.waitForCompletion(true));  
  48.         } catch (Exception e) {  
  49.             LOG.error(e.getMessage(), e);  
  50.         }  
  51.     }  
  52.       
  53. }  
  54.   
  55. class E2HMapper03 extends Mapper<Text, LinkedMapWritable, NullWritable, Text> {  
  56.   
  57.     private static final Logger LOG = LoggerFactory.getLogger(E2HMapper02.class);  
  58.       
  59.     private Gson gson = null;  
  60.       
  61.     @Override  
  62.     protected void setup(Context context) throws IOException, InterruptedException {  
  63.         super.setup(context);  
  64.         gson = new Gson();  
  65.     }  
  66.   
  67.     @Override  
  68.     protected void map(Text key, LinkedMapWritable value, Context context)  
  69.             throws IOException, InterruptedException {  
  70.         JobInfo jobInfo = new JobInfo();  
  71.         jobInfo.setId(key.toString());  
  72.         Map<String, String> map = new HashMap<String, String>();  
  73.         for (Entry<Writable, Writable> entry : value.entrySet()) {  
  74.             LOG.info("key {} value {}", entry.getKey(), entry.getValue());  
  75.             map.put(entry.getKey().toString(), entry.getValue().toString());  
  76.         }  
  77.         jobInfo.setJobName(map.get("jobName"));  
  78.         jobInfo.setJobUrl(map.get("jobUrl"));  
  79.         jobInfo.setCompanyName(map.get("companyName"));  
  80.         jobInfo.setCompanyUrl(map.get("companyUrl"));  
  81.         jobInfo.setSalary(map.get("salary"));  
  82.         jobInfo.setWorkPlace(map.get("workPlace"));  
  83.         jobInfo.setContact(map.get("contact"));  
  84.         jobInfo.setWelfare(map.get("welfare"));  
  85.         context.write(NullWritable.get(), new Text(gson.toJson(jobInfo)));  
  86.     }  
  87.       
  88.     @Override  
  89.     protected void cleanup(Context context) throws IOException, InterruptedException {  
  90.         super.cleanup(context);  
  91.     }  
  92.       
  93. }  
  94.   
  95. class JobInfo {  
  96.       
  97.     private String id = null;  
  98.       
  99.     private String jobName = null;  
  100.       
  101.     private String jobUrl = null;  
  102.       
  103.     private String companyName = null;  
  104.       
  105.     private String companyUrl = null;  
  106.       
  107.     private String salary = null;  
  108.   
  109.     private String workPlace = null;  
  110.   
  111.     private String contact = null;  
  112.       
  113.     private String welfare = null;  
  114.   
  115.     public String getId() {  
  116.         return id;  
  117.     }  
  118.   
  119.     public void setId(String id) {  
  120.         this.id = id;  
  121.     }  
  122.   
  123.     public String getJobName() {  
  124.         return jobName;  
  125.     }  
  126.   
  127.     public void setJobName(String jobName) {  
  128.         this.jobName = jobName;  
  129.     }  
  130.   
  131.     public String getJobUrl() {  
  132.         return jobUrl;  
  133.     }  
  134.   
  135.     public void setJobUrl(String jobUrl) {  
  136.         this.jobUrl = jobUrl;  
  137.     }  
  138.   
  139.     public String getCompanyName() {  
  140.         return companyName;  
  141.     }  
  142.   
  143.     public void setCompanyName(String companyName) {  
  144.         this.companyName = companyName;  
  145.     }  
  146.   
  147.     public String getCompanyUrl() {  
  148.         return companyUrl;  
  149.     }  
  150.   
  151.     public void setCompanyUrl(String companyUrl) {  
  152.         this.companyUrl = companyUrl;  
  153.     }  
  154.   
  155.     public String getSalary() {  
  156.         return salary;  
  157.     }  
  158.   
  159.     public void setSalary(String salary) {  
  160.         this.salary = salary;  
  161.     }  
  162.   
  163.     public String getWorkPlace() {  
  164.         return workPlace;  
  165.     }  
  166.   
  167.     public void setWorkPlace(String workPlace) {  
  168.         this.workPlace = workPlace;  
  169.     }  
  170.   
  171.     public String getContact() {  
  172.         return contact;  
  173.     }  
  174.   
  175.     public void setContact(String contact) {  
  176.         this.contact = contact;  
  177.     }  
  178.   
  179.     public String getWelfare() {  
  180.         return welfare;  
  181.     }  
  182.   
  183.     public void setWelfare(String welfare) {  
  184.         this.welfare = welfare;  
  185.     }  
  186.   
  187. }  

接下来的实例是将hadoop上的数据移动到ElasticSearch上索引,这里直接用上面存储的JSON数据试验

[java] view plain copy
  1. import java.io.IOException;  
  2.   
  3. import org.apache.hadoop.conf.Configuration;  
  4. import org.apache.hadoop.fs.Path;  
  5. import org.apache.hadoop.io.LongWritable;  
  6. import org.apache.hadoop.io.NullWritable;  
  7. import org.apache.hadoop.io.Text;  
  8. import org.apache.hadoop.io.Writable;  
  9. import org.apache.hadoop.mapreduce.Job;  
  10. import org.apache.hadoop.mapreduce.Mapper;  
  11. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
  12. import org.apache.hadoop.util.GenericOptionsParser;  
  13. import org.elasticsearch.hadoop.mr.EsOutputFormat;  
  14. import org.slf4j.Logger;  
  15. import org.slf4j.LoggerFactory;  
  16.   
  17. public class H2EJob {  
  18.       
  19.     private static Logger LOG = LoggerFactory.getLogger(H2EJob.class);  
  20.   
  21.     public static void main(String args[]) {  
  22.         try {  
  23.             Configuration conf = new Configuration();  
  24.             conf.setBoolean("mapreduce.map.speculative"false);   
  25.             conf.setBoolean("mapreduce.reduce.speculative"false);   
  26.             conf.set("es.nodes""centos.host1:9200");  
  27.             conf.set("es.resource""job1/51");   
  28.             //Hadoop上的数据格式为JSON,可以直接导入  
  29.             conf.set("es.input.json""yes");  
  30.             String[] oArgs = new GenericOptionsParser(conf, args).getRemainingArgs();  
  31.             if (oArgs.length != 1) {  
  32.                 LOG.error("error");  
  33.                 System.exit(2);  
  34.             }  
  35.             Job job = Job.getInstance(conf, "51JOBH2E");  
  36.             job.setJarByClass(H2EJob.class);  
  37.             job.setMapperClass(H2EMapper.class);  
  38.             job.setMapOutputKeyClass(NullWritable.class);  
  39.             job.setMapOutputValueClass(Text.class);  
  40.             job.setOutputFormatClass(EsOutputFormat.class);  
  41.               
  42.             FileInputFormat.addInputPath(job, new Path(oArgs[0]));  
  43.               
  44.             System.out.println(job.waitForCompletion(true));  
  45.         } catch (Exception e) {  
  46.             LOG.error(e.getMessage(), e);  
  47.         }  
  48.     }  
  49.       
  50. }  
  51.   
  52. class H2EMapper extends Mapper<LongWritable, Text, NullWritable, Text> {  
  53.   
  54.     @Override  
  55.     protected void setup(Context context) throws IOException, InterruptedException {  
  56.         super.setup(context);  
  57.     }  
  58.   
  59.     @Override  
  60.     public void run(Context context) throws IOException, InterruptedException {  
  61.         super.run(context);  
  62.     }  
  63.   
  64.     @Override  
  65.     protected void map(LongWritable key, Text value, Context context)  
  66.             throws IOException, InterruptedException {  
  67.         context.write(NullWritable.get(), value);  
  68.     }  
  69.   
  70.     @Override  
  71.     protected void cleanup(Context context) throws IOException,InterruptedException {  
  72.         super.cleanup(context);  
  73.     }  
  74.       
  75. }  

执行hadoop jar eshadoop.jar H2EJob /user/data/es/job后,可以在ES上看到数据已经索引过来。