-
Notifications
You must be signed in to change notification settings - Fork 169
Adding serializer and api implementation for segment profiler state #2687
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adding serializer and api implementation for segment profiler state #2687
Conversation
Signed-off-by: Arun Ganesh <[email protected]>
Signed-off-by: Arun Ganesh <[email protected]>
CHANGELOG.md
Outdated
## [Unreleased 2.x](https://github.com/opensearch-project/k-NN/compare/2.19...2.x) | ||
### Features | ||
* [Vector Profiler] Adding basic generic vector profiler implementation and tests. [#2624](https://github.com/opensearch-project/k-NN/pull/2624) | ||
* [Vector Profiler] Updating serializer, compression, and api. [#2687](https://github.com/opensearch-project/k-NN/pull/2687) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we be a little more specific on what we're updating?
Also AFAIK we're not updating anything related to how compression is done or any API (but we're introducing one). Should we reword this?
* @param fieldName The name of the vector field to profile | ||
* @return List of statistical summaries for each dimension | ||
*/ | ||
// TODO: Write unit tests to ensure that the segment statistic aggregation is correct. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we finish this TODO and write unit tests for the aggregation?
* @return List of statistical summaries for each dimension | ||
*/ | ||
// TODO: Write unit tests to ensure that the segment statistic aggregation is correct. | ||
public List<StatisticalSummaryValues> profile(String fieldName) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: final variables whenever possible
|
||
// Get dimension and validate all segments have the same dimension | ||
int dimension = segmentLevelProfilerStates.get(0).getDimension(); | ||
boolean dimensionsMatch = segmentLevelProfilerStates.stream().allMatch(state -> state.getDimension() == dimension); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would there ever be a case where the dimensions won't match accordingly here? Do we need to check for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reverting back to previous way of implementing KNNIndexShard as current way is less robust.
|
||
if (!dimensionsMatch) { | ||
log.error("[KNN] Inconsistent dimensions found across segments"); | ||
return shardVectorProfile; // Return empty list |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: you can return List.of()
|
||
for (SegmentProfilerState state : segmentLevelProfilerStates) { | ||
List<SummaryStatistics> stateStats = state.getStatistics(); | ||
if (dimensionId < stateStats.size()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again - if the dimension is constant here do we really need this check? it will always be within bounds.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Understood, reverting back to previous way of implementing KNNIndexShard as current way is less robust.
List<SummaryStatistics> stateStats = state.getStatistics(); | ||
if (dimensionId < stateStats.size()) { | ||
SummaryStatistics stat = stateStats.get(dimensionId); | ||
if (stat != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When can this be possible?
log.info("[KNN] Profiling completed for field: {} in shard: {}", fieldName, indexShard.shardId()); | ||
} catch (Exception e) { | ||
log.error( | ||
"[KNN] Critical error during profiling for field: {} in shard: {}: {}", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Critical sounds a bit too extreme here given we're not in the hot path.
* @return List of statistical summaries for each dimension | ||
*/ | ||
public List<StatisticalSummaryValues> profile() { | ||
return profile("target_field"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We shouldn't hard code a field name here. I understand we're doing this from a benchmarking OSB perspective but if we want customers to be able to leverage this feature we need to provide the ability to take in a field/index name as input.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed specific field input
int fieldNumber = segmentReadState.fieldInfos.fieldInfo(field).getFieldNumber(); | ||
|
||
try (IndexInput input = segmentReadState.directory.openInput(quantizationStateFileName, IOContext.READONCE)) { | ||
try (IndexInput input = segmentReadState.directory.openInput(quantizationStateFileName, IOContext.DEFAULT)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you give some context around this? Why are we changing from READONCE to DEFAULT?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mostly did it for consistency when accessing quantization state files multiple times.
https://lucene.apache.org/core/9_12_1/core/org/apache/lucene/store/IOContext.html?is-external=true
// should skip graph building only for non quantization use case and if threshold is met | ||
if (quantizationState == null && shouldSkipBuildingVectorDataStructure(totalLiveDocs)) { | ||
log.debug( | ||
log.info( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: use log.debug wherever possible
|
||
SegmentProfilerState segmentProfilerState = null; | ||
if (totalLiveDocs > 0) { | ||
// TODO:Refactor to another init |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems important to the PR. Are we going to act on this TODO?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Refactors NativeEnginesVectorsWriter class to encompass segmentStateWriter and initSegmentStateWriterIfNecessary()
/** | ||
* A utility function to get {@link SegmentProfilerState} for a given segment and field. | ||
* This needs to public as we are accessing this on a transport action | ||
* TODO: move this out of this Util class and into another one. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here. This seems important to the PR. Should we complete this TODO?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The method has already been refactors into it's own class for under org.opensearch.knn.index.query. Need to remove the existing code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One over-arching comment here is that we should have unit/integration tests for all our code. Make sure to include them in the PR so that we can test our functionality such as making sure the aggregation at the shard level is what we're expecting.
* @return {@link SegmentProfilerState} | ||
* @throws IOException exception during reading the {@link SegmentProfilerState} | ||
*/ | ||
public static SegmentProfilerState getSegmentProfileState(final LeafReader leafReader, String fieldName) throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have the util function below that is a exact replica of this method. Do we still need this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, the method has already been refactors into it's own class for under org.opensearch.knn.index.query
/** | ||
* Get aggregated dimension statistics by index | ||
*/ | ||
private Map<String, Map<Integer, Map<String, Object>>> getAggregatedStats() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm assuming you want to get the aggregation at the field level here across all shards.
- Shouldn't this be by field/index instead of just index?
- NIT:
getAggregatedStats
can be more specific because there are now multiple layers of aggregation
continue; | ||
} | ||
|
||
Map<Integer, Map<String, Object>> dimensions = indexDimensions.computeIfAbsent(indexName, k -> new HashMap<>()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the mapping that you're trying to do is:
Map<Index, Map<Dimension ID, Map<String,Object>>.
In lucene and specifically within k-nn a index can have multiple fields. Not all fields are of type knn_vector
Map<Integer, Map<String, Object>> dimensions = indexDimensions.computeIfAbsent(indexName, k -> new HashMap<>()); | ||
|
||
for (int i = 0; i < stats.size(); i++) { | ||
StatisticalSummaryValues stat = stats.get(i); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please correct me if I'm wrong, but I think the crux of your situation here by looking at StatisticalSummaryValues
is that the library we're using doesn't provide us with a aggregation technique for that object.
If that is the case, how about we collect our SegmentProfileStates and put them in the shard response? That way we can craft both the shard/field level aggregation as we see fit since we have access to those values.
int finalI = i; | ||
Map<String, Object> dimension = dimensions.computeIfAbsent(i, k -> { | ||
Map<String, Object> newDim = new HashMap<>(); | ||
newDim.put("dimension", finalI); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: static imports when possible
double min = in.readDouble(); | ||
double max = in.readDouble(); | ||
double sum = in.readDouble(); | ||
this.dimensionStats.add(new StatisticalSummaryValues(mean, variance, n, min, max, sum)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we're deserialize it here again would it be worthwhile to just save the deserialization at the profile API layer?
What kind of responses do we need to deserialize here?
); | ||
|
||
knnIndexShard.warmup(); | ||
knnIndexShard.profile(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this in WarmupTransportAction?
out.writeVInt(dimension); | ||
out.writeVInt(statistics.size()); | ||
for (SummaryStatistics stat : statistics) { | ||
out.writeDouble(stat.getMean()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We shouldn't be serializing/deserializing the values but rather the object itself.
SummaryStatistics
already implements the serializable interface so we should be able to convert it directly to bytes without needing to retrieve it's properties.
|
||
for (int i = 0; i < statsSize; i++) { | ||
SummaryStatistics stat = new SummaryStatistics(); | ||
stat.addValue(input.readDouble()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we've previously had a discussion on the deserialization aspect of this - the same issue is present here as well.
Let's make sure to modify this as this would yield incorrect results for our profiler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this was just used to help test for compression by creating a new instance. Reverting back to pre-existing method for serializing and deserializing.
* Response for KNN profile request | ||
*/ | ||
@Log4j2 | ||
public class KNNProfileResponse extends BroadcastResponse implements ToXContentObject { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have not looked too closely into these since I do have some comments on the serialization/deserialization component.
Signed-off-by: Arun Ganesh <[email protected]>
builder.endObject(); | ||
} | ||
builder.endObject(); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about at the cluster level with every segment aggregated together? This output currently does this per shard right? Can we combine every SummaryStatistics
in the cluster together?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes the stats were currently per shard - added cluster level stats as well.
if (dim < state.getStatistics().size()) { | ||
SummaryStatistics stats = state.getStatistics().get(dim); | ||
|
||
totalCount += stats.getN(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we leverage a existing library for aggregation?
- This doesn't give an correct aggregation outside of min max and sum.
- https://commons.apache.org/proper/commons-math/commons-math-docs/apidocs/src-html/org/apache/commons/math4/legacy/stat/descriptive/AggregateSummaryStatistics.html#line.305 - there's a library provided to us that aggregates a
SummaryStatistics
object for us. Can we use that instead of trying to combine it on our own?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated to use AggregateSummaryStatistics
builder.startObject(shardProfileResult.shardId); | ||
|
||
// Individual segment statistics | ||
builder.startArray("segments"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you provide an example of output in the Javadoc of what this would look like?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated javadocs
|
||
@Override | ||
public void writeTo(StreamOutput streamOutput) throws IOException { | ||
super.writeTo(streamOutput); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's see if we use this override for anything. If we don't we can throw an exception here to make sure we don't stray off the intended path
* Constructor for reading from StreamInput | ||
*/ | ||
public KNNIndexShardProfileResult(StreamInput streamInput) throws IOException { | ||
this.shardId = streamInput.readString(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we ever use this constructor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, it was used previously in KNNProfileResponse, but when removing the logic and the above constructor I was able to validate the output was still working.
Signed-off-by: Arun Ganesh <[email protected]>
|
||
## [Unreleased 2.x](https://github.com/opensearch-project/k-NN/compare/2.19...2.x) | ||
### Features | ||
* [Vector Profiler] Adding basic generic vector profiler implementation and tests. [#2624](https://github.com/opensearch-project/k-NN/pull/2624) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we adding these changes? Weren't they already merged?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, updated branch to adjust with changes already implemented in feature branch
*/ | ||
@Log4j2 | ||
@AllArgsConstructor | ||
public class SegmentProfilerState implements Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this file being shown as new? Isn't this already in the branch?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated branch to adjust with changes already implemented in feature branch
…erializer Signed-off-by: oaganesh <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add unit/integration tests to validate shard aggregation functionality. You'll need to create multiple segments/shards to do this most likely.
Signed-off-by: Arun Ganesh <[email protected]>
Signed-off-by: Arun Ganesh <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few comments. I think overall structure is looking good. I think itd be good to enhance tests to ensure that we are validating functionality. Seems you are already working on that!
@Getter | ||
public class KNNProfileRequest extends BroadcastRequest<KNNProfileRequest> { | ||
|
||
private String index; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Broadcast request should already have the indices: https://github.com/opensearch-project/OpenSearch/blob/main/server/src/main/java/org/opensearch/action/support/broadcast/BroadcastRequest.java#L52.
public class KNNProfileRequest extends BroadcastRequest<KNNProfileRequest> { | ||
|
||
private String index; | ||
private String field; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: lets make these final.
|
||
@Override | ||
public ActionRequestValidationException validate() { | ||
return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: no need to override
* "failures": [] | ||
* } | ||
*/ | ||
public class KNNProfileResponse extends BroadcastResponse implements ToXContentObject { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: BroadcastResponse already implements ToXContentObject - do we need to do it again? https://github.com/opensearch-project/OpenSearch/blob/main/server/src/main/java/org/opensearch/action/support/broadcast/BroadcastResponse.java
*/ | ||
public class KNNProfileResponse extends BroadcastResponse implements ToXContentObject { | ||
|
||
List<KNNIndexShardProfileResult> shardProfileResults; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: private final?
KNNProfileResponse, | ||
KNNIndexShardProfileResult> { | ||
|
||
public static Logger logger = LogManager.getLogger(KNNProfileTransportAction.class); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
replace this with @Log4j2
annotation
KNNIndexShard knnIndexShard = new KNNIndexShard( | ||
indicesService.indexServiceSafe(shardRouting.shardId().getIndex()).getShard(shardRouting.shardId().id()) | ||
); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove for git history
|
||
@Setter | ||
@Getter | ||
public class SegmentProfileKNNCollector implements KnnCollector { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add comment on top of this class.
return response; | ||
} | ||
|
||
private void validateProfileResponse(Response response, int dimension) throws IOException, ParseException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Itd be good to update this to ensure expected statistics are being computed. For instance, you can generate the vectors from a normalized distribution (i.e. 0-1) and confirm that the mean values is computed as close to 0.5.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 agree with this.
Make sure to validate that the aggregation is what'd we expect. Since merge is out of scope for this this can probably be done be explicitly creating the 2 segments and calling the profile API to make sure the aggregate is what we expect.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated tests
import java.util.List; | ||
|
||
@AllArgsConstructor | ||
public class KNNIndexShardProfileResult implements Writeable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we also need deserialization logic with this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it may be useful especially when compressing vectors.
Signed-off-by: Arun Ganesh <[email protected]>
Description
Adding implementation for profiling and statistical analysis of KNN vector segments in OpenSearch with compression when benchmarking. Provides functionality to collect, process, and store statistical information about vector dimensions across different shards and segments.
Related Issues
Implements #2622
Check List
--signoff
.By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check ✔️.
Example input:
Example output: