Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,24 @@ private void routeRecordStatically(SinkRecord record) {
String routeField = config.tablesRouteField();

if (routeField == null) {
// route to all tables
config
.tables()
.forEach(
tableName -> {
writerForTable(tableName, record, false).write(record);
});
// try topic-based routing: match last segment of topic to last segment of table name
String topicLastSegment = lastSegment(record.topic());
boolean matched = false;
for (String tableName : config.tables()) {
if (topicLastSegment.equalsIgnoreCase(lastSegment(tableName))) {
writerForTable(tableName, record, false).write(record);
matched = true;
}
}
if (!matched) {
// no topic match found, fall back to broadcasting for backward compatibility
config
.tables()
.forEach(
tableName -> {
writerForTable(tableName, record, false).write(record);
});
}

} else {
String routeValue = extractRouteValue(record.value(), routeField);
Expand Down Expand Up @@ -133,6 +144,12 @@ private String extractRouteValue(Object recordValue, String routeField) {
return routeValue == null ? null : routeValue.toString();
}

/** Extracts the last dot-delimited segment from a qualified name. */
private static String lastSegment(String name) {
int lastDot = name.lastIndexOf('.');
return lastDot >= 0 ? name.substring(lastDot + 1) : name;
}

private RecordWriter writerForTable(
String tableName, SinkRecord sample, boolean ignoreMissingTable) {
return writers.computeIfAbsent(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,10 @@ public class TestSinkWriter {

private static final Namespace NAMESPACE = Namespace.of("db");
private static final String TABLE_NAME = "tbl";
private static final String TABLE2_NAME = "tbl2";
private static final TableIdentifier TABLE_IDENTIFIER = TableIdentifier.of(NAMESPACE, TABLE_NAME);
private static final TableIdentifier TABLE2_IDENTIFIER =
TableIdentifier.of(NAMESPACE, TABLE2_NAME);
private static final Schema SCHEMA =
new Schema(
optional(1, "id", Types.LongType.get()),
Expand All @@ -68,6 +71,7 @@ public void before() {
catalog = initInMemoryCatalog();
catalog.createNamespace(NAMESPACE);
catalog.createTable(TABLE_IDENTIFIER, SCHEMA);
catalog.createTable(TABLE2_IDENTIFIER, SCHEMA);
}

@AfterEach
Expand Down Expand Up @@ -167,8 +171,58 @@ public void testDynamicNoRoute() {
assertThat(writerResults).hasSize(0);
}

@Test
public void testTopicRoute() {
IcebergSinkConfig config = mock(IcebergSinkConfig.class);
when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class));
when(config.tables())
.thenReturn(
ImmutableList.of(TABLE_IDENTIFIER.toString(), TABLE2_IDENTIFIER.toString()));
Map<String, Object> value = ImmutableMap.of();

// topic "src.tbl" should match table "db.tbl" (last segment = "tbl")
List<IcebergWriterResult> writerResults =
sinkWriterTestWithTopic(value, config, "src.tbl");
assertThat(writerResults).hasSize(1);
}

@Test
public void testTopicRouteSecondTable() {
IcebergSinkConfig config = mock(IcebergSinkConfig.class);
when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class));
when(config.tables())
.thenReturn(
ImmutableList.of(TABLE_IDENTIFIER.toString(), TABLE2_IDENTIFIER.toString()));
Map<String, Object> value = ImmutableMap.of();

// topic "src.tbl2" should match table "db.tbl2" (last segment = "tbl2")
List<IcebergWriterResult> writerResults =
sinkWriterTestWithTopic(value, config, "src.tbl2");
assertThat(writerResults).hasSize(1);
}

@Test
public void testTopicRouteFallbackBroadcast() {
IcebergSinkConfig config = mock(IcebergSinkConfig.class);
when(config.tableConfig(any())).thenReturn(mock(TableSinkConfig.class));
when(config.tables())
.thenReturn(
ImmutableList.of(TABLE_IDENTIFIER.toString(), TABLE2_IDENTIFIER.toString()));
Map<String, Object> value = ImmutableMap.of();

// topic "unmatched" does not match any table, should broadcast to all
List<IcebergWriterResult> writerResults =
sinkWriterTestWithTopic(value, config, "unmatched");
assertThat(writerResults).hasSize(2);
}

private List<IcebergWriterResult> sinkWriterTest(
Map<String, Object> value, IcebergSinkConfig config) {
return sinkWriterTestWithTopic(value, config, "topic");
}

private List<IcebergWriterResult> sinkWriterTestWithTopic(
Map<String, Object> value, IcebergSinkConfig config, String topic) {
IcebergWriterResult writeResult =
new IcebergWriterResult(
TableIdentifier.parse(TABLE_NAME),
Expand All @@ -187,7 +241,7 @@ private List<IcebergWriterResult> sinkWriterTest(
Instant now = Instant.now().truncatedTo(ChronoUnit.MILLIS);
SinkRecord rec =
new SinkRecord(
"topic",
topic,
1,
null,
"key",
Expand All @@ -200,7 +254,7 @@ private List<IcebergWriterResult> sinkWriterTest(

SinkWriterResult result = sinkWriter.completeWrite();

Offset offset = result.sourceOffsets().get(new TopicPartition("topic", 1));
Offset offset = result.sourceOffsets().get(new TopicPartition(topic, 1));
assertThat(offset).isNotNull();
assertThat(offset.offset()).isEqualTo(101L); // should be 1 more than current offset
assertThat(offset.timestamp()).isEqualTo(now.atOffset(ZoneOffset.UTC));
Expand Down
Loading