Skip to content

[RFC] Data streaming support for OSB #789

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
gkamat opened this issue Mar 17, 2025 · 0 comments
Open

[RFC] Data streaming support for OSB #789

gkamat opened this issue Mar 17, 2025 · 0 comments
Assignees
Labels
RFC Request for comment on major changes

Comments

@gkamat
Copy link
Collaborator

gkamat commented Mar 17, 2025

Motivation

OpenSearch Benchmark is the de-facto performance benchmarking tool for OpenSearch, that is widely used by developers, end-users and organizations to measure and track performance for their deployments, workloads and use-cases. Being packaged with several workloads targeting various domains including log-analytics, full-text search, geolocation, vectors, etc., users rely upon it as a reference benchmark to compare different versions, configuration options, cluster sizes and other options. Given its ease of use as an OpenSearch client and full-fledged functionality, users have come to rely upon it not just to measure latency for individual queries, but also as a load testing tool.

However, OSB, not having originally been designed for such varied uses suffers from various limitations. Some of those have been addressed in recent days, such as the ability to scale out to generate high traffic rates. Yet other features are currently in the works, such as support for gradual ramp-up of workload intensity and red-line performance testing. One area that is still lacking is the capability for ingesting large amounts of data. There are some stop-gap options provided such as the expand-data-corpus feature in the http_logs workload, but these have their own limitations.

As cluster sizes become larger and users and customers deal with large data corpora, the tiny workloads packaged with OSB (most are well under 50 GB) are woefully inadequate. New paradigms like Serverless insist upon being able to ingest multi-terabytes of data at rapid rates. Features like OpenSearch reader-writer separation and associated shard allocations need to be exercised in specific ways. These scenarios become necessary to verify systems scale out (and in) rapidly based upon changes in customer workloads. To be able to do that, two capabilities are needed:

  • Corpus generators, including synthetic workload generators
  • An ability for OSB to ingest data via streaming rather than from static corpus files

This document is a proposal on a solution to address the latter. The first item is addressed in this issue.

Stakeholders

  • OpenSearch users who want to benchmark 10+ TB workloads
  • Managed service offerings who would like to benchmark large clusters and deployments
  • Serverless implementations that would like to simulate large workload scenarios to test scale out of their service
  • Users interested in debugging performance issues that occur only at high scale.
  • OpenSearch developers who want to benchmark with larger data corpora than are available with the existing pre-packaged workloads.

User Stories

  • As an OpenSearch user, I would like to evaluate the performance of my cluster with a real-life workload.
  • As an OpenSearch developer, I want to test my new feature at scale.
  • As an Managed Service operator, I need to reproduce an issue encountered by a major customer who is running a production workload on a large cluster.
  • As an OpenSearch Serverless operator, I need to ingest several terabytes of data to verify that scale-out of the service occurs promptly as expected.

Methodology

An OSB workload is comprised of a set of directive files, templates, optional code that defines additional operations and finally data corpora files. The latter are compressed files composed of standard OpenSearch JSON documents. In additional, there are associated metadata (offset) files that aid with ingestion.

To run workloads with large amounts of data, correspondingly large data corpora need to be generated, pre-processed and compressed. These files then need to be published to a repository, from which they can be downloaded by users when they desire to initiate a test. This download and decompression step is necessarily slow and cumbersome. Furthermore, the corpus needs to be saved on the load generation instance. If machine images are being used for fast turnaround, this becomes an insurmountable burden, since to speed up tests, data corpora would ideally be frozen on the machine image. Multi-terabyte data corpora render this option unpractical.

The solution then is to have OSB be able to ingest data via streaming, which would work seamlessly with a just-in-time corpus generator. For many log-analytics scenarios, documents with consistent, monotonically-increasing timestamps are essential, as are queries that mesh well with such corpora. Ideally, documents would be stamped with the current time exactly when they are generated. The synthetic workload generator being worked on currently will have such an option.

Data streaming, in conjunction with various models of workload generation, effective scale out and ramp-up support will address most of the current limitations that hinder scenarios such as Serverless implementations, managed services, large organizations and users from reproducing real-life issues and bottlenecks.

Data streaming with also enhance OSB to work well with ingestion pathways like Data Prepper, Kinesis and Kafka. New workloads can be set up with appropriate document generators and will then permit multi-terabyte ingestion scenarios.

To add data-stream support, OSB will need to be updated to include support for the data streams feature of OpenSearch, by adding runner functions that exercise the streaming API. There is some rudimentary support already, but this needs to be enhanced and tested. The next step is to have the Thespian clients and worker coordinators orchestrate streaming chunks of documents amongst themselves, while ensuring that time-series documents are properly ingested in order.

Preliminary work on data streaming is currently in progress and more details will be added as the next steps become clearer.

Use Model

OpenSearch has support for data streams, which are targeted towards ingesting continuously generated time-series data such as logs, events, and metrics into OpenSearch. As with any time-series scenario, the number of documents grows continually, without the need to update older documents. The typical workflow to manage time-series data involves multiple steps, such as creating a rollover index alias, defining a write index, and defining common mappings and settings for the backing indexes. Data streams encapsulate and simplify these operations. The feature works best in conjunction with Index State Management, which permits defining policies that handle index rollovers and deletion.

A data stream is internally composed of multiple backing indexes. Search requests are routed to all the backing indexes, while indexing requests are routed to the latest write index. ISM policies are used to automatically handle index rollovers or deletions.

OpenSearch Benchmark will be enhanced to support the full workflow required to manage data streams, by adding support for the requisite operations, which include the following:

Creating an index template

This configures a set of indexes as a data stream. The data_stream field indicates that the item is a data stream and not a regular index template. The index pattern must match the name of the data stream. Index mappings and other settings can be added here as well.


PUT _index_template/logs-template
{
  "index_patterns": [
    "my-data-stream",
    "logs-*"
  ],
  "data_stream": {},
  "priority": 100
}

PUT _index_template/logs-template-nginx
{
  "index_patterns": "logs-nginx",
  "data_stream": {
    "timestamp_field": {
      "name": "request_time"
    }
  },
  "priority": 200,
  "template": {
    "settings": {
      "number_of_shards": 1,
      "number_of_replicas": 0
    }
  }
}

Creating a data stream

This step will also auto-initialize the first backing index. Data can alternatively be ingested without creating a data stream explicitly; since this will occur automatically at the start of ingestion, based on the configuration specified in the template.

PUT _data_stream/logs-redis

POST logs-staging/_doc
{
  "message": "login attempt failed",
  "@timestamp": "2013-03-01T00:00:00"
}

Ingesting data into the data stream

This uses the regular indexing APIs, so no additional new support is required here.

POST logs-redis/_doc
{
  "message": "login attempt",
  "@timestamp": "2013-03-01T00:00:00"
}

Searching a data stream

Like ingesting data above, this is similar to searching a regular index or an index alias, and no additional support is required for this capability. The search operation applies to all of the backing indexes (i.e., all data present in the stream).

GET logs-redis/_search
{
  "query": {
    "match": {
      "message": "login"
    }
  }
}

Example response:

{
  "took" : 514,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 1,
      "relation" : "eq"
    },
    "max_score" : 0.2876821,
    "hits" : [
      {
        "_index" : ".ds-logs-redis-000001",
        "_type" : "_doc",
        "_id" : "-rhVmXoBL6BAVWH3mMpC",
        "_score" : 0.2876821,
        "_source" : {
          "message" : "login attempt",
          "@timestamp" : "2013-03-01T00:00:00"
        }
      }
    ]
  }
}

Rolling over a data stream

A rollover operation creates a new backing index that becomes the data stream's new write index. ISM policies will carry out this operation automatically as configured, but a manual rollover operation on the data stream can be performed like so:

POST logs-redis/_rollover

Example response:

{
  "acknowledged" : true,
  "shards_acknowledged" : true,
  "old_index" : ".ds-logs-redis-000001",
  "new_index" : ".ds-logs-redis-000002",
  "rolled_over" : true,
  "dry_run" : false,
  "conditions" : { }
}

Deleting a data stream

The delete operation first deletes the backing indexes of a data stream and then deletes the data stream itself. To delete a data stream and all of its hidden backing indexes:

DELETE _data_stream/logs-redis

Milestones

Category Task Effort Completion
Design POC and HLD for data streaming 1 4/21
Development Implement API for streaming 2 4/28
Create basic workload 1
Integrate with SDG 1
Update big5 corpus generation 1
Testing Add unit and integration tests 1
Release PR and review 1
Release 1

How Can You Help?

  • Any general comments about the overall direction are welcome.
  • Indicating whether the areas scoped out above for data stream support include your typical or desired scenarios and use-cases will be helpful in prioritizing them.
  • Provide early feedback by testing features as they become available.
  • Help out on the implementation! Check out the issues page for work that is ready to be picked up.

Next Steps

We will incorporate feedback and add more details on design, implementation and prototypes as they become available.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
RFC Request for comment on major changes
Projects
Status: 🏗 In progress
Status: New
Development

No branches or pull requests

1 participant