Skip to content

Commit 7f1af5a

Browse files
authored
Migrate derived source from filter to mask (opensearch-project#2612)
Migrates derived source functionality from filter to mask based approach. Moves old read functionality and related classes to backwards codecs KNN9120... Removes old write as no longer necessary. In order to support bwc, we add custom functionality in the stored fields writer merge logic to fall back to base, non-optimized merge if it detects older readers in the merge state. This is needed because for these segments, we need to rebuild the source and then apply filter to migrate to new write format. Signed-off-by: John Mazanec <[email protected]>
1 parent 294b4c7 commit 7f1af5a

File tree

42 files changed

+901
-142
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+901
-142
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ Add filter function to KNNQueryBuilder with unit tests and integration tests [#2
1717
### Maintenance
1818
### Refactoring
1919
* Switch derived source from field attributes to segment attribute [#2606](https://github.com/opensearch-project/k-NN/pull/2606)
20+
* Migrate derived source from filter to mask [#2612](https://github.com/opensearch-project/k-NN/pull/2612)
2021

2122
## [Unreleased 2.x](https://github.com/opensearch-project/k-NN/compare/2.19...2.x)
2223
### Features

build.gradle

+2-2
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@ buildscript {
1515
ext {
1616
// build.version_qualifier parameter applies to knn plugin artifacts only. OpenSearch version must be set
1717
// explicitly as 'opensearch.version' property, for instance opensearch.version=2.0.0-rc1-SNAPSHOT
18-
opensearch_version = System.getProperty("opensearch.version", "3.0.0-alpha1-SNAPSHOT")
19-
version_qualifier = System.getProperty("build.version_qualifier", "alpha1")
18+
opensearch_version = System.getProperty("opensearch.version", "3.0.0-beta1-SNAPSHOT")
19+
version_qualifier = System.getProperty("build.version_qualifier", "beta1")
2020
opensearch_group = "org.opensearch"
2121
isSnapshot = "true" == System.getProperty("build.snapshot", "true")
2222
avx2_enabled = System.getProperty("avx2.enabled", "true")

qa/restart-upgrade/src/test/java/org/opensearch/knn/bwc/DerivedSourceBWCRestartIT.java

+4
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,9 @@ private void testIndexAndForceMergeOnOld_injectOnNew(List<IndexConfigContext> in
5252

5353
// Delete
5454
testDelete(indexConfigContexts);
55+
assertDocsMatch(indexConfigContexts);
5556
} else {
57+
assertDocsMatch(indexConfigContexts);
5658
// Search
5759
testSearch(indexConfigContexts);
5860

@@ -81,7 +83,9 @@ private void testIndexOnOld_forceMergeAndInjectOnNew(List<IndexConfigContext> in
8183
if (isRunningAgainstOldCluster()) {
8284
prepareOriginalIndices(indexConfigContexts);
8385
} else {
86+
assertDocsMatch(indexConfigContexts);
8487
testMerging(indexConfigContexts);
88+
assertDocsMatch(indexConfigContexts);
8589
// Update. Skipping update tests for nested docs for now. Will add in the future.
8690
if (indexConfigContexts.get(0).isNested() == false) {
8791
testUpdate(indexConfigContexts);

src/main/java/org/opensearch/knn/index/codec/KNN10010Codec/KNN10010Codec.java

+1-12
Original file line numberDiff line numberDiff line change
@@ -91,18 +91,7 @@ private StoredFieldsFormat getStoredFieldsFormat() {
9191
}
9292
return null;
9393

94-
}, (segmentReadState) -> {
95-
if (segmentReadState.fieldInfos.hasPostings()) {
96-
return postingsFormat().fieldsProducer(segmentReadState);
97-
}
98-
return null;
99-
100-
}, (segmentReadState -> {
101-
if (segmentReadState.fieldInfos.hasNorms()) {
102-
return normsFormat().normsProducer(segmentReadState);
103-
}
104-
return null;
105-
}));
94+
});
10695
return new KNN10010DerivedSourceStoredFieldsFormat(delegate.storedFieldsFormat(), derivedSourceReadersSupplier, mapperService);
10796
}
10897
}

src/main/java/org/opensearch/knn/index/codec/KNN10010Codec/KNN10010DerivedSourceStoredFieldsFormat.java

+40-18
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
import org.apache.lucene.codecs.StoredFieldsFormat;
1010
import org.apache.lucene.codecs.StoredFieldsReader;
1111
import org.apache.lucene.codecs.StoredFieldsWriter;
12-
import org.apache.lucene.index.FieldInfo;
1312
import org.apache.lucene.index.FieldInfos;
1413
import org.apache.lucene.index.SegmentInfo;
1514
import org.apache.lucene.index.SegmentReadState;
@@ -19,14 +18,15 @@
1918
import org.opensearch.index.mapper.MappedFieldType;
2019
import org.opensearch.index.mapper.MapperService;
2120
import org.opensearch.knn.index.KNNSettings;
22-
import org.opensearch.knn.index.codec.KNN9120Codec.KNN9120DerivedSourceStoredFieldsReader;
21+
import org.opensearch.knn.index.codec.derivedsource.DerivedFieldInfo;
2322
import org.opensearch.knn.index.codec.derivedsource.DerivedSourceReadersSupplier;
2423
import org.opensearch.knn.index.codec.derivedsource.DerivedSourceSegmentAttributeParser;
2524
import org.opensearch.knn.index.mapper.KNNVectorFieldType;
2625

2726
import java.io.IOException;
2827
import java.util.ArrayList;
2928
import java.util.List;
29+
import java.util.stream.Stream;
3030

3131
@AllArgsConstructor
3232
public class KNN10010DerivedSourceStoredFieldsFormat extends StoredFieldsFormat {
@@ -40,16 +40,22 @@ public class KNN10010DerivedSourceStoredFieldsFormat extends StoredFieldsFormat
4040
@Override
4141
public StoredFieldsReader fieldsReader(Directory directory, SegmentInfo segmentInfo, FieldInfos fieldInfos, IOContext ioContext)
4242
throws IOException {
43-
List<FieldInfo> derivedVectorFields = DerivedSourceSegmentAttributeParser.parseDerivedVectorFields(segmentInfo)
44-
.stream()
45-
.filter(field -> fieldInfos.fieldInfo(field) != null)
46-
.map(fieldInfos::fieldInfo)
47-
.toList();
43+
List<DerivedFieldInfo> derivedVectorFields = Stream.concat(
44+
DerivedSourceSegmentAttributeParser.parseDerivedVectorFields(segmentInfo, false)
45+
.stream()
46+
.filter(field -> fieldInfos.fieldInfo(field) != null)
47+
.map(field -> new DerivedFieldInfo(fieldInfos.fieldInfo(field), false)),
48+
DerivedSourceSegmentAttributeParser.parseDerivedVectorFields(segmentInfo, true)
49+
.stream()
50+
.filter(field -> fieldInfos.fieldInfo(field) != null)
51+
.map(field -> new DerivedFieldInfo(fieldInfos.fieldInfo(field), true))
52+
).toList();
53+
4854
// If no fields have it enabled, we can just short-circuit and return the delegate's fieldReader
49-
if (derivedVectorFields == null || derivedVectorFields.isEmpty()) {
55+
if (derivedVectorFields.isEmpty()) {
5056
return delegate.fieldsReader(directory, segmentInfo, fieldInfos, ioContext);
5157
}
52-
return new KNN9120DerivedSourceStoredFieldsReader(
58+
return new KNN10010DerivedSourceStoredFieldsReader(
5359
delegate.fieldsReader(directory, segmentInfo, fieldInfos, ioContext),
5460
derivedVectorFields,
5561
derivedSourceReadersSupplier,
@@ -60,18 +66,34 @@ public StoredFieldsReader fieldsReader(Directory directory, SegmentInfo segmentI
6066
@Override
6167
public StoredFieldsWriter fieldsWriter(Directory directory, SegmentInfo segmentInfo, IOContext ioContext) throws IOException {
6268
StoredFieldsWriter delegateWriter = delegate.fieldsWriter(directory, segmentInfo, ioContext);
63-
if (mapperService != null && KNNSettings.isKNNDerivedSourceEnabled(mapperService.getIndexSettings().getSettings())) {
64-
List<String> vectorFieldTypes = new ArrayList<>();
65-
for (MappedFieldType fieldType : mapperService.fieldTypes()) {
66-
if (fieldType instanceof KNNVectorFieldType) {
69+
if (mapperService == null || KNNSettings.isKNNDerivedSourceEnabled(mapperService.getIndexSettings().getSettings()) == false) {
70+
return delegateWriter;
71+
}
72+
73+
List<String> vectorFieldTypes = new ArrayList<>();
74+
List<String> nestedVectorFieldTypes = new ArrayList<>();
75+
for (MappedFieldType fieldType : mapperService.fieldTypes()) {
76+
if (fieldType instanceof KNNVectorFieldType) {
77+
boolean isNested = mapperService.documentMapper().mappers().getNestedScope(fieldType.name()) != null;
78+
if (isNested) {
79+
nestedVectorFieldTypes.add(fieldType.name());
80+
} else {
6781
vectorFieldTypes.add(fieldType.name());
6882
}
6983
}
70-
if (vectorFieldTypes.isEmpty() == false) {
71-
DerivedSourceSegmentAttributeParser.addDerivedVectorFieldsSegmentInfoAttribute(segmentInfo, vectorFieldTypes);
72-
return new KNN10010DerivedSourceStoredFieldsWriter(delegateWriter, vectorFieldTypes);
73-
}
7484
}
75-
return delegateWriter;
85+
if (vectorFieldTypes.isEmpty() && nestedVectorFieldTypes.isEmpty()) {
86+
return delegateWriter;
87+
}
88+
89+
// Store nested fields separately from non-nested for easy handling on read
90+
if (vectorFieldTypes.isEmpty() == false) {
91+
DerivedSourceSegmentAttributeParser.addDerivedVectorFieldsSegmentInfoAttribute(segmentInfo, vectorFieldTypes, false);
92+
}
93+
if (nestedVectorFieldTypes.isEmpty() == false) {
94+
vectorFieldTypes.addAll(nestedVectorFieldTypes);
95+
DerivedSourceSegmentAttributeParser.addDerivedVectorFieldsSegmentInfoAttribute(segmentInfo, nestedVectorFieldTypes, true);
96+
}
97+
return new KNN10010DerivedSourceStoredFieldsWriter(delegateWriter, vectorFieldTypes);
7698
}
7799
}

src/main/java/org/opensearch/knn/index/codec/KNN9120Codec/KNN9120DerivedSourceStoredFieldsReader.java src/main/java/org/opensearch/knn/index/codec/KNN10010Codec/KNN10010DerivedSourceStoredFieldsReader.java

+30-27
Original file line numberDiff line numberDiff line change
@@ -3,29 +3,29 @@
33
* SPDX-License-Identifier: Apache-2.0
44
*/
55

6-
package org.opensearch.knn.index.codec.KNN9120Codec;
6+
package org.opensearch.knn.index.codec.KNN10010Codec;
77

88
import org.apache.lucene.codecs.StoredFieldsReader;
9-
import org.apache.lucene.index.FieldInfo;
109
import org.apache.lucene.index.SegmentReadState;
1110
import org.apache.lucene.index.StoredFieldVisitor;
1211
import org.apache.lucene.util.IOUtils;
1312
import org.opensearch.index.fieldvisitor.FieldsVisitor;
13+
import org.opensearch.knn.index.codec.derivedsource.DerivedFieldInfo;
1414
import org.opensearch.knn.index.codec.derivedsource.DerivedSourceReadersSupplier;
1515
import org.opensearch.knn.index.codec.derivedsource.DerivedSourceStoredFieldVisitor;
16-
import org.opensearch.knn.index.codec.derivedsource.DerivedSourceVectorInjector;
16+
import org.opensearch.knn.index.codec.derivedsource.DerivedSourceVectorTransformer;
1717

1818
import java.io.IOException;
1919
import java.util.List;
2020

21-
public class KNN9120DerivedSourceStoredFieldsReader extends StoredFieldsReader {
21+
public class KNN10010DerivedSourceStoredFieldsReader extends StoredFieldsReader {
2222
private final StoredFieldsReader delegate;
23-
private final List<FieldInfo> derivedVectorFields;
23+
private final List<DerivedFieldInfo> derivedVectorFields;
2424
private final DerivedSourceReadersSupplier derivedSourceReadersSupplier;
2525
private final SegmentReadState segmentReadState;
2626
private final boolean shouldInject;
2727

28-
private final DerivedSourceVectorInjector derivedSourceVectorInjector;
28+
private final DerivedSourceVectorTransformer derivedSourceVectorTransformer;
2929

3030
/**
3131
*
@@ -35,18 +35,18 @@ public class KNN9120DerivedSourceStoredFieldsReader extends StoredFieldsReader {
3535
* @param segmentReadState SegmentReadState for the segment
3636
* @throws IOException in case of I/O error
3737
*/
38-
public KNN9120DerivedSourceStoredFieldsReader(
38+
public KNN10010DerivedSourceStoredFieldsReader(
3939
StoredFieldsReader delegate,
40-
List<FieldInfo> derivedVectorFields,
40+
List<DerivedFieldInfo> derivedVectorFields,
4141
DerivedSourceReadersSupplier derivedSourceReadersSupplier,
4242
SegmentReadState segmentReadState
4343
) throws IOException {
4444
this(delegate, derivedVectorFields, derivedSourceReadersSupplier, segmentReadState, true);
4545
}
4646

47-
private KNN9120DerivedSourceStoredFieldsReader(
47+
private KNN10010DerivedSourceStoredFieldsReader(
4848
StoredFieldsReader delegate,
49-
List<FieldInfo> derivedVectorFields,
49+
List<DerivedFieldInfo> derivedVectorFields,
5050
DerivedSourceReadersSupplier derivedSourceReadersSupplier,
5151
SegmentReadState segmentReadState,
5252
boolean shouldInject
@@ -56,34 +56,37 @@ private KNN9120DerivedSourceStoredFieldsReader(
5656
this.derivedSourceReadersSupplier = derivedSourceReadersSupplier;
5757
this.segmentReadState = segmentReadState;
5858
this.shouldInject = shouldInject;
59-
this.derivedSourceVectorInjector = createDerivedSourceVectorInjector();
59+
this.derivedSourceVectorTransformer = createDerivedSourceVectorTransformer();
6060
}
6161

62-
private DerivedSourceVectorInjector createDerivedSourceVectorInjector() throws IOException {
63-
return new DerivedSourceVectorInjector(derivedSourceReadersSupplier, segmentReadState, derivedVectorFields);
62+
private DerivedSourceVectorTransformer createDerivedSourceVectorTransformer() throws IOException {
63+
return new DerivedSourceVectorTransformer(derivedSourceReadersSupplier, segmentReadState, derivedVectorFields);
6464
}
6565

6666
@Override
6767
public void document(int docId, StoredFieldVisitor storedFieldVisitor) throws IOException {
6868
// If the visitor has explicitly indicated it does not need the fields, we should not inject them
69-
boolean isVisitorNeedFields = true;
70-
if (storedFieldVisitor instanceof FieldsVisitor) {
71-
isVisitorNeedFields = derivedSourceVectorInjector.shouldInject(
72-
((FieldsVisitor) storedFieldVisitor).includes(),
73-
((FieldsVisitor) storedFieldVisitor).excludes()
74-
);
75-
}
76-
if (shouldInject && isVisitorNeedFields) {
77-
delegate.document(docId, new DerivedSourceStoredFieldVisitor(storedFieldVisitor, docId, derivedSourceVectorInjector));
69+
if (shouldInject && doesVisitorNeedVectors(storedFieldVisitor)) {
70+
delegate.document(docId, new DerivedSourceStoredFieldVisitor(storedFieldVisitor, docId, derivedSourceVectorTransformer));
7871
return;
7972
}
8073
delegate.document(docId, storedFieldVisitor);
8174
}
8275

76+
private boolean doesVisitorNeedVectors(StoredFieldVisitor delegate) {
77+
if (delegate instanceof FieldsVisitor) {
78+
return derivedSourceVectorTransformer.shouldInject(
79+
((FieldsVisitor) delegate).includes(),
80+
((FieldsVisitor) delegate).excludes()
81+
);
82+
}
83+
return true;
84+
}
85+
8386
@Override
8487
public StoredFieldsReader clone() {
8588
try {
86-
return new KNN9120DerivedSourceStoredFieldsReader(
89+
return new KNN10010DerivedSourceStoredFieldsReader(
8790
delegate.clone(),
8891
derivedVectorFields,
8992
derivedSourceReadersSupplier,
@@ -102,7 +105,7 @@ public void checkIntegrity() throws IOException {
102105

103106
@Override
104107
public void close() throws IOException {
105-
IOUtils.close(delegate, derivedSourceVectorInjector);
108+
IOUtils.close(delegate, derivedSourceVectorTransformer);
106109
}
107110

108111
/**
@@ -114,7 +117,7 @@ public void close() throws IOException {
114117
*/
115118
private StoredFieldsReader cloneForMerge() {
116119
try {
117-
return new KNN9120DerivedSourceStoredFieldsReader(
120+
return new KNN10010DerivedSourceStoredFieldsReader(
118121
delegate.getMergeInstance(),
119122
derivedVectorFields,
120123
derivedSourceReadersSupplier,
@@ -134,8 +137,8 @@ private StoredFieldsReader cloneForMerge() {
134137
* @return wrapped stored fields reader
135138
*/
136139
public static StoredFieldsReader wrapForMerge(StoredFieldsReader storedFieldsReader) {
137-
if (storedFieldsReader instanceof KNN9120DerivedSourceStoredFieldsReader) {
138-
return ((KNN9120DerivedSourceStoredFieldsReader) storedFieldsReader).cloneForMerge();
140+
if (storedFieldsReader instanceof KNN10010DerivedSourceStoredFieldsReader) {
141+
return ((KNN10010DerivedSourceStoredFieldsReader) storedFieldsReader).cloneForMerge();
139142
}
140143
return storedFieldsReader;
141144
}

src/main/java/org/opensearch/knn/index/codec/KNN10010Codec/KNN10010DerivedSourceStoredFieldsWriter.java

+37-9
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55

66
package org.opensearch.knn.index.codec.KNN10010Codec;
77

8-
import lombok.RequiredArgsConstructor;
98
import org.apache.lucene.codecs.StoredFieldsWriter;
109
import org.apache.lucene.index.FieldInfo;
1110
import org.apache.lucene.index.MergeState;
@@ -20,19 +19,41 @@
2019
import org.opensearch.core.xcontent.MediaTypeRegistry;
2120
import org.opensearch.core.xcontent.XContentBuilder;
2221
import org.opensearch.index.mapper.SourceFieldMapper;
23-
import org.opensearch.knn.index.codec.KNN9120Codec.KNN9120DerivedSourceStoredFieldsReader;
22+
import org.opensearch.knn.index.codec.backward_codecs.KNN9120Codec.KNN9120DerivedSourceStoredFieldsReader;
2423

2524
import java.io.IOException;
2625
import java.nio.ByteBuffer;
2726
import java.util.List;
2827
import java.util.Map;
2928
import java.util.Objects;
29+
import java.util.function.Function;
30+
import java.util.stream.Collectors;
3031

31-
@RequiredArgsConstructor
3232
public class KNN10010DerivedSourceStoredFieldsWriter extends StoredFieldsWriter {
3333

3434
private final StoredFieldsWriter delegate;
35-
private final List<String> vectorFieldTypes;
35+
private final Function<Map<String, Object>, Map<String, Object>> vectorMask;
36+
37+
// Keeping the mask as small as possible.
38+
private final static Byte MASK = 0x1;
39+
40+
/**
41+
*
42+
* @param delegate StoredFieldsWriter to wrap
43+
* @param vectorFieldTypesArg List of vector field types to mask. If empty, no masking will be done
44+
*/
45+
public KNN10010DerivedSourceStoredFieldsWriter(StoredFieldsWriter delegate, List<String> vectorFieldTypesArg) {
46+
this.delegate = delegate;
47+
List<String> vectorFieldTypes = vectorFieldTypesArg.stream().map(String::toLowerCase).toList();
48+
if (vectorFieldTypes.isEmpty() == false) {
49+
this.vectorMask = XContentMapValues.transform(
50+
vectorFieldTypes.stream().collect(Collectors.toMap(k -> k, k -> (Object o) -> o == null ? o : MASK)),
51+
false
52+
);
53+
} else {
54+
this.vectorMask = null;
55+
}
56+
}
3657

3758
@Override
3859
public void startDocument() throws IOException {
@@ -66,26 +87,33 @@ public void writeField(FieldInfo info, DataInput value, int length) throws IOExc
6687

6788
@Override
6889
public int merge(MergeState mergeState) throws IOException {
69-
// We have to wrap these here to avoid storing the vectors during merge
90+
// In case of backwards compatibility, with old segments, we need to perform a non-optimal merge. Basically, it
91+
// will repopulate each source and then inject the vector and then remove it. This allows us to migrate
92+
// segments from filter approach to mask approach
93+
if (KNN9120DerivedSourceStoredFieldsReader.doesMergeContainLegacySegments(mergeState)) {
94+
return super.merge(mergeState);
95+
}
96+
97+
// We wrap the segments to avoid injecting back vectors and then removing. If this is not done, then we will
98+
// inject and then just write to disk potentially.
7099
for (int i = 0; i < mergeState.storedFieldsReaders.length; i++) {
71-
mergeState.storedFieldsReaders[i] = KNN9120DerivedSourceStoredFieldsReader.wrapForMerge(mergeState.storedFieldsReaders[i]);
100+
mergeState.storedFieldsReaders[i] = KNN10010DerivedSourceStoredFieldsReader.wrapForMerge(mergeState.storedFieldsReaders[i]);
72101
}
73102
return delegate.merge(mergeState);
74103
}
75104

76105
@Override
77106
public void writeField(FieldInfo fieldInfo, BytesRef bytesRef) throws IOException {
78107
// Parse out the vectors from the source
79-
if (Objects.equals(fieldInfo.name, SourceFieldMapper.NAME) && !vectorFieldTypes.isEmpty()) {
108+
if (vectorMask != null && Objects.equals(fieldInfo.name, SourceFieldMapper.NAME)) {
80109
// Reference:
81110
// https://github.com/opensearch-project/OpenSearch/blob/2.18.0/server/src/main/java/org/opensearch/index/mapper/SourceFieldMapper.java#L322
82111
Tuple<? extends MediaType, Map<String, Object>> mapTuple = XContentHelper.convertToMap(
83112
BytesReference.fromByteBuffer(ByteBuffer.wrap(bytesRef.bytes, bytesRef.offset, bytesRef.length)),
84113
true,
85114
MediaTypeRegistry.JSON
86115
);
87-
Map<String, Object> filteredSource = XContentMapValues.filter(null, vectorFieldTypes.toArray(new String[0]))
88-
.apply(mapTuple.v2());
116+
Map<String, Object> filteredSource = vectorMask.apply(mapTuple.v2());
89117
BytesStreamOutput bStream = new BytesStreamOutput();
90118
MediaType actualContentType = mapTuple.v1();
91119
XContentBuilder builder = MediaTypeRegistry.contentBuilder(actualContentType, bStream).map(filteredSource);

src/main/java/org/opensearch/knn/index/codec/derivedsource/AbstractPerFieldDerivedVectorInjector.java src/main/java/org/opensearch/knn/index/codec/backward_codecs/KNN9120Codec/AbstractPerFieldDerivedVectorInjector.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
* SPDX-License-Identifier: Apache-2.0
44
*/
55

6-
package org.opensearch.knn.index.codec.derivedsource;
6+
package org.opensearch.knn.index.codec.backward_codecs.KNN9120Codec;
77

88
import lombok.extern.log4j.Log4j2;
99
import org.apache.lucene.index.FieldInfo;

0 commit comments

Comments
 (0)