Skip to content

Commit 0421cdc

Browse files
authored
Integrates KNN plugin with ConcurrentSearchRequestDecider interface (opensearch-project#2111)
This allows knn queries to enable concurrency when index.search.concurrent_segment_search.mode or search.concurrent_segment_search.mode in auto mode. Without this the default behavior of auto mode is non-concurrent search Signed-off-by: Tejas Shah <[email protected]>
1 parent ccc59b1 commit 0421cdc

File tree

6 files changed

+291
-3
lines changed

6 files changed

+291
-3
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
77
## [Unreleased 3.0](https://github.com/opensearch-project/k-NN/compare/2.x...HEAD)
88
### Features
99
### Enhancements
10+
* Adds concurrent segment search support for mode auto [#2111](https://github.com/opensearch-project/k-NN/pull/2111)
1011
### Bug Fixes
1112
* Add DocValuesProducers for releasing memory when close index [#1946](https://github.com/opensearch-project/k-NN/pull/1946)
1213
### Infrastructure

src/main/java/org/opensearch/knn/plugin/KNNPlugin.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.opensearch.index.engine.EngineFactory;
1414
import org.opensearch.indices.SystemIndexDescriptor;
1515
import org.opensearch.knn.index.KNNCircuitBreaker;
16+
import org.opensearch.knn.plugin.search.KNNConcurrentSearchRequestDecider;
1617
import org.opensearch.knn.index.util.KNNClusterUtil;
1718
import org.opensearch.knn.index.query.KNNQueryBuilder;
1819
import org.opensearch.knn.index.KNNSettings;
@@ -95,6 +96,7 @@
9596
import org.opensearch.script.ScriptContext;
9697
import org.opensearch.script.ScriptEngine;
9798
import org.opensearch.script.ScriptService;
99+
import org.opensearch.search.deciders.ConcurrentSearchRequestDecider;
98100
import org.opensearch.threadpool.ExecutorBuilder;
99101
import org.opensearch.threadpool.FixedExecutorBuilder;
100102
import org.opensearch.threadpool.ThreadPool;
@@ -349,4 +351,9 @@ public List<NamedXContentRegistry.Entry> getNamedXContent() {
349351
public Collection<SystemIndexDescriptor> getSystemIndexDescriptors(Settings settings) {
350352
return ImmutableList.of(new SystemIndexDescriptor(MODEL_INDEX_NAME, "Index for storing models used for k-NN indices"));
351353
}
354+
355+
@Override
356+
public Optional<ConcurrentSearchRequestDecider.Factory> getConcurrentSearchRequestDeciderFactory() {
357+
return Optional.of(new KNNConcurrentSearchRequestDecider.Factory());
358+
}
352359
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.knn.plugin.search;
7+
8+
import lombok.EqualsAndHashCode;
9+
import org.opensearch.index.IndexSettings;
10+
import org.opensearch.index.query.QueryBuilder;
11+
import org.opensearch.knn.index.KNNSettings;
12+
import org.opensearch.knn.index.query.KNNQueryBuilder;
13+
import org.opensearch.search.deciders.ConcurrentSearchDecision;
14+
import org.opensearch.search.deciders.ConcurrentSearchRequestDecider;
15+
16+
import java.util.Optional;
17+
18+
/**
19+
* Decides if the knn query uses concurrent segment search
20+
* As of 2.17, this is only used when
21+
* - "index.search.concurrent_segment_search.mode": "auto" or
22+
* - "search.concurrent_segment_search.mode": "auto"
23+
*
24+
* Note: the class is not thread-safe and a new instance needs to be created for each request
25+
*/
26+
@EqualsAndHashCode(callSuper = true)
27+
public class KNNConcurrentSearchRequestDecider extends ConcurrentSearchRequestDecider {
28+
29+
private static final ConcurrentSearchDecision DEFAULT_KNN_DECISION = new ConcurrentSearchDecision(
30+
ConcurrentSearchDecision.DecisionStatus.NO_OP,
31+
"Default decision"
32+
);
33+
private static final ConcurrentSearchDecision YES = new ConcurrentSearchDecision(
34+
ConcurrentSearchDecision.DecisionStatus.YES,
35+
"Enable concurrent search for knn as Query has k-NN query in it and index is k-nn index"
36+
);
37+
38+
private ConcurrentSearchDecision knnDecision = DEFAULT_KNN_DECISION;
39+
40+
@Override
41+
public void evaluateForQuery(final QueryBuilder queryBuilder, final IndexSettings indexSettings) {
42+
if (queryBuilder instanceof KNNQueryBuilder && indexSettings.getValue(KNNSettings.IS_KNN_INDEX_SETTING)) {
43+
knnDecision = YES;
44+
} else {
45+
knnDecision = DEFAULT_KNN_DECISION;
46+
}
47+
}
48+
49+
@Override
50+
public ConcurrentSearchDecision getConcurrentSearchDecision() {
51+
return knnDecision;
52+
}
53+
54+
/**
55+
* Returns {@link KNNConcurrentSearchRequestDecider} when index.knn is true
56+
*/
57+
public static class Factory implements ConcurrentSearchRequestDecider.Factory {
58+
public Optional<ConcurrentSearchRequestDecider> create(final IndexSettings indexSettings) {
59+
if (indexSettings.getValue(KNNSettings.IS_KNN_INDEX_SETTING)) {
60+
return Optional.of(new KNNConcurrentSearchRequestDecider());
61+
}
62+
return Optional.empty();
63+
}
64+
}
65+
}
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.knn.integ.search;
7+
8+
import com.google.common.primitives.Floats;
9+
import lombok.SneakyThrows;
10+
import org.apache.hc.core5.http.io.entity.EntityUtils;
11+
import org.junit.BeforeClass;
12+
import org.opensearch.client.Response;
13+
import org.opensearch.common.settings.Settings;
14+
import org.opensearch.core.xcontent.XContentBuilder;
15+
import org.opensearch.knn.KNNJsonIndexMappingsBuilder;
16+
import org.opensearch.knn.KNNRestTestCase;
17+
import org.opensearch.knn.KNNResult;
18+
import org.opensearch.knn.TestUtils;
19+
import org.opensearch.knn.index.SpaceType;
20+
import org.opensearch.knn.index.engine.KNNEngine;
21+
import org.opensearch.knn.index.query.KNNQueryBuilder;
22+
import org.opensearch.knn.plugin.script.KNNScoringUtil;
23+
24+
import java.io.IOException;
25+
import java.net.URL;
26+
import java.util.List;
27+
import java.util.Map;
28+
import java.util.TreeMap;
29+
30+
import static org.opensearch.knn.common.KNNConstants.METHOD_HNSW;
31+
32+
/**
33+
* Note that this is simply a sanity test to make sure that concurrent search code path is hit E2E and scores are intact
34+
* There is no latency verification as it can be better encapsulated in nightly runs.
35+
*/
36+
public class ConcurrentSegmentSearchIT extends KNNRestTestCase {
37+
38+
static TestUtils.TestData testData;
39+
40+
@BeforeClass
41+
public static void setUpClass() throws IOException {
42+
if (ConcurrentSegmentSearchIT.class.getClassLoader() == null) {
43+
throw new IllegalStateException("ClassLoader of ConcurrentSegmentSearchIT Class is null");
44+
}
45+
URL testIndexVectors = ConcurrentSegmentSearchIT.class.getClassLoader().getResource("data/test_vectors_1000x128.json");
46+
URL testQueries = ConcurrentSegmentSearchIT.class.getClassLoader().getResource("data/test_queries_100x128.csv");
47+
assert testIndexVectors != null;
48+
assert testQueries != null;
49+
testData = new TestUtils.TestData(testIndexVectors.getPath(), testQueries.getPath());
50+
}
51+
52+
@SneakyThrows
53+
public void testConcurrentSegmentSearch_thenSucceed() {
54+
String indexName = "test-concurrent-segment";
55+
String fieldName = "test-field-1";
56+
int dimension = testData.indexData.vectors[0].length;
57+
final XContentBuilder indexBuilder = createFaissHnswIndexMapping(fieldName, dimension);
58+
Map<String, Object> mappingMap = xContentBuilderToMap(indexBuilder);
59+
String mapping = indexBuilder.toString();
60+
createKnnIndex(indexName, mapping);
61+
assertEquals(new TreeMap<>(mappingMap), new TreeMap<>(getIndexMappingAsMap(indexName)));
62+
63+
// Index the test data
64+
for (int i = 0; i < testData.indexData.docs.length; i++) {
65+
addKnnDoc(
66+
indexName,
67+
Integer.toString(testData.indexData.docs[i]),
68+
fieldName,
69+
Floats.asList(testData.indexData.vectors[i]).toArray()
70+
);
71+
}
72+
refreshAllNonSystemIndices();
73+
updateIndexSettings(indexName, Settings.builder().put("index.search.concurrent_segment_search.mode", "auto"));
74+
75+
// Test search queries
76+
int k = 10;
77+
verifySearch(indexName, fieldName, k);
78+
79+
updateIndexSettings(indexName, Settings.builder().put("index.search.concurrent_segment_search.mode", "all"));
80+
verifySearch(indexName, fieldName, k);
81+
82+
deleteKNNIndex(indexName);
83+
}
84+
85+
/*
86+
{
87+
"properties": {
88+
"<fieldName>": {
89+
"type": "knn_vector",
90+
"dimension": <dimension>,
91+
"method": {
92+
"name": "hnsw",
93+
"space_type": "l2",
94+
"engine": "faiss",
95+
"parameters": {
96+
"m": 16,
97+
"ef_construction": 128,
98+
"ef_search": 128
99+
}
100+
}
101+
}
102+
}
103+
*/
104+
@SneakyThrows
105+
private XContentBuilder createFaissHnswIndexMapping(String fieldName, int dimension) {
106+
return KNNJsonIndexMappingsBuilder.builder()
107+
.fieldName(fieldName)
108+
.dimension(dimension)
109+
.method(
110+
KNNJsonIndexMappingsBuilder.Method.builder()
111+
.engine(KNNEngine.FAISS.getName())
112+
.methodName(METHOD_HNSW)
113+
.spaceType(SpaceType.L2.getValue())
114+
.parameters(KNNJsonIndexMappingsBuilder.Method.Parameters.builder().efConstruction(128).efSearch(128).m(16).build())
115+
.build()
116+
)
117+
.build()
118+
.getIndexMappingBuilder();
119+
}
120+
121+
@SneakyThrows
122+
private void verifySearch(String indexName, String fieldName, int k) {
123+
for (int i = 0; i < testData.queries.length; i++) {
124+
final KNNQueryBuilder queryBuilder = KNNQueryBuilder.builder().fieldName(fieldName).vector(testData.queries[i]).k(k).build();
125+
Response response = searchKNNIndex(indexName, queryBuilder, k);
126+
String responseBody = EntityUtils.toString(response.getEntity());
127+
List<KNNResult> knnResults = parseSearchResponse(responseBody, fieldName);
128+
assertEquals(k, knnResults.size());
129+
130+
List<Float> actualScores = parseSearchResponseScore(responseBody, fieldName);
131+
for (int j = 0; j < k; j++) {
132+
float[] primitiveArray = knnResults.get(j).getVector();
133+
assertEquals(
134+
KNNEngine.FAISS.score(KNNScoringUtil.l2Squared(testData.queries[i], primitiveArray), SpaceType.L2),
135+
actualScores.get(j),
136+
0.0001
137+
);
138+
}
139+
}
140+
}
141+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.knn.plugin.search;
7+
8+
import org.opensearch.index.IndexSettings;
9+
import org.opensearch.index.query.MatchAllQueryBuilder;
10+
import org.opensearch.knn.KNNTestCase;
11+
import org.opensearch.knn.index.KNNSettings;
12+
import org.opensearch.knn.index.query.KNNQueryBuilder;
13+
import org.opensearch.search.deciders.ConcurrentSearchDecision;
14+
15+
import static org.mockito.Mockito.mock;
16+
import static org.mockito.Mockito.when;
17+
18+
public class KNNConcurrentSearchRequestDeciderTests extends KNNTestCase {
19+
20+
public void testDecider_thenSucceed() {
21+
ConcurrentSearchDecision noop = new ConcurrentSearchDecision(ConcurrentSearchDecision.DecisionStatus.NO_OP, "Default decision");
22+
23+
KNNConcurrentSearchRequestDecider decider = new KNNConcurrentSearchRequestDecider();
24+
assertDecision(noop, decider.getConcurrentSearchDecision());
25+
IndexSettings indexSettingsMock = mock(IndexSettings.class);
26+
when(indexSettingsMock.getValue(KNNSettings.IS_KNN_INDEX_SETTING)).thenReturn(Boolean.FALSE);
27+
28+
// Non KNNQueryBuilder
29+
decider.evaluateForQuery(new MatchAllQueryBuilder(), indexSettingsMock);
30+
assertDecision(noop, decider.getConcurrentSearchDecision());
31+
decider.evaluateForQuery(
32+
KNNQueryBuilder.builder().vector(new float[] { 1f, 2f, 3f, 4f, 5f, 6f }).fieldName("decider").k(10).build(),
33+
indexSettingsMock
34+
);
35+
assertDecision(noop, decider.getConcurrentSearchDecision());
36+
37+
when(indexSettingsMock.getValue(KNNSettings.IS_KNN_INDEX_SETTING)).thenReturn(Boolean.TRUE);
38+
decider.evaluateForQuery(
39+
KNNQueryBuilder.builder().vector(new float[] { 1f, 2f, 3f, 4f, 5f, 6f }).fieldName("decider").k(10).build(),
40+
indexSettingsMock
41+
);
42+
ConcurrentSearchDecision yes = new ConcurrentSearchDecision(
43+
ConcurrentSearchDecision.DecisionStatus.YES,
44+
"Enable concurrent search for knn as Query has k-NN query in it and index is k-nn index"
45+
);
46+
assertDecision(yes, decider.getConcurrentSearchDecision());
47+
48+
decider.evaluateForQuery(new MatchAllQueryBuilder(), indexSettingsMock);
49+
assertDecision(noop, decider.getConcurrentSearchDecision());
50+
}
51+
52+
public void testDeciderFactory_thenSucceed() {
53+
KNNConcurrentSearchRequestDecider.Factory factory = new KNNConcurrentSearchRequestDecider.Factory();
54+
IndexSettings indexSettingsMock = mock(IndexSettings.class);
55+
when(indexSettingsMock.getValue(KNNSettings.IS_KNN_INDEX_SETTING)).thenReturn(Boolean.TRUE);
56+
assertNotSame(factory.create(indexSettingsMock).get(), factory.create(indexSettingsMock).get());
57+
when(indexSettingsMock.getValue(KNNSettings.IS_KNN_INDEX_SETTING)).thenReturn(Boolean.FALSE);
58+
assertTrue(factory.create(indexSettingsMock).isEmpty());
59+
}
60+
61+
private void assertDecision(ConcurrentSearchDecision expected, ConcurrentSearchDecision actual) {
62+
assertEquals(expected.getDecisionReason(), actual.getDecisionReason());
63+
assertEquals(expected.getDecisionStatus(), actual.getDecisionStatus());
64+
}
65+
}

src/testFixtures/java/org/opensearch/knn/KNNJsonIndexMappingsBuilder.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import lombok.NonNull;
1010
import org.opensearch.common.xcontent.XContentFactory;
1111
import org.opensearch.core.xcontent.XContentBuilder;
12+
import org.opensearch.knn.common.KNNConstants;
1213

1314
import java.io.IOException;
1415

@@ -26,7 +27,7 @@ public class KNNJsonIndexMappingsBuilder {
2627
private String vectorDataType;
2728
private Method method;
2829

29-
public String getIndexMapping() throws IOException {
30+
public XContentBuilder getIndexMappingBuilder() throws IOException {
3031
if (nestedFieldName != null) {
3132
XContentBuilder xContentBuilder = XContentFactory.jsonBuilder()
3233
.startObject()
@@ -40,7 +41,7 @@ public String getIndexMapping() throws IOException {
4041
addVectorDataType(xContentBuilder);
4142
addMethod(xContentBuilder);
4243
xContentBuilder.endObject().endObject().endObject().endObject().endObject();
43-
return xContentBuilder.toString();
44+
return xContentBuilder;
4445
} else {
4546
XContentBuilder xContentBuilder = XContentFactory.jsonBuilder()
4647
.startObject()
@@ -51,10 +52,14 @@ public String getIndexMapping() throws IOException {
5152
addVectorDataType(xContentBuilder);
5253
addMethod(xContentBuilder);
5354
xContentBuilder.endObject().endObject().endObject();
54-
return xContentBuilder.toString();
55+
return xContentBuilder;
5556
}
5657
}
5758

59+
public String getIndexMapping() throws IOException {
60+
return getIndexMappingBuilder().toString();
61+
}
62+
5863
private void addVectorDataType(final XContentBuilder xContentBuilder) throws IOException {
5964
if (vectorDataType == null) {
6065
return;
@@ -104,6 +109,7 @@ public static class Parameters {
104109
private Encoder encoder;
105110
private Integer efConstruction;
106111
private Integer efSearch;
112+
private Integer m;
107113

108114
private void addTo(final XContentBuilder xContentBuilder) throws IOException {
109115
xContentBuilder.startObject("parameters");
@@ -113,6 +119,9 @@ private void addTo(final XContentBuilder xContentBuilder) throws IOException {
113119
if (efSearch != null) {
114120
xContentBuilder.field("ef_search", efSearch);
115121
}
122+
if (m != null) {
123+
xContentBuilder.field(KNNConstants.METHOD_PARAMETER_M, m);
124+
}
116125
addEncoder(xContentBuilder);
117126
xContentBuilder.endObject();
118127
}

0 commit comments

Comments
 (0)