-
Notifications
You must be signed in to change notification settings - Fork 25.2k
Increase concurrency for TS command #128419
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
6daee94
to
7f08143
Compare
Pinging @elastic/es-analytical-engine (Team:Analytics) |
Pinging @elastic/es-storage-engine (Team:StorageEngine) |
[
{
"operator": "TimeSeriesSourceOperator[shards = [.ds-metrics-hostmetricsreceiver.otel-default-2025.05.08-000001:0], maxPageSize = 2520[maxPageSize=2520, remainingDocs=2147388847]]",
"status":
{
"processed_slices": 1,
"processed_queries":
[
"IndexOrDocValuesQuery(indexQuery=@timestamp:[1746727208001 TO 9223372036854775807], dvQuery=@timestamp:[1746727208001 TO 9223372036854775807])"
],
"processed_shards":
[
".ds-metrics-hostmetricsreceiver.otel-default-2025.05.08-000001:0"
],
"process_nanos": 18334025,
"slice_index": 0,
"total_slices": 1,
"pages_emitted": 38,
"slice_min": 0,
"slice_max": 0,
"current": 0,
"rows_emitted": 94800,
"partitioning_strategies":
{
".ds-metrics-hostmetricsreceiver.otel-default-2025.05.08-000001:0": "SHARD"
},
"tsid_loaded": 7900,
"values_loaded": 284400
}
},
{
"operator": "EvalOperator[evaluator=DateTruncDatetimeEvaluator[fieldVal=Attribute[channel=1], rounding=Rounding[300000 in Z][fixed]]]",
"status":
{
"process_nanos": 310306,
"pages_processed": 38,
"rows_received": 94800,
"rows_emitted": 94800
}
},
{
"operator": "TimeSeriesAggregationOperator[blockHash=TimeSeriesBlockHash{keys=[BytesRefKey[channel=0], LongKey[channel=4]], entries=7900b}, aggregators=[GroupingAggregator[aggregatorFunction=RateDoubleGroupingAggregatorFunction[channels=[2, 1]], mode=INITIAL], GroupingAggregator[aggregatorFunction=ValuesBytesRefGroupingAggregatorFunction[channels=[3]], mode=INITIAL]]]",
"status":
{
"hash_nanos": 2006304,
"aggregation_nanos": 3007861,
"pages_processed": 38,
"rows_received": 94800,
"rows_emitted": 7900,
"emit_nanos": 1242781
}
}
] With this change, we save 2ms in hash_nanos and 3ms in aggregation_nanos because the TimeSeriesAggregationOperator now executes concurrently with the TimeSeriesSourceOperator. Also, we expect to save more from the TimeSeriesSourceOperator (18ms) by executing field extraction in a separate driver. The query time of this query decreased from 41ms to 36ms.
|
If I understand correctly, this allows using 2 threads per query - and we may add another one for field extraction? This sounds great, but may still leave resources unused (e.g. in sockets with dozens of cores), so we may want to investigate partitioning work per tsid, later. |
Thanks, Kostas! In the long term, we should explore better partitioning strategies, but this approach helps TS in the short term. |
Today, with
FROM
, we can partition a shard into multiple slices, allowing multiple drivers to execute against a single shard.For time-series, specifically rate aggregation, the data needs to arrive in order. Strictly speaking, data for one tsid in each bucket must arrive in order to avoid buffering. There are several options to parallel the execution:
Split the queries into multiple time intervals based on the bucket interval, with multiple drivers executing concurrently at different intervals. However, since the data is sorted by TSID and timestamp, this partitioning might not be efficient within each driver.
Alternatively, split the current single driver vertically into multiple parts, with one driver for each part.
This PR implements the first step of the option-2, where time-series source operator and time-series aggregation operator are executed in two separate drivers.
The field extractions within TS will be executed in a separate driver in a follow-up PR.