Skip to content

Commit 54ee016

Browse files
committed
Add improvements for opensearch engine and updated the docker compose.
Signed-off-by: Navneet Verma <[email protected]>
1 parent ea53db4 commit 54ee016

File tree

7 files changed

+209
-53
lines changed

7 files changed

+209
-53
lines changed

Diff for: engine/clients/opensearch/configure.py

+37-17
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
from opensearchpy import NotFoundError, OpenSearch
22

33
from benchmark.dataset import Dataset
4-
from engine.base_client import IncompatibilityError
54
from engine.base_client.configure import BaseConfigurator
65
from engine.base_client.distances import Distance
76
from engine.clients.opensearch.config import (
@@ -10,6 +9,7 @@
109
OPENSEARCH_PORT,
1110
OPENSEARCH_USER,
1211
)
12+
from engine.clients.opensearch.utils import get_index_thread_qty
1313

1414

1515
class OpenSearchConfigurator(BaseConfigurator):
@@ -40,28 +40,37 @@ def __init__(self, host, collection_params: dict, connection_params: dict):
4040
)
4141

4242
def clean(self):
43-
try:
43+
is_index_available = self.client.indices.exists(index=OPENSEARCH_INDEX,
44+
params={
45+
"timeout": 300,
46+
})
47+
if(is_index_available):
48+
print(f"Deleting index: {OPENSEARCH_INDEX}, as it is already present")
4449
self.client.indices.delete(
4550
index=OPENSEARCH_INDEX,
4651
params={
4752
"timeout": 300,
4853
},
4954
)
50-
except NotFoundError:
51-
pass
55+
5256

5357
def recreate(self, dataset: Dataset, collection_params):
54-
if dataset.config.distance == Distance.DOT:
55-
raise IncompatibilityError
56-
if dataset.config.vector_size > 1024:
57-
raise IncompatibilityError
58+
self._update_cluster_settings()
59+
distance = self.DISTANCE_MAPPING[dataset.config.distance]
60+
if dataset.config.distance == Distance.COSINE:
61+
distance = self.DISTANCE_MAPPING[Distance.DOT]
62+
print(f"Using distance type: {distance} as dataset distance is : {dataset.config.distance}")
5863

5964
self.client.indices.create(
6065
index=OPENSEARCH_INDEX,
6166
body={
6267
"settings": {
6368
"index": {
6469
"knn": True,
70+
"refresh_interval": -1,
71+
"number_of_replicas": 0 if collection_params.get("number_of_replicas") == None else collection_params.get("number_of_replicas"),
72+
"number_of_shards": 1 if collection_params.get("number_of_shards") == None else collection_params.get("number_of_shards"),
73+
"knn.advanced.approximate_threshold": "-1"
6574
}
6675
},
6776
"mappings": {
@@ -72,18 +81,13 @@ def recreate(self, dataset: Dataset, collection_params):
7281
"method": {
7382
**{
7483
"name": "hnsw",
75-
"engine": "lucene",
76-
"space_type": self.DISTANCE_MAPPING[
77-
dataset.config.distance
78-
],
79-
"parameters": {
80-
"m": 16,
81-
"ef_construction": 100,
82-
},
84+
"engine": "faiss",
85+
"space_type": distance,
86+
**collection_params.get("method")
8387
},
84-
**collection_params.get("method"),
8588
},
8689
},
90+
# this doesn't work for nmslib, we need see what to do here, may be remove them
8791
**self._prepare_fields_config(dataset),
8892
}
8993
},
@@ -94,6 +98,16 @@ def recreate(self, dataset: Dataset, collection_params):
9498
cluster_manager_timeout="5m",
9599
)
96100

101+
def _update_cluster_settings(self):
102+
index_thread_qty = get_index_thread_qty(self.client)
103+
cluster_settings_body = {
104+
"persistent": {
105+
"knn.memory.circuit_breaker.limit": "75%", # putting a higher value to ensure that even with small cluster the latencies for vector search are good
106+
"knn.algo_param.index_thread_qty": index_thread_qty
107+
}
108+
}
109+
self.client.cluster.put_settings(cluster_settings_body)
110+
97111
def _prepare_fields_config(self, dataset: Dataset):
98112
return {
99113
field_name: {
@@ -104,3 +118,9 @@ def _prepare_fields_config(self, dataset: Dataset):
104118
}
105119
for field_name, field_type in dataset.config.schema.items()
106120
}
121+
122+
def execution_params(self, distance, vector_size) -> dict:
123+
# normalize the vectors if cosine similarity is there.
124+
if distance == Distance.COSINE:
125+
return {"normalize": "true"}
126+
return {}

Diff for: engine/clients/opensearch/search.py

+14-5
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from opensearchpy import OpenSearch
66

77
from dataset_reader.base_reader import Query
8+
from engine.base_client.distances import Distance
89
from engine.base_client.search import BaseSearcher
910
from engine.clients.opensearch.config import (
1011
OPENSEARCH_INDEX,
@@ -13,6 +14,7 @@
1314
OPENSEARCH_USER,
1415
)
1516
from engine.clients.opensearch.parser import OpenSearchConditionParser
17+
import numpy as np
1618

1719

1820
class ClosableOpenSearch(OpenSearch):
@@ -44,6 +46,7 @@ def init_client(cls, host, distance, connection_params: dict, search_params: dic
4446
basic_auth=(OPENSEARCH_USER, OPENSEARCH_PASSWORD),
4547
**init_params,
4648
)
49+
cls.distance = distance
4750
cls.search_params = search_params
4851

4952
@classmethod
@@ -53,6 +56,9 @@ def search_one(cls, query: Query, top: int) -> List[Tuple[int, float]]:
5356
"vector": {
5457
"vector": query.vector,
5558
"k": top,
59+
"method_parameters" : {
60+
"ef_search": cls.search_params["config"]["ef_search"] # ef_search parameter is added in the query time
61+
}
5662
}
5763
}
5864
}
@@ -70,15 +76,18 @@ def search_one(cls, query: Query, top: int) -> List[Tuple[int, float]]:
7076
params={
7177
"timeout": 60,
7278
},
79+
_source=False,
80+
docvalue_fields=["_id"],
81+
stored_fields="_none_",
7382
)
83+
7484
return [
75-
(uuid.UUID(hex=hit["_id"]).int, hit["_score"])
85+
(uuid.UUID(hex=hit["fields"]["_id"][0]).int, hit["_score"])
7686
for hit in res["hits"]["hits"]
7787
]
7888

7989
@classmethod
8090
def setup_search(cls):
81-
if cls.search_params:
82-
cls.client.indices.put_settings(
83-
body=cls.search_params["config"], index=OPENSEARCH_INDEX
84-
)
91+
# Load the graphs in memory
92+
warmup_endpoint = f'/_plugins/_knn/warmup/{OPENSEARCH_INDEX}'
93+
cls.client.transport.perform_request('GET', warmup_endpoint)

Diff for: engine/clients/opensearch/upload.py

+37-6
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,20 @@
11
import multiprocessing as mp
22
import uuid
3+
import time
34
from typing import List
45

56
from opensearchpy import OpenSearch
67

78
from dataset_reader.base_reader import Record
9+
from engine.base_client.distances import Distance
810
from engine.base_client.upload import BaseUploader
911
from engine.clients.opensearch.config import (
1012
OPENSEARCH_INDEX,
1113
OPENSEARCH_PASSWORD,
1214
OPENSEARCH_PORT,
1315
OPENSEARCH_USER,
1416
)
17+
from engine.clients.opensearch.utils import get_index_thread_qty_for_force_merge, update_force_merge_threads
1518

1619

1720
class ClosableOpenSearch(OpenSearch):
@@ -62,10 +65,38 @@ def upload_batch(cls, batch: List[Record]):
6265

6366
@classmethod
6467
def post_upload(cls, _distance):
65-
cls.client.indices.forcemerge(
66-
index=OPENSEARCH_INDEX,
67-
params={
68-
"timeout": 300,
69-
},
70-
)
68+
# ensuring that index is refreshed before force merge
69+
cls._refresh_index()
70+
cls._update_vector_threshold_setting()
71+
cls._force_merge_index()
72+
# ensuring that only force merged segments are remaining
73+
cls._refresh_index()
7174
return {}
75+
76+
@classmethod
77+
def _refresh_index(cls):
78+
print(f"Refreshing index: {OPENSEARCH_INDEX}")
79+
params={"timeout": 300}
80+
cls.client.indices.refresh(index=OPENSEARCH_INDEX, params=params)
81+
82+
@classmethod
83+
def _update_vector_threshold_setting(cls):
84+
body = {
85+
# ensure that approximate graph creation is enabled
86+
"index.knn.advanced.approximate_threshold": "0"
87+
}
88+
cls.client.indices.put_settings(index=OPENSEARCH_INDEX, body=body)
89+
90+
@classmethod
91+
def _force_merge_index(cls):
92+
index_thread_qty = get_index_thread_qty_for_force_merge(cls.client)
93+
update_force_merge_threads(client = cls.client, index_thread_qty = index_thread_qty)
94+
force_merge_endpoint = f'/{OPENSEARCH_INDEX}/_forcemerge?max_num_segments=1&wait_for_completion=false'
95+
force_merge_task_id = cls.client.transport.perform_request('POST', force_merge_endpoint)['task']
96+
SECONDS_WAITING_FOR_FORCE_MERGE_API_CALL_SEC = 30
97+
print(f"Starting force merge on index: {OPENSEARCH_INDEX}, task_id: {force_merge_task_id}")
98+
while True:
99+
time.sleep(SECONDS_WAITING_FOR_FORCE_MERGE_API_CALL_SEC)
100+
task_status = cls.client.tasks.get(task_id=force_merge_task_id)
101+
if task_status['completed']:
102+
break

Diff for: engine/clients/opensearch/utils.py

+85
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
from opensearchpy import OpenSearch
2+
3+
def get_index_thread_qty_for_force_merge(client: OpenSearch):
4+
processors_per_node = get_cores_for_data_nodes(client=client)
5+
# since during force merge only 1 shard will be doing the merge we can be aggressive in parallelization factor
6+
index_thread_qty = max(1, processors_per_node // 2)
7+
print(f"Index thread qty for force merge: {index_thread_qty}")
8+
return index_thread_qty
9+
10+
def get_index_thread_qty(client: OpenSearch):
11+
processors_per_node = get_cores_for_data_nodes(client=client)
12+
# since during index more than 1 shard will be doing indexing, we are becoming conservative in parallelization factor
13+
index_thread_qty = max(1, processors_per_node // 8)
14+
print(f"Index thread qty for indexing: {index_thread_qty}")
15+
return index_thread_qty
16+
17+
18+
def get_cores_for_data_nodes(client: OpenSearch):
19+
# Sample nodes info response which is getting parsed.
20+
# {
21+
# "nodes": {
22+
# "Or9Nm4UJR3-gcMOGwJhHHQ": {
23+
# "roles": [
24+
# "data",
25+
# "ingest",
26+
# "master",
27+
# "remote_cluster_client"
28+
# ],
29+
# "os": {
30+
# "refresh_interval_in_millis": 1000,
31+
# "available_processors": 8,
32+
# "allocated_processors": 8
33+
# }
34+
# },
35+
# "A-cqbeekROeR3kzKhOXpRw": {
36+
# "roles": [
37+
# "data",
38+
# "ingest",
39+
# "master",
40+
# "remote_cluster_client"
41+
# ],
42+
# "os": {
43+
# "refresh_interval_in_millis": 1000,
44+
# "available_processors": 8,
45+
# "allocated_processors": 8
46+
# }
47+
# },
48+
# "FrDs-vOMQ8yDZ0HEkDwRHA": {
49+
# "roles": [
50+
# "data",
51+
# "ingest",
52+
# "master",
53+
# "remote_cluster_client"
54+
# ],
55+
# "os": {
56+
# "refresh_interval_in_millis": 1000,
57+
# "available_processors": 8,
58+
# "allocated_processors": 8
59+
# }
60+
# }
61+
# }
62+
# }
63+
64+
nodes_stats_res = client.nodes.info(filter_path="nodes.*.roles,nodes.*.os")
65+
nodes_data = nodes_stats_res.get("nodes")
66+
data_node_count = 0
67+
total_processors = 0
68+
for node_id in nodes_data:
69+
node_info = nodes_data.get(node_id)
70+
roles = node_info["roles"]
71+
os_info = node_info["os"]
72+
if 'data' in roles:
73+
data_node_count += 1
74+
total_processors += int(os_info['allocated_processors'])
75+
processors_per_node = total_processors // data_node_count
76+
return processors_per_node
77+
78+
79+
def update_force_merge_threads(client: OpenSearch, index_thread_qty=1):
80+
cluster_settings_body = {
81+
"persistent": {
82+
"knn.algo_param.index_thread_qty": index_thread_qty
83+
}
84+
}
85+
client.cluster.put_settings(cluster_settings_body)

Diff for: engine/servers/opensearch-single-node-ci/docker-compose.yaml

+4-2
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@ version: '3.5'
22

33
services:
44
opensearch:
5-
image: opensearchproject/opensearch:2.10.0
5+
image: opensearchproject/opensearch:2.17.1
66
environment:
77
discovery.type: "single-node"
8-
plugins.security.disabled: true
8+
DISABLE_SECURITY_PLUGIN: true
9+
DISABLE_INSTALL_DEMO_CONFIG: true
10+
bootstrap.memory_lock: true
911
OPENSEARCH_JAVA_OPTS: "-Xms2g -Xmx2g"
1012
ports:
1113
- "9200:9200"

Diff for: engine/servers/opensearch-single-node/docker-compose.yaml

+11-2
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,23 @@ version: '3.5'
22

33
services:
44
opensearch:
5-
image: opensearchproject/opensearch:2.10.0
5+
image: opensearchproject/opensearch:2.18.0
66
environment:
77
discovery.type: "single-node"
8-
plugins.security.disabled: true
8+
DISABLE_SECURITY_PLUGIN: true
9+
DISABLE_INSTALL_DEMO_CONFIG: true
10+
bootstrap.memory_lock: true
911
OPENSEARCH_JAVA_OPTS: "-Xms4g -Xmx4g"
1012
ports:
1113
- "9200:9200"
1214
- "9300:9300"
15+
ulimits:
16+
memlock:
17+
soft: -1 # Set memlock to unlimited (no soft or hard limit)
18+
hard: -1
19+
nofile:
20+
soft: 65536 # Maximum number of open files for the opensearch user - set to at least 65536
21+
hard: 65536
1322
logging:
1423
driver: "json-file"
1524
options:

0 commit comments

Comments
 (0)