diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java index df3e6458aed7..4d7c44ea4d7e 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterFlushControl.java @@ -239,6 +239,20 @@ && delta < ramBufferGranularity()) { // is super important since we can not address more than 2048 MB per DWPT setFlushPending(perThread); } + + // Buffer count limit check + if (!perThread.isFlushPending() && perThread.isApproachingBufferLimit()) { + if (infoStream.isEnabled("DWFC")) { + infoStream.message( + "DWFC", + "force flush due to buffer count limit approaching in DWPT " + + perThread.getSegmentInfo().name + + ": " + + ", limit: " + + 65000); + } + setFlushPending(perThread); + } } return checkout(perThread, false); } finally { diff --git a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java index 8b81a6a9b7fa..a2f595e89809 100644 --- a/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java +++ b/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterPerThread.java @@ -753,6 +753,19 @@ long getCommitLastBytesUsedDelta() { return delta; } + /** + * Returns true if any of the ByteBlockPools used by this DWPT are approaching their buffer limit. + * This is used to trigger a flush before integer overflow occurs in ByteBlockPool.byteOffset. + * + * @return true if buffer count is approaching the limit + */ + boolean isApproachingBufferLimit() { + assert isHeldByCurrentThread(); + // Use a threshold of 65000 to provide some safety margin before the actual limit of 65535 + final int threshold = 65000; + return indexingChain.isApproachingBufferLimit(threshold); + } + @Override public void lock() { lock.lock(); diff --git a/lucene/core/src/java/org/apache/lucene/index/IndexingChain.java b/lucene/core/src/java/org/apache/lucene/index/IndexingChain.java index 29e7003012e8..bc6256702e7e 100644 --- a/lucene/core/src/java/org/apache/lucene/index/IndexingChain.java +++ b/lucene/core/src/java/org/apache/lucene/index/IndexingChain.java @@ -1063,6 +1063,31 @@ public Collection getChildResources() { vectorValuesConsumer.getAccountable()); } + /** + * Returns true if any of the ByteBlockPools used by this indexing chain are approaching their + * buffer limit. + * + * @param threshold the threshold below ByteBlockPool.MAX_BUFFER_COUNT to trigger the warning + * @return true if any ByteBlockPool buffer count >= threshold + */ + boolean isApproachingBufferLimit(int threshold) { + // Check the main byte pool used by terms hash + if (termsHash.bytePool.isApproachingBufferLimit(threshold)) { + return true; + } + // Check the doc values byte pool + if (docValuesBytePool.isApproachingBufferLimit(threshold)) { + return true; + } + // Check term vectors byte pool if it exists and is different from the main pool + if (termsHash.termBytePool != null + && termsHash.termBytePool != termsHash.bytePool + && termsHash.termBytePool.isApproachingBufferLimit(threshold)) { + return true; + } + return false; + } + /** NOTE: not static: accesses at least docState, termsHash. */ private final class PerField implements Comparable { final String fieldName; diff --git a/lucene/core/src/java/org/apache/lucene/util/ByteBlockPool.java b/lucene/core/src/java/org/apache/lucene/util/ByteBlockPool.java index 986118acdedf..d8d9e7dace19 100644 --- a/lucene/core/src/java/org/apache/lucene/util/ByteBlockPool.java +++ b/lucene/core/src/java/org/apache/lucene/util/ByteBlockPool.java @@ -125,6 +125,13 @@ public void recycleByteBlocks(byte[][] blocks, int start, int end) { */ public int byteOffset = -BYTE_BLOCK_SIZE; + /** + * Maximum number of buffers before integer overflow occurs in byteOffset calculation. Since + * byteOffset = bufferUpto * BYTE_BLOCK_SIZE, and BYTE_BLOCK_SIZE = 32768, the maximum safe value + * is Integer.MAX_VALUE / BYTE_BLOCK_SIZE = 65535. + */ + public static final int MAX_BUFFER_COUNT = Integer.MAX_VALUE / BYTE_BLOCK_SIZE; + private final Allocator allocator; public ByteBlockPool(Allocator allocator) { @@ -353,6 +360,22 @@ public long getPosition() { return bufferUpto * allocator.blockSize + byteUpto; } + /** Returns the current number of allocated buffers. */ + public int getBufferCount() { + return bufferUpto + 1; + } + + /** + * Returns true if the buffer count is approaching the limit that would cause integer overflow. + * This should be used to trigger a flush before the overflow occurs. + * + * @param threshold the threshold below MAX_BUFFER_COUNT to trigger the warning + * @return true if buffer count >= threshold + */ + public boolean isApproachingBufferLimit(int threshold) { + return getBufferCount() >= threshold; + } + /** Retrieve the buffer at the specified index from the buffer pool. */ public byte[] getBuffer(int bufferIndex) { return buffers[bufferIndex]; diff --git a/lucene/core/src/test/org/apache/lucene/index/TestBufferLimitFlush.java b/lucene/core/src/test/org/apache/lucene/index/TestBufferLimitFlush.java new file mode 100644 index 000000000000..0d77bc4ff771 --- /dev/null +++ b/lucene/core/src/test/org/apache/lucene/index/TestBufferLimitFlush.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.lucene.index; + +import java.io.IOException; +import java.util.Iterator; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.TextField; +import org.apache.lucene.store.Directory; +import org.apache.lucene.tests.analysis.MockAnalyzer; +import org.apache.lucene.tests.util.LuceneTestCase; +import org.apache.lucene.util.ByteBlockPool; + +/** Test for the buffer limit flush functionality that prevents ByteBlockPool integer overflow. */ +public class TestBufferLimitFlush extends LuceneTestCase { + + public void testBufferLimitConstants() { + // Verify the buffer limit calculation is correct + assertEquals(32768, ByteBlockPool.BYTE_BLOCK_SIZE); + assertEquals(65535, ByteBlockPool.MAX_BUFFER_COUNT); + + // Verify that MAX_BUFFER_COUNT is indeed the correct limit + // The overflow occurs when we try to allocate beyond this limit + int maxSafeOffset = ByteBlockPool.MAX_BUFFER_COUNT * ByteBlockPool.BYTE_BLOCK_SIZE; + assertTrue("Max safe offset should be positive", maxSafeOffset > 0); + + // The overflow happens when we try to add another BYTE_BLOCK_SIZE beyond the limit + ArithmeticException exception = + expectThrows( + ArithmeticException.class, + () -> { + Math.addExact(maxSafeOffset, ByteBlockPool.BYTE_BLOCK_SIZE); + }); + + // Verify we got the expected overflow exception + assertNotNull("Should have thrown ArithmeticException due to overflow", exception); + } + + public void testByteBlockPoolBufferLimitDetection() { + ByteBlockPool pool = new ByteBlockPool(new ByteBlockPool.DirectAllocator()); + pool.nextBuffer(); + + // Test buffer count tracking + assertEquals(1, pool.getBufferCount()); + + // Test approaching limit detection with various thresholds + assertFalse( + "Should not be approaching limit with threshold 65000", + pool.isApproachingBufferLimit(65000)); + assertFalse( + "Should not be approaching limit with threshold 2", pool.isApproachingBufferLimit(2)); + assertTrue("Should be approaching limit with threshold 1", pool.isApproachingBufferLimit(1)); + assertTrue("Should be approaching limit with threshold 0", pool.isApproachingBufferLimit(0)); + } + + public void testDWPTBufferLimitCheck() throws IOException { + Directory dir = newDirectory(); + IndexWriterConfig iwc = newIndexWriterConfig(new MockAnalyzer(random())); + // Disable other flush triggers to isolate buffer limit testing + iwc.setRAMBufferSizeMB(256.0); + iwc.setMaxBufferedDocs(10000); + + IndexWriter writer = new IndexWriter(dir, iwc); + + // Add a document to initialize the DWPT + Document doc = new Document(); + doc.add(new TextField("content", "test content", TextField.Store.NO)); + writer.addDocument(doc); + + // Get the active DWPT and test buffer limit checking + DocumentsWriter docsWriter = writer.getDocsWriter(); + DocumentsWriterFlushControl flushControl = docsWriter.flushControl; + + // Find an active DWPT + DocumentsWriterPerThread dwpt = null; + for (Iterator it = flushControl.allActiveWriters(); it.hasNext(); ) { + DocumentsWriterPerThread activeDwpt = it.next(); + dwpt = activeDwpt; + break; + } + + assertNotNull("Should have at least one active DWPT", dwpt); + + // Test the buffer limit checking method exists and works + dwpt.lock(); + try { + boolean approaching = dwpt.isApproachingBufferLimit(); + // With normal documents, we should not be approaching the limit + assertFalse("Should not be approaching buffer limit with normal documents", approaching); + } finally { + dwpt.unlock(); + } + + writer.close(); + dir.close(); + } + + public void testFlushControlBufferLimitIntegration() throws IOException { + Directory dir = newDirectory(); + IndexWriterConfig iwc = newIndexWriterConfig(new MockAnalyzer(random())); + // Disable other flush triggers + iwc.setRAMBufferSizeMB(256.0); + iwc.setMaxBufferedDocs(10000); + + IndexWriter writer = new IndexWriter(dir, iwc); + DocumentsWriter docsWriter = writer.getDocsWriter(); + DocumentsWriterFlushControl flushControl = docsWriter.flushControl; + + // Add a document to create a DWPT + Document doc = new Document(); + doc.add(new TextField("content", "test", TextField.Store.NO)); + writer.addDocument(doc); + + // Verify that the flush control integration is working + // The doAfterDocument method should include our buffer limit check + // This is tested indirectly by ensuring no exceptions are thrown + // and the mechanism is in place + + assertTrue("Should have active bytes after adding document", flushControl.activeBytes() > 0); + + writer.close(); + dir.close(); + } +} diff --git a/lucene/core/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java b/lucene/core/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java index aca1c5e4898b..11b58c995e19 100644 --- a/lucene/core/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java +++ b/lucene/core/src/test/org/apache/lucene/index/TestFlushByRamOrCountsPolicy.java @@ -21,6 +21,8 @@ import java.util.Iterator; import java.util.concurrent.atomic.AtomicInteger; import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.document.TextField; import org.apache.lucene.store.Directory; import org.apache.lucene.tests.analysis.MockAnalyzer; import org.apache.lucene.tests.store.MockDirectoryWrapper; @@ -376,4 +378,46 @@ static void findPending( } } } + + public void testFlushByBufferLimit() throws IOException { + // Test that DWPT is flushed when buffer count approaches limit + Directory dir = newDirectory(); + MockAnalyzer analyzer = new MockAnalyzer(random()); + analyzer.setMaxTokenLength(TestUtil.nextInt(random(), 1, IndexWriter.MAX_TERM_LENGTH)); + + IndexWriterConfig iwc = newIndexWriterConfig(analyzer); + // Disable RAM and doc count based flushing to isolate buffer limit testing + iwc.setRAMBufferSizeMB(256.0); + iwc.setMaxBufferedDocs(10000); + + IndexWriter writer = new IndexWriter(dir, iwc); + DocumentsWriter docsWriter = writer.getDocsWriter(); + DocumentsWriterFlushControl flushControl = docsWriter.flushControl; + + // Create a document with many unique terms to force buffer allocation + Document doc = new Document(); + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < 1000; i++) { + sb.append("term").append(i).append(" "); + } + doc.add(new TextField("content", sb.toString(), Field.Store.NO)); + + // Add documents until we approach buffer limit + // Note: This is a simplified test - in practice, reaching 65000 buffers would require + // an enormous amount of data, so we test the mechanism rather than the actual limit + writer.addDocument(doc); + + writer.close(); + dir.close(); + + // Verify that the buffer limit checking mechanism is in place + Iterator activeWriters = flushControl.allActiveWriters(); + if (activeWriters.hasNext()) { + DocumentsWriterPerThread dwpt = activeWriters.next(); + // The method should exist and not throw an exception + boolean approaching = dwpt.isApproachingBufferLimit(); + // With normal document sizes, we shouldn't be approaching the limit + assertFalse("Should not be approaching buffer limit with normal documents", approaching); + } + } } diff --git a/lucene/core/src/test/org/apache/lucene/util/TestByteBlockPool.java b/lucene/core/src/test/org/apache/lucene/util/TestByteBlockPool.java index 4f4226d3fd31..2f93c229c13c 100644 --- a/lucene/core/src/test/org/apache/lucene/util/TestByteBlockPool.java +++ b/lucene/core/src/test/org/apache/lucene/util/TestByteBlockPool.java @@ -167,4 +167,21 @@ public byte[] getByteBlock() { assertTrue(throwsException); assertTrue(pool.byteOffset + ByteBlockPool.BYTE_BLOCK_SIZE < pool.byteOffset); } + + public void testBufferLimitDetection() { + // Test the new buffer limit detection functionality + ByteBlockPool pool = new ByteBlockPool(new ByteBlockPool.DirectAllocator()); + pool.nextBuffer(); + + // Initially should not be approaching limit + assertFalse(pool.isApproachingBufferLimit(65000)); + assertEquals(1, pool.getBufferCount()); + + // Test with a lower threshold + assertTrue(pool.isApproachingBufferLimit(1)); + assertTrue(pool.isApproachingBufferLimit(0)); + + // Verify MAX_BUFFER_COUNT constant + assertEquals(65535, ByteBlockPool.MAX_BUFFER_COUNT); + } }