Skip to content

Add Connector Name and Task ID in the Table Monitor Thread Name #1509

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: 10.8.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import io.confluent.connect.jdbc.util.TableId;
import io.confluent.connect.jdbc.util.Version;

import static io.confluent.connect.jdbc.source.JdbcSourceTaskConfig.TASK_ID_CONFIG;

/**
* JdbcConnector is a Kafka Connect Connector implementation that watches a JDBC database and
* generates tasks to ingest database contents.
Expand All @@ -56,6 +58,8 @@ public class JdbcSourceConnector extends SourceConnector {
private static final long MAX_TIMEOUT = 10000L;

private Map<String, String> configProperties;

private JdbcSourceTaskConfig taskConfig;
private JdbcSourceConnectorConfig config;
private CachedConnectionProvider cachedConnectionProvider;
private TableMonitorThread tableMonitorThread;
Expand Down Expand Up @@ -131,7 +135,8 @@ public void start(Map<String, String> properties) throws ConnectException {
tablePollMs,
whitelistSet,
blacklistSet,
Time.SYSTEM
Time.SYSTEM,
taskConfig.getTaskID()
);
if (query.isEmpty()) {
tableMonitorThread.start();
Expand Down Expand Up @@ -202,13 +207,15 @@ public List<Map<String, String>> taskConfigs(int maxTasks) {
List<List<TableId>> tablesGrouped =
ConnectorUtils.groupPartitions(currentTables, numGroups);
taskConfigs = new ArrayList<>(tablesGrouped.size());
int count = 0;
for (List<TableId> taskTables : tablesGrouped) {
Map<String, String> taskProps = new HashMap<>(configProperties);
ExpressionBuilder builder = dialect.expressionBuilder();
builder.appendList().delimitedBy(",").of(taskTables);
taskProps.put(JdbcSourceTaskConfig.TABLES_CONFIG, builder.toString());
taskProps.put(JdbcSourceTaskConfig.TABLES_FETCHED, "true");
log.trace("Assigned tables {} to task with tablesFetched=true", taskTables);
taskProps.put(TASK_ID_CONFIG, count++ + "");
taskConfigs.add(taskProps);
}
log.info("Current Tables size: {}", currentTables.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1062,11 +1062,14 @@ public static int get(TransactionIsolationMode mode) {
}
}


protected JdbcSourceConnectorConfig(ConfigDef subclassConfigDef, Map<String, String> props) {
super(subclassConfigDef, props);
}

public String connectorName() {
return originalsStrings().get("name");
}

public NumericMapping numericMapping() {
return NumericMapping.get(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,20 @@ public class JdbcSourceTaskConfig extends JdbcSourceConnectorConfig {
private static final String TABLES_DOC = "List of tables for this task to watch for changes.";
public static final String TABLES_FETCHED = "tables.fetched";

public static final String TASK_ID_CONFIG = "task.id";
private static final String TASK_ID_DOC = "Task's id";

static ConfigDef config = baseConfigDef()
.define(TABLES_CONFIG, Type.LIST, Importance.HIGH, TABLES_DOC)
.defineInternal(TABLES_FETCHED, Type.BOOLEAN, false, Importance.HIGH);
.defineInternal(TABLES_FETCHED, Type.BOOLEAN, false, Importance.HIGH)
.defineInternal(TABLES_FETCHED, Type.BOOLEAN, false, Importance.HIGH)
.define(TASK_ID_CONFIG, Type.STRING, Importance.HIGH, TASK_ID_DOC);

public JdbcSourceTaskConfig(Map<String, String> props) {
super(config, props);
}

public String getTaskID() {
return getString(TASK_ID_CONFIG);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ public TableMonitorThread(DatabaseDialect dialect,
long pollMs,
Set<String> whitelist,
Set<String> blacklist,
Time time
Time time,
String taskId

) {
this.dialect = dialect;
this.connectionProvider = connectionProvider;
Expand All @@ -75,6 +77,7 @@ public TableMonitorThread(DatabaseDialect dialect,
this.blacklist = blacklist;
this.tables = new AtomicReference<>();
this.time = time;
this.setName(taskId + "-TableMonitorThread");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ public class TableMonitorThreadTest {
private static final long STARTUP_LIMIT = 50;
private static final long POLL_INTERVAL = 100;

private static final String connectorName = "test-connector";
private static final String connectorTaskId = "test-task-id";

private final static TableId FOO = new TableId(null, null, "foo");
private final static TableId BAR = new TableId(null, null, "bar");
private final static TableId BAZ = new TableId(null, null, "baz");
Expand Down Expand Up @@ -92,7 +95,7 @@ public class TableMonitorThreadTest {
public void testSingleLookup() throws Exception {
EasyMock.expect(dialect.expressionBuilder()).andReturn(ExpressionBuilder.create()).anyTimes();
tableMonitorThread = new TableMonitorThread(dialect, connectionProvider, context,
STARTUP_LIMIT, POLL_INTERVAL, null, null, MockTime.SYSTEM);
STARTUP_LIMIT, POLL_INTERVAL, null, null, MockTime.SYSTEM, connectorTaskId);
expectTableNames(LIST_FOO, shutdownThread());
EasyMock.replay(connectionProvider, dialect);

Expand All @@ -107,7 +110,7 @@ public void testSingleLookup() throws Exception {
public void testTablesBlockingTimeoutOnUpdateThread() throws Exception {
EasyMock.expect(dialect.expressionBuilder()).andReturn(ExpressionBuilder.create()).anyTimes();
tableMonitorThread = new TableMonitorThread(dialect, connectionProvider, context,
STARTUP_LIMIT, 0, null, null, time);
STARTUP_LIMIT, 0, null, null, time, connectorTaskId);

CountDownLatch connectionRequested = new CountDownLatch(1);
CountDownLatch connectionCompleted = new CountDownLatch(1);
Expand Down Expand Up @@ -158,7 +161,7 @@ public void testTablesBlockingTimeoutOnUpdateThread() throws Exception {
public void testTablesBlockingWithDeadlineOnUpdateThread() throws Exception {
EasyMock.expect(dialect.expressionBuilder()).andReturn(ExpressionBuilder.create());
tableMonitorThread = new TableMonitorThread(dialect, connectionProvider, context,
STARTUP_LIMIT, POLL_INTERVAL, null, null, time);
STARTUP_LIMIT, POLL_INTERVAL, null, null, time, connectorTaskId);

EasyMock.expect(dialect.tableIds(EasyMock.eq(connection))).andReturn(Collections.emptyList());
EasyMock.expect(connectionProvider.getConnection()).andReturn(connection);
Expand All @@ -185,7 +188,7 @@ public void testWhitelist() throws Exception {
Set<String> whitelist = new HashSet<>(Arrays.asList("foo", "bar"));
EasyMock.expect(dialect.expressionBuilder()).andReturn(ExpressionBuilder.create()).anyTimes();
tableMonitorThread = new TableMonitorThread(dialect, connectionProvider, context,
STARTUP_LIMIT, POLL_INTERVAL, whitelist, null, MockTime.SYSTEM);
STARTUP_LIMIT, POLL_INTERVAL, whitelist, null, MockTime.SYSTEM, connectorTaskId);
expectTableNames(LIST_FOO_BAR, shutdownThread());
EasyMock.replay(connectionProvider, dialect);

Expand All @@ -201,7 +204,7 @@ public void testBlacklist() throws Exception {
Set<String> blacklist = new HashSet<>(Arrays.asList("bar", "baz"));
EasyMock.expect(dialect.expressionBuilder()).andReturn(ExpressionBuilder.create()).anyTimes();
tableMonitorThread = new TableMonitorThread(dialect, connectionProvider, context,
STARTUP_LIMIT, POLL_INTERVAL, null, blacklist, MockTime.SYSTEM);
STARTUP_LIMIT, POLL_INTERVAL, null, blacklist, MockTime.SYSTEM, connectorTaskId);
expectTableNames(LIST_FOO_BAR_BAZ, shutdownThread());
EasyMock.replay(connectionProvider, dialect);

Expand All @@ -216,7 +219,7 @@ public void testBlacklist() throws Exception {
public void testReconfigOnUpdate() throws Exception {
EasyMock.expect(dialect.expressionBuilder()).andReturn(ExpressionBuilder.create()).anyTimes();
tableMonitorThread = new TableMonitorThread(dialect, connectionProvider, context,
STARTUP_LIMIT, POLL_INTERVAL, null, null, MockTime.SYSTEM);
STARTUP_LIMIT, POLL_INTERVAL, null, null, MockTime.SYSTEM, connectorTaskId);
expectTableNames(LIST_FOO);
expectTableNames(LIST_FOO, checkTableNames("foo"));
context.requestTaskReconfiguration();
Expand Down Expand Up @@ -244,7 +247,7 @@ public void testReconfigOnUpdate() throws Exception {
@Test
public void testInvalidConnection() throws Exception {
tableMonitorThread = new TableMonitorThread(dialect, connectionProvider, context,
STARTUP_LIMIT, POLL_INTERVAL, null, null, MockTime.SYSTEM);
STARTUP_LIMIT, POLL_INTERVAL, null, null, MockTime.SYSTEM, connectorTaskId);
EasyMock.expect(connectionProvider.getConnection()).andThrow(new ConnectException("Simulated error with the db."));

CountDownLatch errorLatch = new CountDownLatch(1);
Expand All @@ -267,7 +270,7 @@ public void testInvalidConnection() throws Exception {
public void testDuplicates() throws Exception {
EasyMock.expect(dialect.expressionBuilder()).andReturn(ExpressionBuilder.create()).anyTimes();
tableMonitorThread = new TableMonitorThread(dialect, connectionProvider, context,
STARTUP_LIMIT, POLL_INTERVAL, null, null, MockTime.SYSTEM);
STARTUP_LIMIT, POLL_INTERVAL, null, null, MockTime.SYSTEM, connectorTaskId);
expectTableNames(LIST_DUP_WITH_ALL, shutdownThread());
context.requestTaskReconfiguration();
EasyMock.expectLastCall();
Expand All @@ -285,7 +288,7 @@ public void testDuplicateWithUnqualifiedWhitelist() throws Exception {
Set<String> whitelist = new HashSet<>(Arrays.asList("dup"));
EasyMock.expect(dialect.expressionBuilder()).andReturn(ExpressionBuilder.create()).anyTimes();
tableMonitorThread = new TableMonitorThread(dialect, connectionProvider, context,
STARTUP_LIMIT, POLL_INTERVAL, whitelist, null, MockTime.SYSTEM);
STARTUP_LIMIT, POLL_INTERVAL, whitelist, null, MockTime.SYSTEM, connectorTaskId);
expectTableNames(LIST_DUP_ONLY, shutdownThread());
context.requestTaskReconfiguration();
EasyMock.expectLastCall();
Expand All @@ -304,7 +307,7 @@ public void testDuplicateWithUnqualifiedBlacklist() throws Exception {
Set<String> blacklist = new HashSet<>(Arrays.asList("foo"));
EasyMock.expect(dialect.expressionBuilder()).andReturn(ExpressionBuilder.create()).anyTimes();
tableMonitorThread = new TableMonitorThread(dialect, connectionProvider, context,
STARTUP_LIMIT, POLL_INTERVAL, null, blacklist, MockTime.SYSTEM);
STARTUP_LIMIT, POLL_INTERVAL, null, blacklist, MockTime.SYSTEM, connectorTaskId);
expectTableNames(LIST_DUP_WITH_ALL, shutdownThread());
context.requestTaskReconfiguration();
EasyMock.expectLastCall();
Expand All @@ -323,7 +326,7 @@ public void testDuplicateWithQualifiedWhitelist() throws Exception {
Set<String> whitelist = new HashSet<>(Arrays.asList("dup1.dup", "foo"));
EasyMock.expect(dialect.expressionBuilder()).andReturn(ExpressionBuilder.create()).anyTimes();
tableMonitorThread = new TableMonitorThread(dialect, connectionProvider, context,
STARTUP_LIMIT, POLL_INTERVAL, whitelist, null, MockTime.SYSTEM);
STARTUP_LIMIT, POLL_INTERVAL, whitelist, null, MockTime.SYSTEM, connectorTaskId);
expectTableNames(LIST_DUP_WITH_ALL, shutdownThread());
EasyMock.replay(connectionProvider, dialect);

Expand All @@ -338,7 +341,7 @@ public void testDuplicateWithQualifiedBlacklist() throws Exception {
Set<String> blacklist = new HashSet<>(Arrays.asList("dup1.dup", "foo"));
EasyMock.expect(dialect.expressionBuilder()).andReturn(ExpressionBuilder.create()).anyTimes();
tableMonitorThread = new TableMonitorThread(dialect, connectionProvider, context,
STARTUP_LIMIT, POLL_INTERVAL, null, blacklist, MockTime.SYSTEM);
STARTUP_LIMIT, POLL_INTERVAL, null, blacklist, MockTime.SYSTEM, connectorTaskId);
expectTableNames(LIST_DUP_WITH_ALL, shutdownThread());
EasyMock.replay(connectionProvider, dialect);

Expand Down