前言
Elasticsearch是Elastic公司开源的分布式搜索引擎,基于 JSON 开发而来,具有 RESTful 风格。
通过 Elasticsearch,您能够执行及合并多种类型的搜索(结构化数据、非结构化数据、地理位置、指标),搜索方式随心而变,通过扩展集群规模,能够实现PB级数据的存储与搜索
Elasticsearch底层基于Lucene实现,其实现了方便易用的上层API,无须借助分布式中间件实现集群管理,十分灵活。Elastic公司本身就是围绕这一开源技术而成立的。Elastic推出的产品矩阵叫做Elastic Stack。最新架构图是这样的:
ELK的含义就是:Elasticsearch+Logstash+Kibana。其中Kibana是其可视化组件,Logstash是其日志采集管道。至于Beats的作用笔者还来得及研究。
用途
通过前言我们大致知道了Elasticsearch是什么。那么我们能将其应用在哪些场景呢?
- ELK最典型的应用场景是企业内日志的存储与搜索分析
- 分布式搜索引擎:如电商的商品搜索,企业内部站内搜索
- 数据分析:支持数据的metrics, patterns, trends,aggregation 分析,使用机器学习进行自动化时间时序数据的分析
如何学习
本篇文章将通过介绍官网的Elasticsearch Guide来了解和学习elasticsearch。通常官方文档更全面且更权威。不过elasticsearch提供的文档更像操作手册,其中对一些架构和概念并没有详细说明,读者可参考Elastic:开发者上手指南学习前几个步骤,由于博客使用的是之前的版本,后面的就不再建议跟着操作了。
当前(2022-04-20)的最新版本为8.1.2,重点入门目录罗列如下:
- Elasticsearch Guide:
- What is Elasticsearch? 了解即可
- What’s new in 8.1 了解即可
- Quick start 了解即可
- Set up Elasticsearch 新手应着重了解这节里面的内容,es配置巨多,不过大多数都设置有经验默认值
- Upgrade Elasticsearch 新版本不用看这个
- Index modules 索引模块是为每个索引创建的模块,控制与索引相关的所有方面(核心)
- Mapping 定义文档及其包含的字段如何存储和索引的过程,有点类似关系型数据库的字段的定义(核心)
- Text analysis 将非结构化文本转换为结构化格式的过程以便于搜索,针对text字段,不包含这个字段可以忽略
- Index templates index创建模板,先于Index创建
- Data streams 适用于logs,events,metrics和其他连续生成的数据
- EQL 事件查询语言,适用于基于事件的连续生成的数据。如logs,metrics.traces
- Ingest pipelines 对写入数据进行转换,然后再写入索引
- Aliases 别名系统
- Search your data
- Query DSL
- REST APIs 以上3个是搜索的逻辑和API (核心)
- Aggregations 聚合操作,如求平均值,最大值,数量等,类似于Group By
- SQL 支持SQL查询
- Scripting 类似于Hive的UDF 用户自定义函数。
- Data management 不太了解
- ILM: Manage the index lifecycle 索引管理的高级功能,如:定时创建新索引,索引量到多少后自动创建新索引...
- Autoscaling 配置集群自动伸缩
- Monitor a cluster 集群的监控
- Set up a cluster for high availability 如何设置高可用集群
- Snapshot and restore 数据备份逻辑
- Secure the Elastic Stack Xpack安全配置,需要暴露端口得了解
客户端介绍 点我进入
Elasticsearch支持多客户端。如:Java,JavaScript, Go 等。又由于其基于 JSON 开发而来,具有 RESTful 风格。你可以使用HTTP客户端直接访问,Postman Curl,Kibana的debug里更是支持语法提示补全功能。
如您是Java后端开发且项目中使用springboot,可以引入starter
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
<version>${spring-boot.version}</version>
</dependency>
或直接使用客户端
<dependency>
<groupId>co.elastic.clients</groupId>
<artifactId>elasticsearch-java</artifactId>
<version>8.1.2</version>
</dependency>
由于elasticsearch官网客户端是和elasticsearch版本一致的,而springboot支持的版本滞后于官方,所以当引入jar包请注意依赖包版本问题。如jakarta.json-api,springboot的版本是1.1.6,而elasticsearch-java依赖的版本是2.0.1。则jakarta.json-api的版本会被父包版本覆盖从而导致找不到类 jakarta.json.spi.JsonProvider
<dependency>
<groupId>jakarta.json</groupId>
<artifactId>jakarta.json-api</artifactId>
<version>2.0.1</version>
<scope>compile</scope>
</dependency>
具体做法是这样
<dependency>
<groupId>co.elastic.clients</groupId>
<artifactId>elasticsearch-java</artifactId>
<version>8.1.3</version>
<exclusions>
<exclusion>
<artifactId>jakarta.json-api</artifactId>
<groupId>jakarta.json</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<artifactId>jakarta.json-api</artifactId>
<groupId>jakarta.json</groupId>
<version>2.0.1</version>
</dependency>
Java客户端是基于建造者模式来实现的,可全程使用lamda表达式。javaAPI 与REST APIs 是对应关系,lamda表达式可读性较差,不熟悉的话,比如索引创建,你可以先写出创建索引的Json格式,然后对应着写代码。下面奉献一段(其中包含向量索引的kNN搜索)
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.mapping.Property;
import co.elastic.clients.elasticsearch._types.mapping.TypeMapping;
import co.elastic.clients.elasticsearch.core.*;
import co.elastic.clients.elasticsearch.core.bulk.BulkResponseItem;
import co.elastic.clients.elasticsearch.core.search.Hit;
import co.elastic.clients.elasticsearch.core.search.TotalHits;
import co.elastic.clients.elasticsearch.core.search.TotalHitsRelation;
import co.elastic.clients.elasticsearch.indices.CreateIndexRequest;
import co.elastic.clients.elasticsearch.indices.CreateIndexResponse;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.endpoints.BooleanResponse;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import co.elastic.clients.util.ObjectBuilder;
import com.blueorigin.universe.elasticsearch.model.VectorModel;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.junit.jupiter.api.Test;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.function.Function;
@Slf4j
public class VectorServiceTest {
ElasticsearchClient esClient;
String indexName = "index001";
ObjectMapper objectMapper = new ObjectMapper();
private String index2 = "index002";
{
// Create the low-level client
RestClient restClient = RestClient.builder(
new HttpHost("192.168.2.222", 9200),
new HttpHost("192.168.2.221", 9201)
).build();
// Create the transport with a Jackson mapper
ElasticsearchTransport transport = new RestClientTransport(
restClient, new JacksonJsonpMapper());
// And create the API client
esClient = new ElasticsearchClient(transport);
}
@Test
public void search() throws IOException {
SearchResponse<VectorModel> response = esClient.search(
s -> s.index(indexName).query(q -> q.match(t -> t.field("vectorId").query(1000))),
VectorModel.class);
TotalHits total = response.hits().total();
boolean isExactResult = total.relation() == TotalHitsRelation.Eq;
if (isExactResult) {
log.info("There are " + total.value() + " results");
} else {
log.info("There are more than " + total.value() + " results");
}
List<Hit<VectorModel>> hits = response.hits().hits();
for (Hit<VectorModel> hit : hits) {
VectorModel product = hit.source();
log.info(objectMapper.writeValueAsString(product));
}
}
@Test
public void knnSearch() throws IOException {
Double[] qv = new Double[]{0.04881, -0.33609, -0.13305, 0.11145, -0.08408, -0.03294, -0.40138, 0.19321, -0.02528, -0.02768, -0.07773, -0.23604, 0.12693, -0.07832, 0.03975, -0.23248, -0.04273, 0.10179, 0.15156, -6.8E-4, 0.07414, 0.02857, 0.19306, -0.06342, -0.15033, 0.0331, 0.01185, 0.08438, -0.04065, -0.10001, 0.09478, 0.07242, 0.07154, -0.11093, -0.0125, -0.10403, -0.07861, -0.16341, -0.17495, 0.10467, -0.08135, -0.18041, 0.23096, 0.16601, 0.00806, 0.06222, -0.11909, -0.0224, -0.00351, 0.00517, -0.10414, 0.07032, 0.17009, 0.13883, -0.12384, -0.13548, -0.01265, -0.01102, -0.02224, -0.10055, 0.11512, 0.04149, -0.0423, -0.01998};
List<Double> queryVector = Arrays.asList(qv);
Function<KnnSearchRequest.Builder, ObjectBuilder<KnnSearchRequest>> knnRequest =
builder -> builder.index(indexName).knn(s -> s.field("vector").queryVector(queryVector).k(10).numCandidates(10L));
KnnSearchResponse<VectorModel> response = esClient.knnSearch(knnRequest, VectorModel.class);
TotalHits total = response.hits().total();
boolean isExactResult = total.relation() == TotalHitsRelation.Eq;
if (isExactResult) {
log.info("There are " + total.value() + " results");
} else {
log.info("There are more than " + total.value() + " results");
}
List<Hit<VectorModel>> hits = response.hits().hits();
log.info("query = 0.9999994, vector = {}", queryVector);
for (Hit<VectorModel> hit : hits) {
VectorModel source = hit.source();
log.info("score = {}, vector = {}", hit.score(), source.getVector());
}
}
@Test
public void batchKnnSearch() throws IOException {
File file = new File("vector.txt");
FileReader fileReader = new FileReader(file);
BufferedReader bufferedReader = new BufferedReader(fileReader);
List<VectorModel> vectorModelList = Lists.newArrayListWithExpectedSize(2000);
int count = 0;
int skip = 0;
while (true) {
String line = bufferedReader.readLine();
skip++;
if (skip % 100 != 0) {
continue;
}
if (StringUtils.isBlank(line)) {
log.info("break end");
break;
}
VectorModel vectorModel = objectMapper.readValue(line, VectorModel.class);
vectorModelList.add(vectorModel);
count++;
if (count > 2000) {
log.info("break 2000");
break;
}
}
long start = System.currentTimeMillis();
for (VectorModel vectorModel : vectorModelList) {
KnnSearchResponse<VectorModel> response = esClient.knnSearch(builder -> builder.index(indexName)
.knn(s -> s.field("vector").queryVector(vectorModel.getVector()).k(10).numCandidates(10L)), VectorModel.class);
}
long end = System.currentTimeMillis();
log.info("1 thread query 2000 cost {} ms", end - start);
}
void batchInsert(List<VectorModel> modelList) throws IOException {
BulkRequest.Builder br = new BulkRequest.Builder();
for (VectorModel vec : modelList) {
br.operations(op -> op
.index(idx -> idx
.index(indexName)
.document(vec)
)
);
}
BulkResponse result = esClient.bulk(br.build());
// Log errors, if any
if (result.errors()) {
log.error("Bulk had errors");
for (BulkResponseItem item : result.items()) {
if (item.error() != null) {
log.error(item.error().reason());
}
}
}
}
@Test
public void checkIndexExists() throws IOException {
BooleanResponse exists = esClient.indices().exists(eb -> eb.index(indexName));
if (exists.value()) {
log.info("index {} exists", indexName);
} else {
log.info("index {} not exists", indexName);
}
}
@Test
public void insert100() {
try {
insertFromFile(index2, 100);
} catch (Exception e) {
e.printStackTrace();
}
}
// @Test
public void insertFromFile(String index, int num) throws Exception {
// 索引存在就不执行了
if (existsIndex(index)) return;
// 创建索引
createIndex(index);
// 准备数据
File file = new File("vector.txt");
FileReader fileReader = new FileReader(file);
BufferedReader bufferedReader = new BufferedReader(fileReader);
List<VectorModel> vectorModelList = Lists.newArrayListWithExpectedSize(10000);
// 执行插入
int total = 0;
int count = 0;
while (true) {
String line = bufferedReader.readLine();
if (StringUtils.isBlank(line)) {
total += vectorModelList.size();
batchInsert(vectorModelList);
break;
}
count++;
VectorModel vectorModel = objectMapper.readValue(line, VectorModel.class);
vectorModelList.add(vectorModel);
if (count % 10000 == 0) {
total += vectorModelList.size();
batchInsert(vectorModelList);
vectorModelList = Lists.newArrayListWithExpectedSize(10000);
}
if (total > num) break;
}
log.info("vector total = {}", total);
}
private boolean existsIndex(String index) throws IOException {
BooleanResponse exists = esClient.indices().exists(eb -> eb.index(index));
if (exists.value()) {
log.info("index({}) already exists", index);
return true;
}
return false;
}
private void createIndex(String index) throws IOException {
CreateIndexRequest.Builder indexBuilder = new CreateIndexRequest.Builder();
indexBuilder.index(index);
TypeMapping.Builder tmBuilder = new TypeMapping.Builder();
tmBuilder.properties("vec", new Property.Builder().denseVector(builder -> builder.index(true).dims(64).similarity("dot_product")
.indexOptions(opBuilder -> opBuilder.type("hnsw").m(16).efConstruction(100))).build());
tmBuilder.properties("id", new Property.Builder().long_(pb -> pb.index(false)).build());
TypeMapping typeMapping = tmBuilder.build();
indexBuilder.mappings(typeMapping);
CreateIndexResponse createIndexResponse = esClient.indices().create(indexBuilder.build());
String resIndex = createIndexResponse.index();
Boolean acknowledged = createIndexResponse.acknowledged();
boolean b = createIndexResponse.shardsAcknowledged();
log.info("index create response for {}, acknowledged= {}, shardsAcknowledged= {}", resIndex, acknowledged, b);
}
}