JAVASpringBoot整合并操作ES(ElasticSearch)数据

it2026-01-31  3

工作需要,在更改数据时需要维护es(ElasticSearch简称,以下均称es)数据,记录个人总结。以下只包含java对es数据操作,不包含es基础内容。 一、导入maven坐标

<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-elasticsearch</artifactId> <version>2.0.2.RELEASE</version> </dependency>

二、es连接配置类,@ConfigurationProperties(prefix = “sc.elasticsearch”)见下方配置文件,成员变量名与配置文件一致即可完成值注入

import lombok.Setter; import org.apache.http.HttpHost; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; import org.apache.http.impl.client.BasicCredentialsProvider; import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClientBuilder; import org.elasticsearch.client.RestHighLevelClient; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration @ConfigurationProperties(prefix = "sc.elasticsearch") @Setter public class ElasticsearchConfig { //es地址 private String host; //es端口号 private int port; //es用户名 private String username; //es密码 private String password; @Bean(destroyMethod = "close") public RestHighLevelClient client() { //初始化ES操作客户端 final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password)); //es账号密码(默认用户名为elastic) RestHighLevelClient esClient = new RestHighLevelClient( RestClient.builder( new HttpHost(host, port) ).setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() { @Override public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) { httpClientBuilder.disableAuthCaching(); return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); } }).setMaxRetryTimeoutMillis(2000) ); return esClient; } }

三、以上配置类引入了yml的配置内容,下附yml配置

sc: elasticsearch: host: 127.0.0.1 port: 9200 username: elastic password: ******

四、测试

import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.client.RestHighLevelClient; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import java.io.IOException; @RunWith(SpringRunner.class) @SpringBootTest public class MyTest { @Autowired private RestHighLevelClient client; @Test public void getLearn() throws IOException { GetRequest getRequest = new GetRequest("fn_rmsv2_supervise_log","_doc","336198460682276870"); GetResponse response = client.get(getRequest); System.out.println(response.getId()); } }

五、根据业务逻辑编写service 1.接口

public interface IElasticsearchService { /** * 新增或修改es数据 * @param indexName * @param type * @param id * @param jsonStr */ void addData(String indexName,String type ,String id,String jsonStr); /** * 根据督导id查询es的_id,新增或修改es数据需要使用 * @param id * @param index * @return */ String getEsId(String id, String index); }

2.实现类

import cn.shencom.server.service.IElasticsearchService; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.client.RestHighLevelClient; import org.elasticsearch.common.Strings; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.query.BoolQueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.io.IOException; @Service public class ElasticsearchServiceImpl implements IElasticsearchService { @Autowired private RestHighLevelClient client; @Override public void addData(String indexName,String type ,String id,String jsonStr) { try { // 1、创建索引请求 //索引 // mapping type //文档id IndexRequest request = new IndexRequest(indexName, type, id); //文档id // 2、准备文档数据 // 直接给JSON串 request.source(jsonStr, XContentType.JSON); //4、发送请求 IndexResponse indexResponse = null; try { // 同步方式 indexResponse = client.index(request); } catch (ElasticsearchException e) { // 捕获,并处理异常 //判断是否版本冲突、create但文档已存在冲突 if (e.status() == RestStatus.CONFLICT) { System.out.println("冲突了,请在此写冲突处理逻辑!" + e.getDetailedMessage()); } } //5、处理响应 if (indexResponse != null) { String index1 = indexResponse.getIndex(); String type1 = indexResponse.getType(); String id1 = indexResponse.getId(); long version1 = indexResponse.getVersion(); if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) { System.out.println("新增文档成功!" + " index:" + index1 + " type:" + type1 + " id:" + id1 + " version:" + version1); } else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) { System.out.println("修改文档成功!"); } // 分片处理信息 ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo(); if (shardInfo.getTotal() != shardInfo.getSuccessful()) { System.out.println("分片处理信息....."); } // 如果有分片副本失败,可以获得失败原因信息 if (shardInfo.getFailed() > 0) { for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) { String reason = failure.reason(); System.out.println("副本失败原因:" + reason); } } } } catch (Exception e) { e.printStackTrace(); } } @Override public String getEsId(String id, String index){ //设置查询条件 SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); sourceBuilder.from(0); sourceBuilder.size(10); sourceBuilder.fetchSource(new String[]{"*"}, Strings.EMPTY_ARRAY); BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); //设置督导记录id条件 boolQueryBuilder.filter(QueryBuilders.matchQuery("id", id)); sourceBuilder.query(boolQueryBuilder); SearchRequest searchRequest = new SearchRequest(index); searchRequest.source(sourceBuilder); SearchResponse response; try { response = client.search(searchRequest); //返回_id,没有记录时返回督导记录作为id return response.getHits().getTotalHits() > 0 ? response.getHits().getAt(0).getId() : id; } catch (IOException e) { e.printStackTrace(); } return null; } /** * 批量插入ES * @param indexName 索引 * @param type 类型 * @param idName id名称 * @param list 数据集合 */ public void bulkDate(String indexName,String type ,String idName ,List<Map<String,Object>> list ){ try { if(null == list || list.size()<=0){ return; } if(StringUtils.isBlank(indexName) || StringUtils.isBlank(idName) || StringUtils.isBlank(type)){ return; } BulkRequest request = new BulkRequest(); for(Map<String,Object> map : list){ if(map.get(idName)!=null){ request.add(new IndexRequest(indexName, type, String.valueOf(map.get(idName))) .source(map,XContentType.JSON)); } } // 2、可选的设置 /* request.timeout("2m"); request.setRefreshPolicy("wait_for"); request.waitForActiveShards(2); */ //3、发送请求 // 同步请求 BulkResponse bulkResponse = client.bulk(request); //4、处理响应 if(bulkResponse != null) { for (BulkItemResponse bulkItemResponse : bulkResponse) { DocWriteResponse itemResponse = bulkItemResponse.getResponse(); if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX || bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) { IndexResponse indexResponse = (IndexResponse) itemResponse; //TODO 新增成功的处理 System.out.println("新增成功,{}"+ indexResponse.toString()); } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) { UpdateResponse updateResponse = (UpdateResponse) itemResponse; //TODO 修改成功的处理 System.out.println("修改成功,{}"+ updateResponse.toString()); } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) { DeleteResponse deleteResponse = (DeleteResponse) itemResponse; //TODO 删除成功的处理 System.out.println("删除成功,{}"+ deleteResponse.toString()); } } } } catch (IOException e) { e.printStackTrace(); } } // public static void main(String ags[]){ // Map<String,Object> map1 = new HashMap<String, Object>(); // map1.put("id","2"); // map1.put("user1","bbherbert1"); // map1.put("postDate","2018-08-30"); // map1.put("username","aa"); // map1.put("message","message"); // Map<String,Object> map2 = new HashMap<String, Object>(); // map2.put("id","3"); // map2.put("user2","bbherbert1"); // map2.put("postDate","2018-08-30"); // map2.put("username","aa"); // map2.put("message","message"); // Map<String,Object> map = new HashMap<String, Object>(); // map.put("id","1"); // map.put("user","bbherbert1"); // map.put("postDate","2018-08-30"); // map.put("username","aa"); // map.put("message","message"); // // List<Map<String,Object>> list = new ArrayList<Map<String, Object>>(); // list.add(map); // list.add(map1); // list.add(map2); // ESUtil esUtil= new ESUtil(); // esUtil.bulkDate("book15","boo","id",list); // Map<String,Object> map = new HashMap<String, Object>(); // map.put("user","herbert1"); // map.put("postDate","2018-08-30"); // map.put("username","aa"); // map.put("message","message"); // String jsonString = JSON.toJSONString(map); // esUtil.addData("hh","d","4",jsonString); // esUtil.addData("hh","d","4","{" + // "\"user\":\"kimchy\"," + // "\"postDate\":\"2013-01-30\"," + // "\"username\":\"zhangsan\"," + // "\"message\":\"trying out Elasticsearch\"" + // "}"); // } }

六、使用,entity即需要新增或修改的对象,fn_rmsv2_supervise_log为es索引

String jsonStr = JSON.toJSONString(entity); //获得es的数据_id String _id = iElasticsearchService.getEsId(entity.getId(), "fn_rmsv2_supervise_log" ); //添加或修改数据 iElasticsearchService.addData("fn_rmsv2_supervise_log", "_doc", _id, jsonStr);

七、至此,工作任务就完成了,但是学无止境,故收集了一些es的其他java操作知识点,下附: 1.es API文档地址http://www.elasticsearch.org/guide/en/elasticsearch/client/java-api/current/query-dsl-queries.html

package com.elasticsearch; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.index.query.IndicesQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; /** * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ * elasticsearch以提供了一个完整的Java查询dsl其余查询dsl。 * QueryBuilders工厂构建 * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ */ public class QueryBuildersFactory { /** * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ * match query 单个匹配 * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ */ protected static QueryBuilder matchQuery() { return QueryBuilders.matchQuery("name", "张三"); } /** * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ * multimatch query * 创建一个匹配查询的布尔型提供字段名称和文本 * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ */ protected static QueryBuilder multiMatchQuery() { //曾用名和现名称为张三的条件 return QueryBuilders.multiMatchQuery( "张三", // Text you are looking for "name", "old_name" // Fields you query on ); } /** * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ * boolean query and 多条件组合查询 * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ */ protected static QueryBuilder booleanQuery() { return QueryBuilders .boolQuery() .must(QueryBuilders.termQuery("name", "张三")) .mustNot(QueryBuilders.termQuery("using", false)) .must(QueryBuilders.termQuery("sex", "男")) .should(QueryBuilders.termQuery("age", "20")); } /** * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ * ids query * 构造一个只会匹配的特定数据 id 的查询,类似sql的in关键字 * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ */ protected static QueryBuilder idsQuery() { return QueryBuilders.idsQuery().ids("101", "102", "103"); } /** * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ * fuzzy query * 使用模糊查询匹配文档查询 * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ */ protected static QueryBuilder fuzzyQuery() { return QueryBuilders.fuzzyQuery("name", "张三"); } /** * TODO NotSolved * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ * has child / has parent * 父或者子的文档查询 * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ */ protected static QueryBuilder hasChildQuery() { return // Has Child QueryBuilders.hasChildQuery("blog_tag", QueryBuilders.termQuery("tag", "something")); // Has Parent /*return QueryBuilders.hasParentQuery("blog", QueryBuilders.termQuery("tag","something"));*/ } /** * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ * matchall query * 查询匹配所有文件。 * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ */ protected static QueryBuilder matchAllQuery() { return QueryBuilders.matchAllQuery(); } /** * TODO NotSolved * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ * more like this (field) query (mlt and mlt_field) * 多字段模糊条件查询 * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ */ protected static QueryBuilder moreLikeThisQuery() { // mlt Query QueryBuilders.moreLikeThisQuery("name", "old_name") // Fields .likeText("张") //模糊等于的内容 .minTermFreq(1) //最少出现的次数 .maxQueryTerms(12); // 最多匹配项 // in generated queries // mlt_field Query //单字段模糊条件查询 return QueryBuilders.moreLikeThisFieldQuery("name") // Only on single field .likeText("张") .minTermFreq(1) .maxQueryTerms(12); } /** * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ * prefix query * 包含与查询相匹配的文档指定的前缀,匹配name=“张%” * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ */ protected static QueryBuilder prefixQuery() { return QueryBuilders.prefixQuery("name", "张"); } /** * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ * range query * 查询相匹配的文档在一个范围。 * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ */ protected static QueryBuilder rangeQuery() { return QueryBuilders .rangeQuery("id") .from("101") .to("199") .includeLower(true) //包括下界 .includeUpper(false); //包括上界 } /** * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ * term query * 一个查询相匹配的文件包含一个术语。。 * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ */ protected static QueryBuilder termQuery() { return QueryBuilders.termQuery("name", "张三"); } /** * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ * terms query * 一个查询相匹配的多个value * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ */ protected static QueryBuilder termsQuery() { return QueryBuilders.termsQuery("name", // field "张三", "李四", "王五") // values .minimumMatch(1); // 设置最小数量的匹配提供了条件。默认为1。 } /** * TODO NotSolved * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ * top children query * 构建了一种新的评分的子查询,与子类型和运行在子文档查询。这个查询的结果是,那些子父文档文件匹配。 * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ */ protected static QueryBuilder topChildrenQuery() { return QueryBuilders.topChildrenQuery( "blog_tag", // field QueryBuilders.termQuery("name", "葫芦3812娃") // Query ) .score("max") // max, sum or avg .factor(5) .incrementalFactor(2); } /** * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ * wildcard query *   实现了通配符搜索查询。支持通配符* < /tt>,<tt> *   匹配任何字符序列(包括空),<tt> ? < /tt>, *   匹配任何单个的字符。注意该查询可以缓慢,因为它 *   许多方面需要遍历。为了防止WildcardQueries极其缓慢。 *   一个通配符词不应该从一个通配符* < /tt>或<tt> *   < /tt> <tt> ?。 * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ */ protected static QueryBuilder wildcardQuery() { return QueryBuilders.wildcardQuery("name", "张*2三"); } /** * TODO NotSolved * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ * nested query * 嵌套查询 * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ */ protected static QueryBuilder nestedQuery() { return QueryBuilders.nestedQuery("location", // Path QueryBuilders.boolQuery() // Your query .must(QueryBuilders.matchQuery("location.lat", 0.962590433140581)) .must(QueryBuilders.rangeQuery("location.lon").lt(0.00000000000000000003)) ) .scoreMode("total"); // max, total, avg or none } /** * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ * indices query * 索引查询 * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ */ protected static IndicesQueryBuilder indicesQuery() { // Using another query when no match for the main one QueryBuilders.indicesQuery( QueryBuilders.termQuery("name", "张三"), Es_Utils.INDEX_DEMO_01, "index2" ) //设置查询索引上执行时使用不匹配指数 .noMatchQuery(QueryBuilders.termQuery("age", "18")); // Using all (match all) or none (match no documents) return QueryBuilders.indicesQuery( QueryBuilders.termQuery("name", "张三"), Es_Utils.INDEX_DEMO_01, "index2" ) // 设置不匹配查询,可以是 all 或者 none .noMatchQuery("none"); } public static void main(String[] args) { //设置查询条件 SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); sourceBuilder.from(0); sourceBuilder.size(10); sourceBuilder.fetchSource(new String[]{"*"}, Strings.EMPTY_ARRAY); BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); sourceBuilder.query(fuzzyQuery); SearchRequest searchRequest = new SearchRequest(index); searchRequest.source(sourceBuilder); SearchResponse response; try { response = client.search(searchRequest); System.out.println(response); } catch (IOException e) { e.printStackTrace(); } } }
最新回复(0)