Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,20 @@ && delta < ramBufferGranularity()) {
// is super important since we can not address more than 2048 MB per DWPT
setFlushPending(perThread);
}

// Buffer count limit check
if (!perThread.isFlushPending() && perThread.isApproachingBufferLimit()) {
if (infoStream.isEnabled("DWFC")) {
infoStream.message(
"DWFC",
"force flush due to buffer count limit approaching in DWPT "
+ perThread.getSegmentInfo().name
+ ": "
+ ", limit: "
+ 65000);
}
setFlushPending(perThread);
}
}
return checkout(perThread, false);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -753,6 +753,19 @@ long getCommitLastBytesUsedDelta() {
return delta;
}

/**
* Returns true if any of the ByteBlockPools used by this DWPT are approaching their buffer limit.
* This is used to trigger a flush before integer overflow occurs in ByteBlockPool.byteOffset.
*
* @return true if buffer count is approaching the limit
*/
boolean isApproachingBufferLimit() {
assert isHeldByCurrentThread();
// Use a threshold of 65000 to provide some safety margin before the actual limit of 65535
final int threshold = 65000;
return indexingChain.isApproachingBufferLimit(threshold);
}

@Override
public void lock() {
lock.lock();
Expand Down
25 changes: 25 additions & 0 deletions lucene/core/src/java/org/apache/lucene/index/IndexingChain.java
Original file line number Diff line number Diff line change
Expand Up @@ -1063,6 +1063,31 @@ public Collection<Accountable> getChildResources() {
vectorValuesConsumer.getAccountable());
}

/**
* Returns true if any of the ByteBlockPools used by this indexing chain are approaching their
* buffer limit.
*
* @param threshold the threshold below ByteBlockPool.MAX_BUFFER_COUNT to trigger the warning
* @return true if any ByteBlockPool buffer count >= threshold
*/
boolean isApproachingBufferLimit(int threshold) {
// Check the main byte pool used by terms hash
if (termsHash.bytePool.isApproachingBufferLimit(threshold)) {
return true;
}
// Check the doc values byte pool
if (docValuesBytePool.isApproachingBufferLimit(threshold)) {
return true;
}
// Check term vectors byte pool if it exists and is different from the main pool
if (termsHash.termBytePool != null
&& termsHash.termBytePool != termsHash.bytePool
&& termsHash.termBytePool.isApproachingBufferLimit(threshold)) {
return true;
}
return false;
}

/** NOTE: not static: accesses at least docState, termsHash. */
private final class PerField implements Comparable<PerField> {
final String fieldName;
Expand Down
23 changes: 23 additions & 0 deletions lucene/core/src/java/org/apache/lucene/util/ByteBlockPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,13 @@ public void recycleByteBlocks(byte[][] blocks, int start, int end) {
*/
public int byteOffset = -BYTE_BLOCK_SIZE;

/**
* Maximum number of buffers before integer overflow occurs in byteOffset calculation. Since
* byteOffset = bufferUpto * BYTE_BLOCK_SIZE, and BYTE_BLOCK_SIZE = 32768, the maximum safe value
* is Integer.MAX_VALUE / BYTE_BLOCK_SIZE = 65535.
*/
public static final int MAX_BUFFER_COUNT = Integer.MAX_VALUE / BYTE_BLOCK_SIZE;

private final Allocator allocator;

public ByteBlockPool(Allocator allocator) {
Expand Down Expand Up @@ -353,6 +360,22 @@ public long getPosition() {
return bufferUpto * allocator.blockSize + byteUpto;
}

/** Returns the current number of allocated buffers. */
public int getBufferCount() {
return bufferUpto + 1;
}

/**
* Returns true if the buffer count is approaching the limit that would cause integer overflow.
* This should be used to trigger a flush before the overflow occurs.
*
* @param threshold the threshold below MAX_BUFFER_COUNT to trigger the warning
* @return true if buffer count >= threshold
*/
public boolean isApproachingBufferLimit(int threshold) {
return getBufferCount() >= threshold;
}

/** Retrieve the buffer at the specified index from the buffer pool. */
public byte[] getBuffer(int bufferIndex) {
return buffers[bufferIndex];
Expand Down
138 changes: 138 additions & 0 deletions lucene/core/src/test/org/apache/lucene/index/TestBufferLimitFlush.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.index;

import java.io.IOException;
import java.util.Iterator;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.TextField;
import org.apache.lucene.store.Directory;
import org.apache.lucene.tests.analysis.MockAnalyzer;
import org.apache.lucene.tests.util.LuceneTestCase;
import org.apache.lucene.util.ByteBlockPool;

/** Test for the buffer limit flush functionality that prevents ByteBlockPool integer overflow. */
public class TestBufferLimitFlush extends LuceneTestCase {

public void testBufferLimitConstants() {
// Verify the buffer limit calculation is correct
assertEquals(32768, ByteBlockPool.BYTE_BLOCK_SIZE);
assertEquals(65535, ByteBlockPool.MAX_BUFFER_COUNT);

// Verify that MAX_BUFFER_COUNT is indeed the correct limit
// The overflow occurs when we try to allocate beyond this limit
int maxSafeOffset = ByteBlockPool.MAX_BUFFER_COUNT * ByteBlockPool.BYTE_BLOCK_SIZE;
assertTrue("Max safe offset should be positive", maxSafeOffset > 0);

// The overflow happens when we try to add another BYTE_BLOCK_SIZE beyond the limit
ArithmeticException exception =
expectThrows(
ArithmeticException.class,
() -> {
Math.addExact(maxSafeOffset, ByteBlockPool.BYTE_BLOCK_SIZE);
});

// Verify we got the expected overflow exception
assertNotNull("Should have thrown ArithmeticException due to overflow", exception);
}

public void testByteBlockPoolBufferLimitDetection() {
ByteBlockPool pool = new ByteBlockPool(new ByteBlockPool.DirectAllocator());
pool.nextBuffer();

// Test buffer count tracking
assertEquals(1, pool.getBufferCount());

// Test approaching limit detection with various thresholds
assertFalse(
"Should not be approaching limit with threshold 65000",
pool.isApproachingBufferLimit(65000));
assertFalse(
"Should not be approaching limit with threshold 2", pool.isApproachingBufferLimit(2));
assertTrue("Should be approaching limit with threshold 1", pool.isApproachingBufferLimit(1));
assertTrue("Should be approaching limit with threshold 0", pool.isApproachingBufferLimit(0));
}

public void testDWPTBufferLimitCheck() throws IOException {
Directory dir = newDirectory();
IndexWriterConfig iwc = newIndexWriterConfig(new MockAnalyzer(random()));
// Disable other flush triggers to isolate buffer limit testing
iwc.setRAMBufferSizeMB(256.0);
iwc.setMaxBufferedDocs(10000);

IndexWriter writer = new IndexWriter(dir, iwc);

// Add a document to initialize the DWPT
Document doc = new Document();
doc.add(new TextField("content", "test content", TextField.Store.NO));
writer.addDocument(doc);

// Get the active DWPT and test buffer limit checking
DocumentsWriter docsWriter = writer.getDocsWriter();
DocumentsWriterFlushControl flushControl = docsWriter.flushControl;

// Find an active DWPT
DocumentsWriterPerThread dwpt = null;
for (Iterator<DocumentsWriterPerThread> it = flushControl.allActiveWriters(); it.hasNext(); ) {
DocumentsWriterPerThread activeDwpt = it.next();
dwpt = activeDwpt;
break;
}

assertNotNull("Should have at least one active DWPT", dwpt);

// Test the buffer limit checking method exists and works
dwpt.lock();
try {
boolean approaching = dwpt.isApproachingBufferLimit();
// With normal documents, we should not be approaching the limit
assertFalse("Should not be approaching buffer limit with normal documents", approaching);
} finally {
dwpt.unlock();
}

writer.close();
dir.close();
}

public void testFlushControlBufferLimitIntegration() throws IOException {
Directory dir = newDirectory();
IndexWriterConfig iwc = newIndexWriterConfig(new MockAnalyzer(random()));
// Disable other flush triggers
iwc.setRAMBufferSizeMB(256.0);
iwc.setMaxBufferedDocs(10000);

IndexWriter writer = new IndexWriter(dir, iwc);
DocumentsWriter docsWriter = writer.getDocsWriter();
DocumentsWriterFlushControl flushControl = docsWriter.flushControl;

// Add a document to create a DWPT
Document doc = new Document();
doc.add(new TextField("content", "test", TextField.Store.NO));
writer.addDocument(doc);

// Verify that the flush control integration is working
// The doAfterDocument method should include our buffer limit check
// This is tested indirectly by ensuring no exceptions are thrown
// and the mechanism is in place

assertTrue("Should have active bytes after adding document", flushControl.activeBytes() > 0);

writer.close();
dir.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.TextField;
import org.apache.lucene.store.Directory;
import org.apache.lucene.tests.analysis.MockAnalyzer;
import org.apache.lucene.tests.store.MockDirectoryWrapper;
Expand Down Expand Up @@ -376,4 +378,46 @@ static void findPending(
}
}
}

public void testFlushByBufferLimit() throws IOException {
// Test that DWPT is flushed when buffer count approaches limit
Directory dir = newDirectory();
MockAnalyzer analyzer = new MockAnalyzer(random());
analyzer.setMaxTokenLength(TestUtil.nextInt(random(), 1, IndexWriter.MAX_TERM_LENGTH));

IndexWriterConfig iwc = newIndexWriterConfig(analyzer);
// Disable RAM and doc count based flushing to isolate buffer limit testing
iwc.setRAMBufferSizeMB(256.0);
iwc.setMaxBufferedDocs(10000);

IndexWriter writer = new IndexWriter(dir, iwc);
DocumentsWriter docsWriter = writer.getDocsWriter();
DocumentsWriterFlushControl flushControl = docsWriter.flushControl;

// Create a document with many unique terms to force buffer allocation
Document doc = new Document();
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 1000; i++) {
sb.append("term").append(i).append(" ");
}
doc.add(new TextField("content", sb.toString(), Field.Store.NO));

// Add documents until we approach buffer limit
// Note: This is a simplified test - in practice, reaching 65000 buffers would require
// an enormous amount of data, so we test the mechanism rather than the actual limit
writer.addDocument(doc);

writer.close();
dir.close();

// Verify that the buffer limit checking mechanism is in place
Iterator<DocumentsWriterPerThread> activeWriters = flushControl.allActiveWriters();
if (activeWriters.hasNext()) {
DocumentsWriterPerThread dwpt = activeWriters.next();
// The method should exist and not throw an exception
boolean approaching = dwpt.isApproachingBufferLimit();
// With normal document sizes, we shouldn't be approaching the limit
assertFalse("Should not be approaching buffer limit with normal documents", approaching);
}
}
}
17 changes: 17 additions & 0 deletions lucene/core/src/test/org/apache/lucene/util/TestByteBlockPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -167,4 +167,21 @@ public byte[] getByteBlock() {
assertTrue(throwsException);
assertTrue(pool.byteOffset + ByteBlockPool.BYTE_BLOCK_SIZE < pool.byteOffset);
}

public void testBufferLimitDetection() {
// Test the new buffer limit detection functionality
ByteBlockPool pool = new ByteBlockPool(new ByteBlockPool.DirectAllocator());
pool.nextBuffer();

// Initially should not be approaching limit
assertFalse(pool.isApproachingBufferLimit(65000));
assertEquals(1, pool.getBufferCount());

// Test with a lower threshold
assertTrue(pool.isApproachingBufferLimit(1));
assertTrue(pool.isApproachingBufferLimit(0));

// Verify MAX_BUFFER_COUNT constant
assertEquals(65535, ByteBlockPool.MAX_BUFFER_COUNT);
}
}
Loading