Skip to content

Lucene as a primary engine for merge in pluggable dataformat engine#21735

Draft
darjisagar7 wants to merge 1 commit into
opensearch-project:mainfrom
darjisagar7:PrimaryMergeLucene
Draft

Lucene as a primary engine for merge in pluggable dataformat engine#21735
darjisagar7 wants to merge 1 commit into
opensearch-project:mainfrom
darjisagar7:PrimaryMergeLucene

Conversation

@darjisagar7
Copy link
Copy Markdown
Contributor

@darjisagar7 darjisagar7 commented May 19, 2026

Description

[Describe what this change achieves]

Related Issues

Resolves #[Issue number to be closed when this PR is merged]

Check List

  • Functionality includes testing.
  • API changes companion pull request created, if applicable.
  • Public documentation issue/PR created, if applicable.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Signed-off-by: Sagar Darji <darsaga@amazon.com>

# Conflicts:
#	sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/index/LuceneIndexingExecutionEngine.java
#	sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/merge/LuceneMerger.java
#	sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/merge/PrimaryLuceneMergeStrategy.java
#	sandbox/plugins/parquet-data-format/src/main/java/org/opensearch/parquet/merge/NativeParquetMergeStrategy.java
@darjisagar7 darjisagar7 requested a review from a team as a code owner May 19, 2026 13:47
@darjisagar7 darjisagar7 marked this pull request as draft May 19, 2026 13:48
@github-actions
Copy link
Copy Markdown
Contributor

PR Code Analyzer ❗

AI-powered 'Code-Diff-Analyzer' found issues on commit faaa969.

PathLineSeverityDescription
sandbox/plugins/parquet-data-format/src/main/rust/src/merge/reordered.rs133lowSilent fallback to identity mapping (.unwrap_or(local_row_id as i64)) when get_new_position() returns None. This silently produces incorrect row ordering instead of failing loudly, which could mask data corruption in secondary format output if the mapping is incomplete or mismatched. No malicious intent is evident, but the behavior suppresses a detectable error condition.
sandbox/plugins/parquet-data-format/src/main/java/org/opensearch/parquet/merge/NativeParquetMergeStrategy.java88lowIn the else branch, local variables 'mergeMetadata' and 'rowIdMapping' shadow the outer 'mergeMetadata' and 'outputMapping' variables. The result is that on the non-secondary path, outputMapping is never assigned the actual RowIdMapping, causing a null mapping to be returned in MergeResult. This appears to be a scoping bug rather than malicious intent, but could silently break primary-format merge consumers that depend on a valid RowIdMapping.

The table above displays the top 10 most important findings.

Total: 2 | Critical: 0 | High: 0 | Medium: 0 | Low: 2


Pull Requests Author(s): Please update your Pull Request according to the report above.

Repository Maintainer(s): You can bypass diff analyzer by adding label skip-diff-analyzer after reviewing the changes carefully, then re-run failed actions. To re-enable the analyzer, remove the label, then re-run all actions.


⚠️ Note: The Code-Diff-Analyzer helps protect against potentially harmful code patterns. Please ensure you have thoroughly reviewed the changes beforehand.

Thanks.

@github-actions
Copy link
Copy Markdown
Contributor

PR Reviewer Guide 🔍

Here are some key observations to aid the review process:

🧪 PR contains tests
🔒 No security concerns identified
✅ No TODO sections
🔀 No multiple PR themes
⚡ Recommended focus areas for review

Unreachable Code

Lines 95-96 declare variables mergeMetadata and outputMapping inside the if (isSecondary) block, shadowing the outer declarations at lines 85-86. The outer variables remain uninitialized when isSecondary is false, causing a compilation error at line 99 where mergeMetadata is used. The code after line 96 is unreachable because the else block is missing its body.

ParquetFileMetadata mergeMetadata;
RowIdMapping outputMapping;

if (isSecondary) {
    // Secondary mode: reorder rows according to external RowIdMapping from primary format
    mergeMetadata = RustBridge.mergeParquetFilesWithMapping(filePaths, mergedFilePath.toString(), indexName, externalMapping);
    outputMapping = null;
} else {
    // Merge files in Rust
    MergeFilesResult merged = RustBridge.mergeParquetFilesInRust(filePaths, mergedFilePath.toString(), indexName, writerGeneration);
    ParquetFileMetadata mergeMetadata = merged.metadata();
    RowIdMapping rowIdMapping = merged.rowIdMapping();
}

assert mergeMetadata.numRows() > 0 : "Merged file should contain at least one row";
Memory Inefficiency

Lines 133-148 load all input Parquet files entirely into memory (all_batches). For large merges with many input files or large row groups, this can exhaust available memory. The two-pass approach (build schedule, then read all data) prevents streaming and forces full materialization before writing begins.

let mut all_batches: Vec<Vec<arrow::array::RecordBatch>> = Vec::with_capacity(input_files.len());
for (file_idx, path) in input_files.iter().enumerate() {
    let file = File::open(path)?;
    let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
    let parquet_descr = builder.parquet_schema().clone();
    let schema = builder.schema().clone();
    let projection_indices = projection_indices_excluding_row_id(&schema);
    let projection = parquet::arrow::ProjectionMask::roots(&parquet_descr, projection_indices);
    let reader = builder.with_batch_size(batch_size).with_projection(projection).build()?;

    let mut batches = Vec::new();
    for batch_result in reader {
        batches.push(batch_result?);
    }
    all_batches.push(batches);
}
Potential NPE

Line 79 calls primaryOneMerge.getIndexWriter() which may return null if setIndexWriter was not called. Although line 142 in LuceneMerger.java sets it, if createOneMerge is invoked independently or the merge flow changes, this will throw IllegalStateException at line 79. The null check happens after attempting to use the writer at line 82.

if (indexWriter == null) {
    throw new IllegalStateException("IndexWriter not set on PrimaryOneMerge — cannot read merged segment");
}

try (DirectoryReader dirReader = DirectoryReader.open(indexWriter)) {

@github-actions
Copy link
Copy Markdown
Contributor

PR Code Suggestions ✨

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Fix variable declaration and assignment

The variable declarations for mergeMetadata and outputMapping in the secondary
branch are correct, but the primary branch declares mergeMetadata again and uses
rowIdMapping instead of outputMapping. This creates a compilation error due to
duplicate variable declaration and inconsistent variable naming. Assign to the
already-declared variables instead.

sandbox/plugins/parquet-data-format/src/main/java/org/opensearch/parquet/merge/NativeParquetMergeStrategy.java [84-97]

 if (isSecondary) {
     // Secondary mode: reorder rows according to external RowIdMapping from primary format
     mergeMetadata = RustBridge.mergeParquetFilesWithMapping(filePaths, mergedFilePath.toString(), indexName, externalMapping);
     outputMapping = null;
 } else {
     // Merge files in Rust
     MergeFilesResult merged = RustBridge.mergeParquetFilesInRust(filePaths, mergedFilePath.toString(), indexName, writerGeneration);
-    ParquetFileMetadata mergeMetadata = merged.metadata();
-    RowIdMapping rowIdMapping = merged.rowIdMapping();
+    mergeMetadata = merged.metadata();
+    outputMapping = merged.rowIdMapping();
 }
Suggestion importance[1-10]: 10

__

Why: This is a critical compilation error. The code declares mergeMetadata twice and uses inconsistent variable names (rowIdMapping vs outputMapping), which will prevent the code from compiling. The suggestion correctly identifies that variables should be assigned to the already-declared mergeMetadata and outputMapping.

High
Commit before reading merged segment

Opening a DirectoryReader on an IndexWriter that may have uncommitted changes can
lead to inconsistent reads or missing the merged segment. Ensure the merged segment
is committed or visible before attempting to read it, or use
DirectoryReader.openIfChanged with proper NRT reader handling.

sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/merge/PrimaryLuceneMergeStrategy.java [82-98]

+indexWriter.commit();
 try (DirectoryReader dirReader = DirectoryReader.open(indexWriter)) {
     LeafReader reader = findMergedSegmentReader(dirReader, mergedInfo);
-    SortedNumericDocValues rowIdValues = reader.getSortedNumericDocValues(DocumentInput.ROW_ID_FIELD);
-    if (rowIdValues == null) {
-        throw new IllegalStateException("Merged segment does not contain " + DocumentInput.ROW_ID_FIELD + " doc values");
-    }
-
-    for (int newPos = 0; newPos < reader.maxDoc(); newPos++) {
-        if (!rowIdValues.advanceExact(newPos)) {
-            throw new IllegalStateException("Doc " + newPos + " missing " + DocumentInput.ROW_ID_FIELD + " value");
-        }
-        long globalRowId = rowIdValues.nextValue();
-        // globalRowId = cumulativeOffset + localRowId (set by PrimaryOneMerge)
-        // mapping[globalRowId] = newPos
-        mappingArray[(int) globalRowId] = newPos;
-    }
+    ...
 }
Suggestion importance[1-10]: 7

__

Why: This is a valid concern about reading uncommitted changes from an IndexWriter. However, the suggestion to add indexWriter.commit() may not be the correct solution in this context, as the merge operation itself handles commits. The code should verify that the merged segment is visible before reading, but forcing a commit here might interfere with the merge lifecycle.

Medium
General
Avoid loading all rows into memory

Loading all input rows into memory before writing can cause out-of-memory errors for
large datasets. Consider a streaming approach that processes rows in chunks,
maintaining a buffer of upcoming rows sorted by output position, or use
memory-mapped files for large merges.

sandbox/plugins/parquet-data-format/src/main/rust/src/merge/reordered.rs [133-148]

-let mut all_batches: Vec<Vec<arrow::array::RecordBatch>> = Vec::with_capacity(input_files.len());
-for (file_idx, path) in input_files.iter().enumerate() {
-    let file = File::open(path)?;
-    let builder = ParquetRecordBatchReaderBuilder::try_new(file)?;
-    ...
-    let mut batches = Vec::new();
-    for batch_result in reader {
-        batches.push(batch_result?);
-    }
-    all_batches.push(batches);
-}
+// Process in chunks to avoid loading entire dataset into memory
+const MAX_BUFFER_SIZE: usize = 100_000;
+let mut buffer: Vec<(i64, usize, usize)> = Vec::with_capacity(MAX_BUFFER_SIZE);
+// Stream rows in schedule order, maintaining a sliding window buffer
+...
Suggestion importance[1-10]: 6

__

Why: This is a valid performance and scalability concern. Loading all rows into memory can cause OOM errors for large datasets. However, the suggestion is more of an optimization recommendation rather than a critical bug fix. The current implementation may be acceptable for moderate-sized datasets, and the suggested streaming approach would require significant architectural changes.

Low

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for faaa969: null

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant