Skip to content

Commit 0201ce6

Browse files
authored
ESQL: Speed loading stored fields (#127348) (#127721)
This speeds up loading from stored fields by opting more blocks into the "sequential" strategy. This really kicks in when loading stored fields like `text`. And when you need less than 100% of documents, but more than, say, 10%. This is most useful when you need 99.9% of field documents. That sort of thing. Here's the perf numbers: ``` %100.0 {"took": 403 -> 401,"documents_found":1000000} %099.9 {"took":3990 -> 436,"documents_found": 999000} %099.0 {"took":4069 -> 440,"documents_found": 990000} %090.0 {"took":3468 -> 421,"documents_found": 900000} %030.0 {"took":1213 -> 152,"documents_found": 300000} %020.0 {"took": 766 -> 104,"documents_found": 200000} %010.0 {"took": 397 -> 55,"documents_found": 100000} %009.0 {"took": 352 -> 375,"documents_found": 90000} %008.0 {"took": 304 -> 317,"documents_found": 80000} %007.0 {"took": 273 -> 287,"documents_found": 70000} %005.0 {"took": 199 -> 204,"documents_found": 50000} %001.0 {"took": 46 -> 46,"documents_found": 10000} ``` Let's explain this with an example. First, jump to `main` and load a million documents: ``` rm -f /tmp/bulk for a in {1..1000}; do echo '{"index":{}}' >> /tmp/bulk echo '{"text":"text '$(printf %04d $a)'"}' >> /tmp/bulk done curl -s -uelastic:password -HContent-Type:application/json -XDELETE localhost:9200/test for a in {1..1000}; do echo -n $a: curl -s -uelastic:password -HContent-Type:application/json -XPOST localhost:9200/test/_bulk?pretty --data-binary @/tmp/bulk | grep errors done curl -s -uelastic:password -HContent-Type:application/json -XPOST localhost:9200/test/_forcemerge?max_num_segments=1 curl -s -uelastic:password -HContent-Type:application/json -XPOST localhost:9200/test/_refresh echo ``` Now query them all. Run this a few times until it's stable: ``` echo -n "%100.0 " curl -s -uelastic:password -HContent-Type:application/json -XPOST 'localhost:9200/_query?pretty' -d'{ "query": "FROM test | STATS SUM(LENGTH(text))", "pragma": { "data_partitioning": "shard" } }' | jq -c '{took, documents_found}' ``` Now fetch 99.9% of documents: ``` echo -n "%099.9 " curl -s -uelastic:password -HContent-Type:application/json -XPOST 'localhost:9200/_query?pretty' -d'{ "query": "FROM test | WHERE NOT text.keyword IN (\"text 0998\") | STATS SUM(LENGTH(text))", "pragma": { "data_partitioning": "shard" } }' | jq -c '{took, documents_found}' ``` This should spit out something like: ``` %100.0 { "took":403,"documents_found":1000000} %099.9 {"took":4098, "documents_found":999000} ``` We're loading *fewer* documents but it's slower! What in the world?! If you dig into the profile you'll see that it's value loading: ``` $ curl -s -uelastic:password -HContent-Type:application/json -XPOST 'localhost:9200/_query?pretty' -d'{ "query": "FROM test | STATS SUM(LENGTH(text))", "pragma": { "data_partitioning": "shard" }, "profile": true }' | jq '.profile.drivers[].operators[] | select(.operator | contains("ValuesSourceReaderOperator"))' { "operator": "ValuesSourceReaderOperator[fields = [text]]", "status": { "readers_built": { "stored_fields[requires_source:true, fields:0, sequential: true]": 222, "text:column_at_a_time:null": 222, "text:row_stride:BlockSourceReader.Bytes": 1 }, "values_loaded": 1000000, "process_nanos": 370687157, "pages_processed": 222, "rows_received": 1000000, "rows_emitted": 1000000 } } $ curl -s -uelastic:password -HContent-Type:application/json -XPOST 'localhost:9200/_query?pretty' -d'{ "query": "FROM test | WHERE NOT text.keyword IN (\"text 0998\") | STATS SUM(LENGTH(text))", "pragma": { "data_partitioning": "shard" }, "profile": true }' | jq '.profile.drivers[].operators[] | select(.operator | contains("ValuesSourceReaderOperator"))' { "operator": "ValuesSourceReaderOperator[fields = [text]]", "status": { "readers_built": { "stored_fields[requires_source:true, fields:0, sequential: false]": 222, "text:column_at_a_time:null": 222, "text:row_stride:BlockSourceReader.Bytes": 1 }, "values_loaded": 999000, "process_nanos": 3965803793, "pages_processed": 222, "rows_received": 999000, "rows_emitted": 999000 } } ``` It jumps from 370ms to almost four seconds! Loading fewer values! The second big difference is in the `stored_fields` marker. In the second on it's `sequential: false` and in the first `sequential: true`. `sequential: true` uses Lucene's "merge" stored fields reader instead of the default one. It's much more optimized at decoding sequences of documents. Previously we only enabled this reader when loading compact sequences of documents - when the entire block looks like ``` 1, 2, 3, 4, 5, ... 1230, 1231 ``` If there are any gaps we wouldn't enable it. That was a very conservative thing we did long ago without doing any experiments. We knew it was faster without any gaps, but not otherwise. It turns out it's a lot faster in a lot more cases. I've measured it as faster for 99% gaps, at least on simple documents. I'm a bit worried that this is too aggressive, so I've set made it configurable and made the default being to use the "merge" loader with 10% gaps. So we'd use the merge loader with a block like: ``` 1, 11, 21, 31, ..., 1231, 1241 ``` ESQL: Fix test locale (#127566) Was formatting a string and didn't include `Locale.ROOT` so sometimes the string would use the Arabic ٩ instead of 9. And JSON doesn't parse those. Closes #127562
1 parent 740c86e commit 0201ce6

File tree

18 files changed

+397
-33
lines changed

18 files changed

+397
-33
lines changed

benchmarks/src/main/java/org/elasticsearch/benchmark/compute/operator/ValuesSourceReaderBenchmark.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.lucene.util.NumericUtils;
2626
import org.elasticsearch.common.breaker.NoopCircuitBreaker;
2727
import org.elasticsearch.common.lucene.Lucene;
28+
import org.elasticsearch.common.settings.Settings;
2829
import org.elasticsearch.common.util.BigArrays;
2930
import org.elasticsearch.compute.data.BlockFactory;
3031
import org.elasticsearch.compute.data.BytesRefBlock;
@@ -50,6 +51,7 @@
5051
import org.elasticsearch.index.mapper.MappedFieldType;
5152
import org.elasticsearch.index.mapper.NumberFieldMapper;
5253
import org.elasticsearch.search.lookup.SearchLookup;
54+
import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
5355
import org.openjdk.jmh.annotations.Benchmark;
5456
import org.openjdk.jmh.annotations.BenchmarkMode;
5557
import org.openjdk.jmh.annotations.Fork;
@@ -296,7 +298,7 @@ public void benchmark() {
296298
fields(name),
297299
List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> {
298300
throw new UnsupportedOperationException("can't load _source here");
299-
})),
301+
}, EsqlPlugin.STORED_FIELDS_SEQUENTIAL_PROPORTION.getDefault(Settings.EMPTY))),
300302
0
301303
);
302304
long sum = 0;

docs/changelog/127348.yaml

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 127348
2+
summary: Speed loading stored fields
3+
area: ES|QL
4+
type: enhancement
5+
issues: []

docs/reference/esql/functions/functionNamedParams/qstr.asciidoc

+1-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

docs/reference/esql/functions/kibana/definition/qstr.json

+2-2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperator.java

+10-5
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ public String describe() {
105105
*/
106106
public record FieldInfo(String name, ElementType type, IntFunction<BlockLoader> blockLoader) {}
107107

108-
public record ShardContext(IndexReader reader, Supplier<SourceLoader> newSourceLoader) {}
108+
public record ShardContext(IndexReader reader, Supplier<SourceLoader> newSourceLoader, double storedFieldsSequentialProportion) {}
109109

110110
private final FieldWork[] fields;
111111
private final List<ShardContext> shardContexts;
@@ -241,8 +241,9 @@ private void loadFromSingleLeaf(Block[] blocks, int shard, int segment, BlockLoa
241241
}
242242

243243
SourceLoader sourceLoader = null;
244+
ShardContext shardContext = shardContexts.get(shard);
244245
if (storedFieldsSpec.requiresSource()) {
245-
sourceLoader = shardContexts.get(shard).newSourceLoader.get();
246+
sourceLoader = shardContext.newSourceLoader.get();
246247
storedFieldsSpec = storedFieldsSpec.merge(new StoredFieldsSpec(true, false, sourceLoader.requiredStoredFields()));
247248
}
248249

@@ -255,7 +256,7 @@ private void loadFromSingleLeaf(Block[] blocks, int shard, int segment, BlockLoa
255256
);
256257
}
257258
StoredFieldLoader storedFieldLoader;
258-
if (useSequentialStoredFieldsReader(docs)) {
259+
if (useSequentialStoredFieldsReader(docs, shardContext.storedFieldsSequentialProportion())) {
259260
storedFieldLoader = StoredFieldLoader.fromSpecSequential(storedFieldsSpec);
260261
trackStoredFields(storedFieldsSpec, true);
261262
} else {
@@ -432,9 +433,13 @@ public void close() {
432433
* Is it more efficient to use a sequential stored field reader
433434
* when reading stored fields for the documents contained in {@code docIds}?
434435
*/
435-
private boolean useSequentialStoredFieldsReader(BlockLoader.Docs docs) {
436+
private boolean useSequentialStoredFieldsReader(BlockLoader.Docs docs, double storedFieldsSequentialProportion) {
436437
int count = docs.count();
437-
return count >= SEQUENTIAL_BOUNDARY && docs.get(count - 1) - docs.get(0) == count - 1;
438+
if (count < SEQUENTIAL_BOUNDARY) {
439+
return false;
440+
}
441+
int range = docs.get(count - 1) - docs.get(0);
442+
return range * storedFieldsSequentialProportion <= count;
438443
}
439444

440445
private void trackStoredFields(StoredFieldsSpec spec, boolean sequential) {

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ public String toString() {
193193
operators.add(
194194
new OrdinalsGroupingOperator(
195195
shardIdx -> new KeywordFieldMapper.KeywordFieldType("g").blockLoader(null),
196-
List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> SourceLoader.FROM_STORED_SOURCE)),
196+
List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> SourceLoader.FROM_STORED_SOURCE, 0.2)),
197197
ElementType.BYTES_REF,
198198
0,
199199
gField,

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneQueryEvaluatorTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ private List<Page> runQuery(Set<String> values, Query query, boolean shuffleDocs
207207
),
208208
List.of(new ValuesSourceReaderOperator.ShardContext(reader, () -> {
209209
throw new UnsupportedOperationException();
210-
})),
210+
}, 0.2)),
211211
0
212212
)
213213
);

0 commit comments

Comments
 (0)