Skip to content

Commit f21cb86

Browse files
Use new source loader when lower docId is accessed (#128320) (#128411)
When using synthetic source, runtime fields data may come from doc values. Doc values iterators can only be read once, and in increasing docId order. But if a runtime field is referenced multiple times in a query, currently the same doc value iterator will be used. This causes an error, as the second field reference will attempt to read the same iterator from a lower docId than was previously used. The fix is to create a new source loader, and thus a new doc value iterator, if the requested docId is lower than the last seen docId.
1 parent 49d28fc commit f21cb86

File tree

3 files changed

+127
-0
lines changed

3 files changed

+127
-0
lines changed

docs/changelog/128320.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 128320
2+
summary: Use new source loader when lower `docId` is accessed
3+
area: Codec
4+
type: bug
5+
issues: []

server/src/main/java/org/elasticsearch/search/lookup/SyntheticSourceProvider.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,13 +38,22 @@ public Source getSource(LeafReaderContext ctx, int doc) throws IOException {
3838
provider = new SyntheticSourceLeafLoader(ctx);
3939
var existing = leaves.put(id, provider);
4040
assert existing == null : "unexpected source provider [" + existing + "]";
41+
} else if (doc < provider.lastSeenDocId) {
42+
// When queries reference the same runtime field in multiple clauses, each clause re-reads the values from the source in
43+
// increasing docId order. So the last docId accessed by the first clause is higher than the first docId read by the second
44+
// clause. This is okay for stored source, as stored fields do not restrict the order that docIds that can be accessed.
45+
// But with synthetic source, field values may come from doc values, which require than docIds only be read in increasing order.
46+
// To handle this, we detect lower docIds and create a new doc value reader for each clause.
47+
provider = new SyntheticSourceLeafLoader(ctx);
48+
leaves.put(id, provider);
4149
}
4250
return provider.getSource(doc);
4351
}
4452

4553
private class SyntheticSourceLeafLoader {
4654
private final LeafStoredFieldLoader leafLoader;
4755
private final SourceLoader.Leaf leaf;
56+
int lastSeenDocId = -1;
4857

4958
SyntheticSourceLeafLoader(LeafReaderContext ctx) throws IOException {
5059
this.leafLoader = (sourceLoader.requiredStoredFields().isEmpty())
@@ -54,6 +63,7 @@ private class SyntheticSourceLeafLoader {
5463
}
5564

5665
Source getSource(int doc) throws IOException {
66+
this.lastSeenDocId = doc;
5767
leafLoader.advanceTo(doc);
5868
return leaf.source(leafLoader, doc);
5969
}

x-pack/plugin/logsdb/src/javaRestTest/java/org/elasticsearch/xpack/logsdb/LogsdbRestIT.java

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.elasticsearch.common.settings.Settings;
1313
import org.elasticsearch.common.time.DateFormatter;
1414
import org.elasticsearch.common.time.FormatNames;
15+
import org.elasticsearch.common.xcontent.support.XContentMapValues;
1516
import org.elasticsearch.test.cluster.ElasticsearchCluster;
1617
import org.elasticsearch.test.cluster.local.distribution.DistributionType;
1718
import org.elasticsearch.test.rest.ESRestTestCase;
@@ -221,6 +222,117 @@ public void testEsqlRuntimeFields() throws IOException {
221222
assertThat(sumLength, equalTo(20 * numDocs));
222223
}
223224

225+
public void testSyntheticSourceRuntimeFieldQueries() throws IOException {
226+
String mappings = """
227+
{
228+
"runtime": {
229+
"message_length": {
230+
"type": "long"
231+
}
232+
},
233+
"dynamic": false,
234+
"properties": {
235+
"@timestamp": {
236+
"type": "date"
237+
},
238+
"log" : {
239+
"properties": {
240+
"level": {
241+
"type": "keyword"
242+
}
243+
}
244+
}
245+
}
246+
}
247+
""";
248+
String indexName = "test-foo";
249+
createIndex(indexName, Settings.builder().put("index.mode", "logsdb").build(), mappings);
250+
251+
int numDocs = 1000;
252+
var sb = new StringBuilder();
253+
var now = Instant.now();
254+
for (int i = 0; i < numDocs; i++) {
255+
String level = randomBoolean() ? "info" : randomBoolean() ? "warning" : randomBoolean() ? "error" : "fatal";
256+
String msg = randomAlphaOfLength(20);
257+
String messageLength = Integer.toString(msg.length());
258+
sb.append("{ \"create\": {} }").append('\n');
259+
if (randomBoolean()) {
260+
sb.append("""
261+
{"@timestamp":"$now","message":"$msg","message_length":$l,"log":{"level":"$level"}}
262+
""".replace("$now", formatInstant(now)).replace("$level", level).replace("$msg", msg).replace("$l", messageLength));
263+
} else {
264+
sb.append("""
265+
{"@timestamp": "$now", "message": "$msg", "message_length": $l}
266+
""".replace("$now", formatInstant(now)).replace("$msg", msg).replace("$l", messageLength));
267+
}
268+
sb.append('\n');
269+
if (i != numDocs - 1) {
270+
now = now.plusSeconds(1);
271+
}
272+
}
273+
274+
var bulkRequest = new Request("POST", "/" + indexName + "/_bulk");
275+
bulkRequest.setJsonEntity(sb.toString());
276+
bulkRequest.addParameter("refresh", "true");
277+
var bulkResponse = client().performRequest(bulkRequest);
278+
var bulkResponseBody = responseAsMap(bulkResponse);
279+
assertThat(bulkResponseBody, Matchers.hasEntry("errors", false));
280+
281+
var forceMergeRequest = new Request("POST", "/" + indexName + "/_forcemerge");
282+
var forceMergeResponse = client().performRequest(forceMergeRequest);
283+
assertOK(forceMergeResponse);
284+
285+
var searchRequest = new Request("POST", "/" + indexName + "/_search");
286+
287+
searchRequest.setJsonEntity("""
288+
{
289+
"size": 1,
290+
"query": {
291+
"bool": {
292+
"should": [
293+
{
294+
"range": {
295+
"message_length": {
296+
"gte": 1,
297+
"lt": 900000
298+
}
299+
}
300+
},
301+
{
302+
"range": {
303+
"message_length": {
304+
"gte": 900000,
305+
"lt": 1000000
306+
}
307+
}
308+
}
309+
],
310+
"minimum_should_match": "1",
311+
"must_not": [
312+
{
313+
"range": {
314+
"message_length": {
315+
"lt": 0
316+
}
317+
}
318+
}
319+
]
320+
}
321+
}
322+
}
323+
""");
324+
var searchResponse = client().performRequest(searchRequest);
325+
assertOK(searchResponse);
326+
var searchResponseBody = responseAsMap(searchResponse);
327+
int totalHits = (int) XContentMapValues.extractValue("hits.total.value", searchResponseBody);
328+
assertThat(totalHits, equalTo(numDocs));
329+
330+
var shardsHeader = (Map<?, ?>) searchResponseBody.get("_shards");
331+
assertThat(shardsHeader.get("failed"), equalTo(0));
332+
assertThat(shardsHeader.get("successful"), equalTo(1));
333+
assertThat(shardsHeader.get("skipped"), equalTo(0));
334+
}
335+
224336
static String formatInstant(Instant instant) {
225337
return DateFormatter.forPattern(FormatNames.STRICT_DATE_OPTIONAL_TIME.getName()).format(instant);
226338
}

0 commit comments

Comments
 (0)