Skip to content

Commit

Permalink
Allow unmapped fields for schematized records (#54)
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanck authored Jul 21, 2023
1 parent f9762f1 commit 19bed4d
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 7 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ subprojects {
apply plugin: "maven-publish"

group "io.tabular.connect"
version "0.4.7-SNAPSHOT"
version "0.4.7"

repositories {
mavenCentral()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package io.tabular.iceberg.connect.data;

import static java.lang.String.format;

import io.tabular.iceberg.connect.IcebergSinkConfig;
import java.io.Closeable;
import java.io.IOException;
Expand All @@ -29,6 +31,7 @@
import org.apache.iceberg.data.Record;
import org.apache.iceberg.io.TaskWriter;
import org.apache.iceberg.io.WriteResult;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.sink.SinkRecord;

public class IcebergWriter implements Closeable {
Expand Down Expand Up @@ -59,8 +62,12 @@ public void write(SinkRecord record) {
writer.write(new RecordWrapper(row, op));
}
}
} catch (IOException e) {
throw new UncheckedIOException(e);
} catch (Exception e) {
throw new DataException(
format(
"An error occurred converting record, topic: %s, partition, %d, offset: %d",
record.topic(), record.kafkaPartition(), record.kafkaOffset()),
e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.iceberg.types.Types.StructType;
import org.apache.iceberg.types.Types.TimestampType;
import org.apache.iceberg.util.DateTimeUtil;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.json.JsonConverter;

Expand Down Expand Up @@ -164,7 +165,12 @@ private Object getFieldValue(Object value, String fieldName) {
if (value instanceof Map) {
return ((Map<?, ?>) value).get(fieldName);
} else if (value instanceof Struct) {
return ((Struct) value).get(fieldName);
Struct struct = (Struct) value;
Field field = struct.schema().field(fieldName);
if (field == null) {
return null;
}
return struct.get(field);
}
throw new IllegalArgumentException("Cannot convert to struct: " + value.getClass().getName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,11 @@ public class RecordConverterTest {
Types.NestedField.required(
36,
"ma",
Types.MapType.ofRequired(37, 38, Types.StringType.get(), Types.StringType.get())));
Types.MapType.ofRequired(37, 38, Types.StringType.get(), Types.StringType.get())),
Types.NestedField.optional(39, "extra", Types.StringType.get()));

// we have 1 unmapped column so exclude that from the count
private static final int MAPPED_CNT = SCHEMA.columns().size() - 1;

private static final org.apache.iceberg.Schema NESTED_SCHEMA =
new org.apache.iceberg.Schema(
Expand Down Expand Up @@ -169,7 +173,7 @@ public void testMapToString() throws Exception {

String str = (String) record.getField("st");
Map<String, Object> map = (Map<String, Object>) MAPPER.readValue(str, Map.class);
assertEquals(SCHEMA.columns().size(), map.size());
assertEquals(MAPPED_CNT, map.size());
}

@Test
Expand Down Expand Up @@ -206,7 +210,7 @@ public void testStructToString() throws Exception {

String str = (String) record.getField("st");
Map<String, Object> map = (Map<String, Object>) MAPPER.readValue(str, Map.class);
assertEquals(SCHEMA.columns().size(), map.size());
assertEquals(MAPPED_CNT, map.size());
}

@Test
Expand Down

0 comments on commit 19bed4d

Please sign in to comment.