Skip to content

Commit

Permalink
Support catalog type property and setting catalog name (#41)
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanck authored Jul 10, 2023
1 parent 06e6cec commit 73e55f4
Show file tree
Hide file tree
Showing 10 changed files with 40 additions and 25 deletions.
13 changes: 7 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ The zip archive will be found under `./kafka-connect-runtime/build/distributions
| iceberg.control.commitIntervalMs | Commit interval in msec, default is 300,000 (5 min) |
| iceberg.control.commitTimeoutMs | Commit timeout interval in msec, default is 30,000 (30 sec) |
| iceberg.control.commitThreads | Number of threads to use for commits, default is (cores * 2) |
| iceberg.catalog | Name of the catalog, default is `iceberg` |
| iceberg.catalog.* | Properties passed through to Iceberg catalog initialization |
| iceberg.kafka.* | Properties passed through to control topic Kafka client initialization |

Expand All @@ -59,7 +60,7 @@ otherwise you will need to include that yourself.

### REST example
```
"iceberg.catalog": "org.apache.iceberg.rest.RESTCatalog",
"iceberg.catalog.type": "rest",
"iceberg.catalog.uri": "https://catalog-service",
"iceberg.catalog.credential": "<credential>",
"iceberg.catalog.warehouse": "<warehouse>",
Expand All @@ -69,7 +70,7 @@ otherwise you will need to include that yourself.
NOTE: Use the distribution that includes the HMS client (or include the HMS client yourself). Use `S3FileIO` when
using S3 for storage (the default is `HadoopFileIO` with `HiveCatalog`).
```
"iceberg.catalog":"org.apache.iceberg.hive.HiveCatalog",
"iceberg.catalog.tyoe":"hive",
"iceberg.catalog.uri":"thrift://hive:9083",
"iceberg.catalog.io-impl":"org.apache.iceberg.aws.s3.S3FileIO",
"iceberg.catalog.warehouse":"s3a://bucket/warehouse",
Expand Down Expand Up @@ -131,7 +132,7 @@ This example config connects to a Iceberg REST catalog.
"tasks.max": "2",
"topics": "events",
"iceberg.tables": "default.events",
"iceberg.catalog": "org.apache.iceberg.rest.RESTCatalog",
"iceberg.catalog.type": "rest",
"iceberg.catalog.uri": "https://localhost",
"iceberg.catalog.credential": "<credential>",
"iceberg.catalog.warehouse": "<warehouse name>"
Expand Down Expand Up @@ -173,7 +174,7 @@ PARTITIONED BY (hours(ts));
"iceberg.tables.routeField": "type",
"iceberg.table.default.events_list.routeRegex": "list",
"iceberg.table.default.events_create.routeRegex": "create",
"iceberg.catalog": "org.apache.iceberg.rest.RESTCatalog",
"iceberg.catalog.type": "rest",
"iceberg.catalog.uri": "https://localhost",
"iceberg.catalog.credential": "<credential>",
"iceberg.catalog.warehouse": "<warehouse name>"
Expand All @@ -199,7 +200,7 @@ See above for creating two tables.
"topics": "events",
"iceberg.tables.dynamic.enabled": "true",
"iceberg.tables.routeField": "db_table",
"iceberg.catalog": "org.apache.iceberg.rest.RESTCatalog",
"iceberg.catalog.type": "rest",
"iceberg.catalog.uri": "https://localhost",
"iceberg.catalog.credential": "<credential>",
"iceberg.catalog.warehouse": "<warehouse name>"
Expand All @@ -226,7 +227,7 @@ See above for creating the table
"topics": "events",
"iceberg.tables": "default.events",
"iceberg.tables.cdcField": "_cdc_op",
"iceberg.catalog": "org.apache.iceberg.rest.RESTCatalog",
"iceberg.catalog.type": "rest",
"iceberg.catalog.uri": "https://localhost",
"iceberg.catalog.credential": "<credential>",
"iceberg.catalog.warehouse": "<warehouse name>"
Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ subprojects {
apply plugin: "maven-publish"

group "io.tabular.connect"
version "0.4.3-SNAPSHOT"
version "0.4.3"

repositories {
mavenCentral()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.PartitionSpec;
Expand All @@ -44,7 +45,6 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.rest.RESTCatalog;
import org.apache.iceberg.types.Types;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
Expand Down Expand Up @@ -105,7 +105,9 @@ public void testIcebergSink() throws Exception {
.config("iceberg.tables.cdcField", "op")
.config("iceberg.control.commitIntervalMs", 1000)
.config("iceberg.control.commitTimeoutMs", Integer.MAX_VALUE)
.config("iceberg.catalog", RESTCatalog.class.getName())
.config(
"iceberg.catalog." + CatalogUtil.ICEBERG_CATALOG_TYPE,
CatalogUtil.ICEBERG_CATALOG_TYPE_REST)
.config("iceberg.catalog." + CatalogProperties.URI, "http://iceberg:8181")
.config("iceberg.catalog." + S3FileIOProperties.ENDPOINT, "http://minio:9000")
.config("iceberg.catalog." + S3FileIOProperties.ACCESS_KEY_ID, AWS_ACCESS_KEY)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
Expand All @@ -38,7 +39,6 @@
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.rest.RESTCatalog;
import org.apache.iceberg.types.Types;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
Expand Down Expand Up @@ -99,7 +99,9 @@ public void testIcebergSink() {
.config("iceberg.tables.routeField", "payload")
.config("iceberg.control.commitIntervalMs", 1000)
.config("iceberg.control.commitTimeoutMs", Integer.MAX_VALUE)
.config("iceberg.catalog", RESTCatalog.class.getName())
.config(
"iceberg.catalog." + CatalogUtil.ICEBERG_CATALOG_TYPE,
CatalogUtil.ICEBERG_CATALOG_TYPE_REST)
.config("iceberg.catalog." + CatalogProperties.URI, "http://iceberg:8181")
.config("iceberg.catalog." + S3FileIOProperties.ENDPOINT, "http://minio:9000")
.config("iceberg.catalog." + S3FileIOProperties.ACCESS_KEY_ID, AWS_ACCESS_KEY)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
Expand All @@ -38,7 +39,6 @@
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.rest.RESTCatalog;
import org.apache.iceberg.types.Types;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
Expand Down Expand Up @@ -103,7 +103,9 @@ public void testIcebergSink() {
.config(format("iceberg.table.%s.%s.routeRegex", TEST_DB, TEST_TABLE2), "type2")
.config("iceberg.control.commitIntervalMs", 1000)
.config("iceberg.control.commitTimeoutMs", Integer.MAX_VALUE)
.config("iceberg.catalog", RESTCatalog.class.getName())
.config(
"iceberg.catalog." + CatalogUtil.ICEBERG_CATALOG_TYPE,
CatalogUtil.ICEBERG_CATALOG_TYPE_REST)
.config("iceberg.catalog." + CatalogProperties.URI, "http://iceberg:8181")
.config("iceberg.catalog." + S3FileIOProperties.ENDPOINT, "http://minio:9000")
.config("iceberg.catalog." + S3FileIOProperties.ACCESS_KEY_ID, AWS_ACCESS_KEY)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
Expand All @@ -39,7 +40,6 @@
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.rest.RESTCatalog;
import org.apache.iceberg.types.Types;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
Expand Down Expand Up @@ -96,7 +96,9 @@ public void testIcebergSink() throws Exception {
.config("iceberg.tables", format("%s.%s", TEST_DB, TEST_TABLE))
.config("iceberg.control.commitIntervalMs", 1000)
.config("iceberg.control.commitTimeoutMs", Integer.MAX_VALUE)
.config("iceberg.catalog", RESTCatalog.class.getName())
.config(
"iceberg.catalog." + CatalogUtil.ICEBERG_CATALOG_TYPE,
CatalogUtil.ICEBERG_CATALOG_TYPE_REST)
.config("iceberg.catalog." + CatalogProperties.URI, "http://iceberg:8181")
.config("iceberg.catalog." + S3FileIOProperties.ENDPOINT, "http://minio:9000")
.config("iceberg.catalog." + S3FileIOProperties.ACCESS_KEY_ID, AWS_ACCESS_KEY)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public class IcebergSinkConfig extends AbstractConfig {
private static final String KAFKA_PROP_PREFIX = "iceberg.kafka.";
private static final String TABLE_PROP_PREFIX = "iceberg.table.";

private static final String CATALOG_IMPL_PROP = "iceberg.catalog";
private static final String CATALOG_NAME_PROP = "iceberg.catalog";
private static final String TABLES_PROP = "iceberg.tables";
private static final String TABLES_DYNAMIC_PROP = "iceberg.tables.dynamic.enabled";
private static final String TABLES_ROUTE_FIELD_PROP = "iceberg.tables.routeField";
Expand All @@ -75,6 +75,7 @@ public class IcebergSinkConfig extends AbstractConfig {
private static final String NAME_PROP = "name";
private static final String BOOTSTRAP_SERVERS_PROP = "bootstrap.servers";

private static final String DEFAULT_CATALOG_NAME = "iceberg";
private static final String DEFAULT_CONTROL_TOPIC = "control-iceberg";
public static final String DEFAULT_CONTROL_GROUP_PREFIX = "cg-control-";

Expand Down Expand Up @@ -125,7 +126,12 @@ private static ConfigDef newConfigDef() {
false,
Importance.MEDIUM,
"Set to true to treat all appends as upserts, false otherwise");
configDef.define(CATALOG_IMPL_PROP, Type.STRING, Importance.HIGH, "Iceberg catalog class name");
configDef.define(
CATALOG_NAME_PROP,
Type.STRING,
DEFAULT_CATALOG_NAME,
Importance.MEDIUM,
"Iceberg catalog name");
configDef.define(
CONTROL_TOPIC_PROP,
Type.STRING,
Expand Down Expand Up @@ -186,6 +192,7 @@ public IcebergSinkConfig(Map<String, String> originalProps) {
}

private void validate() {
checkState(!getCatalogProps().isEmpty(), "Must specify Iceberg catalog properties");
if (getTables() != null) {
checkState(!getDynamicTablesEnabled(), "Cannot specify both static and dynamic table names");
} else if (getDynamicTablesEnabled()) {
Expand Down Expand Up @@ -223,8 +230,8 @@ public Map<String, String> getKafkaProps() {
return kafkaProps;
}

public String getCatalogImpl() {
return getString(CATALOG_IMPL_PROP);
public String getCatalogName() {
return getString(CATALOG_NAME_PROP);
}

public List<String> getTables() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ public class Utilities {
private static final Logger LOG = LoggerFactory.getLogger(Utilities.class.getName());

public static Catalog loadCatalog(IcebergSinkConfig config) {
return CatalogUtil.loadCatalog(
config.getCatalogImpl(), "iceberg", config.getCatalogProps(), getHadoopConfig());
return CatalogUtil.buildIcebergCatalog(
config.getCatalogName(), config.getCatalogProps(), getHadoopConfig());
}

private static Object getHadoopConfig() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

import java.util.Map;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.rest.RESTCatalog;
import org.apache.kafka.common.config.ConfigException;
import org.junit.jupiter.api.Test;

Expand All @@ -47,7 +46,7 @@ public void testInvalid() {
Map<String, String> props =
ImmutableMap.of(
"topics", "source-topic",
"iceberg.catalog", RESTCatalog.class.getName(),
"iceberg.catalog.type", "rest",
"iceberg.tables", "db.landing",
"iceberg.tables.dynamic.enabled", "true");
assertThatExceptionOfType(ConfigException.class).isThrownBy(() -> new IcebergSinkConfig(props));
Expand All @@ -57,7 +56,7 @@ public void testInvalid() {
public void testGetDefault() {
Map<String, String> props =
ImmutableMap.of(
"iceberg.catalog", RESTCatalog.class.getName(),
"iceberg.catalog.type", "rest",
"topics", "source-topic",
"iceberg.tables", "db.landing");
IcebergSinkConfig config = new IcebergSinkConfig(props);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public void testIsLeader() {
task.start(
ImmutableMap.of(
"topics", "topic1, topic2",
"iceberg.catalog", "catalog",
"iceberg.catalog.type", "rest",
"iceberg.tables", "table"));

List<TopicPartition> assignments =
Expand Down

0 comments on commit 73e55f4

Please sign in to comment.