diff --git a/docs/changelog/128320.yaml b/docs/changelog/128320.yaml new file mode 100644 index 0000000000000..ecd575d1ce93f --- /dev/null +++ b/docs/changelog/128320.yaml @@ -0,0 +1,5 @@ +pr: 128320 +summary: Use new source loader when lower `docId` is accessed +area: Codec +type: bug +issues: [] diff --git a/server/src/main/java/org/elasticsearch/search/lookup/SyntheticSourceProvider.java b/server/src/main/java/org/elasticsearch/search/lookup/SyntheticSourceProvider.java index 8078f4cb9cb8e..763c97635a75e 100644 --- a/server/src/main/java/org/elasticsearch/search/lookup/SyntheticSourceProvider.java +++ b/server/src/main/java/org/elasticsearch/search/lookup/SyntheticSourceProvider.java @@ -38,6 +38,14 @@ public Source getSource(LeafReaderContext ctx, int doc) throws IOException { provider = new SyntheticSourceLeafLoader(ctx); var existing = leaves.put(id, provider); assert existing == null : "unexpected source provider [" + existing + "]"; + } else if (doc < provider.lastSeenDocId) { + // When queries reference the same runtime field in multiple clauses, each clause re-reads the values from the source in + // increasing docId order. So the last docId accessed by the first clause is higher than the first docId read by the second + // clause. This is okay for stored source, as stored fields do not restrict the order that docIds that can be accessed. + // But with synthetic source, field values may come from doc values, which require than docIds only be read in increasing order. + // To handle this, we detect lower docIds and create a new doc value reader for each clause. + provider = new SyntheticSourceLeafLoader(ctx); + leaves.put(id, provider); } return provider.getSource(doc); } @@ -45,6 +53,7 @@ public Source getSource(LeafReaderContext ctx, int doc) throws IOException { private class SyntheticSourceLeafLoader { private final LeafStoredFieldLoader leafLoader; private final SourceLoader.Leaf leaf; + int lastSeenDocId = -1; SyntheticSourceLeafLoader(LeafReaderContext ctx) throws IOException { this.leafLoader = (sourceLoader.requiredStoredFields().isEmpty()) @@ -54,6 +63,7 @@ private class SyntheticSourceLeafLoader { } Source getSource(int doc) throws IOException { + this.lastSeenDocId = doc; leafLoader.advanceTo(doc); return leaf.source(leafLoader, doc); } diff --git a/x-pack/plugin/logsdb/src/javaRestTest/java/org/elasticsearch/xpack/logsdb/LogsdbRestIT.java b/x-pack/plugin/logsdb/src/javaRestTest/java/org/elasticsearch/xpack/logsdb/LogsdbRestIT.java index b4ebab693b591..9491fdf4684e2 100644 --- a/x-pack/plugin/logsdb/src/javaRestTest/java/org/elasticsearch/xpack/logsdb/LogsdbRestIT.java +++ b/x-pack/plugin/logsdb/src/javaRestTest/java/org/elasticsearch/xpack/logsdb/LogsdbRestIT.java @@ -12,6 +12,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.time.DateFormatter; import org.elasticsearch.common.time.FormatNames; +import org.elasticsearch.common.xcontent.support.XContentMapValues; import org.elasticsearch.index.IndexSettings; import org.elasticsearch.test.cluster.ElasticsearchCluster; import org.elasticsearch.test.cluster.local.distribution.DistributionType; @@ -340,4 +341,115 @@ public void testLogsdbDefaultWithRecoveryUseSyntheticSource() throws IOException assertNull(settings.get("index.mapping.source.mode")); assertEquals("true", settings.get(IndexSettings.LOGSDB_SORT_ON_HOST_NAME.getKey())); } + + public void testSyntheticSourceRuntimeFieldQueries() throws IOException { + String mappings = """ + { + "runtime": { + "message_length": { + "type": "long" + } + }, + "dynamic": false, + "properties": { + "@timestamp": { + "type": "date" + }, + "log" : { + "properties": { + "level": { + "type": "keyword" + } + } + } + } + } + """; + String indexName = "test-foo"; + createIndex(indexName, Settings.builder().put("index.mode", "logsdb").build(), mappings); + + int numDocs = 1000; + var sb = new StringBuilder(); + var now = Instant.now(); + for (int i = 0; i < numDocs; i++) { + String level = randomBoolean() ? "info" : randomBoolean() ? "warning" : randomBoolean() ? "error" : "fatal"; + String msg = randomAlphaOfLength(20); + String messageLength = Integer.toString(msg.length()); + sb.append("{ \"create\": {} }").append('\n'); + if (randomBoolean()) { + sb.append(""" + {"@timestamp":"$now","message":"$msg","message_length":$l,"log":{"level":"$level"}} + """.replace("$now", formatInstant(now)).replace("$level", level).replace("$msg", msg).replace("$l", messageLength)); + } else { + sb.append(""" + {"@timestamp": "$now", "message": "$msg", "message_length": $l} + """.replace("$now", formatInstant(now)).replace("$msg", msg).replace("$l", messageLength)); + } + sb.append('\n'); + if (i != numDocs - 1) { + now = now.plusSeconds(1); + } + } + + var bulkRequest = new Request("POST", "/" + indexName + "/_bulk"); + bulkRequest.setJsonEntity(sb.toString()); + bulkRequest.addParameter("refresh", "true"); + var bulkResponse = client().performRequest(bulkRequest); + var bulkResponseBody = responseAsMap(bulkResponse); + assertThat(bulkResponseBody, Matchers.hasEntry("errors", false)); + + var forceMergeRequest = new Request("POST", "/" + indexName + "/_forcemerge"); + var forceMergeResponse = client().performRequest(forceMergeRequest); + assertOK(forceMergeResponse); + + var searchRequest = new Request("POST", "/" + indexName + "/_search"); + + searchRequest.setJsonEntity(""" + { + "size": 1, + "query": { + "bool": { + "should": [ + { + "range": { + "message_length": { + "gte": 1, + "lt": 900000 + } + } + }, + { + "range": { + "message_length": { + "gte": 900000, + "lt": 1000000 + } + } + } + ], + "minimum_should_match": "1", + "must_not": [ + { + "range": { + "message_length": { + "lt": 0 + } + } + } + ] + } + } + } + """); + var searchResponse = client().performRequest(searchRequest); + assertOK(searchResponse); + var searchResponseBody = responseAsMap(searchResponse); + int totalHits = (int) XContentMapValues.extractValue("hits.total.value", searchResponseBody); + assertThat(totalHits, equalTo(numDocs)); + + var shardsHeader = (Map) searchResponseBody.get("_shards"); + assertThat(shardsHeader.get("failed"), equalTo(0)); + assertThat(shardsHeader.get("successful"), equalTo(1)); + assertThat(shardsHeader.get("skipped"), equalTo(0)); + } }