Skip to content

Commit 0401483

Browse files
committed
Add index operation listener to update translog source
Adds an index operation listener to update the source to match the source getting stored to the stored fields. This ensures that on certain recovery events, we can be sure that duplicate operations do not cause conflicts. Along with this, updated integration tests to be more robust. Signed-off-by: John Mazanec <[email protected]>
1 parent 7f1af5a commit 0401483

File tree

14 files changed

+1115
-1618
lines changed

14 files changed

+1115
-1618
lines changed

.github/workflows/backwards_compatibility_tests_workflow.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ jobs:
3535
matrix:
3636
java: [ 21 ]
3737
os: [ubuntu-latest]
38-
bwc_version : [ "2.0.1", "2.1.0", "2.2.1", "2.3.0", "2.4.1", "2.5.0", "2.6.0", "2.7.0", "2.8.0", "2.9.0", "2.10.0", "2.11.0", "2.12.0", "2.13.0", "2.14.0", "2.15.0", "2.16.0", "2.17.0", "2.18.0","2.19.0-SNAPSHOT", "2.20.0-SNAPSHOT"]
38+
bwc_version : [ "2.0.1", "2.1.0", "2.2.1", "2.3.0", "2.4.1", "2.5.0", "2.6.0", "2.7.0", "2.8.0", "2.9.0", "2.10.0", "2.11.0", "2.12.0", "2.13.0", "2.14.0", "2.15.0", "2.16.0", "2.17.0", "2.18.0","2.19.1", "2.20.0-SNAPSHOT"]
3939
opensearch_version : [ "3.0.0-SNAPSHOT" ]
4040
exclude:
4141
- os: windows-latest

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ Add filter function to KNNQueryBuilder with unit tests and integration tests [#2
1212
### Enhancements
1313
### Bug Fixes
1414
* Fixing bug to prevent NullPointerException while doing PUT mappings [#2556](https://github.com/opensearch-project/k-NN/issues/2556)
15+
* Add index operation listener to update translog source [#2629](https://github.com/opensearch-project/k-NN/pull/2629)
1516
### Infrastructure
1617
### Documentation
1718
### Maintenance

qa/restart-upgrade/build.gradle

+2-1
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,8 @@ testClusters {
146146
knn_bwc_version.startsWith("2.15.") ||
147147
knn_bwc_version.startsWith("2.16.") ||
148148
knn_bwc_version.startsWith("2.17.") ||
149-
knn_bwc_version.startsWith("2.18.")) {
149+
knn_bwc_version.startsWith("2.18.") ||
150+
knn_bwc_version.startsWith("2.19.")) {
150151
filter {
151152
excludeTestsMatching "org.opensearch.knn.bwc.DerivedSourceBWCRestartIT"
152153
}

qa/restart-upgrade/src/test/java/org/opensearch/knn/bwc/DerivedSourceBWCRestartIT.java

+23-371
Large diffs are not rendered by default.

src/main/java/org/opensearch/knn/index/codec/KNN10010Codec/KNN10010DerivedSourceStoredFieldsWriter.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public KNN10010DerivedSourceStoredFieldsWriter(StoredFieldsWriter delegate, List
4848
if (vectorFieldTypes.isEmpty() == false) {
4949
this.vectorMask = XContentMapValues.transform(
5050
vectorFieldTypes.stream().collect(Collectors.toMap(k -> k, k -> (Object o) -> o == null ? o : MASK)),
51-
false
51+
true
5252
);
5353
} else {
5454
this.vectorMask = null;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.knn.index.codec.derivedsource;
7+
8+
import lombok.extern.log4j.Log4j2;
9+
import org.apache.lucene.document.KnnByteVectorField;
10+
import org.apache.lucene.document.KnnFloatVectorField;
11+
import org.apache.lucene.index.IndexableField;
12+
import org.apache.lucene.util.BytesRef;
13+
import org.opensearch.common.collect.Tuple;
14+
import org.opensearch.common.io.stream.BytesStreamOutput;
15+
import org.opensearch.common.xcontent.XContentHelper;
16+
import org.opensearch.common.xcontent.XContentType;
17+
import org.opensearch.common.xcontent.support.XContentMapValues;
18+
import org.opensearch.core.index.shard.ShardId;
19+
import org.opensearch.core.xcontent.MediaType;
20+
import org.opensearch.core.xcontent.MediaTypeRegistry;
21+
import org.opensearch.core.xcontent.XContentBuilder;
22+
import org.opensearch.index.engine.Engine;
23+
import org.opensearch.index.mapper.ParseContext;
24+
import org.opensearch.index.shard.IndexingOperationListener;
25+
import org.opensearch.knn.index.VectorDataType;
26+
import org.opensearch.knn.index.mapper.KNNVectorFieldMapperUtil;
27+
28+
import java.io.IOException;
29+
import java.util.ArrayList;
30+
import java.util.HashMap;
31+
import java.util.Iterator;
32+
import java.util.List;
33+
import java.util.Locale;
34+
import java.util.Map;
35+
import java.util.function.Function;
36+
37+
import static org.opensearch.index.mapper.SourceFieldMapper.RECOVERY_SOURCE_NAME;
38+
39+
/**
40+
* Before applying the indexing operation, we need to ensure that the source that gets added to the translog matches
41+
* exactly what we will reconstruct. To do this, we reconstruct the source from the binary source and then apply the
42+
* transformation on top of it and then set the source back.
43+
*/
44+
@Log4j2
45+
public class DerivedSourceIndexOperationListener implements IndexingOperationListener {
46+
47+
@Override
48+
public Engine.Index preIndex(ShardId shardId, Engine.Index operation) {
49+
// If recovery source is enabled, we do not need to modify the translog source. The recovery source will be the
50+
// original, user provided source
51+
if (isRecoverySourceEnabled(operation)) {
52+
return operation;
53+
}
54+
Tuple<? extends MediaType, Map<String, Object>> originalSource = XContentHelper.convertToMap(
55+
operation.parsedDoc().source(),
56+
true,
57+
operation.parsedDoc().getMediaType()
58+
);
59+
Map<String, Object> derivedSource = createInjectTransformer(operation).apply(originalSource.v2());
60+
61+
try (BytesStreamOutput bStream = new BytesStreamOutput();) {
62+
XContentBuilder builder = MediaTypeRegistry.contentBuilder(originalSource.v1(), bStream).map(derivedSource);
63+
builder.close();
64+
operation.parsedDoc().setSource(bStream.bytes(), XContentType.valueOf(originalSource.v1().subtype().toUpperCase(Locale.ROOT)));
65+
} catch (IOException e) {
66+
throw new RuntimeException(e);
67+
}
68+
return operation;
69+
}
70+
71+
private Function<Map<String, Object>, Map<String, Object>> createInjectTransformer(Engine.Index operation) {
72+
Map<String, List<Object>> injectedVectors = new HashMap<>();
73+
74+
// For each document, we get the relevant vector fields to compute the injection logic
75+
for (ParseContext.Document document : operation.parsedDoc().docs()) {
76+
for (Iterator<IndexableField> it = document.iterator(); it.hasNext();) {
77+
IndexableField indexableField = it.next();
78+
if (indexableField instanceof KnnFloatVectorField knnVectorFieldType) {
79+
injectedVectors.computeIfAbsent(indexableField.name(), k -> new ArrayList<>())
80+
.add(formatVector(VectorDataType.FLOAT, knnVectorFieldType.vectorValue()));
81+
}
82+
83+
if (indexableField instanceof KnnByteVectorField knnByteVectorField) {
84+
injectedVectors.computeIfAbsent(indexableField.name(), k -> new ArrayList<>())
85+
.add(formatVector(VectorDataType.BYTE, knnByteVectorField.vectorValue()));
86+
}
87+
}
88+
}
89+
Map<String, Function<Object, Object>> injectTransformers = new HashMap<>();
90+
for (Map.Entry<String, List<Object>> entry : injectedVectors.entrySet()) {
91+
Iterator<Object> iterator = entry.getValue().iterator();
92+
injectTransformers.put(entry.getKey(), (Object o) -> o == null ? o : iterator.next());
93+
}
94+
return XContentMapValues.transform(injectTransformers, true);
95+
}
96+
97+
private boolean isRecoverySourceEnabled(Engine.Index operation) {
98+
return operation.parsedDoc().rootDoc().getField(RECOVERY_SOURCE_NAME) != null;
99+
}
100+
101+
protected Object formatVector(VectorDataType vectorDataType, Object vectorValue) {
102+
if (vectorValue instanceof byte[]) {
103+
BytesRef vectorBytesRef = new BytesRef((byte[]) vectorValue);
104+
return KNNVectorFieldMapperUtil.deserializeStoredVector(vectorBytesRef, vectorDataType);
105+
}
106+
return vectorValue;
107+
}
108+
}

src/main/java/org/opensearch/knn/index/codec/derivedsource/DerivedSourceVectorTransformer.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public DerivedSourceVectorTransformer(
5959
perFieldDerivedVectorTransformers.put(derivedFieldInfo.name(), perFieldDerivedVectorTransformer);
6060
perFieldDerivedVectorTransformersFunctionValues.put(derivedFieldInfo.name(), perFieldDerivedVectorTransformer);
6161
}
62-
derivedSourceVectorTransformer = XContentMapValues.transform(perFieldDerivedVectorTransformersFunctionValues, false);
62+
derivedSourceVectorTransformer = XContentMapValues.transform(perFieldDerivedVectorTransformersFunctionValues, true);
6363
derivedSourceLuceneHelper = new DerivedSourceLuceneHelper(derivedSourceReaders, segmentReadState);
6464
}
6565

src/main/java/org/opensearch/knn/plugin/KNNPlugin.java

+4
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.opensearch.knn.index.KNNCircuitBreaker;
3636
import org.opensearch.knn.index.KNNSettings;
3737
import org.opensearch.knn.index.codec.KNNCodecService;
38+
import org.opensearch.knn.index.codec.derivedsource.DerivedSourceIndexOperationListener;
3839
import org.opensearch.knn.index.codec.nativeindex.NativeIndexBuildStrategyFactory;
3940
import org.opensearch.knn.index.mapper.KNNVectorFieldMapper;
4041
import org.opensearch.knn.index.memory.NativeMemoryCacheManager;
@@ -304,6 +305,9 @@ public Optional<CodecServiceFactory> getCustomCodecServiceFactory(IndexSettings
304305
@Override
305306
public void onIndexModule(IndexModule indexModule) {
306307
KNNSettings.state().onIndexModule(indexModule);
308+
if (KNNSettings.isKNNDerivedSourceEnabled(indexModule.getSettings())) {
309+
indexModule.addIndexOperationListener(new DerivedSourceIndexOperationListener());
310+
}
307311
}
308312

309313
/**
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.knn.index.codec.derivedsource;
7+
8+
import org.apache.lucene.document.KnnFloatVectorField;
9+
import org.apache.lucene.index.Term;
10+
import org.opensearch.common.collect.Tuple;
11+
import org.opensearch.common.io.stream.BytesStreamOutput;
12+
import org.opensearch.common.xcontent.XContentHelper;
13+
import org.opensearch.common.xcontent.XContentType;
14+
import org.opensearch.core.common.bytes.BytesReference;
15+
import org.opensearch.core.xcontent.MediaType;
16+
import org.opensearch.core.xcontent.MediaTypeRegistry;
17+
import org.opensearch.core.xcontent.XContentBuilder;
18+
import org.opensearch.index.engine.Engine;
19+
import org.opensearch.index.mapper.ParseContext;
20+
import org.opensearch.index.mapper.ParsedDocument;
21+
import org.opensearch.knn.KNNTestCase;
22+
23+
import java.util.ArrayList;
24+
import java.util.List;
25+
import java.util.Map;
26+
27+
public class DerivedSourceIndexOperationListenerTests extends KNNTestCase {
28+
29+
public void testPreIndex() throws Exception {
30+
String fieldName = "test-vector";
31+
int[] userVector = { 1, 2, 3, 4 };
32+
float[] backendVector = { 1.0f, 2.0f, 3.0f, 4.0f };
33+
List<Double> expectedOutputAsList = new ArrayList<>(List.of(1.0, 2.0, 3.0, 4.0));
34+
35+
Map<String, Object> originalSourceMap = Map.of(fieldName, userVector);
36+
BytesStreamOutput bStream = new BytesStreamOutput();
37+
XContentBuilder builder = MediaTypeRegistry.contentBuilder(XContentType.JSON, bStream).map(originalSourceMap);
38+
builder.close();
39+
BytesReference originalSource = bStream.bytes();
40+
41+
ParseContext.Document document = new ParseContext.Document();
42+
document.add(new KnnFloatVectorField(fieldName, backendVector));
43+
44+
Engine.Index operation = new Engine.Index(
45+
new Term("test-iud"),
46+
1,
47+
new ParsedDocument(null, null, null, null, List.of(document), originalSource, XContentType.JSON, null)
48+
);
49+
50+
DerivedSourceIndexOperationListener derivedSourceIndexOperationListener = new DerivedSourceIndexOperationListener();
51+
operation = derivedSourceIndexOperationListener.preIndex(null, operation);
52+
Tuple<? extends MediaType, Map<String, Object>> modifiedSource = XContentHelper.convertToMap(
53+
operation.parsedDoc().source(),
54+
true,
55+
operation.parsedDoc().getMediaType()
56+
);
57+
58+
assertEquals(expectedOutputAsList, modifiedSource.v2().get(fieldName));
59+
}
60+
}

0 commit comments

Comments
 (0)