Skip to content

KAFKA-15995: Adding KIP-877 support to Connect #17804

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Feb 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,9 @@
<suppress checks="JavaNCSS"
files="(DistributedHerder|Worker)Test.java"/>

<suppress checks="ParameterNumber"
files="WorkerSinkTaskTest.java"/>

<!-- Raft -->
<suppress checks="NPathComplexity"
files="(DynamicVoter|RecordsIterator).java"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.kafka.connect.connector;

import org.apache.kafka.common.metrics.PluginMetrics;

/**
* ConnectorContext allows {@link Connector}s to proactively interact with the Kafka Connect runtime.
*/
Expand All @@ -33,4 +35,26 @@ public interface ConnectorContext {
* @param e Exception to be raised.
*/
void raiseError(Exception e);

/**
* Get a {@link PluginMetrics} that can be used to define metrics
*
* <p>This method was added in Apache Kafka 4.1. Connectors that use this method but want to
* maintain backward compatibility so they can also be deployed to older Connect runtimes
* should guard the call to this method with a try-catch block, since calling this method will result in a
* {@link NoSuchMethodError} or {@link NoClassDefFoundError} when the connector is deployed to
* Connect runtimes older than Kafka 4.1. For example:
* <pre>
* PluginMetrics pluginMetrics;
* try {
* pluginMetrics = context.pluginMetrics();
* } catch (NoSuchMethodError | NoClassDefFoundError e) {
* pluginMetrics = null;
* }
* </pre>
*
* @return the pluginMetrics instance
* @since 4.1
*/
PluginMetrics pluginMetrics();
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@
* <p>Kafka Connect discovers implementations of this interface using the Java {@link java.util.ServiceLoader} mechanism.
* To support this, implementations of this interface should also contain a service provider configuration file in
* {@code META-INF/services/org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy}.
* <p>
* Implement {@link org.apache.kafka.common.metrics.Monitorable} to enable the policy to register metrics.
* The following tags are automatically added to all metrics registered: <code>config</code> set to
* <code>connector.client.config.override.policy</code>, and <code>class</code> set to the
* ConnectorClientConfigOverridePolicy class name.
*/
public interface ConnectorClientConfigOverridePolicy extends Configurable, AutoCloseable {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@
*
* <p>When the Connect worker shuts down, it will call the extension's {@link #close} method to allow the implementation to release all of
* its resources.
*
* <p>Implement {@link org.apache.kafka.common.metrics.Monitorable} to enable the extension to register metrics.
* The following tags are automatically added to all metrics registered: <code>config</code> set to
* <code>rest.extension.classes</code>, and <code>class</code> set to the ConnectRestExtension class name.
*/
public interface ConnectRestExtension extends Configurable, Versioned, Closeable {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.connect.sink;

import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.PluginMetrics;

import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -123,4 +124,26 @@ default ErrantRecordReporter errantRecordReporter() {
return null;
}

/**
* Get a {@link PluginMetrics} that can be used to define metrics
*
* <p>This method was added in Apache Kafka 4.1. Tasks that use this method but want to
* maintain backward compatibility so they can also be deployed to older Connect runtimes
* should guard the call to this method with a try-catch block, since calling this method will result in a
* {@link NoSuchMethodError} or {@link NoClassDefFoundError} when the connector is deployed to
* Connect runtimes older than Kafka 4.1. For example:
* <pre>
* PluginMetrics pluginMetrics;
* try {
* pluginMetrics = context.pluginMetrics();
* } catch (NoSuchMethodError | NoClassDefFoundError e) {
* pluginMetrics = null;
* }
* </pre>
*
* @return the PluginMetrics instance
* @since 4.1
*/
PluginMetrics pluginMetrics();

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.connect.source;

import org.apache.kafka.common.metrics.PluginMetrics;
import org.apache.kafka.connect.storage.OffsetStorageReader;

import java.util.Map;
Expand Down Expand Up @@ -63,4 +64,26 @@ public interface SourceTaskContext {
default TransactionContext transactionContext() {
return null;
}

/**
* Get a {@link PluginMetrics} that can be used to define metrics
*
* <p>This method was added in Apache Kafka 4.1. Tasks that use this method but want to
* maintain backward compatibility so they can also be deployed to older Connect runtimes
* should guard the call to this method with a try-catch block, since calling this method will result in a
* {@link NoSuchMethodError} or {@link NoClassDefFoundError} when the connector is deployed to
* Connect runtimes older than Kafka 4.1. For example:
* <pre>
* PluginMetrics pluginMetrics;
* try {
* pluginMetrics = context.pluginMetrics();
* } catch (NoSuchMethodError | NoClassDefFoundError e) {
* pluginMetrics = null;
* }
* </pre>
*
* @return the pluginMetrics instance
* @since 4.1
*/
PluginMetrics pluginMetrics();
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;

import java.io.Closeable;
import java.io.IOException;
import java.util.Map;

/**
Expand All @@ -30,8 +32,12 @@
* <p>Kafka Connect may discover implementations of this interface using the Java {@link java.util.ServiceLoader} mechanism.
* To support this, implementations of this interface should also contain a service provider configuration file in
* {@code META-INF/services/org.apache.kafka.connect.storage.Converter}.
*
* <p>Implement {@link org.apache.kafka.common.metrics.Monitorable} to enable the converter to register metrics.
* The following tags are automatically added to all metrics registered: <code>connector</code> set to connector name,
* <code>task</code> set to the task id and <code>converter</code> set to either <code>key</code> or <code>value</code>.
*/
public interface Converter {
public interface Converter extends Closeable {

/**
* Configure this class.
Expand Down Expand Up @@ -98,4 +104,9 @@ default SchemaAndValue toConnectData(String topic, Headers headers, byte[] value
default ConfigDef config() {
return new ConfigDef();
}

@Override
default void close() throws IOException {
// no op
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@
* <p>Kafka Connect may discover implementations of this interface using the Java {@link java.util.ServiceLoader} mechanism.
* To support this, implementations of this interface should also contain a service provider configuration file in
* {@code META-INF/services/org.apache.kafka.connect.storage.HeaderConverter}.
*
* <p>Implement {@link org.apache.kafka.common.metrics.Monitorable} to enable the converter to register metrics.
* The following tags are automatically added to all metrics registered: <code>connector</code> set to connector name,
* <code>task</code> set to the task id and <code>converter</code> set to <code>header</code>.
*/
public interface HeaderConverter extends Configurable, Closeable {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@
* To support this, implementations of this interface should also contain a service provider configuration file in
* {@code META-INF/services/org.apache.kafka.connect.transforms.Transformation}.
*
* <p>Implement {@link org.apache.kafka.common.metrics.Monitorable} to enable the transformation to register metrics.
* The following tags are automatically added to all metrics registered: <code>connector</code> set to connector name,
* <code>task</code> set to the task id and <code>transformation</code> set to the transformation alias.
*
* @param <R> The type of record (must be an implementation of {@link ConnectRecord})
*/
public interface Transformation<R extends ConnectRecord<R>> extends Configurable, Closeable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@
* To support this, implementations of this interface should also contain a service provider configuration file in
* {@code META-INF/services/org.apache.kafka.connect.transforms.predicates.Predicate}.
*
* <p>Implement {@link org.apache.kafka.common.metrics.Monitorable} to enable the predicate to register metrics.
* The following tags are automatically added to all metrics registered: <code>connector</code> set to connector name,
* <code>task</code> set to the task id and <code>predicate</code> set to the predicate alias.
*
* @param <R> The type of record.
*/
public interface Predicate<R extends ConnectRecord<R>> extends Configurable, AutoCloseable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.connect.sink;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.metrics.PluginMetrics;
import org.apache.kafka.connect.connector.ConnectorContext;
import org.apache.kafka.connect.connector.ConnectorTest;
import org.apache.kafka.connect.connector.Task;
Expand Down Expand Up @@ -53,6 +54,12 @@ public void raiseError(Exception e) {
// Unexpected in these tests
throw new UnsupportedOperationException();
}

@Override
public PluginMetrics pluginMetrics() {
// Unexpected in these tests
throw new UnsupportedOperationException();
}
}

protected static class TestSinkConnector extends SinkConnector implements ConnectorTest.AssertableConnector {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.connect.source;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.metrics.PluginMetrics;
import org.apache.kafka.connect.connector.ConnectorContext;
import org.apache.kafka.connect.connector.ConnectorTest;
import org.apache.kafka.connect.connector.Task;
Expand Down Expand Up @@ -55,6 +56,12 @@ public void raiseError(Exception e) {
throw new UnsupportedOperationException();
}

@Override
public PluginMetrics pluginMetrics() {
// Unexpected in these tests
throw new UnsupportedOperationException();
}

@Override
public OffsetStorageReader offsetStorageReader() {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigTransformer;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.common.internals.Plugin;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.connector.Connector;
Expand Down Expand Up @@ -140,7 +141,7 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
protected final StatusBackingStore statusBackingStore;
protected final ConfigBackingStore configBackingStore;
private volatile boolean ready = false;
private final ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy;
private final Plugin<ConnectorClientConfigOverridePolicy> connectorClientConfigOverridePolicyPlugin;
private final ExecutorService connectorExecutor;
private final Time time;
protected final Loggers loggers;
Expand All @@ -160,7 +161,10 @@ public AbstractHerder(Worker worker,
this.kafkaClusterId = kafkaClusterId;
this.statusBackingStore = statusBackingStore;
this.configBackingStore = configBackingStore;
this.connectorClientConfigOverridePolicy = connectorClientConfigOverridePolicy;
this.connectorClientConfigOverridePolicyPlugin = Plugin.wrapInstance(
connectorClientConfigOverridePolicy,
worker.metrics().metrics(),
WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG);
this.connectorExecutor = Executors.newCachedThreadPool();
this.time = time;
this.loggers = Loggers.newInstance(time);
Expand All @@ -185,7 +189,7 @@ protected void stopServices() {
this.configBackingStore.stop();
this.worker.stop();
this.connectorExecutor.shutdown();
Utils.closeQuietly(this.connectorClientConfigOverridePolicy, "connector client config override policy");
Utils.closeQuietly(this.connectorClientConfigOverridePolicyPlugin, "connector client config override policy");
}

protected void ready() {
Expand Down Expand Up @@ -388,6 +392,11 @@ public ConnectorStateInfo.TaskState taskStatus(ConnectorTaskId id) {
status.workerId(), status.trace());
}

@Override
public ConnectMetrics connectMetrics() {
return worker.metrics();
}

protected Map<String, ConfigValue> validateSinkConnectorConfig(SinkConnector connector, ConfigDef configDef, Map<String, String> config) {
Map<String, ConfigValue> result = configDef.validateAll(config);
SinkConnectorConfig.validate(config, result);
Expand Down Expand Up @@ -691,7 +700,7 @@ private ConfigInfos validateClientOverrides(
connectorClass,
connectorType,
ConnectorClientConfigRequest.ClientType.PRODUCER,
connectorClientConfigOverridePolicy);
connectorClientConfigOverridePolicyPlugin);
}
}
if (connectorUsesAdmin(connectorType, connectorProps)) {
Expand All @@ -705,7 +714,7 @@ private ConfigInfos validateClientOverrides(
connectorClass,
connectorType,
ConnectorClientConfigRequest.ClientType.ADMIN,
connectorClientConfigOverridePolicy);
connectorClientConfigOverridePolicyPlugin);
}
}
if (connectorUsesConsumer(connectorType, connectorProps)) {
Expand All @@ -719,7 +728,7 @@ private ConfigInfos validateClientOverrides(
connectorClass,
connectorType,
ConnectorClientConfigRequest.ClientType.CONSUMER,
connectorClientConfigOverridePolicy);
connectorClientConfigOverridePolicyPlugin);
}
}
return mergeConfigInfos(connType,
Expand Down Expand Up @@ -893,7 +902,7 @@ private static ConfigInfos validateClientOverrides(String connName,
Class<? extends Connector> connectorClass,
org.apache.kafka.connect.health.ConnectorType connectorType,
ConnectorClientConfigRequest.ClientType clientType,
ConnectorClientConfigOverridePolicy connectorClientConfigOverridePolicy) {
Plugin<ConnectorClientConfigOverridePolicy> connectorClientConfigOverridePolicyPlugin) {
Map<String, Object> clientConfigs = new HashMap<>();
for (Map.Entry<String, Object> rawClientConfig : connectorConfig.originalsWithPrefix(prefix).entrySet()) {
String configName = rawClientConfig.getKey();
Expand All @@ -906,7 +915,7 @@ private static ConfigInfos validateClientOverrides(String connName,
}
ConnectorClientConfigRequest connectorClientConfigRequest = new ConnectorClientConfigRequest(
connName, connectorType, connectorClass, clientConfigs, clientType);
List<ConfigValue> configValues = connectorClientConfigOverridePolicy.validate(connectorClientConfigRequest);
List<ConfigValue> configValues = connectorClientConfigOverridePolicyPlugin.get().validate(connectorClientConfigRequest);

return prefixedConfigInfos(configDef.configKeys(), configValues, prefix);
}
Expand Down
Loading