Skip to content

Commit 84ec421

Browse files
committed
Migrate derived source from filter to mask
Migrates derived source functionality from filter to mask based approach. Moves old read functionality and related classes to backwards codecs KNN9120... Removes old write as no longer necessary. In order to support bwc, we add custom functionality in the stored fields writer merge logic to fall back to base, non-optimized merge if it detects older readers in the merge state. This is needed because for these segments, we need to rebuild the source and then apply filter to migrate to new write format. Signed-off-by: John Mazanec <[email protected]>
1 parent 294b4c7 commit 84ec421

File tree

39 files changed

+1002
-129
lines changed

39 files changed

+1002
-129
lines changed

src/main/java/org/opensearch/knn/index/codec/KNN10010Codec/KNN10010Codec.java

+1-12
Original file line numberDiff line numberDiff line change
@@ -91,18 +91,7 @@ private StoredFieldsFormat getStoredFieldsFormat() {
9191
}
9292
return null;
9393

94-
}, (segmentReadState) -> {
95-
if (segmentReadState.fieldInfos.hasPostings()) {
96-
return postingsFormat().fieldsProducer(segmentReadState);
97-
}
98-
return null;
99-
100-
}, (segmentReadState -> {
101-
if (segmentReadState.fieldInfos.hasNorms()) {
102-
return normsFormat().normsProducer(segmentReadState);
103-
}
104-
return null;
105-
}));
94+
});
10695
return new KNN10010DerivedSourceStoredFieldsFormat(delegate.storedFieldsFormat(), derivedSourceReadersSupplier, mapperService);
10796
}
10897
}

src/main/java/org/opensearch/knn/index/codec/KNN10010Codec/KNN10010DerivedSourceStoredFieldsFormat.java

+40-18
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
import org.apache.lucene.codecs.StoredFieldsFormat;
1010
import org.apache.lucene.codecs.StoredFieldsReader;
1111
import org.apache.lucene.codecs.StoredFieldsWriter;
12-
import org.apache.lucene.index.FieldInfo;
1312
import org.apache.lucene.index.FieldInfos;
1413
import org.apache.lucene.index.SegmentInfo;
1514
import org.apache.lucene.index.SegmentReadState;
@@ -19,14 +18,15 @@
1918
import org.opensearch.index.mapper.MappedFieldType;
2019
import org.opensearch.index.mapper.MapperService;
2120
import org.opensearch.knn.index.KNNSettings;
22-
import org.opensearch.knn.index.codec.KNN9120Codec.KNN9120DerivedSourceStoredFieldsReader;
21+
import org.opensearch.knn.index.codec.derivedsource.DerivedFieldInfo;
2322
import org.opensearch.knn.index.codec.derivedsource.DerivedSourceReadersSupplier;
2423
import org.opensearch.knn.index.codec.derivedsource.DerivedSourceSegmentAttributeParser;
2524
import org.opensearch.knn.index.mapper.KNNVectorFieldType;
2625

2726
import java.io.IOException;
2827
import java.util.ArrayList;
2928
import java.util.List;
29+
import java.util.stream.Stream;
3030

3131
@AllArgsConstructor
3232
public class KNN10010DerivedSourceStoredFieldsFormat extends StoredFieldsFormat {
@@ -40,16 +40,22 @@ public class KNN10010DerivedSourceStoredFieldsFormat extends StoredFieldsFormat
4040
@Override
4141
public StoredFieldsReader fieldsReader(Directory directory, SegmentInfo segmentInfo, FieldInfos fieldInfos, IOContext ioContext)
4242
throws IOException {
43-
List<FieldInfo> derivedVectorFields = DerivedSourceSegmentAttributeParser.parseDerivedVectorFields(segmentInfo)
44-
.stream()
45-
.filter(field -> fieldInfos.fieldInfo(field) != null)
46-
.map(fieldInfos::fieldInfo)
47-
.toList();
43+
List<DerivedFieldInfo> derivedVectorFields = Stream.concat(
44+
DerivedSourceSegmentAttributeParser.parseDerivedVectorFields(segmentInfo, false)
45+
.stream()
46+
.filter(field -> fieldInfos.fieldInfo(field) != null)
47+
.map(field -> new DerivedFieldInfo(fieldInfos.fieldInfo(field), false)),
48+
DerivedSourceSegmentAttributeParser.parseDerivedVectorFields(segmentInfo, true)
49+
.stream()
50+
.filter(field -> fieldInfos.fieldInfo(field) != null)
51+
.map(field -> new DerivedFieldInfo(fieldInfos.fieldInfo(field), true))
52+
).toList();
53+
4854
// If no fields have it enabled, we can just short-circuit and return the delegate's fieldReader
49-
if (derivedVectorFields == null || derivedVectorFields.isEmpty()) {
55+
if (derivedVectorFields.isEmpty()) {
5056
return delegate.fieldsReader(directory, segmentInfo, fieldInfos, ioContext);
5157
}
52-
return new KNN9120DerivedSourceStoredFieldsReader(
58+
return new KNN10010DerivedSourceStoredFieldsReader(
5359
delegate.fieldsReader(directory, segmentInfo, fieldInfos, ioContext),
5460
derivedVectorFields,
5561
derivedSourceReadersSupplier,
@@ -60,18 +66,34 @@ public StoredFieldsReader fieldsReader(Directory directory, SegmentInfo segmentI
6066
@Override
6167
public StoredFieldsWriter fieldsWriter(Directory directory, SegmentInfo segmentInfo, IOContext ioContext) throws IOException {
6268
StoredFieldsWriter delegateWriter = delegate.fieldsWriter(directory, segmentInfo, ioContext);
63-
if (mapperService != null && KNNSettings.isKNNDerivedSourceEnabled(mapperService.getIndexSettings().getSettings())) {
64-
List<String> vectorFieldTypes = new ArrayList<>();
65-
for (MappedFieldType fieldType : mapperService.fieldTypes()) {
66-
if (fieldType instanceof KNNVectorFieldType) {
69+
if (mapperService == null || KNNSettings.isKNNDerivedSourceEnabled(mapperService.getIndexSettings().getSettings()) == false) {
70+
return delegateWriter;
71+
}
72+
73+
List<String> vectorFieldTypes = new ArrayList<>();
74+
List<String> nestedVectorFieldTypes = new ArrayList<>();
75+
for (MappedFieldType fieldType : mapperService.fieldTypes()) {
76+
if (fieldType instanceof KNNVectorFieldType) {
77+
boolean isNested = mapperService.documentMapper().mappers().getNestedScope(fieldType.name()) != null;
78+
if (isNested) {
79+
nestedVectorFieldTypes.add(fieldType.name());
80+
} else {
6781
vectorFieldTypes.add(fieldType.name());
6882
}
6983
}
70-
if (vectorFieldTypes.isEmpty() == false) {
71-
DerivedSourceSegmentAttributeParser.addDerivedVectorFieldsSegmentInfoAttribute(segmentInfo, vectorFieldTypes);
72-
return new KNN10010DerivedSourceStoredFieldsWriter(delegateWriter, vectorFieldTypes);
73-
}
7484
}
75-
return delegateWriter;
85+
if (vectorFieldTypes.isEmpty() && nestedVectorFieldTypes.isEmpty()) {
86+
return delegateWriter;
87+
}
88+
89+
// Store nested fields separately from non-nested for easy handling on read
90+
if (vectorFieldTypes.isEmpty() == false) {
91+
DerivedSourceSegmentAttributeParser.addDerivedVectorFieldsSegmentInfoAttribute(segmentInfo, vectorFieldTypes, false);
92+
}
93+
if (nestedVectorFieldTypes.isEmpty() == false) {
94+
vectorFieldTypes.addAll(nestedVectorFieldTypes);
95+
DerivedSourceSegmentAttributeParser.addDerivedVectorFieldsSegmentInfoAttribute(segmentInfo, nestedVectorFieldTypes, true);
96+
}
97+
return new KNN10010DerivedSourceStoredFieldsWriter(delegateWriter, vectorFieldTypes);
7698
}
7799
}

src/main/java/org/opensearch/knn/index/codec/KNN9120Codec/KNN9120DerivedSourceStoredFieldsReader.java src/main/java/org/opensearch/knn/index/codec/KNN10010Codec/KNN10010DerivedSourceStoredFieldsReader.java

+20-20
Original file line numberDiff line numberDiff line change
@@ -3,29 +3,29 @@
33
* SPDX-License-Identifier: Apache-2.0
44
*/
55

6-
package org.opensearch.knn.index.codec.KNN9120Codec;
6+
package org.opensearch.knn.index.codec.KNN10010Codec;
77

88
import org.apache.lucene.codecs.StoredFieldsReader;
9-
import org.apache.lucene.index.FieldInfo;
109
import org.apache.lucene.index.SegmentReadState;
1110
import org.apache.lucene.index.StoredFieldVisitor;
1211
import org.apache.lucene.util.IOUtils;
1312
import org.opensearch.index.fieldvisitor.FieldsVisitor;
13+
import org.opensearch.knn.index.codec.derivedsource.DerivedFieldInfo;
1414
import org.opensearch.knn.index.codec.derivedsource.DerivedSourceReadersSupplier;
1515
import org.opensearch.knn.index.codec.derivedsource.DerivedSourceStoredFieldVisitor;
16-
import org.opensearch.knn.index.codec.derivedsource.DerivedSourceVectorInjector;
16+
import org.opensearch.knn.index.codec.derivedsource.DerivedSourceVectorTransformer;
1717

1818
import java.io.IOException;
1919
import java.util.List;
2020

21-
public class KNN9120DerivedSourceStoredFieldsReader extends StoredFieldsReader {
21+
public class KNN10010DerivedSourceStoredFieldsReader extends StoredFieldsReader {
2222
private final StoredFieldsReader delegate;
23-
private final List<FieldInfo> derivedVectorFields;
23+
private final List<DerivedFieldInfo> derivedVectorFields;
2424
private final DerivedSourceReadersSupplier derivedSourceReadersSupplier;
2525
private final SegmentReadState segmentReadState;
2626
private final boolean shouldInject;
2727

28-
private final DerivedSourceVectorInjector derivedSourceVectorInjector;
28+
private final DerivedSourceVectorTransformer derivedSourceVectorTransformer;
2929

3030
/**
3131
*
@@ -35,18 +35,18 @@ public class KNN9120DerivedSourceStoredFieldsReader extends StoredFieldsReader {
3535
* @param segmentReadState SegmentReadState for the segment
3636
* @throws IOException in case of I/O error
3737
*/
38-
public KNN9120DerivedSourceStoredFieldsReader(
38+
public KNN10010DerivedSourceStoredFieldsReader(
3939
StoredFieldsReader delegate,
40-
List<FieldInfo> derivedVectorFields,
40+
List<DerivedFieldInfo> derivedVectorFields,
4141
DerivedSourceReadersSupplier derivedSourceReadersSupplier,
4242
SegmentReadState segmentReadState
4343
) throws IOException {
4444
this(delegate, derivedVectorFields, derivedSourceReadersSupplier, segmentReadState, true);
4545
}
4646

47-
private KNN9120DerivedSourceStoredFieldsReader(
47+
private KNN10010DerivedSourceStoredFieldsReader(
4848
StoredFieldsReader delegate,
49-
List<FieldInfo> derivedVectorFields,
49+
List<DerivedFieldInfo> derivedVectorFields,
5050
DerivedSourceReadersSupplier derivedSourceReadersSupplier,
5151
SegmentReadState segmentReadState,
5252
boolean shouldInject
@@ -56,25 +56,25 @@ private KNN9120DerivedSourceStoredFieldsReader(
5656
this.derivedSourceReadersSupplier = derivedSourceReadersSupplier;
5757
this.segmentReadState = segmentReadState;
5858
this.shouldInject = shouldInject;
59-
this.derivedSourceVectorInjector = createDerivedSourceVectorInjector();
59+
this.derivedSourceVectorTransformer = createDerivedSourceVectorTransformer();
6060
}
6161

62-
private DerivedSourceVectorInjector createDerivedSourceVectorInjector() throws IOException {
63-
return new DerivedSourceVectorInjector(derivedSourceReadersSupplier, segmentReadState, derivedVectorFields);
62+
private DerivedSourceVectorTransformer createDerivedSourceVectorTransformer() throws IOException {
63+
return new DerivedSourceVectorTransformer(derivedSourceReadersSupplier, segmentReadState, derivedVectorFields);
6464
}
6565

6666
@Override
6767
public void document(int docId, StoredFieldVisitor storedFieldVisitor) throws IOException {
6868
// If the visitor has explicitly indicated it does not need the fields, we should not inject them
6969
boolean isVisitorNeedFields = true;
7070
if (storedFieldVisitor instanceof FieldsVisitor) {
71-
isVisitorNeedFields = derivedSourceVectorInjector.shouldInject(
71+
isVisitorNeedFields = derivedSourceVectorTransformer.shouldInject(
7272
((FieldsVisitor) storedFieldVisitor).includes(),
7373
((FieldsVisitor) storedFieldVisitor).excludes()
7474
);
7575
}
7676
if (shouldInject && isVisitorNeedFields) {
77-
delegate.document(docId, new DerivedSourceStoredFieldVisitor(storedFieldVisitor, docId, derivedSourceVectorInjector));
77+
delegate.document(docId, new DerivedSourceStoredFieldVisitor(storedFieldVisitor, docId, derivedSourceVectorTransformer));
7878
return;
7979
}
8080
delegate.document(docId, storedFieldVisitor);
@@ -83,7 +83,7 @@ public void document(int docId, StoredFieldVisitor storedFieldVisitor) throws IO
8383
@Override
8484
public StoredFieldsReader clone() {
8585
try {
86-
return new KNN9120DerivedSourceStoredFieldsReader(
86+
return new KNN10010DerivedSourceStoredFieldsReader(
8787
delegate.clone(),
8888
derivedVectorFields,
8989
derivedSourceReadersSupplier,
@@ -102,7 +102,7 @@ public void checkIntegrity() throws IOException {
102102

103103
@Override
104104
public void close() throws IOException {
105-
IOUtils.close(delegate, derivedSourceVectorInjector);
105+
IOUtils.close(delegate, derivedSourceVectorTransformer);
106106
}
107107

108108
/**
@@ -114,7 +114,7 @@ public void close() throws IOException {
114114
*/
115115
private StoredFieldsReader cloneForMerge() {
116116
try {
117-
return new KNN9120DerivedSourceStoredFieldsReader(
117+
return new KNN10010DerivedSourceStoredFieldsReader(
118118
delegate.getMergeInstance(),
119119
derivedVectorFields,
120120
derivedSourceReadersSupplier,
@@ -134,8 +134,8 @@ private StoredFieldsReader cloneForMerge() {
134134
* @return wrapped stored fields reader
135135
*/
136136
public static StoredFieldsReader wrapForMerge(StoredFieldsReader storedFieldsReader) {
137-
if (storedFieldsReader instanceof KNN9120DerivedSourceStoredFieldsReader) {
138-
return ((KNN9120DerivedSourceStoredFieldsReader) storedFieldsReader).cloneForMerge();
137+
if (storedFieldsReader instanceof KNN10010DerivedSourceStoredFieldsReader) {
138+
return ((KNN10010DerivedSourceStoredFieldsReader) storedFieldsReader).cloneForMerge();
139139
}
140140
return storedFieldsReader;
141141
}

src/main/java/org/opensearch/knn/index/codec/KNN10010Codec/KNN10010DerivedSourceStoredFieldsWriter.java

+29-10
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55

66
package org.opensearch.knn.index.codec.KNN10010Codec;
77

8-
import lombok.RequiredArgsConstructor;
98
import org.apache.lucene.codecs.StoredFieldsWriter;
109
import org.apache.lucene.index.FieldInfo;
1110
import org.apache.lucene.index.MergeState;
@@ -14,25 +13,38 @@
1413
import org.opensearch.common.collect.Tuple;
1514
import org.opensearch.common.io.stream.BytesStreamOutput;
1615
import org.opensearch.common.xcontent.XContentHelper;
17-
import org.opensearch.common.xcontent.support.XContentMapValues;
1816
import org.opensearch.core.common.bytes.BytesReference;
1917
import org.opensearch.core.xcontent.MediaType;
2018
import org.opensearch.core.xcontent.MediaTypeRegistry;
2119
import org.opensearch.core.xcontent.XContentBuilder;
2220
import org.opensearch.index.mapper.SourceFieldMapper;
23-
import org.opensearch.knn.index.codec.KNN9120Codec.KNN9120DerivedSourceStoredFieldsReader;
21+
import org.opensearch.knn.index.codec.backward_codecs.KNN9120Codec.KNN9120DerivedSourceStoredFieldsReader;
22+
import org.opensearch.knn.index.codec.derivedsource.DerivedMapHelper;
2423

2524
import java.io.IOException;
2625
import java.nio.ByteBuffer;
2726
import java.util.List;
2827
import java.util.Map;
2928
import java.util.Objects;
29+
import java.util.function.Function;
30+
import java.util.stream.Collectors;
3031

31-
@RequiredArgsConstructor
3232
public class KNN10010DerivedSourceStoredFieldsWriter extends StoredFieldsWriter {
3333

3434
private final StoredFieldsWriter delegate;
35-
private final List<String> vectorFieldTypes;
35+
private final Function<Map<String, Object>, Map<String, Object>> vectorMask;
36+
37+
public KNN10010DerivedSourceStoredFieldsWriter(StoredFieldsWriter delegate, List<String> vectorFieldTypesArg) {
38+
this.delegate = delegate;
39+
List<String> vectorFieldTypes = vectorFieldTypesArg.stream().map(String::toLowerCase).toList();
40+
if (vectorFieldTypes.isEmpty() == false) {
41+
this.vectorMask = DerivedMapHelper.transform(
42+
vectorFieldTypes.stream().collect(Collectors.toMap(k -> k, k -> (Object o) -> o == null ? o : "MASK"))
43+
);
44+
} else {
45+
this.vectorMask = null;
46+
}
47+
}
3648

3749
@Override
3850
public void startDocument() throws IOException {
@@ -66,26 +78,33 @@ public void writeField(FieldInfo info, DataInput value, int length) throws IOExc
6678

6779
@Override
6880
public int merge(MergeState mergeState) throws IOException {
69-
// We have to wrap these here to avoid storing the vectors during merge
81+
// In case of backwards compatibility, with old segments, we need to perform a non-optimal merge. Basically, it
82+
// will repopulate each source and then inject the vector and then remove it. This allows us to migrate
83+
// segments from filter approach to mask approach
84+
if (KNN9120DerivedSourceStoredFieldsReader.doesMergeContainLegacySegments(mergeState)) {
85+
super.merge(mergeState);
86+
}
87+
88+
// We wrap the segments to avoid injecting back vectors and then removing. If this is not done, then we will
89+
// inject and then just write to disk potentially.
7090
for (int i = 0; i < mergeState.storedFieldsReaders.length; i++) {
71-
mergeState.storedFieldsReaders[i] = KNN9120DerivedSourceStoredFieldsReader.wrapForMerge(mergeState.storedFieldsReaders[i]);
91+
mergeState.storedFieldsReaders[i] = KNN10010DerivedSourceStoredFieldsReader.wrapForMerge(mergeState.storedFieldsReaders[i]);
7292
}
7393
return delegate.merge(mergeState);
7494
}
7595

7696
@Override
7797
public void writeField(FieldInfo fieldInfo, BytesRef bytesRef) throws IOException {
7898
// Parse out the vectors from the source
79-
if (Objects.equals(fieldInfo.name, SourceFieldMapper.NAME) && !vectorFieldTypes.isEmpty()) {
99+
if (Objects.equals(fieldInfo.name, SourceFieldMapper.NAME) && vectorMask != null) {
80100
// Reference:
81101
// https://github.com/opensearch-project/OpenSearch/blob/2.18.0/server/src/main/java/org/opensearch/index/mapper/SourceFieldMapper.java#L322
82102
Tuple<? extends MediaType, Map<String, Object>> mapTuple = XContentHelper.convertToMap(
83103
BytesReference.fromByteBuffer(ByteBuffer.wrap(bytesRef.bytes, bytesRef.offset, bytesRef.length)),
84104
true,
85105
MediaTypeRegistry.JSON
86106
);
87-
Map<String, Object> filteredSource = XContentMapValues.filter(null, vectorFieldTypes.toArray(new String[0]))
88-
.apply(mapTuple.v2());
107+
Map<String, Object> filteredSource = vectorMask.apply(mapTuple.v2());
89108
BytesStreamOutput bStream = new BytesStreamOutput();
90109
MediaType actualContentType = mapTuple.v1();
91110
XContentBuilder builder = MediaTypeRegistry.contentBuilder(actualContentType, bStream).map(filteredSource);

src/main/java/org/opensearch/knn/index/codec/derivedsource/AbstractPerFieldDerivedVectorInjector.java src/main/java/org/opensearch/knn/index/codec/backward_codecs/KNN9120Codec/AbstractPerFieldDerivedVectorInjector.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
* SPDX-License-Identifier: Apache-2.0
44
*/
55

6-
package org.opensearch.knn.index.codec.derivedsource;
6+
package org.opensearch.knn.index.codec.backward_codecs.KNN9120Codec;
77

88
import lombok.extern.log4j.Log4j2;
99
import org.apache.lucene.index.FieldInfo;

src/main/java/org/opensearch/knn/index/codec/derivedsource/DerivedSourceVectorInjector.java src/main/java/org/opensearch/knn/index/codec/backward_codecs/KNN9120Codec/DerivedSourceVectorInjector.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
* SPDX-License-Identifier: Apache-2.0
44
*/
55

6-
package org.opensearch.knn.index.codec.derivedsource;
6+
package org.opensearch.knn.index.codec.backward_codecs.KNN9120Codec;
77

88
import lombok.extern.log4j.Log4j2;
99
import org.apache.lucene.index.FieldInfo;
@@ -35,7 +35,7 @@
3535
@Log4j2
3636
public class DerivedSourceVectorInjector implements Closeable {
3737

38-
private final DerivedSourceReaders derivedSourceReaders;
38+
private final KNN9120DerivedSourceReaders derivedSourceReaders;
3939
private final List<PerFieldDerivedVectorInjector> perFieldDerivedVectorInjectors;
4040
private final Set<String> fieldNames;
4141

@@ -47,7 +47,7 @@ public class DerivedSourceVectorInjector implements Closeable {
4747
* @param fieldsToInjectVector List of fields to inject vectors into
4848
*/
4949
public DerivedSourceVectorInjector(
50-
DerivedSourceReadersSupplier derivedSourceReadersSupplier,
50+
KNN9120DerivedSourceReadersSupplier derivedSourceReadersSupplier,
5151
SegmentReadState segmentReadState,
5252
List<FieldInfo> fieldsToInjectVector
5353
) throws IOException {

src/main/java/org/opensearch/knn/index/codec/backward_codecs/KNN9120Codec/KNN9120Codec.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
import org.opensearch.knn.index.codec.KNN80Codec.KNN80CompoundFormat;
1818
import org.opensearch.knn.index.codec.KNN80Codec.KNN80DocValuesFormat;
1919
import org.opensearch.knn.index.codec.KNN9120Codec.KNN9120PerFieldKnnVectorsFormat;
20-
import org.opensearch.knn.index.codec.derivedsource.DerivedSourceReadersSupplier;
2120

2221
import java.util.Optional;
2322

@@ -76,7 +75,7 @@ public StoredFieldsFormat storedFieldsFormat() {
7675
}
7776

7877
private StoredFieldsFormat getStoredFieldsFormat() {
79-
DerivedSourceReadersSupplier derivedSourceReadersSupplier = new DerivedSourceReadersSupplier((segmentReadState) -> {
78+
KNN9120DerivedSourceReadersSupplier derivedSourceReadersSupplier = new KNN9120DerivedSourceReadersSupplier((segmentReadState) -> {
8079
if (segmentReadState.fieldInfos.hasVectorValues()) {
8180
return knnVectorsFormat().fieldsReader(segmentReadState);
8281
}

0 commit comments

Comments
 (0)