Skip to content

Commit 7f08143

Browse files
committed
Increase parallelism for TS command
1 parent 80a41a7 commit 7f08143

File tree

4 files changed

+163
-7
lines changed

4 files changed

+163
-7
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/exchange/ExchangeBuffer.java

Lines changed: 71 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
import java.util.concurrent.ConcurrentLinkedQueue;
1818
import java.util.concurrent.atomic.AtomicInteger;
1919

20-
final class ExchangeBuffer {
20+
public final class ExchangeBuffer {
2121

2222
private final Queue<Page> queue = new ConcurrentLinkedQueue<>();
2323
// uses a separate counter for size for CAS; and ConcurrentLinkedQueue#size is not a constant time operation.
@@ -34,7 +34,7 @@ final class ExchangeBuffer {
3434

3535
private volatile boolean noMoreInputs = false;
3636

37-
ExchangeBuffer(int maxSize) {
37+
public ExchangeBuffer(int maxSize) {
3838
if (maxSize < 1) {
3939
throw new IllegalArgumentException("max_buffer_size must be at least one; got=" + maxSize);
4040
}
@@ -161,4 +161,73 @@ int size() {
161161
void addCompletionListener(ActionListener<Void> listener) {
162162
completionFuture.addListener(listener);
163163
}
164+
165+
/**
166+
* Creates a new {@link ExchangeSource} associated with this buffer.
167+
*/
168+
public ExchangeSource newExchangeSource() {
169+
final ExchangeBuffer buffer = this;
170+
return new ExchangeSource() {
171+
@Override
172+
public Page pollPage() {
173+
return buffer.pollPage();
174+
}
175+
176+
@Override
177+
public void finish() {
178+
buffer.finish(true);
179+
}
180+
181+
@Override
182+
public boolean isFinished() {
183+
return buffer.isFinished();
184+
}
185+
186+
@Override
187+
public int bufferSize() {
188+
return buffer.size();
189+
}
190+
191+
@Override
192+
public IsBlockedResult waitForReading() {
193+
return buffer.waitForReading();
194+
}
195+
};
196+
}
197+
198+
/**
199+
* Creates a new {@link ExchangeSink} associated with this buffer.
200+
*/
201+
public ExchangeSink newExchangeSink() {
202+
final ExchangeBuffer buffer = this;
203+
return new ExchangeSink() {
204+
private boolean finished = false;
205+
206+
@Override
207+
public void addPage(Page page) {
208+
buffer.addPage(page);
209+
}
210+
211+
@Override
212+
public void finish() {
213+
finished = true;
214+
buffer.finish(false);
215+
}
216+
217+
@Override
218+
public boolean isFinished() {
219+
return finished; // no need to wait for the buffer
220+
}
221+
222+
@Override
223+
public void addCompletionListener(ActionListener<Void> listener) {
224+
buffer.addCompletionListener(listener);
225+
}
226+
227+
@Override
228+
public IsBlockedResult waitForWriting() {
229+
return buffer.waitForWriting();
230+
}
231+
};
232+
}
164233
}

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesIT.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
import org.elasticsearch.common.Randomness;
1212
import org.elasticsearch.common.Rounding;
1313
import org.elasticsearch.common.settings.Settings;
14+
import org.elasticsearch.compute.lucene.TimeSeriesSourceOperator;
15+
import org.elasticsearch.compute.operator.DriverProfile;
1416
import org.elasticsearch.core.TimeValue;
1517
import org.elasticsearch.xpack.esql.EsqlTestUtils;
1618
import org.elasticsearch.xpack.esql.core.type.DataType;
@@ -729,4 +731,29 @@ public void testIndexMode() {
729731
});
730732
assertThat(failure.getMessage(), containsString("Unknown index [hosts-old]"));
731733
}
734+
735+
public void testProfile() {
736+
EsqlQueryRequest request = new EsqlQueryRequest();
737+
request.profile(true);
738+
request.query("TS hosts | STATS sum(rate(request_count)) BY cluster, bucket(@timestamp, 1minute) | SORT cluster");
739+
try (var resp = run(request)) {
740+
EsqlQueryResponse.Profile profile = resp.profile();
741+
List<DriverProfile> dataProfiles = profile.drivers().stream().filter(d -> d.description().equals("data")).toList();
742+
int totalTimeSeries = 0;
743+
for (DriverProfile p : dataProfiles) {
744+
if (p.operators().stream().anyMatch(s -> s.status() instanceof TimeSeriesSourceOperator.Status)) {
745+
totalTimeSeries++;
746+
assertThat(p.operators(), hasSize(2));
747+
assertThat(p.operators().get(1).operator(), equalTo("ExchangeSinkOperator"));
748+
} else {
749+
assertThat(p.operators(), hasSize(4));
750+
assertThat(p.operators().get(0).operator(), equalTo("ExchangeSourceOperator"));
751+
assertThat(p.operators().get(1).operator(), containsString("EvalOperator[evaluator=DateTruncDatetimeEvaluator"));
752+
assertThat(p.operators().get(2).operator(), containsString("TimeSeriesAggregationOperator"));
753+
assertThat(p.operators().get(3).operator(), equalTo("ExchangeSinkOperator"));
754+
}
755+
}
756+
assertThat(totalTimeSeries, equalTo(dataProfiles.size() / 2));
757+
}
758+
}
732759
}

x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java

Lines changed: 40 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.elasticsearch.compute.operator.SourceOperator;
4747
import org.elasticsearch.compute.operator.SourceOperator.SourceOperatorFactory;
4848
import org.elasticsearch.compute.operator.StringExtractOperator;
49+
import org.elasticsearch.compute.operator.exchange.ExchangeBuffer;
4950
import org.elasticsearch.compute.operator.exchange.ExchangeSink;
5051
import org.elasticsearch.compute.operator.exchange.ExchangeSinkOperator.ExchangeSinkOperatorFactory;
5152
import org.elasticsearch.compute.operator.exchange.ExchangeSource;
@@ -195,6 +196,7 @@ public LocalExecutionPlanner(
195196
*/
196197
public LocalExecutionPlan plan(String description, FoldContext foldCtx, PhysicalPlan localPhysicalPlan) {
197198
var context = new LocalExecutionPlannerContext(
199+
description,
198200
new ArrayList<>(),
199201
new Holder<>(DriverParallelism.SINGLE),
200202
configuration.pragmas(),
@@ -272,9 +274,9 @@ else if (node instanceof EsQueryExec esQuery) {
272274
} else if (node instanceof ShowExec show) {
273275
return planShow(show);
274276
} else if (node instanceof ExchangeSourceExec exchangeSource) {
275-
return planExchangeSource(exchangeSource, context);
277+
return planExchangeSource(exchangeSource, exchangeSourceSupplier, context);
276278
} else if (node instanceof TimeSeriesSourceExec ts) {
277-
return planTimeSeriesNode(ts, context);
279+
return planTimeSeriesSource(ts, context);
278280
}
279281
// lookups and joins
280282
else if (node instanceof EnrichExec enrich) {
@@ -332,8 +334,36 @@ private PhysicalOperation planEsQueryNode(EsQueryExec esQueryExec, LocalExecutio
332334
return physicalOperationProviders.sourcePhysicalOperation(esQueryExec, context);
333335
}
334336

335-
private PhysicalOperation planTimeSeriesNode(TimeSeriesSourceExec esQueryExec, LocalExecutionPlannerContext context) {
336-
return physicalOperationProviders.timeSeriesSourceOperation(esQueryExec, context);
337+
private PhysicalOperation planTimeSeriesSource(TimeSeriesSourceExec ts, LocalExecutionPlannerContext context) {
338+
var exchangeBuffer = new ExchangeBuffer(configuration.pragmas().exchangeBufferSize());
339+
// Use a separate driver for the time-series source to split the pipeline to increase parallelism,
340+
// since the time-series source must be executed with a single driver at the shard level.
341+
{
342+
var sourceOperator = physicalOperationProviders.timeSeriesSourceOperation(ts, context);
343+
var sinkOperator = sourceOperator.withSink(
344+
new ExchangeSinkOperatorFactory(exchangeBuffer::newExchangeSink),
345+
sourceOperator.layout
346+
);
347+
final TimeValue statusInterval = configuration.pragmas().statusInterval();
348+
context.addDriverFactory(
349+
new DriverFactory(
350+
new DriverSupplier(
351+
context.description,
352+
ClusterName.CLUSTER_NAME_SETTING.get(settings).value(),
353+
Node.NODE_NAME_SETTING.get(settings),
354+
context.bigArrays,
355+
context.blockFactory,
356+
sinkOperator,
357+
statusInterval,
358+
settings
359+
),
360+
DriverParallelism.SINGLE
361+
)
362+
);
363+
context.driverParallelism.set(DriverParallelism.SINGLE);
364+
}
365+
var exchangeSource = new ExchangeSourceExec(ts.source(), ts.output(), false);
366+
return planExchangeSource(exchangeSource, exchangeBuffer::newExchangeSource, context);
337367
}
338368

339369
private PhysicalOperation planEsStats(EsStatsQueryExec statsQuery, LocalExecutionPlannerContext context) {
@@ -410,7 +440,11 @@ private PhysicalOperation planExchangeSink(ExchangeSinkExec exchangeSink, LocalE
410440
return source.withSink(new ExchangeSinkOperatorFactory(exchangeSinkSupplier), source.layout);
411441
}
412442

413-
private PhysicalOperation planExchangeSource(ExchangeSourceExec exchangeSource, LocalExecutionPlannerContext context) {
443+
private PhysicalOperation planExchangeSource(
444+
ExchangeSourceExec exchangeSource,
445+
Supplier<ExchangeSource> exchangeSourceSupplier,
446+
LocalExecutionPlannerContext context
447+
) {
414448
Objects.requireNonNull(exchangeSourceSupplier, "ExchangeSourceHandler wasn't provided");
415449

416450
var builder = new Layout.Builder();
@@ -923,6 +957,7 @@ enum Type {
923957
* maintains information how many driver instances should be created for a given driver.
924958
*/
925959
public record LocalExecutionPlannerContext(
960+
String description,
926961
List<DriverFactory> driverFactories,
927962
Holder<DriverParallelism> driverParallelism,
928963
QueryPragmas queryPragmas,

x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,9 @@
3030
import org.elasticsearch.core.Releasables;
3131
import org.elasticsearch.index.IndexMode;
3232
import org.elasticsearch.index.cache.query.TrivialQueryCachingPolicy;
33+
import org.elasticsearch.index.mapper.MappedFieldType;
3334
import org.elasticsearch.index.mapper.MapperServiceTestCase;
35+
import org.elasticsearch.index.query.MatchAllQueryBuilder;
3436
import org.elasticsearch.node.Node;
3537
import org.elasticsearch.plugins.ExtensiblePlugin;
3638
import org.elasticsearch.plugins.Plugin;
@@ -39,13 +41,16 @@
3941
import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
4042
import org.elasticsearch.xpack.esql.core.expression.FoldContext;
4143
import org.elasticsearch.xpack.esql.core.expression.Literal;
44+
import org.elasticsearch.xpack.esql.core.expression.MetadataAttribute;
4245
import org.elasticsearch.xpack.esql.core.tree.Source;
4346
import org.elasticsearch.xpack.esql.core.type.DataType;
4447
import org.elasticsearch.xpack.esql.core.type.EsField;
4548
import org.elasticsearch.xpack.esql.core.util.StringUtils;
4649
import org.elasticsearch.xpack.esql.expression.Order;
4750
import org.elasticsearch.xpack.esql.index.EsIndex;
4851
import org.elasticsearch.xpack.esql.plan.physical.EsQueryExec;
52+
import org.elasticsearch.xpack.esql.plan.physical.LimitExec;
53+
import org.elasticsearch.xpack.esql.plan.physical.TimeSeriesSourceExec;
4954
import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
5055
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
5156
import org.elasticsearch.xpack.esql.session.Configuration;
@@ -59,8 +64,10 @@
5964
import java.util.Collections;
6065
import java.util.List;
6166
import java.util.Map;
67+
import java.util.Set;
6268

6369
import static org.hamcrest.Matchers.equalTo;
70+
import static org.hamcrest.Matchers.hasSize;
6471
import static org.hamcrest.Matchers.lessThanOrEqualTo;
6572

6673
public class LocalExecutionPlannerTests extends MapperServiceTestCase {
@@ -204,6 +211,24 @@ public void testDriverClusterAndNodeName() throws IOException {
204211
assertThat(supplier.nodeName(), equalTo("node-1"));
205212
}
206213

214+
public void testTimeSeriesSource() throws Exception {
215+
MetadataAttribute tsid = new MetadataAttribute(Source.EMPTY, "_tsid", DataType.KEYWORD, false);
216+
var timeSeriesSource = new TimeSeriesSourceExec(
217+
Source.EMPTY,
218+
List.of(tsid),
219+
new MatchAllQueryBuilder(),
220+
new Literal(Source.EMPTY, between(1, 100), DataType.INTEGER),
221+
MappedFieldType.FieldExtractPreference.NONE,
222+
Set.of(),
223+
Set.of(),
224+
List.of(),
225+
randomEstimatedRowSize(estimatedRowSizeIsHuge)
226+
);
227+
var limitExec = new LimitExec(Source.EMPTY, timeSeriesSource, new Literal(Source.EMPTY, between(1, 100), DataType.INTEGER));
228+
LocalExecutionPlanner.LocalExecutionPlan plan = planner().plan("test", FoldContext.small(), limitExec);
229+
assertThat(plan.driverFactories, hasSize(2));
230+
}
231+
207232
private int randomEstimatedRowSize(boolean huge) {
208233
int hugeBoundary = SourceOperator.MIN_TARGET_PAGE_SIZE * 10;
209234
return huge ? between(hugeBoundary, Integer.MAX_VALUE) : between(1, hugeBoundary);

0 commit comments

Comments
 (0)