用Hadoop mapreduce将json导入到elasticserch
1.把json上传到hdfs中
2.maven依赖
<dependency><groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-hadoop</artifactId>
<version>5.5.2</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-hadoop-mr</artifactId>
<version>5.5.2</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-hadoop-hive</artifactId>
<version>5.5.2</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-hadoop-pig</artifactId>
<version>5.5.2</version>
</dependency>
<!--Lucene分词模块-->
<dependency>
<groupId>org.apache.lucene</groupId>
<artifactId>lucene-analyzers-common</artifactId>
<version>6.0.0</version>
</dependency>
<!--IK分词 -->
<dependency>
<groupId>cn.bestwu</groupId>
<artifactId>ik-analyzers</artifactId>
<version>5.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.5.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.5.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.5.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/commons-httpclient/commons-httpclient -->
<dependency>
<groupId>commons-httpclient</groupId>
<artifactId>commons-httpclient</artifactId>
<version>3.1</version>
</dependency>
3.写代码(我的Es版本是5.5.2)
public class EsHadoop {
public static class MyMapper extends Mapper<Object, Text, NullWritable, Text> {
private Text line = new Text();
public void map(Object key, Text value, Mapper<Object, Text, NullWritable, Text>.Context context) throws IOException, InterruptedException {
if(value.getLength()>0){
line.set(value);
context.write(NullWritable.get(), line);
}
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
// String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
conf.setBoolean("mapred.map.tasks.speculative.execution", false);
conf.setBoolean("mapred.reduce.tasks.speculative.execution", false);
conf.set("es.nodes", "192.168.1.158:9200");
conf.set("es.resource", "blog/****");//index/type
conf.set("es.input.json", "yes");
Job job = Job.getInstance(conf, "hadoop es write test");
job.setJarByClass(EsHadoop.class);
job.setMapperClass(EsHadoop.MyMapper.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(EsOutputFormat.class);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(Text.class);
// 设置输入路径
FileInputFormat.addInputPath(job, new Path
("hdfs://192.168.1.120:9000/data/json"));//json路径
job.waitForCompletion(true);
}
4 打jar包
因为用到了第三方jar包,所以打包的时候在jar下新建一个lib把第三方jar放进去,不然会报错
java.lang.NoClassDefFoundError: org/elasticsearch/hadoop/mr/EsOutputFormat 这个错
选中你的那个类
在jar下面新建一个lib文件夹,把第三方包加进入,就可以了
选择build执行,在out文件夹下找到你的jar包,放入到hadoop目录下
5.执行命令 hadoop jar /usr/local/hadoop-2.8.2/comspark.jar