Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,14 @@
import com.parquet.parquetdataformat.fields.ArrowSchemaBuilder;
import com.parquet.parquetdataformat.engine.read.ParquetDataSourceCodec;
import com.parquet.parquetdataformat.writer.ParquetWriter;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.index.engine.DataFormatPlugin;
import org.opensearch.index.engine.exec.DataFormat;
import org.opensearch.index.engine.exec.IndexingExecutionEngine;
Expand All @@ -20,12 +28,16 @@
import org.opensearch.plugins.DataSourcePlugin;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.plugins.Plugin;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.script.ScriptService;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.client.Client;
import org.opensearch.vectorized.execution.search.spi.DataSourceCodec;
import org.opensearch.watcher.ResourceWatcherService;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.*;
import java.util.function.Supplier;

/**
* OpenSearch plugin that provides Parquet data format support for indexing operations.
Expand Down Expand Up @@ -58,10 +70,39 @@
*/
public class ParquetDataFormatPlugin extends Plugin implements DataFormatPlugin, DataSourcePlugin {

private Settings settings;

public static String DEFAULT_MAX_NATIVE_ALLOCATION = "10%";

public static final Setting<String> INDEX_MAX_NATIVE_ALLOCATION = Setting.simpleString(
"index.parquet.max_native_allocation",
DEFAULT_MAX_NATIVE_ALLOCATION,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add Validation for syntax of setting value.

Setting.Property.NodeScope,
Setting.Property.Dynamic
);

@Override
@SuppressWarnings("unchecked")
public <T extends DataFormat> IndexingExecutionEngine<T> indexingEngine(MapperService mapperService, ShardPath shardPath) {
return (IndexingExecutionEngine<T>) new ParquetExecutionEngine(() -> ArrowSchemaBuilder.getSchema(mapperService), shardPath);
return (IndexingExecutionEngine<T>) new ParquetExecutionEngine(settings, () -> ArrowSchemaBuilder.getSchema(mapperService), shardPath);
}

@Override
public Collection<Object> createComponents(
Client client,
ClusterService clusterService,
ThreadPool threadPool,
ResourceWatcherService resourceWatcherService,
ScriptService scriptService,
NamedXContentRegistry xContentRegistry,
Environment environment,
NodeEnvironment nodeEnvironment,
NamedWriteableRegistry namedWriteableRegistry,
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<RepositoriesService> repositoriesServiceSupplier
) {
this.settings = clusterService.getSettings();
return super.createComponents(client, clusterService, threadPool, resourceWatcherService, scriptService, xContentRegistry, environment, nodeEnvironment, namedWriteableRegistry, indexNameExpressionResolver, repositoriesServiceSupplier);
}

private Class<? extends DataFormat> getDataFormatType() {
Expand All @@ -83,6 +124,11 @@ public Optional<Map<org.opensearch.vectorized.execution.search.DataFormat, DataS
// return Optional.empty();
}

@Override
public List<Setting<?>> getSettings() {
return List.of(INDEX_MAX_NATIVE_ALLOCATION);
}

// for testing locally only
public void indexDataToParquetEngine() throws IOException {
//Create Engine (take Schema as Input)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ private static void loadNativeLibrary() {
public static native void write(String file, long arrayAddress, long schemaAddress) throws IOException;
public static native void closeWriter(String file) throws IOException;
public static native void flushToDisk(String file) throws IOException;
public static native long getFilteredNativeBytesUsed(String pathPrefix);

// State and metrics methods handled on Rust side
public static native boolean writerExists(String file);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
package com.parquet.parquetdataformat.engine;

import com.parquet.parquetdataformat.bridge.RustBridge;
import com.parquet.parquetdataformat.memory.ArrowBufferPool;
import com.parquet.parquetdataformat.merge.CompactionStrategy;
import com.parquet.parquetdataformat.merge.ParquetMergeExecutor;
import com.parquet.parquetdataformat.merge.ParquetMerger;
import com.parquet.parquetdataformat.writer.ParquetDocumentInput;
import com.parquet.parquetdataformat.writer.ParquetWriter;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.engine.exec.DataFormat;
import org.opensearch.index.engine.exec.IndexingExecutionEngine;
import org.opensearch.index.engine.exec.Merger;
Expand All @@ -15,6 +21,7 @@
import org.opensearch.index.engine.exec.WriterFileSet;
import org.opensearch.index.shard.ShardPath;

import java.io.Closeable;
import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
Expand Down Expand Up @@ -59,18 +66,22 @@
*/
public class ParquetExecutionEngine implements IndexingExecutionEngine<ParquetDataFormat> {

private static final Logger logger = LogManager.getLogger(ParquetExecutionEngine.class);

public static final String FILE_NAME_PREFIX = "_parquet_file_generation";
private static final Pattern FILE_PATTERN = Pattern.compile(".*_(\\d+)\\.parquet$", Pattern.CASE_INSENSITIVE);
private static final String FILE_NAME_PREFIX = "_parquet_file_generation";
private static final String FILE_NAME_EXT = ".parquet";

private final Supplier<Schema> schema;
private final List<WriterFileSet> filesWrittenAlready = new ArrayList<>();
private final ShardPath shardPath;
private final ParquetMerger parquetMerger = new ParquetMergeExecutor(CompactionStrategy.RECORD_BATCH);
private final ArrowBufferPool arrowBufferPool;

public ParquetExecutionEngine(Supplier<Schema> schema, ShardPath shardPath) {
public ParquetExecutionEngine(Settings settings, Supplier<Schema> schema, ShardPath shardPath) {
this.schema = schema;
this.shardPath = shardPath;
this.arrowBufferPool = new ArrowBufferPool(settings);
}

@Override
Expand Down Expand Up @@ -98,7 +109,7 @@ public List<String> supportedFieldTypes() {
@Override
public Writer<ParquetDocumentInput> createWriter(long writerGeneration) throws IOException {
String fileName = Path.of(shardPath.getDataPath().toString(), FILE_NAME_PREFIX + "_" + writerGeneration + FILE_NAME_EXT).toString();
return new ParquetWriter(fileName, schema.get(), writerGeneration);
return new ParquetWriter(fileName, schema.get(), writerGeneration, arrowBufferPool);
}

@Override
Expand All @@ -121,4 +132,19 @@ public RefreshResult refresh(RefreshInput refreshInput) throws IOException {
public DataFormat getDataFormat() {
return new ParquetDataFormat();
}

@Override
public long getNativeBytesUsed() {
long vsrMemory = arrowBufferPool.getTotalAllocatedBytes();
String shardDataPath = shardPath.getDataPath().toString();
long filteredArrowWriterMemory = RustBridge.getFilteredNativeBytesUsed(shardDataPath);
logger.info("Native memory used by VSR Buffer Pool: {}", vsrMemory);
logger.info("Native memory used by ArrowWriters in shard path {}: {}", shardDataPath, filteredArrowWriterMemory);
return vsrMemory + filteredArrowWriterMemory;
}

@Override
public void close() throws IOException {
arrowBufferPool.close();
}
}
Loading
Loading