diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/DirectExchange.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/DirectExchange.java new file mode 100644 index 0000000000000..4909e17aabcc2 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/DirectExchange.java @@ -0,0 +1,104 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.operator.exchange; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.operator.IsBlockedResult; + +import java.util.concurrent.atomic.AtomicInteger; + +public final class DirectExchange { + private final ExchangeBuffer buffer; + private final AtomicInteger pendingSinks = new AtomicInteger(0); + private final AtomicInteger pendingSources = new AtomicInteger(0); + + public DirectExchange(int bufferSize) { + this.buffer = new ExchangeBuffer(bufferSize); + } + + public ExchangeSource exchangeSource() { + return new DirectExchangeSource(); + } + + final class DirectExchangeSource implements ExchangeSource { + private boolean finished = false; + + DirectExchangeSource() { + pendingSources.incrementAndGet(); + } + + @Override + public Page pollPage() { + return buffer.pollPage(); + } + + @Override + public void finish() { + finished = true; + if (pendingSources.decrementAndGet() == 0) { + buffer.finish(true); + } + } + + @Override + public boolean isFinished() { + return finished || buffer.isFinished(); + } + + @Override + public int bufferSize() { + return buffer.size(); + } + + @Override + public IsBlockedResult waitForReading() { + return buffer.waitForReading(); + } + } + + public ExchangeSink exchangeSink() { + return new DirectExchangeSink(); + } + + final class DirectExchangeSink implements ExchangeSink { + private boolean finished = false; + + DirectExchangeSink() { + pendingSinks.incrementAndGet(); + } + + @Override + public void addPage(Page page) { + buffer.addPage(page); + } + + @Override + public void finish() { + finished = true; + if (pendingSinks.decrementAndGet() == 0) { + buffer.finish(false); + } + } + + @Override + public boolean isFinished() { + return finished || buffer.isFinished(); + } + + @Override + public void addCompletionListener(ActionListener listener) { + buffer.addCompletionListener(listener); + } + + @Override + public IsBlockedResult waitForWriting() { + return buffer.waitForWriting(); + } + } +} diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesIT.java index 2b326a030bc14..765ba0f2c14c7 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesIT.java @@ -11,6 +11,8 @@ import org.elasticsearch.common.Randomness; import org.elasticsearch.common.Rounding; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.compute.lucene.TimeSeriesSourceOperator; +import org.elasticsearch.compute.operator.DriverProfile; import org.elasticsearch.core.TimeValue; import org.elasticsearch.xpack.esql.EsqlTestUtils; import org.elasticsearch.xpack.esql.core.type.DataType; @@ -729,4 +731,29 @@ public void testIndexMode() { }); assertThat(failure.getMessage(), containsString("Unknown index [hosts-old]")); } + + public void testProfile() { + EsqlQueryRequest request = new EsqlQueryRequest(); + request.profile(true); + request.query("TS hosts | STATS sum(rate(request_count)) BY cluster, bucket(@timestamp, 1minute) | SORT cluster"); + try (var resp = run(request)) { + EsqlQueryResponse.Profile profile = resp.profile(); + List dataProfiles = profile.drivers().stream().filter(d -> d.description().equals("data")).toList(); + int totalTimeSeries = 0; + for (DriverProfile p : dataProfiles) { + if (p.operators().stream().anyMatch(s -> s.status() instanceof TimeSeriesSourceOperator.Status)) { + totalTimeSeries++; + assertThat(p.operators(), hasSize(2)); + assertThat(p.operators().get(1).operator(), equalTo("ExchangeSinkOperator")); + } else { + assertThat(p.operators(), hasSize(4)); + assertThat(p.operators().get(0).operator(), equalTo("ExchangeSourceOperator")); + assertThat(p.operators().get(1).operator(), containsString("EvalOperator[evaluator=DateTruncDatetimeEvaluator")); + assertThat(p.operators().get(2).operator(), containsString("TimeSeriesAggregationOperator")); + assertThat(p.operators().get(3).operator(), equalTo("ExchangeSinkOperator")); + } + } + assertThat(totalTimeSeries, equalTo(dataProfiles.size() / 2)); + } + } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushDownFieldExtractionToTimeSeriesSource.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushDownFieldExtractionToTimeSeriesSource.java index ced194dc6a1b0..f7fc10bbf6fae 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushDownFieldExtractionToTimeSeriesSource.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushDownFieldExtractionToTimeSeriesSource.java @@ -16,6 +16,7 @@ import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec; import org.elasticsearch.xpack.esql.plan.physical.FilterExec; import org.elasticsearch.xpack.esql.plan.physical.LimitExec; +import org.elasticsearch.xpack.esql.plan.physical.ParallelExec; import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesSourceExec; import org.elasticsearch.xpack.esql.plan.physical.TopNExec; @@ -78,7 +79,7 @@ private static boolean stopPushDownExtract(PhysicalPlan p) { return p instanceof FilterExec || p instanceof TopNExec || p instanceof LimitExec; } - private TimeSeriesSourceExec addFieldExtract( + private PhysicalPlan addFieldExtract( LocalPhysicalOptimizerContext context, EsQueryExec query, boolean keepDocAttribute, @@ -96,7 +97,7 @@ private TimeSeriesSourceExec addFieldExtract( if (keepDocAttribute == false) { attrs = attrs.stream().filter(a -> EsQueryExec.isSourceAttribute(a) == false).toList(); } - return new TimeSeriesSourceExec( + var tsSource = new TimeSeriesSourceExec( query.source(), attrs, query.query(), @@ -107,5 +108,8 @@ private TimeSeriesSourceExec addFieldExtract( attributesToExtract, query.estimatedRowSize() ); + // Use a separate driver for the time-series source to split the pipeline to increase parallelism, + // since the time-series source must be executed with a single driver at the shard level. + return new ParallelExec(query.source(), tsSource); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/ParallelExec.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/ParallelExec.java new file mode 100644 index 0000000000000..875bc825271fd --- /dev/null +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/physical/ParallelExec.java @@ -0,0 +1,44 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.esql.plan.physical; + +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.xpack.esql.core.tree.NodeInfo; +import org.elasticsearch.xpack.esql.core.tree.Source; + +import java.io.IOException; + +/** + * A physical plan node that hints the plan should be partitioned vertically and executed in parallel. + */ +public final class ParallelExec extends UnaryExec { + + public ParallelExec(Source source, PhysicalPlan child) { + super(source, child); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + throw new UnsupportedOperationException("local plan"); + } + + @Override + public String getWriteableName() { + throw new UnsupportedOperationException("local plan"); + } + + @Override + protected NodeInfo info() { + return NodeInfo.create(this, ParallelExec::new, child()); + } + + @Override + public ParallelExec replaceChild(PhysicalPlan newChild) { + return new ParallelExec(source(), newChild); + } +} diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java index 638d1197006fd..ac91890bff914 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java @@ -260,8 +260,7 @@ public PhysicalOperation timeSeriesSourceOperation(TimeSeriesSourceExec ts, Loca ); Layout.Builder layout = new Layout.Builder(); layout.append(ts.output()); - int instanceCount = Math.max(1, luceneFactory.taskConcurrency()); - context.driverParallelism(new DriverParallelism(DriverParallelism.Type.DATA_PARALLELISM, instanceCount)); + context.driverParallelism(DriverParallelism.SINGLE); return PhysicalOperation.fromSource(luceneFactory, layout.build()); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java index c89327e6a6aa4..49252799c321b 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java @@ -46,6 +46,7 @@ import org.elasticsearch.compute.operator.SourceOperator; import org.elasticsearch.compute.operator.SourceOperator.SourceOperatorFactory; import org.elasticsearch.compute.operator.StringExtractOperator; +import org.elasticsearch.compute.operator.exchange.DirectExchange; import org.elasticsearch.compute.operator.exchange.ExchangeSink; import org.elasticsearch.compute.operator.exchange.ExchangeSinkOperator.ExchangeSinkOperatorFactory; import org.elasticsearch.compute.operator.exchange.ExchangeSource; @@ -107,6 +108,7 @@ import org.elasticsearch.xpack.esql.plan.physical.LookupJoinExec; import org.elasticsearch.xpack.esql.plan.physical.MvExpandExec; import org.elasticsearch.xpack.esql.plan.physical.OutputExec; +import org.elasticsearch.xpack.esql.plan.physical.ParallelExec; import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; import org.elasticsearch.xpack.esql.plan.physical.ProjectExec; import org.elasticsearch.xpack.esql.plan.physical.RrfScoreEvalExec; @@ -195,6 +197,7 @@ public LocalExecutionPlanner( */ public LocalExecutionPlan plan(String description, FoldContext foldCtx, PhysicalPlan localPhysicalPlan) { var context = new LocalExecutionPlannerContext( + description, new ArrayList<>(), new Holder<>(DriverParallelism.SINGLE), configuration.pragmas(), @@ -272,9 +275,11 @@ else if (node instanceof EsQueryExec esQuery) { } else if (node instanceof ShowExec show) { return planShow(show); } else if (node instanceof ExchangeSourceExec exchangeSource) { - return planExchangeSource(exchangeSource, context); + return planExchangeSource(exchangeSource, exchangeSourceSupplier); + } else if (node instanceof ParallelExec parallelExec) { + return planParallelNode(parallelExec, context); } else if (node instanceof TimeSeriesSourceExec ts) { - return planTimeSeriesNode(ts, context); + return planTimeSeriesSource(ts, context); } // lookups and joins else if (node instanceof EnrichExec enrich) { @@ -332,8 +337,8 @@ private PhysicalOperation planEsQueryNode(EsQueryExec esQueryExec, LocalExecutio return physicalOperationProviders.sourcePhysicalOperation(esQueryExec, context); } - private PhysicalOperation planTimeSeriesNode(TimeSeriesSourceExec esQueryExec, LocalExecutionPlannerContext context) { - return physicalOperationProviders.timeSeriesSourceOperation(esQueryExec, context); + private PhysicalOperation planTimeSeriesSource(TimeSeriesSourceExec ts, LocalExecutionPlannerContext context) { + return physicalOperationProviders.timeSeriesSourceOperation(ts, context); } private PhysicalOperation planEsStats(EsStatsQueryExec statsQuery, LocalExecutionPlannerContext context) { @@ -410,7 +415,7 @@ private PhysicalOperation planExchangeSink(ExchangeSinkExec exchangeSink, LocalE return source.withSink(new ExchangeSinkOperatorFactory(exchangeSinkSupplier), source.layout); } - private PhysicalOperation planExchangeSource(ExchangeSourceExec exchangeSource, LocalExecutionPlannerContext context) { + private PhysicalOperation planExchangeSource(ExchangeSourceExec exchangeSource, Supplier exchangeSourceSupplier) { Objects.requireNonNull(exchangeSourceSupplier, "ExchangeSourceHandler wasn't provided"); var builder = new Layout.Builder(); @@ -422,6 +427,33 @@ private PhysicalOperation planExchangeSource(ExchangeSourceExec exchangeSource, return PhysicalOperation.fromSource(new ExchangeSourceOperatorFactory(exchangeSourceSupplier), layout); } + private PhysicalOperation planParallelNode(ParallelExec parallelExec, LocalExecutionPlannerContext context) { + var exchange = new DirectExchange(context.queryPragmas.exchangeBufferSize()); + { + PhysicalOperation source = plan(parallelExec.child(), context); + var sinkOperator = source.withSink(new ExchangeSinkOperatorFactory(exchange::exchangeSink), source.layout); + final TimeValue statusInterval = configuration.pragmas().statusInterval(); + context.addDriverFactory( + new DriverFactory( + new DriverSupplier( + context.description, + ClusterName.CLUSTER_NAME_SETTING.get(settings).value(), + Node.NODE_NAME_SETTING.get(settings), + context.bigArrays, + context.blockFactory, + sinkOperator, + statusInterval, + settings + ), + DriverParallelism.SINGLE + ) + ); + context.driverParallelism.set(DriverParallelism.SINGLE); + } + var exchangeSource = new ExchangeSourceExec(parallelExec.source(), parallelExec.output(), false); + return planExchangeSource(exchangeSource, exchange::exchangeSource); + } + private PhysicalOperation planTopN(TopNExec topNExec, LocalExecutionPlannerContext context) { final Integer rowSize = topNExec.estimatedRowSize(); assert rowSize != null && rowSize > 0 : "estimated row size [" + rowSize + "] wasn't set"; @@ -923,6 +955,7 @@ enum Type { * maintains information how many driver instances should be created for a given driver. */ public record LocalExecutionPlannerContext( + String description, List driverFactories, Holder driverParallelism, QueryPragmas queryPragmas, diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizerTests.java index 0c2972d6d4c3e..1ec35671f962b 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/LocalPhysicalPlanOptimizerTests.java @@ -73,6 +73,7 @@ import org.elasticsearch.xpack.esql.plan.physical.LimitExec; import org.elasticsearch.xpack.esql.plan.physical.LocalSourceExec; import org.elasticsearch.xpack.esql.plan.physical.MvExpandExec; +import org.elasticsearch.xpack.esql.plan.physical.ParallelExec; import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan; import org.elasticsearch.xpack.esql.plan.physical.ProjectExec; import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesAggregateExec; @@ -1864,7 +1865,8 @@ public void testPushDownFieldExtractToTimeSeriesSource() { var timeSeriesFinalAgg = as(partialAgg.child(), TimeSeriesAggregateExec.class); var exchange = as(timeSeriesFinalAgg.child(), ExchangeExec.class); var timeSeriesPartialAgg = as(exchange.child(), TimeSeriesAggregateExec.class); - var timeSeriesSource = as(timeSeriesPartialAgg.child(), TimeSeriesSourceExec.class); + var parallel = as(timeSeriesPartialAgg.child(), ParallelExec.class); + var timeSeriesSource = as(parallel.child(), TimeSeriesSourceExec.class); assertThat(timeSeriesSource.attributesToExtract(), hasSize(1)); FieldAttribute field = as(timeSeriesSource.attributesToExtract().getFirst(), FieldAttribute.class); assertThat(field.name(), equalTo("network.total_bytes_in")); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java index d556ee68d4033..db1f18a0e6cd5 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java @@ -46,6 +46,8 @@ import org.elasticsearch.xpack.esql.expression.Order; import org.elasticsearch.xpack.esql.index.EsIndex; import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec; +import org.elasticsearch.xpack.esql.plan.physical.LimitExec; +import org.elasticsearch.xpack.esql.plan.physical.ParallelExec; import org.elasticsearch.xpack.esql.plugin.EsqlPlugin; import org.elasticsearch.xpack.esql.plugin.QueryPragmas; import org.elasticsearch.xpack.esql.session.Configuration; @@ -61,6 +63,7 @@ import java.util.Map; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.lessThanOrEqualTo; public class LocalExecutionPlannerTests extends MapperServiceTestCase { @@ -204,6 +207,27 @@ public void testDriverClusterAndNodeName() throws IOException { assertThat(supplier.nodeName(), equalTo("node-1")); } + public void testParallel() throws Exception { + EsQueryExec queryExec = new EsQueryExec( + Source.EMPTY, + index().name(), + IndexMode.STANDARD, + index().indexNameWithModes(), + List.of(), + null, + null, + null, + between(1, 1000) + ); + var limitExec = new LimitExec( + Source.EMPTY, + new ParallelExec(queryExec.source(), queryExec), + new Literal(Source.EMPTY, between(1, 100), DataType.INTEGER) + ); + LocalExecutionPlanner.LocalExecutionPlan plan = planner().plan("test", FoldContext.small(), limitExec); + assertThat(plan.driverFactories, hasSize(2)); + } + private int randomEstimatedRowSize(boolean huge) { int hugeBoundary = SourceOperator.MIN_TARGET_PAGE_SIZE * 10; return huge ? between(hugeBoundary, Integer.MAX_VALUE) : between(1, hugeBoundary);