> sourceProviders;
+
+ /**
+ * Constructs a ConversionService instance with required dependencies.
+ *
+ * This constructor initializes the ConversionService using the provided service configuration.
+ * It retrieves the Hadoop configuration, creates a new ConversionController with the Hadoop
+ * configuration, and initializes conversion source providers based on the Hadoop configuration.
+ *
+ * @param serviceConfig the conversion service configuration
+ */
+ @Inject
+ public ConversionService(ConversionServiceConfig serviceConfig) {
+ this.serviceConfig = serviceConfig;
+ this.hadoopConf = getHadoopConf();
+ this.conversionController = new ConversionController(hadoopConf);
+ this.sourceProviders = initSourceProviders(hadoopConf);
+ }
+
+ /**
+ * Retrieves the Hadoop configuration.
+ *
+ *
This method creates a new {@code Configuration} instance, reads the Hadoop configuration
+ * file path from the service configuration, and attempts to load the configuration from the
+ * specified XML file. If no resources are loaded, it logs a warning. If an error occurs during
+ * configuration loading, it logs an error message.
+ *
+ * @return the initialized Hadoop {@code Configuration}
+ */
+ private Configuration getHadoopConf() {
+ Configuration conf = new Configuration();
+ String hadoopConfigPath = serviceConfig.getHadoopConfigPath();
+ try {
+ // Load configuration from the specified XML file
+ conf.addResource(hadoopConfigPath);
+
+ // If the resource wasn’t found, log a warning
+ if (conf.size() == 0) {
+ log.warn(
+ "Could not load Hadoop configuration from: {}. Using default Hadoop configuration.",
+ hadoopConfigPath);
+ }
+ } catch (Exception e) {
+ log.error(
+ "Error loading Hadoop configuration from: {}. Exception: {}",
+ hadoopConfigPath,
+ e.getMessage(),
+ e);
+ }
+ return conf;
+ }
+
+ /**
+ * Initializes conversion source providers for different table formats using the provided Hadoop
+ * configuration.
+ *
+ *
This method creates and initializes source providers for HUDI, DELTA, and ICEBERG formats.
+ * Each provider is initialized with the given Hadoop configuration and then mapped to its
+ * respective table format identifier.
+ *
+ * @param hadoopConf the Hadoop configuration used to initialize the source providers
+ * @return a map mapping table format identifiers to their corresponding initialized conversion
+ * source providers
+ */
+ private Map> initSourceProviders(Configuration hadoopConf) {
+ Map> sourceProviders = new HashMap<>();
+ ConversionSourceProvider hudiConversionSourceProvider =
+ new HudiConversionSourceProvider();
+ ConversionSourceProvider deltaConversionSourceProvider =
+ new DeltaConversionSourceProvider();
+ ConversionSourceProvider icebergConversionSourceProvider =
+ new IcebergConversionSourceProvider();
+
+ hudiConversionSourceProvider.init(hadoopConf);
+ deltaConversionSourceProvider.init(hadoopConf);
+ icebergConversionSourceProvider.init(hadoopConf);
+
+ sourceProviders.put(HUDI, hudiConversionSourceProvider);
+ sourceProviders.put(DELTA, deltaConversionSourceProvider);
+ sourceProviders.put(ICEBERG, icebergConversionSourceProvider);
+
+ return sourceProviders;
+ }
+
+ /**
+ * Constructs a new ConversionService instance for testing purposes.
+ *
+ * This constructor is visible for testing using dependency injection. It allows the injection
+ * of a preconfigured ConversionController, Hadoop configuration, and source providers.
+ *
+ * @param serviceConfig the conversion service configuration
+ * @param conversionController a preconfigured conversion controller
+ * @param hadoopConf the Hadoop configuration to be used for initializing resources
+ * @param sourceProviders a map of conversion source providers keyed by table format
+ */
+ @VisibleForTesting
+ public ConversionService(
+ ConversionServiceConfig serviceConfig,
+ ConversionController conversionController,
+ Configuration hadoopConf,
+ Map> sourceProviders) {
+ this.serviceConfig = serviceConfig;
+ this.conversionController = conversionController;
+ this.hadoopConf = hadoopConf;
+ this.sourceProviders = sourceProviders;
+ }
+
+ /**
+ * Converts a source table to one or more target table formats.
+ *
+ * @param convertTableRequest the conversion request containing source table details and target
+ * formats
+ * @return a ConvertTableResponse containing details of the converted target tables
+ */
+ public ConvertTableResponse convertTable(ConvertTableRequest convertTableRequest) {
+ SourceTable sourceTable =
+ SourceTable.builder()
+ .name(convertTableRequest.getSourceTableName())
+ .basePath(convertTableRequest.getSourceTablePath())
+ .formatName(convertTableRequest.getSourceFormat())
+ .build();
+
+ List targetTables = new ArrayList<>();
+ for (String targetFormat : convertTableRequest.getTargetFormats()) {
+ TargetTable targetTable =
+ TargetTable.builder()
+ .name(convertTableRequest.getSourceTableName())
+ .basePath(convertTableRequest.getSourceTablePath())
+ .formatName(targetFormat)
+ .build();
+ targetTables.add(targetTable);
+ }
+
+ ConversionConfig conversionConfig =
+ ConversionConfig.builder().sourceTable(sourceTable).targetTables(targetTables).build();
+
+ conversionController.sync(
+ conversionConfig, sourceProviders.get(convertTableRequest.getSourceFormat()));
+
+ List convertedTables = new ArrayList<>();
+ for (TargetTable targetTable : targetTables) {
+ InternalTable internalTable =
+ sourceProviders
+ .get(targetTable.getFormatName())
+ .getConversionSourceInstance(convertToSourceTable(targetTable))
+ .getCurrentTable();
+ String schemaString = extractSchemaString(targetTable, internalTable);
+ convertedTables.add(
+ ConvertedTable.builder()
+ .targetFormat(internalTable.getName())
+ .targetSchema(schemaString)
+ .targetMetadataPath(internalTable.getLatestMetdataPath())
+ .build());
+ }
+ return new ConvertTableResponse(convertedTables);
+ }
+
+ /**
+ * Extracts the schema string from the given internal table based on the target table format.
+ *
+ * This method supports the following table formats:
+ *
+ *
+ * - HUDI: Converts the internal schema to an Avro schema and returns its string
+ * representation.
+ *
- ICEBERG: Converts the internal schema to an Iceberg schema and returns its JSON
+ * representation.
+ *
- DELTA: Converts the internal schema to a Spark schema and returns its JSON
+ * representation.
+ *
+ *
+ * @param targetTable the target table containing the desired format information
+ * @param internalTable the internal table from which the schema is read
+ * @return the string representation of the converted schema
+ * @throws UnsupportedOperationException if the target table format is not supported
+ */
+ private String extractSchemaString(TargetTable targetTable, InternalTable internalTable) {
+ switch (targetTable.getFormatName()) {
+ case TableFormat.HUDI:
+ return AvroSchemaConverter.getInstance()
+ .fromInternalSchema(internalTable.getReadSchema())
+ .toString();
+ case TableFormat.ICEBERG:
+ org.apache.iceberg.Schema iceSchema =
+ IcebergSchemaExtractor.getInstance().toIceberg(internalTable.getReadSchema());
+ return SchemaParser.toJson(iceSchema);
+ case TableFormat.DELTA:
+ return SparkSchemaExtractor.getInstance()
+ .fromInternalSchema(internalTable.getReadSchema())
+ .json();
+ default:
+ throw new UnsupportedOperationException(
+ "Unsupported table format: " + targetTable.getFormatName());
+ }
+ }
+}
diff --git a/xtable-service/src/main/java/org/apache/xtable/service/ConversionServiceConfig.java b/xtable-service/src/main/java/org/apache/xtable/service/ConversionServiceConfig.java
new file mode 100644
index 000000000..1da7c0594
--- /dev/null
+++ b/xtable-service/src/main/java/org/apache/xtable/service/ConversionServiceConfig.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.service;
+
+import org.eclipse.microprofile.config.inject.ConfigProperty;
+
+import jakarta.enterprise.context.ApplicationScoped;
+
+@ApplicationScoped
+public class ConversionServiceConfig {
+
+ public static final String HADOOP_DEFAULTS_XML = "xtable-hadoop-defaults.xml";
+
+ @ConfigProperty(name = "xtable.hadoop-config-path", defaultValue = HADOOP_DEFAULTS_XML)
+ private String hadoopConfigPath;
+
+ public String getHadoopConfigPath() {
+ return hadoopConfigPath;
+ }
+}
diff --git a/xtable-service/src/main/java/org/apache/xtable/service/models/ConvertTableRequest.java b/xtable-service/src/main/java/org/apache/xtable/service/models/ConvertTableRequest.java
new file mode 100644
index 000000000..465c3c0c0
--- /dev/null
+++ b/xtable-service/src/main/java/org/apache/xtable/service/models/ConvertTableRequest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.service.models;
+
+import java.util.List;
+import java.util.Map;
+
+import lombok.Builder;
+import lombok.Getter;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+@Getter
+@Builder
+public class ConvertTableRequest {
+ @JsonProperty("source-format")
+ private String sourceFormat;
+
+ @JsonProperty("source-table-name")
+ private String sourceTableName;
+
+ @JsonProperty("source-table-path")
+ private String sourceTablePath;
+
+ @JsonProperty("target-formats")
+ private List targetFormats;
+
+ @JsonProperty("configurations")
+ private Map configurations;
+
+ public ConvertTableRequest() {}
+
+ @JsonCreator
+ public ConvertTableRequest(
+ @JsonProperty("source-format") String sourceFormat,
+ @JsonProperty("source-table-name") String sourceTableName,
+ @JsonProperty("source-table-path") String sourceTablePath,
+ @JsonProperty("target-format") List targetFormat,
+ @JsonProperty("configurations") Map configurations) {
+
+ this.sourceFormat = sourceFormat;
+ this.sourceTableName = sourceTableName;
+ this.sourceTablePath = sourceTablePath;
+ this.targetFormats = targetFormat;
+ this.configurations = configurations;
+ }
+}
diff --git a/xtable-service/src/main/java/org/apache/xtable/service/models/ConvertTableResponse.java b/xtable-service/src/main/java/org/apache/xtable/service/models/ConvertTableResponse.java
new file mode 100644
index 000000000..1581ea19c
--- /dev/null
+++ b/xtable-service/src/main/java/org/apache/xtable/service/models/ConvertTableResponse.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.service.models;
+
+import java.util.List;
+
+import lombok.Builder;
+import lombok.Getter;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+@Getter
+@Builder
+public class ConvertTableResponse {
+ @JsonProperty("convertedTables")
+ private List convertedTables;
+
+ @JsonCreator
+ public ConvertTableResponse(@JsonProperty List convertedTables) {
+ this.convertedTables = convertedTables;
+ }
+}
diff --git a/xtable-service/src/main/java/org/apache/xtable/service/models/ConvertedTable.java b/xtable-service/src/main/java/org/apache/xtable/service/models/ConvertedTable.java
new file mode 100644
index 000000000..12bc915e2
--- /dev/null
+++ b/xtable-service/src/main/java/org/apache/xtable/service/models/ConvertedTable.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.service.models;
+
+import lombok.Builder;
+import lombok.Getter;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+@Getter
+@Builder
+public class ConvertedTable {
+ @JsonProperty("target-format")
+ private String targetFormat;
+
+ @JsonProperty("target-metadata-path")
+ private String targetMetadataPath;
+
+ @JsonProperty("target-schema")
+ private String targetSchema;
+
+ @JsonCreator
+ public ConvertedTable(
+ @JsonProperty String targetFormat,
+ @JsonProperty String targetMetadataPath,
+ @JsonProperty String targetSchema) {
+ this.targetFormat = targetFormat;
+ this.targetMetadataPath = targetMetadataPath;
+ this.targetSchema = targetSchema;
+ }
+}
diff --git a/xtable-service/src/main/resources/application.properties b/xtable-service/src/main/resources/application.properties
new file mode 100644
index 000000000..f00ee7c8c
--- /dev/null
+++ b/xtable-service/src/main/resources/application.properties
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+quarkus.log.level=INFO
\ No newline at end of file
diff --git a/xtable-service/src/main/resources/xtable-hadoop-defaults.xml b/xtable-service/src/main/resources/xtable-hadoop-defaults.xml
new file mode 100644
index 000000000..0262bd83b
--- /dev/null
+++ b/xtable-service/src/main/resources/xtable-hadoop-defaults.xml
@@ -0,0 +1,91 @@
+
+
+
+
+
+
+ fs.file.impl
+ org.apache.hadoop.fs.LocalFileSystem
+
+
+
+
+ fs.azure.account.auth.type
+ OAuth
+
+
+ fs.azure.account.oauth.provider.type
+ org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider
+
+
+
+
+
+
+ fs.s3.impl
+ org.apache.hadoop.fs.s3a.S3AFileSystem
+
+
+ fs.s3.aws.credentials.provider
+ software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider
+
+
+ fs.s3a.impl
+ org.apache.hadoop.fs.s3a.S3AFileSystem
+
+
+ fs.s3a.aws.credentials.provider
+ software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider
+
+
+
+
+ fs.gs.impl
+ com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem
+
+
+ fs.AbstractFileSystem.gs.impl
+ com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS
+
+
+
+
+ spark.master
+ local[2]
+
+
+
+
+ parquet.avro.write-old-list-structure
+ false
+
+
+
diff --git a/xtable-service/src/test/java/org/apache/xtable/service/TestConversionResource.java b/xtable-service/src/test/java/org/apache/xtable/service/TestConversionResource.java
new file mode 100644
index 000000000..5591db869
--- /dev/null
+++ b/xtable-service/src/test/java/org/apache/xtable/service/TestConversionResource.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.service;
+
+import static org.junit.jupiter.api.Assertions.*;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import org.apache.xtable.model.storage.TableFormat;
+import org.apache.xtable.service.models.ConvertTableRequest;
+import org.apache.xtable.service.models.ConvertTableResponse;
+import org.apache.xtable.service.models.ConvertedTable;
+
+@ExtendWith(MockitoExtension.class)
+class TestConversionResource {
+
+ private static final String SOURCE_TABLE_NAME = "users";
+ private static final String SOURCE_TABLE_BASE_PATH = "s3://bucket/tables/users";
+ private static final String TARGET_ICEBERG_METADATA_PATH = "s3://bucket/tables/users/metadata";
+
+ @Mock private ConversionService conversionService;
+
+ @InjectMocks private ConversionResource resource;
+
+ @Test
+ void testConvertTableResource() {
+ ConvertTableRequest req =
+ ConvertTableRequest.builder()
+ .sourceFormat(TableFormat.DELTA)
+ .sourceTableName(SOURCE_TABLE_NAME)
+ .sourceTablePath(SOURCE_TABLE_BASE_PATH)
+ .targetFormats(Arrays.asList(TableFormat.ICEBERG))
+ .build();
+
+ ConvertedTable icebergTable =
+ ConvertedTable.builder()
+ .targetFormat(TableFormat.ICEBERG)
+ .targetMetadataPath(TARGET_ICEBERG_METADATA_PATH)
+ .build();
+
+ ConvertTableResponse expected =
+ ConvertTableResponse.builder().convertedTables(Arrays.asList(icebergTable)).build();
+ when(conversionService.convertTable(req)).thenReturn(expected);
+ ConvertTableResponse actual = resource.convertTable(req);
+ verify(conversionService).convertTable(req);
+
+ assertNotNull(actual);
+ assertSame(expected, actual, "Resource should return the exact response from the service");
+
+ assertEquals(1, actual.getConvertedTables().size());
+ assertEquals(TableFormat.ICEBERG, actual.getConvertedTables().get(0).getTargetFormat());
+ assertEquals(
+ TARGET_ICEBERG_METADATA_PATH, actual.getConvertedTables().get(0).getTargetMetadataPath());
+ }
+}
diff --git a/xtable-service/src/test/java/org/apache/xtable/service/TestConversionService.java b/xtable-service/src/test/java/org/apache/xtable/service/TestConversionService.java
new file mode 100644
index 000000000..d22b561cf
--- /dev/null
+++ b/xtable-service/src/test/java/org/apache/xtable/service/TestConversionService.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.service;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.spark.sql.types.StructType;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mock;
+import org.mockito.MockedStatic;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import org.apache.iceberg.SchemaParser;
+
+import org.apache.xtable.avro.AvroSchemaConverter;
+import org.apache.xtable.conversion.ConversionController;
+import org.apache.xtable.conversion.ConversionSourceProvider;
+import org.apache.xtable.iceberg.IcebergSchemaExtractor;
+import org.apache.xtable.model.InternalTable;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.storage.TableFormat;
+import org.apache.xtable.schema.SparkSchemaExtractor;
+import org.apache.xtable.service.models.ConvertTableRequest;
+import org.apache.xtable.service.models.ConvertTableResponse;
+import org.apache.xtable.service.models.ConvertedTable;
+import org.apache.xtable.spi.extractor.ConversionSource;
+
+@ExtendWith(MockitoExtension.class)
+class TestConversionService {
+ private static final String SOURCE_NAME = "users";
+ private static final String SOURCE_PATH = "s3://bucket/tables/users";
+ private static final String HUDI_META_PATH = "s3://bucket/tables/users/.hoodie";
+ private static final String ICEBERG_META_PATH =
+ "s3://bucket/tables/users/metadata/v1.metadata.json";
+ private static final String DELTA_META_PATH = "s3://bucket/tables/users/delta_log";
+
+ private static final String HUDI_SCHEMA_JSON =
+ "{\n"
+ + " \"type\":\"record\",\n"
+ + " \"name\":\"Users\",\n"
+ + " \"fields\":[{\"name\":\"id\",\"type\":\"string\"}]\n"
+ + "}";
+
+ private static final String ICEBERG_JSON =
+ "{\"type\":\"record\",\"name\":\"Users\","
+ + "\"fields\":[{\"name\":\"id\",\"type\":\"string\",\"field-id\":1}]}";
+
+ private static final String DELTA_JSON =
+ "{\"type\":\"struct\",\"fields\":["
+ + "{\"name\":\"id\",\"type\":\"string\",\"nullable\":false,\"metadata\":{}}]}";
+
+ @Mock private ConversionServiceConfig serviceConfig;
+
+ @Mock private ConversionController controller;
+
+ @Mock ConversionSourceProvider provider;
+
+ @Mock ConversionSource conversionSrc;
+
+ @Mock InternalTable internalTbl;
+
+ @Mock InternalSchema internalSchema;
+
+ private ConversionService service;
+ private Configuration conf;
+
+ @BeforeEach
+ void setUp() {
+ this.conf = new Configuration();
+ Map> providers = new HashMap<>();
+ providers.put(TableFormat.DELTA, provider);
+ providers.put(TableFormat.HUDI, provider);
+ providers.put(TableFormat.ICEBERG, provider);
+ service = new ConversionService(serviceConfig, controller, this.conf, providers);
+ }
+
+ @Test
+ void convertToTargetHudi() {
+ ConvertTableRequest req =
+ ConvertTableRequest.builder()
+ .sourceFormat(TableFormat.DELTA)
+ .sourceTableName(SOURCE_NAME)
+ .sourceTablePath(SOURCE_PATH)
+ .targetFormats(Collections.singletonList(TableFormat.HUDI))
+ .build();
+
+ Schema avroSchema = new Schema.Parser().parse(HUDI_SCHEMA_JSON);
+ try (MockedStatic avroConv = mockStatic(AvroSchemaConverter.class)) {
+ when(controller.sync(any(), eq(provider))).thenReturn(null);
+ when(provider.getConversionSourceInstance(any())).thenReturn(conversionSrc);
+ when(conversionSrc.getCurrentTable()).thenReturn(internalTbl);
+
+ when(internalTbl.getName()).thenReturn(TableFormat.HUDI);
+ when(internalTbl.getLatestMetdataPath()).thenReturn(HUDI_META_PATH);
+ when(internalTbl.getReadSchema()).thenReturn(internalSchema);
+
+ AvroSchemaConverter converter = mock(AvroSchemaConverter.class);
+ avroConv.when(AvroSchemaConverter::getInstance).thenReturn(converter);
+ when(converter.fromInternalSchema(internalSchema)).thenReturn(avroSchema);
+
+ ConvertTableResponse resp = service.convertTable(req);
+
+ verify(controller).sync(any(), eq(provider));
+ assertEquals(1, resp.getConvertedTables().size());
+ ConvertedTable ct = resp.getConvertedTables().get(0);
+ assertEquals(TableFormat.HUDI, ct.getTargetFormat());
+ assertEquals(HUDI_META_PATH, ct.getTargetMetadataPath());
+ assertEquals(avroSchema.toString(), ct.getTargetSchema());
+ }
+ }
+
+ @Test
+ void convertToTargetIceberg() {
+ ConvertTableRequest req =
+ ConvertTableRequest.builder()
+ .sourceFormat(TableFormat.DELTA)
+ .sourceTableName(SOURCE_NAME)
+ .sourceTablePath(SOURCE_PATH)
+ .targetFormats(Collections.singletonList(TableFormat.ICEBERG))
+ .build();
+
+ org.apache.iceberg.Schema icebergSchema = mock(org.apache.iceberg.Schema.class);
+ try (MockedStatic iceExt = mockStatic(IcebergSchemaExtractor.class);
+ MockedStatic parserMock = mockStatic(SchemaParser.class)) {
+
+ when(controller.sync(any(), eq(provider))).thenReturn(null);
+ when(provider.getConversionSourceInstance(any())).thenReturn(conversionSrc);
+ when(conversionSrc.getCurrentTable()).thenReturn(internalTbl);
+
+ when(internalTbl.getName()).thenReturn(TableFormat.ICEBERG);
+ when(internalTbl.getLatestMetdataPath()).thenReturn(ICEBERG_META_PATH);
+ when(internalTbl.getReadSchema()).thenReturn(internalSchema);
+
+ IcebergSchemaExtractor extractor = mock(IcebergSchemaExtractor.class);
+ iceExt.when(IcebergSchemaExtractor::getInstance).thenReturn(extractor);
+ when(extractor.toIceberg(internalSchema)).thenReturn(icebergSchema);
+
+ parserMock.when(() -> SchemaParser.toJson(icebergSchema)).thenReturn(ICEBERG_JSON);
+
+ ConvertTableResponse resp = service.convertTable(req);
+
+ verify(controller).sync(any(), eq(provider));
+ assertEquals(1, resp.getConvertedTables().size());
+ ConvertedTable ct = resp.getConvertedTables().get(0);
+ assertEquals(TableFormat.ICEBERG, ct.getTargetFormat());
+ assertEquals(ICEBERG_META_PATH, ct.getTargetMetadataPath());
+ assertEquals(ICEBERG_JSON, ct.getTargetSchema());
+ }
+ }
+
+ @Test
+ void convertToTargetDelta() {
+ ConvertTableRequest req =
+ ConvertTableRequest.builder()
+ .sourceFormat(TableFormat.ICEBERG)
+ .sourceTableName(SOURCE_NAME)
+ .sourceTablePath(SOURCE_PATH)
+ .targetFormats(Collections.singletonList(TableFormat.DELTA))
+ .build();
+
+ StructType structType = mock(StructType.class);
+ try (MockedStatic sparkExt = mockStatic(SparkSchemaExtractor.class)) {
+ when(controller.sync(any(), eq(provider))).thenReturn(null);
+ when(provider.getConversionSourceInstance(any())).thenReturn(conversionSrc);
+ when(conversionSrc.getCurrentTable()).thenReturn(internalTbl);
+
+ when(internalTbl.getName()).thenReturn(TableFormat.DELTA);
+ when(internalTbl.getLatestMetdataPath()).thenReturn(DELTA_META_PATH);
+ when(internalTbl.getReadSchema()).thenReturn(internalSchema);
+
+ SparkSchemaExtractor extractor = mock(SparkSchemaExtractor.class);
+ sparkExt.when(SparkSchemaExtractor::getInstance).thenReturn(extractor);
+ when(extractor.fromInternalSchema(internalSchema)).thenReturn(structType);
+ when(structType.json()).thenReturn(DELTA_JSON);
+
+ ConvertTableResponse resp = service.convertTable(req);
+
+ verify(controller).sync(any(), eq(provider));
+ assertEquals(1, resp.getConvertedTables().size());
+ ConvertedTable ct = resp.getConvertedTables().get(0);
+ assertEquals(TableFormat.DELTA, ct.getTargetFormat());
+ assertEquals(DELTA_META_PATH, ct.getTargetMetadataPath());
+ assertEquals(DELTA_JSON, ct.getTargetSchema());
+ }
+ }
+}