Skip to content

Commit e09fd48

Browse files
committed
Increase concurrency for TS command
1 parent 80a41a7 commit e09fd48

File tree

7 files changed

+245
-9
lines changed

7 files changed

+245
-9
lines changed
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.compute.operator.exchange;
9+
10+
import org.elasticsearch.action.ActionListener;
11+
import org.elasticsearch.compute.data.Page;
12+
import org.elasticsearch.compute.operator.IsBlockedResult;
13+
14+
import java.util.concurrent.atomic.AtomicInteger;
15+
16+
public final class DirectExchange {
17+
private final ExchangeBuffer buffer;
18+
private final AtomicInteger pendingSinks = new AtomicInteger(0);
19+
private final AtomicInteger pendingSources = new AtomicInteger(0);
20+
21+
public DirectExchange(int bufferSize) {
22+
this.buffer = new ExchangeBuffer(bufferSize);
23+
}
24+
25+
public ExchangeSource exchangeSource() {
26+
return new DirectExchangeSource();
27+
}
28+
29+
final class DirectExchangeSource implements ExchangeSource {
30+
private boolean finished = false;
31+
32+
DirectExchangeSource() {
33+
pendingSources.incrementAndGet();
34+
}
35+
36+
@Override
37+
public Page pollPage() {
38+
return buffer.pollPage();
39+
}
40+
41+
@Override
42+
public void finish() {
43+
finished = true;
44+
if (pendingSources.decrementAndGet() == 0) {
45+
buffer.finish(true);
46+
}
47+
}
48+
49+
@Override
50+
public boolean isFinished() {
51+
return finished || buffer.isFinished();
52+
}
53+
54+
@Override
55+
public int bufferSize() {
56+
return buffer.size();
57+
}
58+
59+
@Override
60+
public IsBlockedResult waitForReading() {
61+
return buffer.waitForReading();
62+
}
63+
}
64+
65+
public ExchangeSink exchangeSink() {
66+
return new DirectExchangeSink();
67+
}
68+
69+
final class DirectExchangeSink implements ExchangeSink {
70+
private boolean finished = false;
71+
72+
DirectExchangeSink() {
73+
pendingSinks.incrementAndGet();
74+
}
75+
76+
@Override
77+
public void addPage(Page page) {
78+
buffer.addPage(page);
79+
}
80+
81+
@Override
82+
public void finish() {
83+
finished = true;
84+
if (pendingSinks.decrementAndGet() == 0) {
85+
buffer.finish(false);
86+
}
87+
}
88+
89+
@Override
90+
public boolean isFinished() {
91+
return finished || buffer.isFinished();
92+
}
93+
94+
@Override
95+
public void addCompletionListener(ActionListener<Void> listener) {
96+
buffer.addCompletionListener(listener);
97+
}
98+
99+
@Override
100+
public IsBlockedResult waitForWriting() {
101+
return buffer.waitForWriting();
102+
}
103+
}
104+
}

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesIT.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
import org.elasticsearch.common.Randomness;
1212
import org.elasticsearch.common.Rounding;
1313
import org.elasticsearch.common.settings.Settings;
14+
import org.elasticsearch.compute.lucene.TimeSeriesSourceOperator;
15+
import org.elasticsearch.compute.operator.DriverProfile;
1416
import org.elasticsearch.core.TimeValue;
1517
import org.elasticsearch.xpack.esql.EsqlTestUtils;
1618
import org.elasticsearch.xpack.esql.core.type.DataType;
@@ -729,4 +731,29 @@ public void testIndexMode() {
729731
});
730732
assertThat(failure.getMessage(), containsString("Unknown index [hosts-old]"));
731733
}
734+
735+
public void testProfile() {
736+
EsqlQueryRequest request = new EsqlQueryRequest();
737+
request.profile(true);
738+
request.query("TS hosts | STATS sum(rate(request_count)) BY cluster, bucket(@timestamp, 1minute) | SORT cluster");
739+
try (var resp = run(request)) {
740+
EsqlQueryResponse.Profile profile = resp.profile();
741+
List<DriverProfile> dataProfiles = profile.drivers().stream().filter(d -> d.description().equals("data")).toList();
742+
int totalTimeSeries = 0;
743+
for (DriverProfile p : dataProfiles) {
744+
if (p.operators().stream().anyMatch(s -> s.status() instanceof TimeSeriesSourceOperator.Status)) {
745+
totalTimeSeries++;
746+
assertThat(p.operators(), hasSize(2));
747+
assertThat(p.operators().get(1).operator(), equalTo("ExchangeSinkOperator"));
748+
} else {
749+
assertThat(p.operators(), hasSize(4));
750+
assertThat(p.operators().get(0).operator(), equalTo("ExchangeSourceOperator"));
751+
assertThat(p.operators().get(1).operator(), containsString("EvalOperator[evaluator=DateTruncDatetimeEvaluator"));
752+
assertThat(p.operators().get(2).operator(), containsString("TimeSeriesAggregationOperator"));
753+
assertThat(p.operators().get(3).operator(), equalTo("ExchangeSinkOperator"));
754+
}
755+
}
756+
assertThat(totalTimeSeries, equalTo(dataProfiles.size() / 2));
757+
}
758+
}
732759
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/optimizer/rules/physical/local/PushDownFieldExtractionToTimeSeriesSource.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import org.elasticsearch.xpack.esql.plan.physical.FieldExtractExec;
1717
import org.elasticsearch.xpack.esql.plan.physical.FilterExec;
1818
import org.elasticsearch.xpack.esql.plan.physical.LimitExec;
19+
import org.elasticsearch.xpack.esql.plan.physical.ParallelExec;
1920
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
2021
import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesSourceExec;
2122
import org.elasticsearch.xpack.esql.plan.physical.TopNExec;
@@ -78,7 +79,7 @@ private static boolean stopPushDownExtract(PhysicalPlan p) {
7879
return p instanceof FilterExec || p instanceof TopNExec || p instanceof LimitExec;
7980
}
8081

81-
private TimeSeriesSourceExec addFieldExtract(
82+
private PhysicalPlan addFieldExtract(
8283
LocalPhysicalOptimizerContext context,
8384
EsQueryExec query,
8485
boolean keepDocAttribute,
@@ -96,7 +97,7 @@ private TimeSeriesSourceExec addFieldExtract(
9697
if (keepDocAttribute == false) {
9798
attrs = attrs.stream().filter(a -> EsQueryExec.isSourceAttribute(a) == false).toList();
9899
}
99-
return new TimeSeriesSourceExec(
100+
var tsSource = new TimeSeriesSourceExec(
100101
query.source(),
101102
attrs,
102103
query.query(),
@@ -107,5 +108,8 @@ private TimeSeriesSourceExec addFieldExtract(
107108
attributesToExtract,
108109
query.estimatedRowSize()
109110
);
111+
// Use a separate driver for the time-series source to split the pipeline to increase parallelism,
112+
// since the time-series source must be executed with a single driver at the shard level.
113+
return new ParallelExec(query.source(), tsSource);
110114
}
111115
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.plan.physical;
9+
10+
import org.elasticsearch.common.io.stream.StreamOutput;
11+
import org.elasticsearch.xpack.esql.core.tree.NodeInfo;
12+
import org.elasticsearch.xpack.esql.core.tree.Source;
13+
14+
import java.io.IOException;
15+
16+
/**
17+
* A physical plan node that hints the plan should be partitioned vertically and executed in parallel.
18+
*/
19+
public final class ParallelExec extends UnaryExec {
20+
21+
public ParallelExec(Source source, PhysicalPlan child) {
22+
super(source, child);
23+
}
24+
25+
@Override
26+
public void writeTo(StreamOutput out) throws IOException {
27+
throw new UnsupportedOperationException("local plan");
28+
}
29+
30+
@Override
31+
public String getWriteableName() {
32+
throw new UnsupportedOperationException("local plan");
33+
}
34+
35+
@Override
36+
protected NodeInfo<? extends ParallelExec> info() {
37+
return NodeInfo.create(this, ParallelExec::new, child());
38+
}
39+
40+
@Override
41+
public ParallelExec replaceChild(PhysicalPlan newChild) {
42+
return new ParallelExec(source(), newChild);
43+
}
44+
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/EsPhysicalOperationProviders.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -260,8 +260,7 @@ public PhysicalOperation timeSeriesSourceOperation(TimeSeriesSourceExec ts, Loca
260260
);
261261
Layout.Builder layout = new Layout.Builder();
262262
layout.append(ts.output());
263-
int instanceCount = Math.max(1, luceneFactory.taskConcurrency());
264-
context.driverParallelism(new DriverParallelism(DriverParallelism.Type.DATA_PARALLELISM, instanceCount));
263+
context.driverParallelism(DriverParallelism.SINGLE);
265264
return PhysicalOperation.fromSource(luceneFactory, layout.build());
266265
}
267266

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java

Lines changed: 38 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.elasticsearch.compute.operator.SourceOperator;
4747
import org.elasticsearch.compute.operator.SourceOperator.SourceOperatorFactory;
4848
import org.elasticsearch.compute.operator.StringExtractOperator;
49+
import org.elasticsearch.compute.operator.exchange.DirectExchange;
4950
import org.elasticsearch.compute.operator.exchange.ExchangeSink;
5051
import org.elasticsearch.compute.operator.exchange.ExchangeSinkOperator.ExchangeSinkOperatorFactory;
5152
import org.elasticsearch.compute.operator.exchange.ExchangeSource;
@@ -107,6 +108,7 @@
107108
import org.elasticsearch.xpack.esql.plan.physical.LookupJoinExec;
108109
import org.elasticsearch.xpack.esql.plan.physical.MvExpandExec;
109110
import org.elasticsearch.xpack.esql.plan.physical.OutputExec;
111+
import org.elasticsearch.xpack.esql.plan.physical.ParallelExec;
110112
import org.elasticsearch.xpack.esql.plan.physical.PhysicalPlan;
111113
import org.elasticsearch.xpack.esql.plan.physical.ProjectExec;
112114
import org.elasticsearch.xpack.esql.plan.physical.RrfScoreEvalExec;
@@ -195,6 +197,7 @@ public LocalExecutionPlanner(
195197
*/
196198
public LocalExecutionPlan plan(String description, FoldContext foldCtx, PhysicalPlan localPhysicalPlan) {
197199
var context = new LocalExecutionPlannerContext(
200+
description,
198201
new ArrayList<>(),
199202
new Holder<>(DriverParallelism.SINGLE),
200203
configuration.pragmas(),
@@ -272,9 +275,11 @@ else if (node instanceof EsQueryExec esQuery) {
272275
} else if (node instanceof ShowExec show) {
273276
return planShow(show);
274277
} else if (node instanceof ExchangeSourceExec exchangeSource) {
275-
return planExchangeSource(exchangeSource, context);
278+
return planExchangeSource(exchangeSource, exchangeSourceSupplier);
279+
} else if (node instanceof ParallelExec parallelExec) {
280+
return planParallelNode(parallelExec, context);
276281
} else if (node instanceof TimeSeriesSourceExec ts) {
277-
return planTimeSeriesNode(ts, context);
282+
return planTimeSeriesSource(ts, context);
278283
}
279284
// lookups and joins
280285
else if (node instanceof EnrichExec enrich) {
@@ -332,8 +337,8 @@ private PhysicalOperation planEsQueryNode(EsQueryExec esQueryExec, LocalExecutio
332337
return physicalOperationProviders.sourcePhysicalOperation(esQueryExec, context);
333338
}
334339

335-
private PhysicalOperation planTimeSeriesNode(TimeSeriesSourceExec esQueryExec, LocalExecutionPlannerContext context) {
336-
return physicalOperationProviders.timeSeriesSourceOperation(esQueryExec, context);
340+
private PhysicalOperation planTimeSeriesSource(TimeSeriesSourceExec ts, LocalExecutionPlannerContext context) {
341+
return physicalOperationProviders.timeSeriesSourceOperation(ts, context);
337342
}
338343

339344
private PhysicalOperation planEsStats(EsStatsQueryExec statsQuery, LocalExecutionPlannerContext context) {
@@ -410,7 +415,7 @@ private PhysicalOperation planExchangeSink(ExchangeSinkExec exchangeSink, LocalE
410415
return source.withSink(new ExchangeSinkOperatorFactory(exchangeSinkSupplier), source.layout);
411416
}
412417

413-
private PhysicalOperation planExchangeSource(ExchangeSourceExec exchangeSource, LocalExecutionPlannerContext context) {
418+
private PhysicalOperation planExchangeSource(ExchangeSourceExec exchangeSource, Supplier<ExchangeSource> exchangeSourceSupplier) {
414419
Objects.requireNonNull(exchangeSourceSupplier, "ExchangeSourceHandler wasn't provided");
415420

416421
var builder = new Layout.Builder();
@@ -422,6 +427,33 @@ private PhysicalOperation planExchangeSource(ExchangeSourceExec exchangeSource,
422427
return PhysicalOperation.fromSource(new ExchangeSourceOperatorFactory(exchangeSourceSupplier), layout);
423428
}
424429

430+
private PhysicalOperation planParallelNode(ParallelExec parallelExec, LocalExecutionPlannerContext context) {
431+
var exchange = new DirectExchange(context.queryPragmas.exchangeBufferSize());
432+
{
433+
PhysicalOperation source = plan(parallelExec.child(), context);
434+
var sinkOperator = source.withSink(new ExchangeSinkOperatorFactory(exchange::exchangeSink), source.layout);
435+
final TimeValue statusInterval = configuration.pragmas().statusInterval();
436+
context.addDriverFactory(
437+
new DriverFactory(
438+
new DriverSupplier(
439+
context.description,
440+
ClusterName.CLUSTER_NAME_SETTING.get(settings).value(),
441+
Node.NODE_NAME_SETTING.get(settings),
442+
context.bigArrays,
443+
context.blockFactory,
444+
sinkOperator,
445+
statusInterval,
446+
settings
447+
),
448+
DriverParallelism.SINGLE
449+
)
450+
);
451+
context.driverParallelism.set(DriverParallelism.SINGLE);
452+
}
453+
var exchangeSource = new ExchangeSourceExec(parallelExec.source(), parallelExec.output(), false);
454+
return planExchangeSource(exchangeSource, exchange::exchangeSource);
455+
}
456+
425457
private PhysicalOperation planTopN(TopNExec topNExec, LocalExecutionPlannerContext context) {
426458
final Integer rowSize = topNExec.estimatedRowSize();
427459
assert rowSize != null && rowSize > 0 : "estimated row size [" + rowSize + "] wasn't set";
@@ -923,6 +955,7 @@ enum Type {
923955
* maintains information how many driver instances should be created for a given driver.
924956
*/
925957
public record LocalExecutionPlannerContext(
958+
String description,
926959
List<DriverFactory> driverFactories,
927960
Holder<DriverParallelism> driverParallelism,
928961
QueryPragmas queryPragmas,

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,9 @@
3030
import org.elasticsearch.core.Releasables;
3131
import org.elasticsearch.index.IndexMode;
3232
import org.elasticsearch.index.cache.query.TrivialQueryCachingPolicy;
33+
import org.elasticsearch.index.mapper.MappedFieldType;
3334
import org.elasticsearch.index.mapper.MapperServiceTestCase;
35+
import org.elasticsearch.index.query.MatchAllQueryBuilder;
3436
import org.elasticsearch.node.Node;
3537
import org.elasticsearch.plugins.ExtensiblePlugin;
3638
import org.elasticsearch.plugins.Plugin;
@@ -39,13 +41,16 @@
3941
import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
4042
import org.elasticsearch.xpack.esql.core.expression.FoldContext;
4143
import org.elasticsearch.xpack.esql.core.expression.Literal;
44+
import org.elasticsearch.xpack.esql.core.expression.MetadataAttribute;
4245
import org.elasticsearch.xpack.esql.core.tree.Source;
4346
import org.elasticsearch.xpack.esql.core.type.DataType;
4447
import org.elasticsearch.xpack.esql.core.type.EsField;
4548
import org.elasticsearch.xpack.esql.core.util.StringUtils;
4649
import org.elasticsearch.xpack.esql.expression.Order;
4750
import org.elasticsearch.xpack.esql.index.EsIndex;
4851
import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec;
52+
import org.elasticsearch.xpack.esql.plan.physical.LimitExec;
53+
import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesSourceExec;
4954
import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
5055
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
5156
import org.elasticsearch.xpack.esql.session.Configuration;
@@ -59,8 +64,10 @@
5964
import java.util.Collections;
6065
import java.util.List;
6166
import java.util.Map;
67+
import java.util.Set;
6268

6369
import static org.hamcrest.Matchers.equalTo;
70+
import static org.hamcrest.Matchers.hasSize;
6471
import static org.hamcrest.Matchers.lessThanOrEqualTo;
6572

6673
public class LocalExecutionPlannerTests extends MapperServiceTestCase {
@@ -204,6 +211,24 @@ public void testDriverClusterAndNodeName() throws IOException {
204211
assertThat(supplier.nodeName(), equalTo("node-1"));
205212
}
206213

214+
public void testTimeSeriesSource() throws Exception {
215+
MetadataAttribute tsid = new MetadataAttribute(Source.EMPTY, "_tsid", DataType.KEYWORD, false);
216+
var timeSeriesSource = new TimeSeriesSourceExec(
217+
Source.EMPTY,
218+
List.of(tsid),
219+
new MatchAllQueryBuilder(),
220+
new Literal(Source.EMPTY, between(1, 100), DataType.INTEGER),
221+
MappedFieldType.FieldExtractPreference.NONE,
222+
Set.of(),
223+
Set.of(),
224+
List.of(),
225+
randomEstimatedRowSize(estimatedRowSizeIsHuge)
226+
);
227+
var limitExec = new LimitExec(Source.EMPTY, timeSeriesSource, new Literal(Source.EMPTY, between(1, 100), DataType.INTEGER));
228+
LocalExecutionPlanner.LocalExecutionPlan plan = planner().plan("test", FoldContext.small(), limitExec);
229+
assertThat(plan.driverFactories, hasSize(2));
230+
}
231+
207232
private int randomEstimatedRowSize(boolean huge) {
208233
int hugeBoundary = SourceOperator.MIN_TARGET_PAGE_SIZE * 10;
209234
return huge ? between(hugeBoundary, Integer.MAX_VALUE) : between(1, hugeBoundary);

0 commit comments

Comments
 (0)