Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,20 @@
import com.parquet.parquetdataformat.fields.ArrowSchemaBuilder;
import com.parquet.parquetdataformat.engine.read.ParquetDataSourceCodec;
import com.parquet.parquetdataformat.writer.ParquetWriter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.BlobStore;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.engine.DataFormatPlugin;
import org.opensearch.index.engine.exec.DataFormat;
import org.opensearch.index.engine.exec.IndexingExecutionEngine;
import com.parquet.parquetdataformat.bridge.RustBridge;
import com.parquet.parquetdataformat.engine.ParquetExecutionEngine;
import org.opensearch.index.shard.ShardPath;
import org.opensearch.index.store.FormatStoreDirectory;
import org.opensearch.index.store.GenericStoreDirectory;
import org.opensearch.plugins.DataSourcePlugin;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.plugins.Plugin;
Expand All @@ -26,6 +34,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

/**
* OpenSearch plugin that provides Parquet data format support for indexing operations.
Expand Down Expand Up @@ -58,16 +67,17 @@
*/
public class ParquetDataFormatPlugin extends Plugin implements DataFormatPlugin, DataSourcePlugin {

/**
* Set of file extensions that Parquet format handles
*/
private static final Set<String> PARQUET_EXTENSIONS = Set.of(".parquet", ".pqt");

@Override
@SuppressWarnings("unchecked")
public <T extends DataFormat> IndexingExecutionEngine<T> indexingEngine(MapperService mapperService, ShardPath shardPath) {
return (IndexingExecutionEngine<T>) new ParquetExecutionEngine(() -> ArrowSchemaBuilder.getSchema(mapperService), shardPath);
}

private Class<? extends DataFormat> getDataFormatType() {
return ParquetDataFormat.class;
}

@Override
public DataFormat getDataFormat() {
return new ParquetDataFormat();
Expand All @@ -83,6 +93,28 @@ public Optional<Map<org.opensearch.vectorized.execution.search.DataFormat, DataS
// return Optional.empty();
}


@Override
public FormatStoreDirectory<?> createFormatStoreDirectory(
IndexSettings indexSettings,
ShardPath shardPath
) throws IOException {
Logger logger = LogManager.getLogger("index.store.parquet." + shardPath.getShardId());

return new GenericStoreDirectory<>(
new ParquetDataFormat(),
shardPath.getDataPath(),
logger
);
}

@Override
public BlobContainer createBlobContainer(BlobStore blobStore, BlobPath baseBlobPath) throws IOException
{
BlobPath formatPath = baseBlobPath.add(getDataFormat().name().toLowerCase());
return blobStore.blobContainer(formatPath);
}

// for testing locally only
public void indexDataToParquetEngine() throws IOException {
//Create Engine (take Schema as Input)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public List<String> supportedFieldTypes() {

@Override
public Writer<ParquetDocumentInput> createWriter(long writerGeneration) throws IOException {
String fileName = Path.of(shardPath.getDataPath().toString(), FILE_NAME_PREFIX + "_" + writerGeneration + FILE_NAME_EXT).toString();
String fileName = Path.of(shardPath.getDataPath().toString(), getDataFormat().name(), FILE_NAME_PREFIX + "_" + writerGeneration + FILE_NAME_EXT).toString();
return new ParquetWriter(fileName, schema.get(), writerGeneration);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public MergeResult mergeParquetFiles(Collection<FileMetadata> files) {
Map<RowId, Long> rowIdMapping = new HashMap<>();

FileMetadata mergedFileMetadata = new FileMetadata(
"",
outputDirectory,
mergedFileName
);
Expand Down
2 changes: 1 addition & 1 deletion plugins/engine-datafusion/jni/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ pub extern "system" fn Java_org_opensearch_datafusion_DataFusionQueryJNI_execute

let substrait_plan = match Plan::decode(plan_bytes_vec.as_slice()) {
Ok(plan) => {
// println!("SUBSTRAIT rust: Decoding is successful, Plan has {} relations", plan.relations.len());
println!("SUBSTRAIT rust: Decoding is successful, Plan has {} relations", plan.relations.len());
plan
},
Err(e) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ public class DatafusionEngine extends SearchExecEngine<DatafusionContext, Datafu

public DatafusionEngine(DataFormat dataFormat, Collection<FileMetadata> formatCatalogSnapshot, DataFusionService dataFusionService, ShardPath shardPath) throws IOException {
this.dataFormat = dataFormat;

this.datafusionReaderManager = new DatafusionReaderManager(shardPath.getDataPath().toString(), formatCatalogSnapshot, dataFormat.getName());
String path = shardPath.getDataPath().resolve(dataFormat.getName()).toString();
this.datafusionReaderManager = new DatafusionReaderManager(path, formatCatalogSnapshot, dataFormat.getName());
this.datafusionService = dataFusionService;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.opensearch.core.index.Index;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.engine.exec.FileMetadata;
import org.opensearch.index.remote.RemoteStoreUtils;
import org.opensearch.index.remote.RemoteTranslogTransferTracker;
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
Expand Down Expand Up @@ -209,7 +210,7 @@ private Map<String, Map<String, Object>> getSegmentMetadata(
RemoteSegmentMetadata segmentMetadata = entry.getValue();

Map<String, Object> segmentMetadataMap = new HashMap<>();
Map<String, Object> filesMap = new HashMap<>();
Map<FileMetadata, Object> filesMap = new HashMap<>();
segmentMetadata.getMetadata().forEach((file, meta) -> {
Map<String, Object> metaMap = new HashMap<>();
metaMap.put("original_name", meta.getOriginalFilename());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import org.opensearch.common.Nullable;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.core.action.ActionListener;

import java.io.IOException;
Expand All @@ -50,6 +51,7 @@
*
* @opensearch.internal
*/
@PublicApi(since = "1.0.0")
public interface BlobContainer {

/**
Expand Down Expand Up @@ -276,6 +278,7 @@ default void writeBlobAtomicWithMetadata(

/**
* The type representing sort order of blob names
* @opensearch.api
*/
enum BlobNameSortOrder {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@

package org.opensearch.common.blobstore;

import org.opensearch.common.annotation.PublicApi;

/**
* An interface for providing basic metadata about a blob.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,14 @@

package org.opensearch.common.blobstore;

import org.opensearch.common.annotation.PublicApi;

/**
* The result of deleting multiple blobs from a {@link BlobStore}.
*
* @opensearch.internal
* @opensearch.api
*/
@PublicApi(since = "1.0.0")
public final class DeleteResult {

public static final DeleteResult ZERO = new DeleteResult(0, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import java.util.zip.CRC32;

import com.jcraft.jzlib.JZlib;
import org.opensearch.index.engine.exec.FileMetadata;
import org.opensearch.indices.replication.common.ReplicationLuceneIndex;

/**
* RemoteTransferContainer is an encapsulation for managing file transfers.
Expand All @@ -57,11 +59,12 @@ public class RemoteTransferContainer implements Closeable {
private final boolean isRemoteDataIntegritySupported;
private final AtomicBoolean readBlock = new AtomicBoolean();
private final Map<String, String> metadata;
private final String dataFormat;

private static final Logger log = LogManager.getLogger(RemoteTransferContainer.class);

/**
* Construct a new RemoteTransferContainer object
* OLD Construct a new RemoteTransferContainer object
*
* @param fileName Name of the local file
* @param remoteFileName Name of the remote file
Expand Down Expand Up @@ -91,10 +94,35 @@ public RemoteTransferContainer(
offsetRangeInputStreamSupplier,
expectedChecksum,
isRemoteDataIntegritySupported,
null,
null
);
}

public RemoteTransferContainer(
FileMetadata fileMetadata,
String remoteFileName,
long contentLength,
boolean failTransferIfFileExists,
WritePriority writePriority,
OffsetRangeInputStreamSupplier offsetRangeInputStreamSupplier,
Long expectedChecksum,
boolean isRemoteDataIntegritySupported
) {
this(
fileMetadata.file(),
remoteFileName,
contentLength,
failTransferIfFileExists,
writePriority,
offsetRangeInputStreamSupplier,
expectedChecksum,
isRemoteDataIntegritySupported,
null,
fileMetadata.dataFormat()
);
}

/**
* Construct a new RemoteTransferContainer object with metadata.
*
Expand All @@ -117,7 +145,8 @@ public RemoteTransferContainer(
OffsetRangeInputStreamSupplier offsetRangeInputStreamSupplier,
Long expectedChecksum,
boolean isRemoteDataIntegritySupported,
Map<String, String> metadata
Map<String, String> metadata,
String dataFormat
) {
this.fileName = fileName;
this.remoteFileName = remoteFileName;
Expand All @@ -128,6 +157,7 @@ public RemoteTransferContainer(
this.expectedChecksum = expectedChecksum;
this.isRemoteDataIntegritySupported = isRemoteDataIntegritySupported;
this.metadata = metadata;
this.dataFormat = dataFormat;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ public VersionedCodecStreamWrapper(
public T readStream(IndexInput indexInput) throws IOException {
logger.debug("Reading input stream [{}] of length - [{}]", indexInput.toString(), indexInput.length());
try {
CodecUtil.checksumEntireFile(indexInput);
// Todo: @kamal currently we are not adding checksum to the footer so getting this error
// Caused by: org.apache.lucene.index.CorruptIndexException: codec footer mismatch (file truncated?): actual footer=2 vs expected footer=-1071082520 (resource=BufferedChecksumIndexInput(metadata file))
// CodecUtil.checksumEntireFile(indexInput);
int readStreamVersion = checkHeader(indexInput);
return getHandlerForVersion(readStreamVersion).readContent(indexInput);
} catch (CorruptIndexException cie) {
Expand All @@ -84,7 +86,8 @@ public T readStream(IndexInput indexInput) throws IOException {
public void writeStream(IndexOutput indexOutput, T content) throws IOException {
this.writeHeader(indexOutput);
getHandlerForVersion(this.currentVersion).writeContent(indexOutput, content);
this.writeFooter(indexOutput);
// Todo: @Kamal, This API doesn't work with Parquet files.
// this.writeFooter(indexOutput);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,18 @@

package org.opensearch.common.util;

import org.opensearch.index.engine.exec.FileMetadata;

/**
* A tracker class that is fed to FileUploader.
*
* @opensearch.internal
*/
public interface UploadListener {

void beforeUpload(String file);
void beforeUpload(FileMetadata fileMetadata);

void onSuccess(String file);
void onSuccess(FileMetadata fileMetadata);

void onFailure(String file);
void onFailure(FileMetadata fileMetadata);
}
28 changes: 25 additions & 3 deletions server/src/main/java/org/opensearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import org.opensearch.index.shard.IndexingOperationListener;
import org.opensearch.index.shard.SearchOperationListener;
import org.opensearch.index.similarity.SimilarityService;
import org.opensearch.index.store.CompositeStoreDirectoryFactory;
import org.opensearch.index.store.DefaultCompositeDirectoryFactory;
import org.opensearch.index.store.FsDirectoryFactory;
import org.opensearch.index.store.Store;
Expand Down Expand Up @@ -267,6 +268,7 @@ public final class IndexModule {
private final Map<String, TriFunction<Settings, Version, ScriptService, Similarity>> similarities = new HashMap<>();
private final Map<String, IndexStorePlugin.DirectoryFactory> directoryFactories;
private final Map<String, IndexStorePlugin.CompositeDirectoryFactory> compositeDirectoryFactories;
private final Map<String, CompositeStoreDirectoryFactory> compositeStoreDirectoryFactories;
private final SetOnce<BiFunction<IndexSettings, IndicesQueryCache, QueryCache>> forceQueryCacheProvider = new SetOnce<>();
private final List<SearchOperationListener> searchOperationListeners = new ArrayList<>();
private final List<IndexingOperationListener> indexOperationListeners = new ArrayList<>();
Expand Down Expand Up @@ -300,7 +302,8 @@ public IndexModule(
final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories,
final Map<String, IndexStorePlugin.StoreFactory> storeFactories,
final FileCache fileCache,
final CompositeIndexSettings compositeIndexSettings
final CompositeIndexSettings compositeIndexSettings,
final Map<String, CompositeStoreDirectoryFactory> compositeStoreDirectoryFactories
) {
this.indexSettings = indexSettings;
this.analysisRegistry = analysisRegistry;
Expand All @@ -310,6 +313,7 @@ public IndexModule(
this.indexOperationListeners.add(new IndexingSlowLog(indexSettings));
this.directoryFactories = Collections.unmodifiableMap(directoryFactories);
this.compositeDirectoryFactories = Collections.unmodifiableMap(compositeDirectoryFactories);
this.compositeStoreDirectoryFactories = Collections.unmodifiableMap(compositeStoreDirectoryFactories);
this.allowExpensiveQueries = allowExpensiveQueries;
this.expressionResolver = expressionResolver;
this.recoveryStateFactories = recoveryStateFactories;
Expand Down Expand Up @@ -341,7 +345,8 @@ public IndexModule(
recoveryStateFactories,
Collections.emptyMap(),
null,
null
null,
Collections.emptyMap()
);
}

Expand Down Expand Up @@ -762,6 +767,10 @@ public IndexService newIndexService(
indexSettings,
compositeDirectoryFactories
);
final CompositeStoreDirectoryFactory compositeStoreDirectoryFactory = getCompositeStoreDirectoryFactory(
indexSettings,
compositeStoreDirectoryFactories
);
final IndexStorePlugin.RecoveryStateFactory recoveryStateFactory = getRecoveryStateFactory(indexSettings, recoveryStateFactories);
QueryCache queryCache = null;
IndexAnalyzers indexAnalyzers = null;
Expand Down Expand Up @@ -825,7 +834,8 @@ public IndexService newIndexService(
segmentReplicationStatsProvider,
clusterDefaultMaxMergeAtOnceSupplier,
searchEnginePlugin,
pluginsService
pluginsService,
compositeStoreDirectoryFactory
);
success = true;
return indexService;
Expand Down Expand Up @@ -884,6 +894,18 @@ private static IndexStorePlugin.CompositeDirectoryFactory getCompositeDirectoryF
return factory;
}

private static CompositeStoreDirectoryFactory getCompositeStoreDirectoryFactory(
final IndexSettings indexSettings,
final Map<String, CompositeStoreDirectoryFactory> compositeStoreDirectoryFactories
) {
if (compositeStoreDirectoryFactories.isEmpty()) {
return null;
}
// For now, return the default factory if available
// In future, could add index setting to select specific factory type
return compositeStoreDirectoryFactories.get("default");
}

private static IndexStorePlugin.RecoveryStateFactory getRecoveryStateFactory(
final IndexSettings indexSettings,
final Map<String, IndexStorePlugin.RecoveryStateFactory> recoveryStateFactories
Expand Down
Loading
Loading