Skip to content

Commit c32b447

Browse files
Use new source loader when lower docId is accessed (#128320) (#128408)
When using synthetic source, runtime fields data may come from doc values. Doc values iterators can only be read once, and in increasing docId order. But if a runtime field is referenced multiple times in a query, currently the same doc value iterator will be used. This causes an error, as the second field reference will attempt to read the same iterator from a lower docId than was previously used. The fix is to create a new source loader, and thus a new doc value iterator, if the requested docId is lower than the last seen docId.
1 parent 2f32aab commit c32b447

File tree

3 files changed

+129
-2
lines changed

3 files changed

+129
-2
lines changed

docs/changelog/128320.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 128320
2+
summary: Use new source loader when lower `docId` is accessed
3+
area: Codec
4+
type: bug
5+
issues: []

server/src/main/java/org/elasticsearch/search/lookup/ConcurrentSegmentSourceProvider.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,13 @@ class ConcurrentSegmentSourceProvider implements SourceProvider {
2929
private final SourceLoader sourceLoader;
3030
private final StoredFieldLoader storedFieldLoader;
3131
private final Map<Object, Leaf> leaves = ConcurrentCollections.newConcurrentMap();
32+
private final boolean isStoredSource;
3233

33-
ConcurrentSegmentSourceProvider(SourceLoader loader, boolean loadSource) {
34+
ConcurrentSegmentSourceProvider(SourceLoader loader, boolean isStoredSource) {
3435
this.sourceLoader = loader;
3536
// we force a sequential reader here since it is used during query execution where documents are scanned sequentially
36-
this.storedFieldLoader = StoredFieldLoader.create(loadSource, sourceLoader.requiredStoredFields(), true);
37+
this.storedFieldLoader = StoredFieldLoader.create(isStoredSource, sourceLoader.requiredStoredFields(), true);
38+
this.isStoredSource = isStoredSource;
3739
}
3840

3941
@Override
@@ -44,6 +46,14 @@ public Source getSource(LeafReaderContext ctx, int doc) throws IOException {
4446
leaf = new Leaf(sourceLoader.leaf(ctx.reader(), null), storedFieldLoader.getLoader(ctx, null));
4547
var existing = leaves.put(id, leaf);
4648
assert existing == null : "unexpected source provider [" + existing + "]";
49+
} else if (isStoredSource == false && doc < leaf.doc) {
50+
// When queries reference the same runtime field in multiple clauses, each clause re-reads the values from the source in
51+
// increasing docId order. So the last docId accessed by the first clause is higher than the first docId read by the second
52+
// clause. This is okay for stored source, as stored fields do not restrict the order that docIds that can be accessed.
53+
// But with synthetic source, field values may come from doc values, which require than docIds only be read in increasing order.
54+
// To handle this, we detect lower docIds and create a new doc value reader for each clause.
55+
leaf = new Leaf(sourceLoader.leaf(ctx.reader(), null), storedFieldLoader.getLoader(ctx, null));
56+
leaves.put(id, leaf);
4757
}
4858
return leaf.getSource(ctx, doc);
4959
}

x-pack/plugin/logsdb/src/javaRestTest/java/org/elasticsearch/xpack/logsdb/LogsdbRestIT.java

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.elasticsearch.common.settings.Settings;
1313
import org.elasticsearch.common.time.DateFormatter;
1414
import org.elasticsearch.common.time.FormatNames;
15+
import org.elasticsearch.common.xcontent.support.XContentMapValues;
1516
import org.elasticsearch.index.IndexSettings;
1617
import org.elasticsearch.test.cluster.ElasticsearchCluster;
1718
import org.elasticsearch.test.cluster.local.distribution.DistributionType;
@@ -340,4 +341,115 @@ public void testLogsdbDefaultWithRecoveryUseSyntheticSource() throws IOException
340341
assertNull(settings.get("index.mapping.source.mode"));
341342
assertEquals("true", settings.get(IndexSettings.LOGSDB_SORT_ON_HOST_NAME.getKey()));
342343
}
344+
345+
public void testSyntheticSourceRuntimeFieldQueries() throws IOException {
346+
String mappings = """
347+
{
348+
"runtime": {
349+
"message_length": {
350+
"type": "long"
351+
}
352+
},
353+
"dynamic": false,
354+
"properties": {
355+
"@timestamp": {
356+
"type": "date"
357+
},
358+
"log" : {
359+
"properties": {
360+
"level": {
361+
"type": "keyword"
362+
}
363+
}
364+
}
365+
}
366+
}
367+
""";
368+
String indexName = "test-foo";
369+
createIndex(indexName, Settings.builder().put("index.mode", "logsdb").build(), mappings);
370+
371+
int numDocs = 1000;
372+
var sb = new StringBuilder();
373+
var now = Instant.now();
374+
for (int i = 0; i < numDocs; i++) {
375+
String level = randomBoolean() ? "info" : randomBoolean() ? "warning" : randomBoolean() ? "error" : "fatal";
376+
String msg = randomAlphaOfLength(20);
377+
String messageLength = Integer.toString(msg.length());
378+
sb.append("{ \"create\": {} }").append('\n');
379+
if (randomBoolean()) {
380+
sb.append("""
381+
{"@timestamp":"$now","message":"$msg","message_length":$l,"log":{"level":"$level"}}
382+
""".replace("$now", formatInstant(now)).replace("$level", level).replace("$msg", msg).replace("$l", messageLength));
383+
} else {
384+
sb.append("""
385+
{"@timestamp": "$now", "message": "$msg", "message_length": $l}
386+
""".replace("$now", formatInstant(now)).replace("$msg", msg).replace("$l", messageLength));
387+
}
388+
sb.append('\n');
389+
if (i != numDocs - 1) {
390+
now = now.plusSeconds(1);
391+
}
392+
}
393+
394+
var bulkRequest = new Request("POST", "/" + indexName + "/_bulk");
395+
bulkRequest.setJsonEntity(sb.toString());
396+
bulkRequest.addParameter("refresh", "true");
397+
var bulkResponse = client().performRequest(bulkRequest);
398+
var bulkResponseBody = responseAsMap(bulkResponse);
399+
assertThat(bulkResponseBody, Matchers.hasEntry("errors", false));
400+
401+
var forceMergeRequest = new Request("POST", "/" + indexName + "/_forcemerge");
402+
var forceMergeResponse = client().performRequest(forceMergeRequest);
403+
assertOK(forceMergeResponse);
404+
405+
var searchRequest = new Request("POST", "/" + indexName + "/_search");
406+
407+
searchRequest.setJsonEntity("""
408+
{
409+
"size": 1,
410+
"query": {
411+
"bool": {
412+
"should": [
413+
{
414+
"range": {
415+
"message_length": {
416+
"gte": 1,
417+
"lt": 900000
418+
}
419+
}
420+
},
421+
{
422+
"range": {
423+
"message_length": {
424+
"gte": 900000,
425+
"lt": 1000000
426+
}
427+
}
428+
}
429+
],
430+
"minimum_should_match": "1",
431+
"must_not": [
432+
{
433+
"range": {
434+
"message_length": {
435+
"lt": 0
436+
}
437+
}
438+
}
439+
]
440+
}
441+
}
442+
}
443+
""");
444+
var searchResponse = client().performRequest(searchRequest);
445+
assertOK(searchResponse);
446+
var searchResponseBody = responseAsMap(searchResponse);
447+
int totalHits = (int) XContentMapValues.extractValue("hits.total.value", searchResponseBody);
448+
assertThat(totalHits, equalTo(numDocs));
449+
450+
var shardsHeader = (Map<?, ?>) searchResponseBody.get("_shards");
451+
assertThat(shardsHeader.get("failed"), equalTo(0));
452+
assertThat(shardsHeader.get("successful"), equalTo(1));
453+
assertThat(shardsHeader.get("skipped"), equalTo(0));
454+
}
343455
}

0 commit comments

Comments
 (0)