ES6.6.2 使用小结 - 批量操作
1.文档的操作基本分为:查询、删除、更新、插入。
这些操作都可以进行批量进行,包括:批量处理器执行、批量执行、查询时执行删除。
2.假设当前ES存在如下数据(两条)。
3.批量操作源码如下:
package com.bas.demo;
import com.bas.util.ESUtil;
import org.elasticsearch.action.bulk.*;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.get.MultiGetItemResponse;
import org.elasticsearch.action.get.MultiGetResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryAction;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.Date;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
public class BlukDemo {
/**
* 批量查询
*
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
TransportClient client = ESUtil.getClient();
MultiGetResponse mgResponse = client.prepareMultiGet()
.add("app_account", "blog", "7Z_qrmkBHcVcCKaQfGQ3")
.add("app_account", "blog", "7Z_qrmkBHcVcCKaQfGQ3", "asdasdcVcCKaQfGQ3")
.get();
for (MultiGetItemResponse response : mgResponse) {
GetResponse rp = response.getResponse();
if (rp != null && rp.isExists()) {
System.out.println(rp.getSourceAsString());
}
}
}
/**
* 批量操作
*/
@Test
public void blukOperate() throws IOException {
TransportClient client = ESUtil.getClient();
BulkRequestBuilder bulkRequest = client.prepareBulk();
// 批量插入数据
bulkRequest.add(client.prepareIndex("app_account", "blog", "3")
.setSource(jsonBuilder()
.startObject()
.field("id","3")
.field("title","Java插入数据到ES")
.field("content","abcedfasdasd")
.endObject()
)
);
bulkRequest.add(client.prepareIndex("app_account", "blog", "4")
.setSource(jsonBuilder()
.startObject()
.field("id","4")
.field("title","Java插入数据到ES")
.field("content","abcedfasdasd")
.endObject()
)
);
//批量执行
BulkResponse bulkResponse = bulkRequest.get();
System.out.println(bulkResponse.status());
if (bulkResponse.hasFailures()) {
System.out.println("存在失败操作");
}
}
/**
* 批量处理器
*/
@Test
public void blukProcessor() {
TransportClient client= ESUtil.getClient();
BulkProcessor bulkProcessor = BulkProcessor.builder(
client,
new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId,BulkRequest request) {
//设置bulk批处理的预备工作
System.out.println("请求数:"+request.numberOfActions());
}
@Override
public void afterBulk(long executionId,BulkRequest request,BulkResponse response) {
//设置bulk批处理的善后工作
if(!response.hasFailures()) {
System.out.println("执行成功!");
}else {
System.out.println("执行失败!");
}
}
@Override
public void afterBulk(long executionId,BulkRequest request,Throwable failure) {
//设置bulk批处理的异常处理工作
System.out.println(failure);
}
})
.setBulkActions(1000)//设置提交批处理操作的请求阀值数
.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))//设置提交批处理操作的请求大小阀值
.setFlushInterval(TimeValue.timeValueSeconds(5))//设置刷新索引时间间隔
.setConcurrentRequests(1)//设置并发处理线程个数
//设置回滚策略,等待时间100ms,retry次数为3次
.setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
.build();
// Add your requests
bulkProcessor.add(new DeleteRequest("app_account", "blog", "3"));
bulkProcessor.add(new DeleteRequest("app_account", "blog", "4"));
// 刷新所有请求
bulkProcessor.flush();
// 关闭bulkProcessor
bulkProcessor.close();
// 刷新索引
client.admin().indices().prepareRefresh().get();
// Now you can start searching!
client.prepareSearch().get();
}
/**
* 条件查询删除
*/
@Test
public void bulkDeleteByQuery() {
TransportClient client= ESUtil.getClient();
BulkByScrollResponse response = DeleteByQueryAction.INSTANCE
.newRequestBuilder(client)
.filter(QueryBuilders.matchQuery("title", "Java"))
.source("app_account")//设置索引名称
.get();
//被删除文档数目
long deleted = response.getDeleted();
System.out.println(deleted);
}
}
其中,ES工具类位于:ES6.6.2 使用小结 - ES工具类使用
值得注意的是,方法参数的id为文档数据中的字段 "_id"。
4.首先,进行批量查询,执行BulkDemo.main(),如图:
5.接下来是批量插入文档,执行BlukDemo.blukOperate(),如图:
ES中的数据更新了,如图:
6.然后是使用批量处理器来执行删除上面增加的两个文档,执行BlukDemo.blukProccessor(),如图:
接着ES的数据更新了,如图:
7.最后是查询时删除数据,先批量操作把刚刚两条文档再插进去先,然后再执行BlukDemo.blukDeleteByQuery(),如图:
这时ES又只剩下两条文档了。