-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Description
Description
Context
ByteBlockPool is used to store terms in FreqProxTermsWriter
across fields until flush gets triggered. [Also docValuesByteBlockPool and termsVector byteBlockPool are there for reference]
And since each buffer size in ByteBlockPool
is 32KB and offset tracker is integer, if we allocate more than 65535 buffers, it results in overflow.
We have past issues such as #9660 - which handles the overflow by throwing an exception in this PR - #12392.
But the issue continues to occur as during indexing , if a field has large amount of tokens, buffer will overflow and the exception will be thrown as below:
message [shard failure, reason [index id[3458764570588151359] origin[LOCAL_TRANSLOG_RECOVERY] seq#[53664468]]], failure [NotSerializableExceptionWrapper[arithmetic_exception: integer overflow]], markAsStale [true]]
NotSerializableExceptionWrapper[arithmetic_exception: integer overflow]
at java.lang.Math.addExact(Math.java:883)
at org.apache.lucene.util.ByteBlockPool.nextBuffer(ByteBlockPool.java:199)
at org.apache.lucene.index.ByteSlicePool.allocKnownSizeSlice(ByteSlicePool.java:118)
at org.apache.lucene.index.ByteSlicePool.allocSlice(ByteSlicePool.java:98)
at org.apache.lucene.index.TermsHashPerField.writeByte(TermsHashPerField.java:226)
at org.apache.lucene.index.TermsHashPerField.writeVInt(TermsHashPerField.java:266)
at org.apache.lucene.index.FreqProxTermsWriterPerField.writeProx(FreqProxTermsWriterPerField.java:86)
at org.apache.lucene.index.FreqProxTermsWriterPerField.addTerm(FreqProxTermsWriterPerField.java:197)
at org.apache.lucene.index.TermsHashPerField.positionStreamSlice(TermsHashPerField.java:214)
at org.apache.lucene.index.TermsHashPerField.add(TermsHashPerField.java:202)
at org.apache.lucene.index.IndexingChain$PerField.invertTokenStream(IndexingChain.java:1287)
at org.apache.lucene.index.IndexingChain$PerField.invert(IndexingChain.java:1183)
at org.apache.lucene.index.IndexingChain.processField(IndexingChain.java:731)
at org.apache.lucene.index.IndexingChain.processDocument(IndexingChain.java:609)
at org.apache.lucene.index.DocumentsWriterPerThread.updateDocuments(DocumentsWriterPerThread.java:263)
at org.apache.lucene.index.DocumentsWriter.updateDocuments(DocumentsWriter.java:425)
at org.apache.lucene.index.IndexWriter.updateDocuments(IndexWriter.java:1558)
at org.apache.lucene.index.IndexWriter.addDocuments(IndexWriter.java:1516)
at org.opensearch.index.engine.InternalEngine.addStaleDocs(InternalEngine.java:1291)
at org.opensearch.index.engine.InternalEngine.indexIntoLucene(InternalEngine.java:1210)
at org.opensearch.index.engine.InternalEngine.index(InternalEngine.java:1011)
at org.opensearch.index.shard.IndexShard.index(IndexShard.java:1226)
at org.opensearch.index.shard.IndexShard.applyIndexOperation(IndexShard.java:1171)
at org.opensearch.index.shard.IndexShard.applyTranslogOperation(IndexShard.java:2418)
at org.opensearch.index.shard.IndexShard.runTranslogRecovery(IndexShard.java:2474)
at org.opensearch.index.shard.IndexShard.lambda$recoverLocallyUpToGlobalCheckpoint$14(IndexShard.java:2291)
at org.opensearch.index.engine.InternalEngine.recoverFromTranslogInternal(InternalEngine.java:571)
at org.opensearch.index.engine.InternalEngine.recoverFromTranslog(InternalEngine.java:546)
at org.opensearch.index.engine.InternalEngine.recoverFromTranslog(InternalEngine.java:146)
at org.opensearch.index.shard.IndexShard.recoverLocallyUpToGlobalCheckpoint(IndexShard.java:2301)
at org.opensearch.index.shard.IndexShard.recoverLocallyAndFetchStartSeqNo(IndexShard.java:2331)
at org.opensearch.indices.recovery.PeerRecoveryTargetService.doRecovery(PeerRecoveryTargetService.java:267)
at org.opensearch.indices.recovery.PeerRecoveryTargetService$RecoveryRunner.doRun(PeerRecoveryTargetService.java:618)
at org.opensearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:922)
at org.opensearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:52)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.lang.Thread.run(Thread.java:840)
Proposal
Can we flush the DWPT if the ByteBlockPool buffer is reaching its limit ?
In DocumentsWriterFlushControl
, in doAfterDocument
, similar to how we mark the DWPT as flushPending
when perThread.ramBytesUsed() > hardMaxBytesPerDWPT
, we can pass the buffer size approaching threshold from byteblockpool -> indexingchain -> DWPT -> DocumentsWriterFlushControl and we can mark the DWPT flushPending
POC code : 23fb34f
DocumentsWriterPerThread doAfterDocument(DocumentsWriterPerThread perThread) {
...
...
activeBytes += delta;
assert updatePeaks(delta);
flushPolicy.onChange(this, perThread);
if (!perThread.isFlushPending() && perThread.ramBytesUsed() > hardMaxBytesPerDWPT) {
// Safety check to prevent a single DWPT exceeding its RAM limit. This
// is super important since we can not address more than 2048 MB per DWPT
setFlushPending(perThread);
}
// Buffer count limit check
if (!perThread.isFlushPending() && perThread.isApproachingBufferLimit()) {
if (infoStream.isEnabled("DWFC")) {
infoStream.message("DWFC",
"force flush due to buffer count limit approaching in DWPT " +
perThread.getSegmentInfo().name + ": " +
", limit: " + 65000);
}
setFlushPending(perThread);
}
}
return checkout(perThread, false);
} finally {
boolean stalled = updateStallState();
assert assertNumDocsSinceStalled(stalled) && assertMemory();
}
}
}