-
Notifications
You must be signed in to change notification settings - Fork 169
Adding main segment implementation for API and indexing. #2653
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
10fa912
b955792
fefe7c8
49cadd5
d844f27
0b49596
9f49e51
04f3265
a5356f2
bd9d2d1
37b3912
04adc36
a0870a2
88774a9
4201fb0
105b970
ac813ff
eb2a30c
5165dfc
9feb203
7a509a3
044c7d1
03f4e18
d4022d3
23622da
b9eb1b8
9802027
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,6 +9,9 @@ | |
import lombok.AllArgsConstructor; | ||
import lombok.Getter; | ||
import lombok.extern.log4j.Log4j2; | ||
import org.apache.commons.math3.stat.descriptive.AggregateSummaryStatistics; | ||
import org.apache.commons.math3.stat.descriptive.StatisticalSummaryValues; | ||
import org.apache.commons.math3.stat.descriptive.SummaryStatistics; | ||
import org.apache.lucene.index.FieldInfo; | ||
import org.apache.lucene.index.IndexReader; | ||
import org.apache.lucene.index.LeafReaderContext; | ||
|
@@ -28,6 +31,8 @@ | |
import org.opensearch.knn.index.memory.NativeMemoryEntryContext; | ||
import org.opensearch.knn.index.memory.NativeMemoryLoadStrategy; | ||
import org.opensearch.knn.index.engine.KNNEngine; | ||
import org.opensearch.knn.index.query.SegmentProfilerUtil; | ||
import org.opensearch.knn.profiler.SegmentProfilerState; | ||
|
||
import java.io.IOException; | ||
import java.util.ArrayList; | ||
|
@@ -82,6 +87,83 @@ public String getIndexName() { | |
return indexShard.shardId().getIndexName(); | ||
} | ||
|
||
/** | ||
* Profile the vector fields in this shard and return statistical information. | ||
* | ||
* @param fieldName The name of the vector field to profile | ||
* @return List of statistical summaries for each dimension | ||
*/ | ||
// TODO: Write unit tests to ensure that the segment statistic aggregation is correct. | ||
public List<StatisticalSummaryValues> profile(String fieldName) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. NIT: final |
||
List<StatisticalSummaryValues> shardVectorProfile = new ArrayList<>(); | ||
|
||
try (Engine.Searcher searcher = indexShard.acquireSearcher("knn-profile")) { | ||
List<SegmentProfilerState> segmentLevelProfilerStates = new ArrayList<>(); | ||
|
||
log.info("[KNN] Beginning profiling for field: {} in shard: {}", fieldName, indexShard.shardId()); | ||
|
||
// For each leaf, collect the profile | ||
searcher.getIndexReader().leaves().forEach(leaf -> { | ||
try { | ||
log.info("[KNN] Processing leaf reader for segment: {}", leaf.reader()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we avoid clutter in the logs by reducing these info lines? This doesn't add any value here when debugging as we already have exception handling |
||
segmentLevelProfilerStates.add(SegmentProfilerUtil.getSegmentProfileState(leaf.reader(), fieldName)); | ||
log.info("[KNN] Successfully obtained segment profile state"); | ||
} catch (Exception e) { | ||
log.error("[KNN] Error profiling segment: {}", e.getMessage(), e); | ||
} | ||
}); | ||
|
||
if (segmentLevelProfilerStates.isEmpty()) { | ||
log.info("[KNN] No segment profiles were collected for field: {} in shard: {}", fieldName, indexShard.shardId()); | ||
return shardVectorProfile; // Return empty list | ||
} | ||
|
||
log.info("[KNN] Collected {} segment profiles", segmentLevelProfilerStates.size()); | ||
|
||
// Get dimension | ||
int dimension = segmentLevelProfilerStates.get(0).getDimension(); | ||
log.info("[KNN] Vector dimension: {}", dimension); | ||
|
||
// Transpose our list to aggregate per dimension | ||
for (int i = 0; i < dimension; i++) { | ||
final int dimensionId = i; | ||
List<SummaryStatistics> transposed = segmentLevelProfilerStates.stream() | ||
.map(state -> state.getStatistics().get(dimensionId)) | ||
.collect(Collectors.toList()); | ||
|
||
shardVectorProfile.add(AggregateSummaryStatistics.aggregate(transposed)); | ||
} | ||
|
||
// Log the results for each dimension | ||
for (int i = 0; i < shardVectorProfile.size(); i++) { | ||
StatisticalSummaryValues stats = shardVectorProfile.get(i); | ||
log.info( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've mentioned this before but would it just be easier to use |
||
"[KNN] Dimension {}: count={}, min={}, max={}, mean={}, sum={}, variance={}, std_deviation={}", | ||
i, | ||
stats.getN(), | ||
stats.getMin(), | ||
stats.getMax(), | ||
stats.getMean(), | ||
stats.getSum(), | ||
stats.getVariance(), | ||
Math.sqrt(stats.getVariance()) | ||
); | ||
} | ||
|
||
log.info("[KNN] Profiling completed for field: {} in shard: {}", fieldName, indexShard.shardId()); | ||
} catch (Exception e) { | ||
log.error( | ||
"[KNN] Critical error during profiling for field: {} in shard: {}: {}", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What does |
||
fieldName, | ||
indexShard.shardId(), | ||
e.getMessage(), | ||
e | ||
); | ||
} | ||
|
||
return shardVectorProfile; | ||
} | ||
|
||
/** | ||
* Load all of the k-NN segments for this shard into the cache. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. NIT: Does this comment need to be changed? |
||
* | ||
|
@@ -235,4 +317,13 @@ static class EngineFileContext { | |
private final VectorDataType vectorDataType; | ||
private final SegmentInfo segmentInfo; | ||
} | ||
|
||
/** | ||
* Profile the vector fields in this shard with default field name. | ||
* | ||
* @return List of statistical summaries for each dimension | ||
*/ | ||
public List<StatisticalSummaryValues> profile() { | ||
return profile("my_vector_field"); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.knn.index.query; | ||
|
||
import lombok.experimental.UtilityClass; | ||
import org.apache.lucene.index.LeafReader; | ||
import org.opensearch.knn.profiler.SegmentProfileKNNCollector; | ||
import org.opensearch.knn.profiler.SegmentProfilerState; | ||
|
||
import java.io.IOException; | ||
import java.util.Locale; | ||
|
||
/** | ||
* Utility class to get segment profiler state for a given field | ||
*/ | ||
@UtilityClass | ||
public class SegmentProfilerUtil { | ||
|
||
/** | ||
* Gets the segment profile state for a given field | ||
* @param leafReader The leaf reader to query | ||
* @param fieldName The field name to profile | ||
* @return The segment profiler state | ||
* @throws IOException If there's an error reading the segment | ||
*/ | ||
public static SegmentProfilerState getSegmentProfileState(final LeafReader leafReader, String fieldName) throws IOException { | ||
final SegmentProfileKNNCollector tempCollector = new SegmentProfileKNNCollector(); | ||
leafReader.searchNearestVectors(fieldName, new float[0], tempCollector, null); | ||
if (tempCollector.getSegmentProfilerState() == null) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we want to throw an exception if we're unable to get the SegmentProfilerState? What if the user has the feature disabled? |
||
throw new IllegalStateException(String.format(Locale.ROOT, "No segment state found for field %s", fieldName)); | ||
} | ||
return tempCollector.getSegmentProfilerState(); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.knn.plugin.transport; | ||
|
||
import org.opensearch.action.ActionType; | ||
|
||
/** | ||
* Action for profiling KNN vectors in an index | ||
*/ | ||
public class KNNProfileAction extends ActionType<KNNProfileResponse> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I know we synced offline on this but is it possible to move the API implementation to another PR? I don't think there's |
||
public static final String NAME = "indices:knn/vector/profile"; | ||
public static final KNNProfileAction INSTANCE = new KNNProfileAction(); | ||
|
||
private KNNProfileAction() { | ||
super(NAME, KNNProfileResponse::new); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should write a integration test as well that covers across multiple segments. Let's leave this towards the end.