ElasticSearch Document API
删除索引库
可以看到id为1的索引库不见了
这里要修改下配置文件
slave1,slave2也做同样的操作,在这里就不多赘述了。
这个时候记得要重启elasticseach才能生效,怎么重启这里就不多说了
运行程序
这个函数的意思是如果文件存在就更新,不存在就创建
第一次执行下来
第二次执行(因为文件已经存在了,所以就把里面的内容更新)
这个是批量操作,来获取多条索引
添加两个删除一个
1 public void test13() throws IOException, InterruptedException, 2 ExecutionException { 3 4 BulkProcessor bulkProcessor = BulkProcessor.builder( 5 client, 6 new BulkProcessor.Listener() { 7 8 public void beforeBulk(long executionId, BulkRequest request) { 9 // TODO Auto-generated method stub 10 System.out.println(request.numberOfActions()); 11 } 12 13 public void afterBulk(long executionId, BulkRequest request, 14 Throwable failure) { 15 // TODO Auto-generated method stub 16 System.out.println(failure.getMessage()); 17 } 18 19 public void afterBulk(long executionId, BulkRequest request, 20 BulkResponse response) { 21 // TODO Auto-generated method stub 22 System.out.println(response.hasFailures()); 23 } 24 }) 25 .setBulkActions(1000) // 每个批次的最大数量 26 .setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))// 每个批次的最大字节数 27 .setFlushInterval(TimeValue.timeValueSeconds(5))// 每批提交时间间隔 28 .setConcurrentRequests(1) //设置多少个并发处理线程 29 //可以允许用户自定义当一个或者多个bulk请求失败后,该执行如何操作 30 .setBackoffPolicy( 31 BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)) 32 .build(); 33 String json = "{" + 34 "\"user\":\"kimchy\"," + 35 "\"postDate\":\"2013-01-30\"," + 36 "\"message\":\"trying out Elasticsearch\"" + 37 "}"; 38 39 for (int i = 0; i < 1000; i++) { 40 bulkProcessor.add(new IndexRequest("djt6", "user").source(json)); 41 } 42 //阻塞至所有的请求线程处理完毕后,断开连接资源 43 bulkProcessor.awaitClose(3, TimeUnit.MINUTES); 44 client.close(); 45 } 46 /** 47 * SearchType使用方式 48 * @throws Exception 49 */ 50 @Test 51 public void test14() throws Exception { 52 SearchResponse response = client.prepareSearch("djt") 53 .setTypes("user") 54 //.setSearchType(SearchType.DFS_QUERY_THEN_FETCH) 55 .setSearchType(SearchType.QUERY_AND_FETCH) 56 .execute() 57 .actionGet(); 58 SearchHits hits = response.getHits(); 59 System.out.println(hits.getTotalHits()); 60 } 61 }
这个是批量插入
这里有1000个,我就不数了
参考代码ESTestDocumentAPI.java
1 package com.dajiangtai.djt_spider.elasticsearch; 2 3 import java.io.IOException; 4 import java.net.InetAddress; 5 import java.net.UnknownHostException; 6 import java.util.Date; 7 import java.util.HashMap; 8 import java.util.Iterator; 9 import java.util.List; 10 import java.util.Map; 11 import java.util.concurrent.ExecutionException; 12 import java.util.concurrent.TimeUnit; 13 import static org.elasticsearch.node.NodeBuilder.*; 14 import static org.elasticsearch.common.xcontent.XContentFactory.*; 15 import org.elasticsearch.action.bulk.BackoffPolicy; 16 import org.elasticsearch.action.bulk.BulkProcessor; 17 import org.elasticsearch.common.unit.ByteSizeUnit; 18 import org.elasticsearch.common.unit.ByteSizeValue; 19 import org.elasticsearch.common.unit.TimeValue; 20 import org.codehaus.jackson.map.ObjectMapper; 21 import org.elasticsearch.action.bulk.BulkItemResponse; 22 import org.elasticsearch.action.bulk.BulkRequest; 23 import org.elasticsearch.action.bulk.BulkRequestBuilder; 24 import org.elasticsearch.action.bulk.BulkResponse; 25 import org.elasticsearch.action.delete.DeleteRequestBuilder; 26 import org.elasticsearch.action.delete.DeleteResponse; 27 import org.elasticsearch.action.get.GetResponse; 28 import org.elasticsearch.action.get.MultiGetItemResponse; 29 import org.elasticsearch.action.get.MultiGetResponse; 30 import org.elasticsearch.action.index.IndexRequest; 31 import org.elasticsearch.action.index.IndexRequestBuilder; 32 import org.elasticsearch.action.index.IndexResponse; 33 import org.elasticsearch.action.search.SearchResponse; 34 import org.elasticsearch.action.search.SearchType; 35 import org.elasticsearch.action.update.UpdateRequest; 36 import org.elasticsearch.client.Client; 37 import org.elasticsearch.client.transport.TransportClient; 38 import org.elasticsearch.cluster.node.DiscoveryNode; 39 import org.elasticsearch.common.settings.Settings; 40 import org.elasticsearch.common.transport.InetSocketTransportAddress; 41 import org.elasticsearch.index.query.QueryBuilders; 42 import org.elasticsearch.node.Node; 43 import org.elasticsearch.script.Script; 44 import org.elasticsearch.script.ScriptService; 45 import org.elasticsearch.search.SearchHits; 46 import org.junit.Before; 47 import org.junit.Test; 48 49 /** 50 * Document API 操作 51 * 52 * @author 大讲台 53 * 54 */ 55 public class ESTestDocumentAPI { 56 private TransportClient client; 57 58 @Before 59 public void test0() throws UnknownHostException { 60 61 // 开启client.transport.sniff功能,探测集群所有节点 62 Settings settings = Settings.settingsBuilder() 63 .put("cluster.name", "escluster") 64 .put("client.transport.sniff", true).build(); 65 // on startup 66 // 获取TransportClient 67 client = TransportClient 68 .builder() 69 .settings(settings) 70 .build() 71 .addTransportAddress( 72 new InetSocketTransportAddress(InetAddress 73 .getByName("master"), 9300)) 74 .addTransportAddress( 75 new InetSocketTransportAddress(InetAddress 76 .getByName("slave1"), 9300)) 77 .addTransportAddress( 78 new InetSocketTransportAddress(InetAddress 79 .getByName("slave2"), 9300)); 80 } 81 82 /** 83 * 创建索引:use ElasticSearch helpers 84 * 85 * @throws IOException 86 */ 87 @Test 88 public void test1() throws IOException { 89 IndexResponse response = client 90 .prepareIndex("twitter", "tweet", "1") 91 .setSource( 92 jsonBuilder().startObject().field("user", "kimchy") 93 .field("postDate", new Date()) 94 .field("message", "trying out Elasticsearch") 95 .endObject()).get(); 96 System.out.println(response.getId()); 97 client.close(); 98 } 99 100 /** 101 * 创建索引:do it yourself 102 * 103 * @throws IOException 104 */ 105 @Test 106 public void test2() throws IOException { 107 String json = "{" + "\"user\":\"kimchy\"," 108 + "\"postDate\":\"2013-01-30\"," 109 + "\"message\":\"trying out Elasticsearch\"" + "}"; 110 IndexResponse response = client.prepareIndex("twitter", "tweet") 111 .setSource(json).get(); 112 System.out.println(response.getId()); 113 client.close(); 114 } 115 116 /** 117 * 创建索引:use map 118 * 119 * @throws IOException 120 */ 121 @Test 122 public void test3() throws IOException { 123 Map<String, Object> json = new HashMap<String, Object>(); 124 json.put("user", "kimchy"); 125 json.put("postDate", new Date()); 126 json.put("message", "trying out Elasticsearch"); 127 128 IndexResponse response = client.prepareIndex("twitter", "tweet") 129 .setSource(json).get(); 130 System.out.println(response.getId()); 131 client.close(); 132 } 133 134 /** 135 * 创建索引:serialize your beans 136 * 137 * @throws IOException 138 */ 139 @Test 140 public void test4() throws IOException { 141 User user = new User(); 142 user.setUser("kimchy"); 143 user.setPostDate(new Date()); 144 user.setMessage("trying out Elasticsearch"); 145 146 // instance a json mapper 147 ObjectMapper mapper = new ObjectMapper(); // create once, reuse 148 149 // generate json 150 byte[] json = mapper.writeValueAsBytes(user); 151 152 IndexResponse response = client.prepareIndex("twitter", "tweet") 153 .setSource(json).get(); 154 System.out.println(response.getId()); 155 client.close(); 156 } 157 158 /** 159 * 查询索引:get 160 * 161 * @throws IOException 162 */ 163 @Test 164 public void test5() throws IOException { 165 GetResponse response = client.prepareGet("twitter", "tweet", "1").get(); 166 System.out.println(response.getSourceAsString()); 167 168 client.close(); 169 } 170 171 /** 172 * 删除索引:delete 173 * 174 * @throws IOException 175 */ 176 @Test 177 public void test6() throws IOException { 178 client.prepareDelete("twitter", "tweet", "1").get(); 179 client.close(); 180 } 181 182 /** 183 * 更新索引:Update API-UpdateRequest 184 * 185 * @throws IOException 186 * @throws ExecutionException 187 * @throws InterruptedException 188 */ 189 @Test 190 public void test7() throws IOException, InterruptedException, 191 ExecutionException { 192 UpdateRequest updateRequest = new UpdateRequest(); 193 updateRequest.index("twitter"); 194 updateRequest.type("tweet"); 195 updateRequest.id("AVyi3OORot7zkId708s8"); 196 updateRequest.doc(jsonBuilder().startObject().field("gender", "male") 197 .endObject()); 198 client.update(updateRequest).get(); 199 System.out.println(updateRequest.version()); 200 client.close(); 201 } 202 203 /** 204 * 更新索引:Update API-prepareUpdate()-doc 205 * 206 * @throws IOException 207 * @throws ExecutionException 208 * @throws InterruptedException 209 */ 210 @Test 211 public void test8() throws IOException, InterruptedException, 212 ExecutionException { 213 client.prepareUpdate("twitter", "tweet", "AVyikSKIot7zkId708s6") 214 .setDoc(jsonBuilder().startObject().field("gender", "female") 215 .endObject()).get(); 216 client.close(); 217 } 218 219 /** 220 * 更新索引:Update API-prepareUpdate()-script 221 * 需要开启:script.engine.groovy.inline.update: on 222 * 223 * @throws IOException 224 * @throws ExecutionException 225 * @throws InterruptedException 226 */ 227 @Test 228 public void test9() throws IOException, InterruptedException, 229 ExecutionException { 230 client.prepareUpdate("twitter", "tweet", "AVyi4oZfot7zkId708s-") 231 .setScript( 232 new Script("ctx._source.gender = \"female\"", 233 ScriptService.ScriptType.INLINE, null, null)) 234 .get(); 235 client.close(); 236 } 237 238 /** 239 * 更新索引:Update API-UpdateRequest-upsert 240 * 241 * @throws IOException 242 * @throws ExecutionException 243 * @throws InterruptedException 244 */ 245 @Test 246 public void test10() throws IOException, InterruptedException, 247 ExecutionException { 248 IndexRequest indexRequest = new IndexRequest("twitter", "tweet", "1") 249 .source(jsonBuilder() 250 .startObject() 251 .field("name", "Joe Smith") 252 .field("gender", "male") 253 .endObject()); 254 UpdateRequest updateRequest = new UpdateRequest("twitter", "tweet", "1") 255 .doc(jsonBuilder() 256 .startObject() 257 .field("gender", "female") 258 .endObject()).upsert(indexRequest); 259 client.update(updateRequest).get(); 260 client.close(); 261 } 262 263 /** 264 * 批量查询索引:Multi Get API 265 * 266 * @throws IOException 267 * @throws ExecutionException 268 * @throws InterruptedException 269 */ 270 @Test 271 public void test11() throws IOException, InterruptedException, 272 ExecutionException { 273 MultiGetResponse multiGetItemResponses = client.prepareMultiGet() 274 .add("twitter", "tweet", "1") 275 .add("twitter", "tweet", "AVyi4oZfot7zkId708s-", "AVyi3OORot7zkId708s8", "AVyikSKIot7zkId708s6") 276 .add("djt2", "user", "1") 277 .get(); 278 279 for (MultiGetItemResponse itemResponse : multiGetItemResponses) { 280 GetResponse response = itemResponse.getResponse(); 281 if (response.isExists()) { 282 String json = response.getSourceAsString(); 283 System.out.println(json); 284 } 285 } 286 client.close(); 287 } 288 289 /** 290 * 批量操作索引:Bulk API 291 * 292 * @throws IOException 293 * @throws ExecutionException 294 * @throws InterruptedException 295 */ 296 @Test 297 public void test12() throws IOException, InterruptedException, 298 ExecutionException { 299 BulkRequestBuilder bulkRequest = client.prepareBulk(); 300 301 // either use client#prepare, or use Requests# to directly build index/delete requests 302 bulkRequest.add(client.prepareIndex("twitter", "tweet", "3") 303 .setSource(jsonBuilder() 304 .startObject() 305 .field("user", "kimchy") 306 .field("postDate", new Date()) 307 .field("message", "trying out Elasticsearch") 308 .endObject() 309 ) 310 ); 311 312 bulkRequest.add(client.prepareIndex("twitter", "tweet", "2") 313 .setSource(jsonBuilder() 314 .startObject() 315 .field("user", "kimchy") 316 .field("postDate", new Date()) 317 .field("message", "another post") 318 .endObject() 319 ) 320 ); 321 DeleteRequestBuilder prepareDelete = client.prepareDelete("twitter", "tweet", "AVyikSKIot7zkId708s6"); 322 bulkRequest.add(prepareDelete); 323 324 325 BulkResponse bulkResponse = bulkRequest.get(); 326 //批量操作:其中一个操作失败不影响其他操作成功执行 327 if (bulkResponse.hasFailures()) { 328 // process failures by iterating through each bulk response item 329 BulkItemResponse[] items = bulkResponse.getItems(); 330 for (BulkItemResponse bulkItemResponse : items) { 331 System.out.println(bulkItemResponse.getFailureMessage()); 332 } 333 }else{ 334 System.out.println("bulk process success!"); 335 } 336 client.close(); 337 } 338 339 /** 340 * 批量操作索引:Using Bulk Processor 341 * 优化:先关闭副本,再添加副本,提升效率 342 * @throws IOException 343 * @throws ExecutionException 344 * @throws InterruptedException 345 */ 346 @Test 347 public void test13() throws IOException, InterruptedException, 348 ExecutionException { 349 350 BulkProcessor bulkProcessor = BulkProcessor.builder( 351 client, 352 new BulkProcessor.Listener() { 353 354 public void beforeBulk(long executionId, BulkRequest request) { 355 // TODO Auto-generated method stub 356 System.out.println(request.numberOfActions()); 357 } 358 359 public void afterBulk(long executionId, BulkRequest request, 360 Throwable failure) { 361 // TODO Auto-generated method stub 362 System.out.println(failure.getMessage()); 363 } 364 365 public void afterBulk(long executionId, BulkRequest request, 366 BulkResponse response) { 367 // TODO Auto-generated method stub 368 System.out.println(response.hasFailures()); 369 } 370 }) 371 .setBulkActions(1000) // 每个批次的最大数量 372 .setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))// 每个批次的最大字节数 373 .setFlushInterval(TimeValue.timeValueSeconds(5))// 每批提交时间间隔 374 .setConcurrentRequests(1) //设置多少个并发处理线程 375 //可以允许用户自定义当一个或者多个bulk请求失败后,该执行如何操作 376 .setBackoffPolicy( 377 BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)) 378 .build(); 379 String json = "{" + 380 "\"user\":\"kimchy\"," + 381 "\"postDate\":\"2013-01-30\"," + 382 "\"message\":\"trying out Elasticsearch\"" + 383 "}"; 384 385 for (int i = 0; i < 1000; i++) { 386 bulkProcessor.add(new IndexRequest("djt6", "user").source(json)); 387 } 388 //阻塞至所有的请求线程处理完毕后,断开连接资源 389 bulkProcessor.awaitClose(3, TimeUnit.MINUTES); 390 client.close(); 391 } 392 /** 393 * SearchType使用方式 394 * @throws Exception 395 */ 396 @Test 397 public void test14() throws Exception { 398 SearchResponse response = client.prepareSearch("djt") 399 .setTypes("user") 400 //.setSearchType(SearchType.DFS_QUERY_THEN_FETCH) 401 .setSearchType(SearchType.QUERY_AND_FETCH) 402 .execute() 403 .actionGet(); 404 SearchHits hits = response.getHits(); 405 System.out.println(hits.getTotalHits()); 406 } 407 }