Skip to content

Commit 7493a93

Browse files
committed
Migrate derived source from filter to mask
Migrates derived source functionality from filter to mask based approach. Moves old read functionality and related classes to backwards codecs KNN9120... Removes old write as no longer necessary. In order to support bwc, we add custom functionality in the stored fields writer merge logic to fall back to base, non-optimized merge if it detects older readers in the merge state. This is needed because for these segments, we need to rebuild the source and then apply filter to migrate to new write format. Signed-off-by: John Mazanec <[email protected]>
1 parent 294b4c7 commit 7493a93

File tree

41 files changed

+1031
-134
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+1031
-134
lines changed

qa/restart-upgrade/src/test/java/org/opensearch/knn/bwc/DerivedSourceBWCRestartIT.java

+4
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,9 @@ private void testIndexAndForceMergeOnOld_injectOnNew(List<IndexConfigContext> in
5252

5353
// Delete
5454
testDelete(indexConfigContexts);
55+
assertDocsMatch(indexConfigContexts);
5556
} else {
57+
assertDocsMatch(indexConfigContexts);
5658
// Search
5759
testSearch(indexConfigContexts);
5860

@@ -81,7 +83,9 @@ private void testIndexOnOld_forceMergeAndInjectOnNew(List<IndexConfigContext> in
8183
if (isRunningAgainstOldCluster()) {
8284
prepareOriginalIndices(indexConfigContexts);
8385
} else {
86+
assertDocsMatch(indexConfigContexts);
8487
testMerging(indexConfigContexts);
88+
assertDocsMatch(indexConfigContexts);
8589
// Update. Skipping update tests for nested docs for now. Will add in the future.
8690
if (indexConfigContexts.get(0).isNested() == false) {
8791
testUpdate(indexConfigContexts);

src/main/java/org/opensearch/knn/index/codec/KNN10010Codec/KNN10010Codec.java

+1-12
Original file line numberDiff line numberDiff line change
@@ -91,18 +91,7 @@ private StoredFieldsFormat getStoredFieldsFormat() {
9191
}
9292
return null;
9393

94-
}, (segmentReadState) -> {
95-
if (segmentReadState.fieldInfos.hasPostings()) {
96-
return postingsFormat().fieldsProducer(segmentReadState);
97-
}
98-
return null;
99-
100-
}, (segmentReadState -> {
101-
if (segmentReadState.fieldInfos.hasNorms()) {
102-
return normsFormat().normsProducer(segmentReadState);
103-
}
104-
return null;
105-
}));
94+
});
10695
return new KNN10010DerivedSourceStoredFieldsFormat(delegate.storedFieldsFormat(), derivedSourceReadersSupplier, mapperService);
10796
}
10897
}

src/main/java/org/opensearch/knn/index/codec/KNN10010Codec/KNN10010DerivedSourceStoredFieldsFormat.java

+40-18
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
import org.apache.lucene.codecs.StoredFieldsFormat;
1010
import org.apache.lucene.codecs.StoredFieldsReader;
1111
import org.apache.lucene.codecs.StoredFieldsWriter;
12-
import org.apache.lucene.index.FieldInfo;
1312
import org.apache.lucene.index.FieldInfos;
1413
import org.apache.lucene.index.SegmentInfo;
1514
import org.apache.lucene.index.SegmentReadState;
@@ -19,14 +18,15 @@
1918
import org.opensearch.index.mapper.MappedFieldType;
2019
import org.opensearch.index.mapper.MapperService;
2120
import org.opensearch.knn.index.KNNSettings;
22-
import org.opensearch.knn.index.codec.KNN9120Codec.KNN9120DerivedSourceStoredFieldsReader;
21+
import org.opensearch.knn.index.codec.derivedsource.DerivedFieldInfo;
2322
import org.opensearch.knn.index.codec.derivedsource.DerivedSourceReadersSupplier;
2423
import org.opensearch.knn.index.codec.derivedsource.DerivedSourceSegmentAttributeParser;
2524
import org.opensearch.knn.index.mapper.KNNVectorFieldType;
2625

2726
import java.io.IOException;
2827
import java.util.ArrayList;
2928
import java.util.List;
29+
import java.util.stream.Stream;
3030

3131
@AllArgsConstructor
3232
public class KNN10010DerivedSourceStoredFieldsFormat extends StoredFieldsFormat {
@@ -40,16 +40,22 @@ public class KNN10010DerivedSourceStoredFieldsFormat extends StoredFieldsFormat
4040
@Override
4141
public StoredFieldsReader fieldsReader(Directory directory, SegmentInfo segmentInfo, FieldInfos fieldInfos, IOContext ioContext)
4242
throws IOException {
43-
List<FieldInfo> derivedVectorFields = DerivedSourceSegmentAttributeParser.parseDerivedVectorFields(segmentInfo)
44-
.stream()
45-
.filter(field -> fieldInfos.fieldInfo(field) != null)
46-
.map(fieldInfos::fieldInfo)
47-
.toList();
43+
List<DerivedFieldInfo> derivedVectorFields = Stream.concat(
44+
DerivedSourceSegmentAttributeParser.parseDerivedVectorFields(segmentInfo, false)
45+
.stream()
46+
.filter(field -> fieldInfos.fieldInfo(field) != null)
47+
.map(field -> new DerivedFieldInfo(fieldInfos.fieldInfo(field), false)),
48+
DerivedSourceSegmentAttributeParser.parseDerivedVectorFields(segmentInfo, true)
49+
.stream()
50+
.filter(field -> fieldInfos.fieldInfo(field) != null)
51+
.map(field -> new DerivedFieldInfo(fieldInfos.fieldInfo(field), true))
52+
).toList();
53+
4854
// If no fields have it enabled, we can just short-circuit and return the delegate's fieldReader
49-
if (derivedVectorFields == null || derivedVectorFields.isEmpty()) {
55+
if (derivedVectorFields.isEmpty()) {
5056
return delegate.fieldsReader(directory, segmentInfo, fieldInfos, ioContext);
5157
}
52-
return new KNN9120DerivedSourceStoredFieldsReader(
58+
return new KNN10010DerivedSourceStoredFieldsReader(
5359
delegate.fieldsReader(directory, segmentInfo, fieldInfos, ioContext),
5460
derivedVectorFields,
5561
derivedSourceReadersSupplier,
@@ -60,18 +66,34 @@ public StoredFieldsReader fieldsReader(Directory directory, SegmentInfo segmentI
6066
@Override
6167
public StoredFieldsWriter fieldsWriter(Directory directory, SegmentInfo segmentInfo, IOContext ioContext) throws IOException {
6268
StoredFieldsWriter delegateWriter = delegate.fieldsWriter(directory, segmentInfo, ioContext);
63-
if (mapperService != null && KNNSettings.isKNNDerivedSourceEnabled(mapperService.getIndexSettings().getSettings())) {
64-
List<String> vectorFieldTypes = new ArrayList<>();
65-
for (MappedFieldType fieldType : mapperService.fieldTypes()) {
66-
if (fieldType instanceof KNNVectorFieldType) {
69+
if (mapperService == null || KNNSettings.isKNNDerivedSourceEnabled(mapperService.getIndexSettings().getSettings()) == false) {
70+
return delegateWriter;
71+
}
72+
73+
List<String> vectorFieldTypes = new ArrayList<>();
74+
List<String> nestedVectorFieldTypes = new ArrayList<>();
75+
for (MappedFieldType fieldType : mapperService.fieldTypes()) {
76+
if (fieldType instanceof KNNVectorFieldType) {
77+
boolean isNested = mapperService.documentMapper().mappers().getNestedScope(fieldType.name()) != null;
78+
if (isNested) {
79+
nestedVectorFieldTypes.add(fieldType.name());
80+
} else {
6781
vectorFieldTypes.add(fieldType.name());
6882
}
6983
}
70-
if (vectorFieldTypes.isEmpty() == false) {
71-
DerivedSourceSegmentAttributeParser.addDerivedVectorFieldsSegmentInfoAttribute(segmentInfo, vectorFieldTypes);
72-
return new KNN10010DerivedSourceStoredFieldsWriter(delegateWriter, vectorFieldTypes);
73-
}
7484
}
75-
return delegateWriter;
85+
if (vectorFieldTypes.isEmpty() && nestedVectorFieldTypes.isEmpty()) {
86+
return delegateWriter;
87+
}
88+
89+
// Store nested fields separately from non-nested for easy handling on read
90+
if (vectorFieldTypes.isEmpty() == false) {
91+
DerivedSourceSegmentAttributeParser.addDerivedVectorFieldsSegmentInfoAttribute(segmentInfo, vectorFieldTypes, false);
92+
}
93+
if (nestedVectorFieldTypes.isEmpty() == false) {
94+
vectorFieldTypes.addAll(nestedVectorFieldTypes);
95+
DerivedSourceSegmentAttributeParser.addDerivedVectorFieldsSegmentInfoAttribute(segmentInfo, nestedVectorFieldTypes, true);
96+
}
97+
return new KNN10010DerivedSourceStoredFieldsWriter(delegateWriter, vectorFieldTypes);
7698
}
7799
}

src/main/java/org/opensearch/knn/index/codec/KNN9120Codec/KNN9120DerivedSourceStoredFieldsReader.java src/main/java/org/opensearch/knn/index/codec/KNN10010Codec/KNN10010DerivedSourceStoredFieldsReader.java

+20-20
Original file line numberDiff line numberDiff line change
@@ -3,29 +3,29 @@
33
* SPDX-License-Identifier: Apache-2.0
44
*/
55

6-
package org.opensearch.knn.index.codec.KNN9120Codec;
6+
package org.opensearch.knn.index.codec.KNN10010Codec;
77

88
import org.apache.lucene.codecs.StoredFieldsReader;
9-
import org.apache.lucene.index.FieldInfo;
109
import org.apache.lucene.index.SegmentReadState;
1110
import org.apache.lucene.index.StoredFieldVisitor;
1211
import org.apache.lucene.util.IOUtils;
1312
import org.opensearch.index.fieldvisitor.FieldsVisitor;
13+
import org.opensearch.knn.index.codec.derivedsource.DerivedFieldInfo;
1414
import org.opensearch.knn.index.codec.derivedsource.DerivedSourceReadersSupplier;
1515
import org.opensearch.knn.index.codec.derivedsource.DerivedSourceStoredFieldVisitor;
16-
import org.opensearch.knn.index.codec.derivedsource.DerivedSourceVectorInjector;
16+
import org.opensearch.knn.index.codec.derivedsource.DerivedSourceVectorTransformer;
1717

1818
import java.io.IOException;
1919
import java.util.List;
2020

21-
public class KNN9120DerivedSourceStoredFieldsReader extends StoredFieldsReader {
21+
public class KNN10010DerivedSourceStoredFieldsReader extends StoredFieldsReader {
2222
private final StoredFieldsReader delegate;
23-
private final List<FieldInfo> derivedVectorFields;
23+
private final List<DerivedFieldInfo> derivedVectorFields;
2424
private final DerivedSourceReadersSupplier derivedSourceReadersSupplier;
2525
private final SegmentReadState segmentReadState;
2626
private final boolean shouldInject;
2727

28-
private final DerivedSourceVectorInjector derivedSourceVectorInjector;
28+
private final DerivedSourceVectorTransformer derivedSourceVectorTransformer;
2929

3030
/**
3131
*
@@ -35,18 +35,18 @@ public class KNN9120DerivedSourceStoredFieldsReader extends StoredFieldsReader {
3535
* @param segmentReadState SegmentReadState for the segment
3636
* @throws IOException in case of I/O error
3737
*/
38-
public KNN9120DerivedSourceStoredFieldsReader(
38+
public KNN10010DerivedSourceStoredFieldsReader(
3939
StoredFieldsReader delegate,
40-
List<FieldInfo> derivedVectorFields,
40+
List<DerivedFieldInfo> derivedVectorFields,
4141
DerivedSourceReadersSupplier derivedSourceReadersSupplier,
4242
SegmentReadState segmentReadState
4343
) throws IOException {
4444
this(delegate, derivedVectorFields, derivedSourceReadersSupplier, segmentReadState, true);
4545
}
4646

47-
private KNN9120DerivedSourceStoredFieldsReader(
47+
private KNN10010DerivedSourceStoredFieldsReader(
4848
StoredFieldsReader delegate,
49-
List<FieldInfo> derivedVectorFields,
49+
List<DerivedFieldInfo> derivedVectorFields,
5050
DerivedSourceReadersSupplier derivedSourceReadersSupplier,
5151
SegmentReadState segmentReadState,
5252
boolean shouldInject
@@ -56,25 +56,25 @@ private KNN9120DerivedSourceStoredFieldsReader(
5656
this.derivedSourceReadersSupplier = derivedSourceReadersSupplier;
5757
this.segmentReadState = segmentReadState;
5858
this.shouldInject = shouldInject;
59-
this.derivedSourceVectorInjector = createDerivedSourceVectorInjector();
59+
this.derivedSourceVectorTransformer = createDerivedSourceVectorTransformer();
6060
}
6161

62-
private DerivedSourceVectorInjector createDerivedSourceVectorInjector() throws IOException {
63-
return new DerivedSourceVectorInjector(derivedSourceReadersSupplier, segmentReadState, derivedVectorFields);
62+
private DerivedSourceVectorTransformer createDerivedSourceVectorTransformer() throws IOException {
63+
return new DerivedSourceVectorTransformer(derivedSourceReadersSupplier, segmentReadState, derivedVectorFields);
6464
}
6565

6666
@Override
6767
public void document(int docId, StoredFieldVisitor storedFieldVisitor) throws IOException {
6868
// If the visitor has explicitly indicated it does not need the fields, we should not inject them
6969
boolean isVisitorNeedFields = true;
7070
if (storedFieldVisitor instanceof FieldsVisitor) {
71-
isVisitorNeedFields = derivedSourceVectorInjector.shouldInject(
71+
isVisitorNeedFields = derivedSourceVectorTransformer.shouldInject(
7272
((FieldsVisitor) storedFieldVisitor).includes(),
7373
((FieldsVisitor) storedFieldVisitor).excludes()
7474
);
7575
}
7676
if (shouldInject && isVisitorNeedFields) {
77-
delegate.document(docId, new DerivedSourceStoredFieldVisitor(storedFieldVisitor, docId, derivedSourceVectorInjector));
77+
delegate.document(docId, new DerivedSourceStoredFieldVisitor(storedFieldVisitor, docId, derivedSourceVectorTransformer));
7878
return;
7979
}
8080
delegate.document(docId, storedFieldVisitor);
@@ -83,7 +83,7 @@ public void document(int docId, StoredFieldVisitor storedFieldVisitor) throws IO
8383
@Override
8484
public StoredFieldsReader clone() {
8585
try {
86-
return new KNN9120DerivedSourceStoredFieldsReader(
86+
return new KNN10010DerivedSourceStoredFieldsReader(
8787
delegate.clone(),
8888
derivedVectorFields,
8989
derivedSourceReadersSupplier,
@@ -102,7 +102,7 @@ public void checkIntegrity() throws IOException {
102102

103103
@Override
104104
public void close() throws IOException {
105-
IOUtils.close(delegate, derivedSourceVectorInjector);
105+
IOUtils.close(delegate, derivedSourceVectorTransformer);
106106
}
107107

108108
/**
@@ -114,7 +114,7 @@ public void close() throws IOException {
114114
*/
115115
private StoredFieldsReader cloneForMerge() {
116116
try {
117-
return new KNN9120DerivedSourceStoredFieldsReader(
117+
return new KNN10010DerivedSourceStoredFieldsReader(
118118
delegate.getMergeInstance(),
119119
derivedVectorFields,
120120
derivedSourceReadersSupplier,
@@ -134,8 +134,8 @@ private StoredFieldsReader cloneForMerge() {
134134
* @return wrapped stored fields reader
135135
*/
136136
public static StoredFieldsReader wrapForMerge(StoredFieldsReader storedFieldsReader) {
137-
if (storedFieldsReader instanceof KNN9120DerivedSourceStoredFieldsReader) {
138-
return ((KNN9120DerivedSourceStoredFieldsReader) storedFieldsReader).cloneForMerge();
137+
if (storedFieldsReader instanceof KNN10010DerivedSourceStoredFieldsReader) {
138+
return ((KNN10010DerivedSourceStoredFieldsReader) storedFieldsReader).cloneForMerge();
139139
}
140140
return storedFieldsReader;
141141
}

src/main/java/org/opensearch/knn/index/codec/KNN10010Codec/KNN10010DerivedSourceStoredFieldsWriter.java

+37-10
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55

66
package org.opensearch.knn.index.codec.KNN10010Codec;
77

8-
import lombok.RequiredArgsConstructor;
98
import org.apache.lucene.codecs.StoredFieldsWriter;
109
import org.apache.lucene.index.FieldInfo;
1110
import org.apache.lucene.index.MergeState;
@@ -14,25 +13,46 @@
1413
import org.opensearch.common.collect.Tuple;
1514
import org.opensearch.common.io.stream.BytesStreamOutput;
1615
import org.opensearch.common.xcontent.XContentHelper;
17-
import org.opensearch.common.xcontent.support.XContentMapValues;
1816
import org.opensearch.core.common.bytes.BytesReference;
1917
import org.opensearch.core.xcontent.MediaType;
2018
import org.opensearch.core.xcontent.MediaTypeRegistry;
2119
import org.opensearch.core.xcontent.XContentBuilder;
2220
import org.opensearch.index.mapper.SourceFieldMapper;
23-
import org.opensearch.knn.index.codec.KNN9120Codec.KNN9120DerivedSourceStoredFieldsReader;
21+
import org.opensearch.knn.index.codec.backward_codecs.KNN9120Codec.KNN9120DerivedSourceStoredFieldsReader;
22+
import org.opensearch.knn.index.codec.derivedsource.DerivedMapHelper;
2423

2524
import java.io.IOException;
2625
import java.nio.ByteBuffer;
2726
import java.util.List;
2827
import java.util.Map;
2928
import java.util.Objects;
29+
import java.util.function.Function;
30+
import java.util.stream.Collectors;
3031

31-
@RequiredArgsConstructor
3232
public class KNN10010DerivedSourceStoredFieldsWriter extends StoredFieldsWriter {
3333

3434
private final StoredFieldsWriter delegate;
35-
private final List<String> vectorFieldTypes;
35+
private final Function<Map<String, Object>, Map<String, Object>> vectorMask;
36+
37+
// Keeping the mask as small as possible.
38+
private final static char MASK = 'z';
39+
40+
/**
41+
*
42+
* @param delegate StoredFieldsWriter to wrap
43+
* @param vectorFieldTypesArg List of vector field types to mask. If empty, no masking will be done
44+
*/
45+
public KNN10010DerivedSourceStoredFieldsWriter(StoredFieldsWriter delegate, List<String> vectorFieldTypesArg) {
46+
this.delegate = delegate;
47+
List<String> vectorFieldTypes = vectorFieldTypesArg.stream().map(String::toLowerCase).toList();
48+
if (vectorFieldTypes.isEmpty() == false) {
49+
this.vectorMask = DerivedMapHelper.transform(
50+
vectorFieldTypes.stream().collect(Collectors.toMap(k -> k, k -> (Object o) -> o == null ? o : MASK))
51+
);
52+
} else {
53+
this.vectorMask = null;
54+
}
55+
}
3656

3757
@Override
3858
public void startDocument() throws IOException {
@@ -66,26 +86,33 @@ public void writeField(FieldInfo info, DataInput value, int length) throws IOExc
6686

6787
@Override
6888
public int merge(MergeState mergeState) throws IOException {
69-
// We have to wrap these here to avoid storing the vectors during merge
89+
// In case of backwards compatibility, with old segments, we need to perform a non-optimal merge. Basically, it
90+
// will repopulate each source and then inject the vector and then remove it. This allows us to migrate
91+
// segments from filter approach to mask approach
92+
if (KNN9120DerivedSourceStoredFieldsReader.doesMergeContainLegacySegments(mergeState)) {
93+
super.merge(mergeState);
94+
}
95+
96+
// We wrap the segments to avoid injecting back vectors and then removing. If this is not done, then we will
97+
// inject and then just write to disk potentially.
7098
for (int i = 0; i < mergeState.storedFieldsReaders.length; i++) {
71-
mergeState.storedFieldsReaders[i] = KNN9120DerivedSourceStoredFieldsReader.wrapForMerge(mergeState.storedFieldsReaders[i]);
99+
mergeState.storedFieldsReaders[i] = KNN10010DerivedSourceStoredFieldsReader.wrapForMerge(mergeState.storedFieldsReaders[i]);
72100
}
73101
return delegate.merge(mergeState);
74102
}
75103

76104
@Override
77105
public void writeField(FieldInfo fieldInfo, BytesRef bytesRef) throws IOException {
78106
// Parse out the vectors from the source
79-
if (Objects.equals(fieldInfo.name, SourceFieldMapper.NAME) && !vectorFieldTypes.isEmpty()) {
107+
if (Objects.equals(fieldInfo.name, SourceFieldMapper.NAME) && vectorMask != null) {
80108
// Reference:
81109
// https://github.com/opensearch-project/OpenSearch/blob/2.18.0/server/src/main/java/org/opensearch/index/mapper/SourceFieldMapper.java#L322
82110
Tuple<? extends MediaType, Map<String, Object>> mapTuple = XContentHelper.convertToMap(
83111
BytesReference.fromByteBuffer(ByteBuffer.wrap(bytesRef.bytes, bytesRef.offset, bytesRef.length)),
84112
true,
85113
MediaTypeRegistry.JSON
86114
);
87-
Map<String, Object> filteredSource = XContentMapValues.filter(null, vectorFieldTypes.toArray(new String[0]))
88-
.apply(mapTuple.v2());
115+
Map<String, Object> filteredSource = vectorMask.apply(mapTuple.v2());
89116
BytesStreamOutput bStream = new BytesStreamOutput();
90117
MediaType actualContentType = mapTuple.v1();
91118
XContentBuilder builder = MediaTypeRegistry.contentBuilder(actualContentType, bStream).map(filteredSource);

src/main/java/org/opensearch/knn/index/codec/derivedsource/AbstractPerFieldDerivedVectorInjector.java src/main/java/org/opensearch/knn/index/codec/backward_codecs/KNN9120Codec/AbstractPerFieldDerivedVectorInjector.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
* SPDX-License-Identifier: Apache-2.0
44
*/
55

6-
package org.opensearch.knn.index.codec.derivedsource;
6+
package org.opensearch.knn.index.codec.backward_codecs.KNN9120Codec;
77

88
import lombok.extern.log4j.Log4j2;
99
import org.apache.lucene.index.FieldInfo;

0 commit comments

Comments
 (0)