Skip to content

Increase concurrency for TS command #128419

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.compute.operator.exchange;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.IsBlockedResult;

import java.util.concurrent.atomic.AtomicInteger;

public final class DirectExchange {
private final ExchangeBuffer buffer;
private final AtomicInteger pendingSinks = new AtomicInteger(0);
private final AtomicInteger pendingSources = new AtomicInteger(0);

public DirectExchange(int bufferSize) {
this.buffer = new ExchangeBuffer(bufferSize);
}

public ExchangeSource exchangeSource() {
return new DirectExchangeSource();
}

final class DirectExchangeSource implements ExchangeSource {
private boolean finished = false;

DirectExchangeSource() {
pendingSources.incrementAndGet();
}

@Override
public Page pollPage() {
return buffer.pollPage();
}

@Override
public void finish() {
finished = true;
if (pendingSources.decrementAndGet() == 0) {
buffer.finish(true);
}
}

@Override
public boolean isFinished() {
return finished || buffer.isFinished();
}

@Override
public int bufferSize() {
return buffer.size();
}

@Override
public IsBlockedResult waitForReading() {
return buffer.waitForReading();
}
}

public ExchangeSink exchangeSink() {
return new DirectExchangeSink();
}

final class DirectExchangeSink implements ExchangeSink {
private boolean finished = false;

DirectExchangeSink() {
pendingSinks.incrementAndGet();
}

@Override
public void addPage(Page page) {
buffer.addPage(page);
}

@Override
public void finish() {
finished = true;
if (pendingSinks.decrementAndGet() == 0) {
buffer.finish(false);
}
}

@Override
public boolean isFinished() {
return finished || buffer.isFinished();
}

@Override
public void addCompletionListener(ActionListener<Void> listener) {
buffer.addCompletionListener(listener);
}

@Override
public IsBlockedResult waitForWriting() {
return buffer.waitForWriting();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.Rounding;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.compute.lucene.TimeSeriesSourceOperator;
import org.elasticsearch.compute.operator.DriverProfile;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.xpack.esql.EsqlTestUtils;
import org.elasticsearch.xpack.esql.core.type.DataType;
Expand Down Expand Up @@ -729,4 +731,29 @@ public void testIndexMode() {
});
assertThat(failure.getMessage(), containsString("Unknown index [hosts-old]"));
}

public void testProfile() {
EsqlQueryRequest request = new EsqlQueryRequest();
request.profile(true);
request.query("TS hosts | STATS sum(rate(request_count)) BY cluster, bucket(@timestamp, 1minute) | SORT cluster");
try (var resp = run(request)) {
EsqlQueryResponse.Profile profile = resp.profile();
List<DriverProfile> dataProfiles = profile.drivers().stream().filter(d -> d.description().equals("data")).toList();
int totalTimeSeries = 0;
for (DriverProfile p : dataProfiles) {
if (p.operators().stream().anyMatch(s -> s.status() instanceof TimeSeriesSourceOperator.Status)) {
totalTimeSeries++;
assertThat(p.operators(), hasSize(2));
assertThat(p.operators().get(1).operator(), equalTo("ExchangeSinkOperator"));
} else {
assertThat(p.operators(), hasSize(4));
assertThat(p.operators().get(0).operator(), equalTo("ExchangeSourceOperator"));
assertThat(p.operators().get(1).operator(), containsString("EvalOperator[evaluator=DateTruncDatetimeEvaluator"));
assertThat(p.operators().get(2).operator(), containsString("TimeSeriesAggregationOperator"));
assertThat(p.operators().get(3).operator(), equalTo("ExchangeSinkOperator"));
}
}
assertThat(totalTimeSeries, equalTo(dataProfiles.size() / 2));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec;
import org.elasticsearch.xpack.esql.plan.physical.FilterExec;
import org.elasticsearch.xpack.esql.plan.physical.LimitExec;
import org.elasticsearch.xpack.esql.plan.physical.ParallelExec;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesSourceExec;
import org.elasticsearch.xpack.esql.plan.physical.TopNExec;
Expand Down Expand Up @@ -78,7 +79,7 @@ private static boolean stopPushDownExtract(PhysicalPlan p) {
return p instanceof FilterExec || p instanceof TopNExec || p instanceof LimitExec;
}

private TimeSeriesSourceExec addFieldExtract(
private PhysicalPlan addFieldExtract(
LocalPhysicalOptimizerContext context,
EsQueryExec query,
boolean keepDocAttribute,
Expand All @@ -96,7 +97,7 @@ private TimeSeriesSourceExec addFieldExtract(
if (keepDocAttribute == false) {
attrs = attrs.stream().filter(a -> EsQueryExec.isSourceAttribute(a) == false).toList();
}
return new TimeSeriesSourceExec(
var tsSource = new TimeSeriesSourceExec(
query.source(),
attrs,
query.query(),
Expand All @@ -107,5 +108,8 @@ private TimeSeriesSourceExec addFieldExtract(
attributesToExtract,
query.estimatedRowSize()
);
// Use a separate driver for the time-series source to split the pipeline to increase parallelism,
// since the time-series source must be executed with a single driver at the shard level.
return new ParallelExec(query.source(), tsSource);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.esql.plan.physical;

import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
import org.elasticsearch.xpack.esql.core.tree.Source;

import java.io.IOException;

/**
* A physical plan node that hints the plan should be partitioned vertically and executed in parallel.
*/
public final class ParallelExec extends UnaryExec {

public ParallelExec(Source source, PhysicalPlan child) {
super(source, child);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
throw new UnsupportedOperationException("local plan");
}

@Override
public String getWriteableName() {
throw new UnsupportedOperationException("local plan");
}

@Override
protected NodeInfo<? extends ParallelExec> info() {
return NodeInfo.create(this, ParallelExec::new, child());
}

@Override
public ParallelExec replaceChild(PhysicalPlan newChild) {
return new ParallelExec(source(), newChild);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -260,8 +260,7 @@ public PhysicalOperation timeSeriesSourceOperation(TimeSeriesSourceExec ts, Loca
);
Layout.Builder layout = new Layout.Builder();
layout.append(ts.output());
int instanceCount = Math.max(1, luceneFactory.taskConcurrency());
context.driverParallelism(new DriverParallelism(DriverParallelism.Type.DATA_PARALLELISM, instanceCount));
context.driverParallelism(DriverParallelism.SINGLE);
return PhysicalOperation.fromSource(luceneFactory, layout.build());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.elasticsearch.compute.operator.SourceOperator;
import org.elasticsearch.compute.operator.SourceOperator.SourceOperatorFactory;
import org.elasticsearch.compute.operator.StringExtractOperator;
import org.elasticsearch.compute.operator.exchange.DirectExchange;
import org.elasticsearch.compute.operator.exchange.ExchangeSink;
import org.elasticsearch.compute.operator.exchange.ExchangeSinkOperator.ExchangeSinkOperatorFactory;
import org.elasticsearch.compute.operator.exchange.ExchangeSource;
Expand Down Expand Up @@ -107,6 +108,7 @@
import org.elasticsearch.xpack.esql.plan.physical.LookupJoinExec;
import org.elasticsearch.xpack.esql.plan.physical.MvExpandExec;
import org.elasticsearch.xpack.esql.plan.physical.OutputExec;
import org.elasticsearch.xpack.esql.plan.physical.ParallelExec;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.esql.plan.physical.ProjectExec;
import org.elasticsearch.xpack.esql.plan.physical.RrfScoreEvalExec;
Expand Down Expand Up @@ -195,6 +197,7 @@ public LocalExecutionPlanner(
*/
public LocalExecutionPlan plan(String description, FoldContext foldCtx, PhysicalPlan localPhysicalPlan) {
var context = new LocalExecutionPlannerContext(
description,
new ArrayList<>(),
new Holder<>(DriverParallelism.SINGLE),
configuration.pragmas(),
Expand Down Expand Up @@ -272,9 +275,11 @@ else if (node instanceof EsQueryExec esQuery) {
} else if (node instanceof ShowExec show) {
return planShow(show);
} else if (node instanceof ExchangeSourceExec exchangeSource) {
return planExchangeSource(exchangeSource, context);
return planExchangeSource(exchangeSource, exchangeSourceSupplier);
} else if (node instanceof ParallelExec parallelExec) {
return planParallelNode(parallelExec, context);
} else if (node instanceof TimeSeriesSourceExec ts) {
return planTimeSeriesNode(ts, context);
return planTimeSeriesSource(ts, context);
}
// lookups and joins
else if (node instanceof EnrichExec enrich) {
Expand Down Expand Up @@ -332,8 +337,8 @@ private PhysicalOperation planEsQueryNode(EsQueryExec esQueryExec, LocalExecutio
return physicalOperationProviders.sourcePhysicalOperation(esQueryExec, context);
}

private PhysicalOperation planTimeSeriesNode(TimeSeriesSourceExec esQueryExec, LocalExecutionPlannerContext context) {
return physicalOperationProviders.timeSeriesSourceOperation(esQueryExec, context);
private PhysicalOperation planTimeSeriesSource(TimeSeriesSourceExec ts, LocalExecutionPlannerContext context) {
return physicalOperationProviders.timeSeriesSourceOperation(ts, context);
}

private PhysicalOperation planEsStats(EsStatsQueryExec statsQuery, LocalExecutionPlannerContext context) {
Expand Down Expand Up @@ -410,7 +415,7 @@ private PhysicalOperation planExchangeSink(ExchangeSinkExec exchangeSink, LocalE
return source.withSink(new ExchangeSinkOperatorFactory(exchangeSinkSupplier), source.layout);
}

private PhysicalOperation planExchangeSource(ExchangeSourceExec exchangeSource, LocalExecutionPlannerContext context) {
private PhysicalOperation planExchangeSource(ExchangeSourceExec exchangeSource, Supplier<ExchangeSource> exchangeSourceSupplier) {
Objects.requireNonNull(exchangeSourceSupplier, "ExchangeSourceHandler wasn't provided");

var builder = new Layout.Builder();
Expand All @@ -422,6 +427,33 @@ private PhysicalOperation planExchangeSource(ExchangeSourceExec exchangeSource,
return PhysicalOperation.fromSource(new ExchangeSourceOperatorFactory(exchangeSourceSupplier), layout);
}

private PhysicalOperation planParallelNode(ParallelExec parallelExec, LocalExecutionPlannerContext context) {
var exchange = new DirectExchange(context.queryPragmas.exchangeBufferSize());
{
PhysicalOperation source = plan(parallelExec.child(), context);
var sinkOperator = source.withSink(new ExchangeSinkOperatorFactory(exchange::exchangeSink), source.layout);
final TimeValue statusInterval = configuration.pragmas().statusInterval();
context.addDriverFactory(
new DriverFactory(
new DriverSupplier(
context.description,
ClusterName.CLUSTER_NAME_SETTING.get(settings).value(),
Node.NODE_NAME_SETTING.get(settings),
context.bigArrays,
context.blockFactory,
sinkOperator,
statusInterval,
settings
),
DriverParallelism.SINGLE
)
);
context.driverParallelism.set(DriverParallelism.SINGLE);
}
var exchangeSource = new ExchangeSourceExec(parallelExec.source(), parallelExec.output(), false);
return planExchangeSource(exchangeSource, exchange::exchangeSource);
}

private PhysicalOperation planTopN(TopNExec topNExec, LocalExecutionPlannerContext context) {
final Integer rowSize = topNExec.estimatedRowSize();
assert rowSize != null && rowSize > 0 : "estimated row size [" + rowSize + "] wasn't set";
Expand Down Expand Up @@ -923,6 +955,7 @@ enum Type {
* maintains information how many driver instances should be created for a given driver.
*/
public record LocalExecutionPlannerContext(
String description,
List<DriverFactory> driverFactories,
Holder<DriverParallelism> driverParallelism,
QueryPragmas queryPragmas,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import org.elasticsearch.xpack.esql.plan.physical.LimitExec;
import org.elasticsearch.xpack.esql.plan.physical.LocalSourceExec;
import org.elasticsearch.xpack.esql.plan.physical.MvExpandExec;
import org.elasticsearch.xpack.esql.plan.physical.ParallelExec;
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
import org.elasticsearch.xpack.esql.plan.physical.ProjectExec;
import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesAggregateExec;
Expand Down Expand Up @@ -1864,7 +1865,8 @@ public void testPushDownFieldExtractToTimeSeriesSource() {
var timeSeriesFinalAgg = as(partialAgg.child(), TimeSeriesAggregateExec.class);
var exchange = as(timeSeriesFinalAgg.child(), ExchangeExec.class);
var timeSeriesPartialAgg = as(exchange.child(), TimeSeriesAggregateExec.class);
var timeSeriesSource = as(timeSeriesPartialAgg.child(), TimeSeriesSourceExec.class);
var parallel = as(timeSeriesPartialAgg.child(), ParallelExec.class);
var timeSeriesSource = as(parallel.child(), TimeSeriesSourceExec.class);
assertThat(timeSeriesSource.attributesToExtract(), hasSize(1));
FieldAttribute field = as(timeSeriesSource.attributesToExtract().getFirst(), FieldAttribute.class);
assertThat(field.name(), equalTo("network.total_bytes_in"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import org.elasticsearch.xpack.esql.expression.Order;
import org.elasticsearch.xpack.esql.index.EsIndex;
import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec;
import org.elasticsearch.xpack.esql.plan.physical.LimitExec;
import org.elasticsearch.xpack.esql.plan.physical.ParallelExec;
import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
import org.elasticsearch.xpack.esql.session.Configuration;
Expand All @@ -61,6 +63,7 @@
import java.util.Map;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.lessThanOrEqualTo;

public class LocalExecutionPlannerTests extends MapperServiceTestCase {
Expand Down Expand Up @@ -204,6 +207,27 @@ public void testDriverClusterAndNodeName() throws IOException {
assertThat(supplier.nodeName(), equalTo("node-1"));
}

public void testParallel() throws Exception {
EsQueryExec queryExec = new EsQueryExec(
Source.EMPTY,
index().name(),
IndexMode.STANDARD,
index().indexNameWithModes(),
List.of(),
null,
null,
null,
between(1, 1000)
);
var limitExec = new LimitExec(
Source.EMPTY,
new ParallelExec(queryExec.source(), queryExec),
new Literal(Source.EMPTY, between(1, 100), DataType.INTEGER)
);
LocalExecutionPlanner.LocalExecutionPlan plan = planner().plan("test", FoldContext.small(), limitExec);
assertThat(plan.driverFactories, hasSize(2));
}

private int randomEstimatedRowSize(boolean huge) {
int hugeBoundary = SourceOperator.MIN_TARGET_PAGE_SIZE * 10;
return huge ? between(hugeBoundary, Integer.MAX_VALUE) : between(1, hugeBoundary);
Expand Down