Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 35 additions & 1 deletion src/main/java/io/confluent/connect/jdbc/sink/JdbcDbWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.header.Header;

import java.sql.Connection;
import java.sql.SQLException;
Expand All @@ -31,6 +32,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static io.confluent.connect.jdbc.sink.JdbcSinkConfig.TABLE_NAME_FORMAT;
import static io.confluent.connect.jdbc.sink.JdbcSinkConfig.TABLE_NAME_FORMAT_RECORD_HEADER;

public class JdbcDbWriter {
private static final Logger log = LoggerFactory.getLogger(JdbcDbWriter.class);

Expand Down Expand Up @@ -68,7 +72,7 @@ void write(final Collection<SinkRecord> records)
try {
final Map<TableId, BufferedRecords> bufferByTable = new HashMap<>();
for (SinkRecord record : records) {
final TableId tableId = destinationTable(record.topic(), schemaName, catalogName);
final TableId tableId = determineTableId(record, schemaName, catalogName);
BufferedRecords buffer = bufferByTable.get(tableId);
if (buffer == null) {
buffer = new BufferedRecords(config, tableId, dbDialect, dbStructure, connection);
Expand Down Expand Up @@ -140,4 +144,34 @@ private Optional<String> getCatalogSafe(Connection connection) {
return Optional.empty();
}
}

private TableId determineTableId(SinkRecord sinkRecord, String schemaName, String catalogName) {
// Only try to get table name from header if config.tableNameFormat is __RECORD_HEADER__
if (TABLE_NAME_FORMAT_RECORD_HEADER.equals(config.tableNameFormat)) {
String headerTableName = extractTableNameFormatFromHeader(sinkRecord);
if (headerTableName != null && !headerTableName.trim().isEmpty()) {
log.debug("Using table name from header: {} for record from topic: {}",
headerTableName, sinkRecord.topic());
return new TableId(catalogName, schemaName, headerTableName.trim());
} else {
throw new ConnectException(String.format(
"Header '%s' is not set or empty for record from topic '%s'. "
+ "Please ensure the header is set correctly.",
TABLE_NAME_FORMAT, sinkRecord.topic()
));
}
}

// Fall back to default behavior (topic-based table naming)
return destinationTable(sinkRecord.topic(), schemaName, catalogName);
}

private String extractTableNameFormatFromHeader(SinkRecord sinkRecord) {
Header header = sinkRecord.headers().lastWithName(TABLE_NAME_FORMAT);
log.debug("Extracting table name format from header: {}", header);
if (header != null && header.value() != null) {
return header.value().toString();
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,16 @@ public enum TimestampPrecisionMode {
"A format string for the destination table name, which may contain '${topic}' as a "
+ "placeholder for the originating topic name.\n"
+ "For example, ``kafka_${topic}`` for the topic 'orders' will map to the table name "
+ "'kafka_orders'.";
+ "'kafka_orders'.\n\n"
+ "Special value: If set to ``__RECORD_HEADER__``, the table name will be dynamically "
+ "determined from the Kafka message headers. In this mode, the connector will look for "
+ "a header key ('table.name.format')"
+ "that contains the actual table name. This allows routing messages from a single topic "
+ "to different tables based on the message headers.\n"
+ "Example header: ``{\"table.name.format\": \"user_events\"}`` will route the message"
+ " to the 'user_events' table.";
private static final String TABLE_NAME_FORMAT_DISPLAY = "Table Name Format";
public static final String TABLE_NAME_FORMAT_RECORD_HEADER = "__RECORD_HEADER__";

public static final String MAX_RETRIES = "max.retries";
private static final int MAX_RETRIES_DEFAULT = 10;
Expand Down
211 changes: 211 additions & 0 deletions src/test/java/io/confluent/connect/jdbc/sink/JdbcSinkTaskTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

package io.confluent.connect.jdbc.sink;

import static io.confluent.connect.jdbc.sink.JdbcSinkConfig.TABLE_NAME_FORMAT;
import static io.confluent.connect.jdbc.sink.JdbcSinkConfig.TABLE_NAME_FORMAT_RECORD_HEADER;
import static org.easymock.EasyMock.anyObject;
import static org.easymock.EasyMock.expect;
import static org.easymock.EasyMock.expectLastCall;
Expand All @@ -41,6 +43,7 @@
import java.util.concurrent.CompletableFuture;

import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.data.Timestamp;
Expand Down Expand Up @@ -519,6 +522,214 @@ private List<SinkRecord> createRecordsList(int batchSize) {
return records;
}

@Test
public void putWithMultipleTableRoutingWithPkModeKafka() throws Exception {
Map<String, String> props = new HashMap<>();
props.put("connection.url", sqliteHelper.sqliteUri());
props.put(TABLE_NAME_FORMAT, TABLE_NAME_FORMAT_RECORD_HEADER);
props.put("auto.create", "true");
props.put("pk.mode", "kafka");
props.put("pk.fields", "kafka_topic,kafka_partition,kafka_offset");

JdbcSinkTask task = new JdbcSinkTask();
task.initialize(mock(SinkTaskContext.class));
task.start(props);

final Struct struct1 = new Struct(SCHEMA)
.put("firstName", "Alice")
.put("lastName", "Johnson")
.put("age", 28)
.put("modified", new Date(1474661402123L));

final Struct struct2 = new Struct(SCHEMA)
.put("firstName", "Bob")
.put("lastName", "Williams")
.put("age", 35)
.put("modified", new Date(1474661402123L));

final String topic = "source_topic";

// Create records with different target tables
SinkRecord record1 = new SinkRecord(topic, 1, null, null, SCHEMA, struct1, 44);
record1.headers().add(TABLE_NAME_FORMAT, new SchemaAndValue(Schema.STRING_SCHEMA, "users"));

SinkRecord record2 = new SinkRecord(topic, 1, null, null, SCHEMA, struct2, 45);
record2.headers().add(TABLE_NAME_FORMAT, new SchemaAndValue(Schema.STRING_SCHEMA, "employees"));

List<SinkRecord> records = new ArrayList<>();
records.add(record1);
records.add(record2);

task.put(records);

// Verify first record went to 'users' table
assertEquals(
1,
sqliteHelper.select(
"SELECT * FROM users",
new SqliteHelper.ResultSetReadCallback() {
@Override
public void read(ResultSet rs) throws SQLException {
assertEquals(struct1.getString("firstName"), rs.getString("firstName"));
assertEquals(struct1.getString("lastName"), rs.getString("lastName"));
assertEquals(44, rs.getLong("kafka_offset"));
}
}
)
);

// Verify second record went to 'employees' table
assertEquals(
1,
sqliteHelper.select(
"SELECT * FROM employees",
new SqliteHelper.ResultSetReadCallback() {
@Override
public void read(ResultSet rs) throws SQLException {
assertEquals(struct2.getString("firstName"), rs.getString("firstName"));
assertEquals(struct2.getString("lastName"), rs.getString("lastName"));
assertEquals(45, rs.getLong("kafka_offset"));
}
}
)
);
}

@Test
public void putWithMultipleTableRoutingWithPkModeRecordKey() throws Exception {
Map<String, String> props = new HashMap<>();
props.put("connection.url", sqliteHelper.sqliteUri());
props.put(TABLE_NAME_FORMAT, TABLE_NAME_FORMAT_RECORD_HEADER);
props.put("auto.create", "true");
props.put("pk.mode", "record_key");
props.put("pk.fields", ""); // Empty for record_key mode

JdbcSinkTask task = new JdbcSinkTask();
task.initialize(mock(SinkTaskContext.class));
task.start(props);

final Struct struct1 = new Struct(SCHEMA)
.put("firstName", "Alice")
.put("lastName", "Johnson")
.put("age", 28)
.put("modified", new Date(1474661402123L));

final Struct struct2 = new Struct(SCHEMA)
.put("firstName", "Bob")
.put("lastName", "Williams")
.put("age", 35)
.put("modified", new Date(1474661402123L));

final String topic = "source_topic";

// Define key schemas for record keys
final Schema keySchema = SchemaBuilder.struct()
.field("id", Schema.INT32_SCHEMA)
.field("type", Schema.STRING_SCHEMA)
.build();

// Create record keys
final Struct key1 = new Struct(keySchema)
.put("id", 1001)
.put("type", "user");

final Struct key2 = new Struct(keySchema)
.put("id", 2001)
.put("type", "employee");

// Create records with record keys and different target tables
SinkRecord record1 = new SinkRecord(topic, 1, keySchema, key1, SCHEMA, struct1, 44);
record1.headers().add(TABLE_NAME_FORMAT, new SchemaAndValue(Schema.STRING_SCHEMA, "users"));

SinkRecord record2 = new SinkRecord(topic, 1, keySchema, key2, SCHEMA, struct2, 45);
record2.headers().add(TABLE_NAME_FORMAT, new SchemaAndValue(Schema.STRING_SCHEMA, "employees"));

List<SinkRecord> records = new ArrayList<>();
records.add(record1);
records.add(record2);

task.put(records);

// Verify first record went to 'users' table with record key as PK
assertEquals(
1,
sqliteHelper.select(
"SELECT * FROM users",
new SqliteHelper.ResultSetReadCallback() {
@Override
public void read(ResultSet rs) throws SQLException {
// Verify data fields
assertEquals(struct1.getString("firstName"), rs.getString("firstName"));
assertEquals(struct1.getString("lastName"), rs.getString("lastName"));
assertEquals(struct1.getInt32("age").intValue(), rs.getInt("age"));

// Verify record key fields are used as primary key
assertEquals(key1.getInt32("id").intValue(), rs.getInt("id"));
assertEquals(key1.getString("type"), rs.getString("type"));
}
}
)
);

// Verify second record went to 'employees' table with record key as PK
assertEquals(
1,
sqliteHelper.select(
"SELECT * FROM employees",
new SqliteHelper.ResultSetReadCallback() {
@Override
public void read(ResultSet rs) throws SQLException {
// Verify data fields
assertEquals(struct2.getString("firstName"), rs.getString("firstName"));
assertEquals(struct2.getString("lastName"), rs.getString("lastName"));
assertEquals(struct2.getInt32("age").intValue(), rs.getInt("age"));

// Verify record key fields are used as primary key
assertEquals(key2.getInt32("id").intValue(), rs.getInt("id"));
assertEquals(key2.getString("type"), rs.getString("type"));
}
}
)
);
}

@Test
public void putWithInvalidHeaderShouldFail() throws ConnectException {
Map<String, String> props = new HashMap<>();
props.put("connection.url", sqliteHelper.sqliteUri());
props.put(TABLE_NAME_FORMAT, TABLE_NAME_FORMAT_RECORD_HEADER);
props.put("auto.create", "true");

JdbcSinkTask task = new JdbcSinkTask();
task.initialize(mock(SinkTaskContext.class));
task.start(props);

final Struct struct = new Struct(SCHEMA)
.put("firstName", "Test")
.put("lastName", "User")
.put("age", 30)
.put("modified", new Date(1474661402123L));

// Test case 1: Missing header
SinkRecord recordWithoutHeader = new SinkRecord("source_topic", 1, null, null, SCHEMA, struct, 46);
try {
task.put(Collections.singleton(recordWithoutHeader));
fail("Expected ConnectException for missing header");
} catch (ConnectException e) {
assertTrue("Exception should mention header issue", e.getMessage().contains("Header 'table.name.format'"));
}

// Test case 2: Empty header value
SinkRecord recordWithEmptyHeader = new SinkRecord("source_topic", 1, null, null, SCHEMA, struct, 47);
recordWithEmptyHeader.headers().add(TABLE_NAME_FORMAT, new SchemaAndValue(Schema.STRING_SCHEMA, ""));
try {
task.put(Collections.singleton(recordWithEmptyHeader));
fail("Expected ConnectException for empty header value");
} catch (ConnectException e) {
assertTrue("Exception should mention header issue", e.getMessage().contains("Header 'table.name.format'"));
}
}

private Map<String, String> setupBasicProps(int maxRetries, long retryBackoffMs) {
Map<String, String> props = new HashMap<>();
props.put(JdbcSinkConfig.CONNECTION_URL, "stub");
Expand Down