Skip to content

Commit e389179

Browse files
committed
IndexingMemoryController Integration for VSRs and ArrowWriters for parquet-data-format module
- Integrated native memory tracking for VSRs and ArrowWriters in IndexingMemoryController - Fixed ArrowBufferPool allocator creation logic to have a single RootAllocator per shard and ChildAllocators for each ParquetWriter - Fixed VSR rotation bugs in ParquetDocumentInput.addToWriter code path Signed-off-by: Raghuvansh Raj <[email protected]>
1 parent 73975ab commit e389179

File tree

16 files changed

+360
-232
lines changed

16 files changed

+360
-232
lines changed

modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/ParquetDataFormatPlugin.java

Lines changed: 50 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,14 @@
1111
import com.parquet.parquetdataformat.fields.ArrowSchemaBuilder;
1212
import com.parquet.parquetdataformat.engine.read.ParquetDataSourceCodec;
1313
import com.parquet.parquetdataformat.writer.ParquetWriter;
14+
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
15+
import org.opensearch.cluster.service.ClusterService;
16+
import org.opensearch.common.settings.Setting;
17+
import org.opensearch.common.settings.Settings;
18+
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
19+
import org.opensearch.core.xcontent.NamedXContentRegistry;
20+
import org.opensearch.env.Environment;
21+
import org.opensearch.env.NodeEnvironment;
1422
import org.opensearch.index.engine.DataFormatPlugin;
1523
import org.opensearch.index.engine.exec.DataFormat;
1624
import org.opensearch.index.engine.exec.IndexingExecutionEngine;
@@ -20,12 +28,16 @@
2028
import org.opensearch.plugins.DataSourcePlugin;
2129
import org.opensearch.index.mapper.MapperService;
2230
import org.opensearch.plugins.Plugin;
31+
import org.opensearch.repositories.RepositoriesService;
32+
import org.opensearch.script.ScriptService;
33+
import org.opensearch.threadpool.ThreadPool;
34+
import org.opensearch.transport.client.Client;
2335
import org.opensearch.vectorized.execution.search.spi.DataSourceCodec;
36+
import org.opensearch.watcher.ResourceWatcherService;
2437

2538
import java.io.IOException;
26-
import java.util.HashMap;
27-
import java.util.Map;
28-
import java.util.Optional;
39+
import java.util.*;
40+
import java.util.function.Supplier;
2941

3042
/**
3143
* OpenSearch plugin that provides Parquet data format support for indexing operations.
@@ -58,10 +70,39 @@
5870
*/
5971
public class ParquetDataFormatPlugin extends Plugin implements DataFormatPlugin, DataSourcePlugin {
6072

73+
private Settings settings;
74+
75+
public static String DEFAULT_MAX_NATIVE_ALLOCATION = "10%";
76+
77+
public static final Setting<String> INDEX_MAX_NATIVE_ALLOCATION = Setting.simpleString(
78+
"index.parquet.max_native_allocation",
79+
DEFAULT_MAX_NATIVE_ALLOCATION,
80+
Setting.Property.NodeScope,
81+
Setting.Property.Dynamic
82+
);
83+
6184
@Override
6285
@SuppressWarnings("unchecked")
6386
public <T extends DataFormat> IndexingExecutionEngine<T> indexingEngine(MapperService mapperService, ShardPath shardPath) {
64-
return (IndexingExecutionEngine<T>) new ParquetExecutionEngine(() -> ArrowSchemaBuilder.getSchema(mapperService), shardPath);
87+
return (IndexingExecutionEngine<T>) new ParquetExecutionEngine(settings, () -> ArrowSchemaBuilder.getSchema(mapperService), shardPath);
88+
}
89+
90+
@Override
91+
public Collection<Object> createComponents(
92+
Client client,
93+
ClusterService clusterService,
94+
ThreadPool threadPool,
95+
ResourceWatcherService resourceWatcherService,
96+
ScriptService scriptService,
97+
NamedXContentRegistry xContentRegistry,
98+
Environment environment,
99+
NodeEnvironment nodeEnvironment,
100+
NamedWriteableRegistry namedWriteableRegistry,
101+
IndexNameExpressionResolver indexNameExpressionResolver,
102+
Supplier<RepositoriesService> repositoriesServiceSupplier
103+
) {
104+
this.settings = clusterService.getSettings();
105+
return super.createComponents(client, clusterService, threadPool, resourceWatcherService, scriptService, xContentRegistry, environment, nodeEnvironment, namedWriteableRegistry, indexNameExpressionResolver, repositoriesServiceSupplier);
65106
}
66107

67108
private Class<? extends DataFormat> getDataFormatType() {
@@ -83,6 +124,11 @@ public Optional<Map<org.opensearch.vectorized.execution.search.DataFormat, DataS
83124
// return Optional.empty();
84125
}
85126

127+
@Override
128+
public List<Setting<?>> getSettings() {
129+
return List.of(INDEX_MAX_NATIVE_ALLOCATION);
130+
}
131+
86132
// for testing locally only
87133
public void indexDataToParquetEngine() throws IOException {
88134
//Create Engine (take Schema as Input)

modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/bridge/RustBridge.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ private static void loadNativeLibrary() {
8686
public static native void write(String file, long arrayAddress, long schemaAddress) throws IOException;
8787
public static native void closeWriter(String file) throws IOException;
8888
public static native void flushToDisk(String file) throws IOException;
89+
public static native long getFilteredNativeBytesUsed(String pathPrefix);
8990

9091
// State and metrics methods handled on Rust side
9192
public static native boolean writerExists(String file);

modules/parquet-data-format/src/main/java/com/parquet/parquetdataformat/engine/ParquetExecutionEngine.java

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,17 @@
11
package com.parquet.parquetdataformat.engine;
22

3+
import com.parquet.parquetdataformat.bridge.RustBridge;
4+
import com.parquet.parquetdataformat.memory.ArrowBufferPool;
35
import com.parquet.parquetdataformat.merge.CompactionStrategy;
46
import com.parquet.parquetdataformat.merge.ParquetMergeExecutor;
57
import com.parquet.parquetdataformat.merge.ParquetMerger;
68
import com.parquet.parquetdataformat.writer.ParquetDocumentInput;
79
import com.parquet.parquetdataformat.writer.ParquetWriter;
810
import org.apache.arrow.vector.types.pojo.Schema;
11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
13+
import org.opensearch.common.settings.Setting;
14+
import org.opensearch.common.settings.Settings;
915
import org.opensearch.index.engine.exec.DataFormat;
1016
import org.opensearch.index.engine.exec.IndexingExecutionEngine;
1117
import org.opensearch.index.engine.exec.Merger;
@@ -15,6 +21,7 @@
1521
import org.opensearch.index.engine.exec.WriterFileSet;
1622
import org.opensearch.index.shard.ShardPath;
1723

24+
import java.io.Closeable;
1825
import java.io.IOException;
1926
import java.nio.file.DirectoryStream;
2027
import java.nio.file.Files;
@@ -59,18 +66,22 @@
5966
*/
6067
public class ParquetExecutionEngine implements IndexingExecutionEngine<ParquetDataFormat> {
6168

69+
private static final Logger logger = LogManager.getLogger(ParquetExecutionEngine.class);
70+
71+
public static final String FILE_NAME_PREFIX = "_parquet_file_generation";
6272
private static final Pattern FILE_PATTERN = Pattern.compile(".*_(\\d+)\\.parquet$", Pattern.CASE_INSENSITIVE);
63-
private static final String FILE_NAME_PREFIX = "_parquet_file_generation";
6473
private static final String FILE_NAME_EXT = ".parquet";
6574

6675
private final Supplier<Schema> schema;
6776
private final List<WriterFileSet> filesWrittenAlready = new ArrayList<>();
6877
private final ShardPath shardPath;
6978
private final ParquetMerger parquetMerger = new ParquetMergeExecutor(CompactionStrategy.RECORD_BATCH);
79+
private final ArrowBufferPool arrowBufferPool;
7080

71-
public ParquetExecutionEngine(Supplier<Schema> schema, ShardPath shardPath) {
81+
public ParquetExecutionEngine(Settings settings, Supplier<Schema> schema, ShardPath shardPath) {
7282
this.schema = schema;
7383
this.shardPath = shardPath;
84+
this.arrowBufferPool = new ArrowBufferPool(settings);
7485
}
7586

7687
@Override
@@ -98,7 +109,7 @@ public List<String> supportedFieldTypes() {
98109
@Override
99110
public Writer<ParquetDocumentInput> createWriter(long writerGeneration) throws IOException {
100111
String fileName = Path.of(shardPath.getDataPath().toString(), FILE_NAME_PREFIX + "_" + writerGeneration + FILE_NAME_EXT).toString();
101-
return new ParquetWriter(fileName, schema.get(), writerGeneration);
112+
return new ParquetWriter(fileName, schema.get(), writerGeneration, arrowBufferPool);
102113
}
103114

104115
@Override
@@ -121,4 +132,19 @@ public RefreshResult refresh(RefreshInput refreshInput) throws IOException {
121132
public DataFormat getDataFormat() {
122133
return new ParquetDataFormat();
123134
}
135+
136+
@Override
137+
public long getNativeBytesUsed() {
138+
long vsrMemory = arrowBufferPool.getTotalAllocatedBytes();
139+
String shardDataPath = shardPath.getDataPath().toString();
140+
long filteredArrowWriterMemory = RustBridge.getFilteredNativeBytesUsed(shardDataPath);
141+
logger.info("Native memory used by VSR Buffer Pool: {}", vsrMemory);
142+
logger.info("Native memory used by ArrowWriters in shard path {}: {}", shardDataPath, filteredArrowWriterMemory);
143+
return vsrMemory + filteredArrowWriterMemory;
144+
}
145+
146+
@Override
147+
public void close() throws IOException {
148+
arrowBufferPool.close();
149+
}
124150
}

0 commit comments

Comments
 (0)