目录
1、基于java实现员工信息的增删改查
2、基于java对员工信息进行复杂的搜索操作
3、基于java对员工信息进行聚合分析
1、基于java实现员工信息的增删改查
员工信息:姓名、年龄、职位、国家、入职日期、薪水
(1)maven依赖
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>5.2.1</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>5.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.7</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.7</version>
</dependency>
log4j.properties
appender.console.type = Console
appender.console.name = console
appender.console.layout.type = PatternLayout
rootLogger.level = info
rootLogger.appenderRef.console.ref = console
(2)构建client
Settings settings = Settings.builder()
.put("cluster.name", "myClusterName").build();
TransportClient client = new PreBuiltTransportClient(settings);
TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("host1"), 9300))
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("host2"), 9300));
client.close();
(3)创建document
IndexResponse response = client.prepareIndex("index", "type", "1")
.setSource(jsonBuilder()
.startObject()
.field("user", "kimchy")
.field("postDate", new Date())
.field("message", "trying out Elasticsearch")
.endObject()
)
.get();
(4)查询document
GetResponse response = client.prepareGet("index", "type", "1").get();
(5)修改document
client.prepareUpdate("index", "type", "1")
.setDoc(jsonBuilder()
.startObject()
.field("gender", "male")
.endObject())
.get();
(6)删除document
DeleteResponse response = client.prepareDelete("index", "type", "1").get();
2、基于java对员工信息进行复杂的搜索操作
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import java.net.InetAddress;
public class EmploySearchApp {
@SuppressWarnings({"unchecked","resource"})
public static void main(String[] args) throws Exception {
Settings settings = Settings.builder()
.put("cluster.name","elasticsearch")
.build();
TransportClient client = new PreBuiltTransportClient(settings)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("localhost"),9300));
//准备数据
//prepareData(client);
//搜索操作
executeSearch(client);
client.close();
}
/**
* 执行搜索操作
*/
private static void executeSearch(TransportClient client) {
//1、搜索职位中包含technique的员工
//2、同时要求age在30到40岁之间
//3、分页查询,查第一页
SearchResponse searchResponse = client.prepareSearch("company")
.setTypes("employee")
.setQuery(QueryBuilders.matchQuery("position", "technique"))
.setPostFilter(QueryBuilders.rangeQuery("age").from(30).to(40))
.setFrom(0).setSize(1)
.get();
SearchHit[] hits = searchResponse.getHits().getHits();
for (int i = 0; i < hits.length; i++) {
System.out.println(hits[i].getSourceAsString());
}
}
/**
* 准备数据
*/
private static void prepareData(TransportClient client) throws Exception {
client.prepareIndex("company","employee","1")
.setSource(XContentFactory.jsonBuilder()
.startObject()
.field("name","jack")
.field("age",27)
.field("position","technique software")
.field("country","china")
.field("join_date","2017-01-01")
.field("salary",10000)
.endObject())
.get();
client.prepareIndex("company","employee","2")
.setSource(XContentFactory.jsonBuilder()
.startObject()
.field("name","marry")
.field("age",35)
.field("position","technique manager")
.field("country","china")
.field("join_date","2017-01-01")
.field("salary",12000)
.endObject())
.get();
client.prepareIndex("company","employee","3")
.setSource(XContentFactory.jsonBuilder()
.startObject()
.field("name","tom")
.field("age",32)
.field("position","senior technique software")
.field("country","china")
.field("join_date","2016-01-01")
.field("salary",11000)
.endObject())
.get();
client.prepareIndex("company","employee","4")
.setSource(XContentFactory.jsonBuilder()
.startObject()
.field("name","jen")
.field("age",25)
.field("position","junior finance")
.field("country","usa")
.field("join_date","2016-01-01")
.field("salary",7000)
.endObject())
.get();
client.prepareIndex("company","employee","5")
.setSource(XContentFactory.jsonBuilder()
.startObject()
.field("name","mike")
.field("age",37)
.field("position","finance manager")
.field("country","usa")
.field("join_date","2015-01-01")
.field("salary",15000)
.endObject())
.get();
}
}
//----------------------------对应的es语句
GET company/employee/_search
{
"query":{
"bool": {
"must": [{
"match": {
"position": "technique"
}
}],
"filter": {
"range": {
"age": {
"gte": 30,
"lte": 40
}
}
}
}
},
"from":0,
"size":1
}
为什么一边运行创建document,一边搜索什么都没搜素到?
近实时!!默认是1秒以后,写入es的数据,才能被搜索到。很明显刚才,写入数据不到一秒,就进行搜索,什么都搜不到
3、基于java对员工信息进行聚合分析
//--------------------------聚合之前先将正排索引打开
DELETE company
PUT company
{
"mappings": {
"employee": {
"properties": {
"age": {
"type": "long"
},
"country": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
},
"fielddata": true
},
"join_date": {
"type": "date"
},
"name": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"position": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"salary": {
"type": "long"
}
}
}
}
}
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.metrics.avg.Avg;
import org.elasticsearch.search.aggregations.metrics.avg.InternalAvg;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import java.net.InetAddress;
import java.util.Iterator;
import java.util.Map;
/**
* <p>Description:员工聚合分析
*/
public class EmployeeAggrApp {
/**
* (1)首先按照country国家来进行分组
* (2)然后在每个country分组内,在按照入职年限进行分组
* (3)最后计算每个分组内的平均薪资
*/
public static void main(String[] args) throws Exception {
Settings settings = Settings.builder()
.put("cluster.name","elasticsearch")
.build();
TransportClient client = new PreBuiltTransportClient(settings)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("localhost"),9300));
SearchResponse searchResponse = client.prepareSearch("company")
.addAggregation(AggregationBuilders.terms("group_by_country").field("country")
.subAggregation(AggregationBuilders
.dateHistogram("group_by_join_date")
.field("join_date")
.dateHistogramInterval(DateHistogramInterval.YEAR)
.subAggregation(AggregationBuilders.avg("avg_salary").field("salary"))))
.execute().actionGet();
Map<String, Aggregation> aggMap = searchResponse.getAggregations().getAsMap();
StringTerms groupByCountry =(StringTerms) aggMap.get("group_by_country");
Iterator<Terms.Bucket> countryIterator = groupByCountry.getBuckets().iterator();
while (countryIterator.hasNext()){
Terms.Bucket countryBucket = countryIterator.next();
System.out.println(countryBucket.getKey()+":"+countryBucket.getDocCount());
Histogram groupByJoinDate =(Histogram) countryBucket.getAggregations().asMap().get("group_by_join_date");
Iterator<Histogram.Bucket> joinDateIterator = groupByJoinDate.getBuckets().iterator();
while (joinDateIterator.hasNext()){
Histogram.Bucket joinDateBucket = joinDateIterator.next();
System.out.println(joinDateBucket.getKey()+":"+joinDateBucket.getDocCount());
Avg avgSalary = (InternalAvg) joinDateBucket.getAggregations().asMap().get("avg_salary");
System.out.println(avgSalary.getValue());
}
}
client.close();
}
}