Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature Request] OpenSearchArrowClient (OSAC) to support consuming Arrow Flight Streams #17654

Open
rishabhmaurya opened this issue Mar 21, 2025 · 2 comments
Assignees
Labels
enhancement Enhancement or improvement to existing feature or request Search:Query Capabilities untriaged

Comments

@rishabhmaurya
Copy link
Contributor

Is your feature request related to a problem? Please describe

With Arrow Flight as a new transport and Streaming support with arrow format introduced in #16679, I would like to propose OpenSearchArrowClient (OSAC) - enhancing the OpenSearch Java Client with Arrow Flight support. It delivers search and aggregation results as Arrow streams, reusing SearchRequest APIs and handling authentication transparently.

Describe the solution you'd like

Details on Arrow Flight java client - https://arrow.apache.org/docs/java/flight.html#

Motivation

Enable Arrow stream consumption for OpenSearch data.
Maintain consistency with existing request-building APIs.
Simplify auth for Flight interactions.

Long term, we would like to migrate and replace consumption of all OpenSearch APIs producing columnar data using OSAC.

Image
OSAC SeqDiag.txt

Sample usage:

Search

OpenSearchArrowClient client = new OpenSearchArrowClient(
    "localhost", 9200, "localhost", 50051, "admin", "password", true
);

SearchRequest request = new SearchRequest("logs-*")
    .source(SearchSourceBuilder.searchSource()
        .query(QueryBuilders.matchQuery("message", "error"))
        .sort("timestamp", SortOrder.DESC)
        .size(100));

try (FlightStream stream = client.searchAsStream(request)) {
    while (stream.next()) {
        VectorSchemaRoot root = stream.getRoot();
        VarCharVector idVector = (VarCharVector) root.getVector("_id");
        Float4Vector scoreVector = (Float4Vector) root.getVector("_score");
        VarCharVector sourceVector = (VarCharVector) root.getVector("_source");
        
        // Process batch
        for (int i = 0; i < root.getRowCount(); i++) {
            String id = new String(idVector.get(i));
            float score = scoreVector.get(i);
            String source = new String(sourceVector.get(i));
            System.out.printf("ID: %s, Score: %.1f, Source: %s%n", id, score, source);
        }
    }
}
client.shutdown();
_id _score _source
log_2025-03-21_001 2.3 {"timestamp": "2025-03-21T10:00:00", "message": "disk error"}
log_2025-03-21_002 1.9 {"timestamp": "2025-03-21T09:55:12", "message": "network error"}
log_2025-03-20_003 1.7 {"timestamp": "2025-03-20T23:45:00", "message": "auth error"}

Aggregation

OpenSearchArrowClient client = new OpenSearchArrowClient(
    "localhost", 9200, "localhost", 50051, false
);

TermsAggregationBuilder byRegion = AggregationBuilders.terms("by_region")
    .field("region").size(5)
    .subAggregation(AggregationBuilders.terms("by_category").field("category").size(5));
SearchRequest request = new SearchRequest("sales")
    .source(SearchSourceBuilder.searchSource().aggregation(byRegion));

try (FlightStream stream = client.searchAsStream(request)) {
    while (stream.next()) {
        VectorSchemaRoot root = stream.getRoot();
        VarCharVector regionVector = (VarCharVector) root.getVector("region");
        VarCharVector categoryVector = (VarCharVector) root.getVector("category");
        BigIntVector countVector = (BigIntVector) root.getVector("doc_count");
        
        // Process batch
        for (int i = 0; i < root.getRowCount(); i++) {
            String region = new String(regionVector.get(i));
            String category = new String(categoryVector.get(i));
            long count = countVector.get(i);
            System.out.printf("Region: %s, Category: %s, Count: %d%n", region, category, count);
        }
    }
}
client.shutdown();
region category doc_count
North Electronics 120
North Clothing 85
South Electronics 95
South Furniture 60
West Books 75

Note:

  1. Nested hits and aggregations can be handled using Arrow format, however, it is currently not described in the examples here. I will add more such examples as we will progress.
  2. Search hits and aggregation responses cannot be combined together as its columnar format with the rigid schema.

Next Steps

Related component

Search:Query Capabilities

Describe alternatives you've considered

No response

Additional context

No response

@msfroh
Copy link
Collaborator

msfroh commented Mar 21, 2025

Nice!

Do you think we should move this to the opensearch-java repo?

@rishabhmaurya
Copy link
Contributor Author

@msfroh we can, I added it here for better visibility :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement Enhancement or improvement to existing feature or request Search:Query Capabilities untriaged
Projects
Status: Todo
Status: 🆕 New
Development

No branches or pull requests

2 participants