diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java index d5d6edf703e40..f0f209c631806 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java @@ -1074,6 +1074,9 @@ protected final boolean maybeAddConfigErrors( messages.append("Connector configuration is invalid and contains the following ") .append(errors).append(" error(s):"); for (ConfigInfo configInfo : configInfos.configs()) { + if (configInfo == null || configInfo.configValue() == null) { + continue; + } for (String msg : configInfo.configValue().errors()) { messages.append('\n').append(msg); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java index 02caa21812f6b..2001b145c2714 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java @@ -1061,6 +1061,41 @@ public void testGenerateResultWithConfigValuesWithNoConfigKeysAndWithSomeErrors( assertInfoValue(infos, "config.extra2", "value.extra2", "error extra2"); } + @Test + public void testMaybeAddConfigErrorsWithNullConfigInfoAndNullConfigValue() throws Exception { + AbstractHerder herder = testHerder(); + Map keys = new HashMap<>(); + addConfigKey(keys, "config.a1", null); + addConfigKey(keys, "config.b1", "group B"); + + List values = new ArrayList<>(); + addValue(values, "config.b1", "value.b1", "error b1"); + + ConfigInfos infos = AbstractHerder.generateResult( + "com.acme.connector.MyConnector", keys, values, List.of("group B")); + infos.configs().add(null); + + FutureCallback> callback = new FutureCallback<>(); + assertTrue(herder.maybeAddConfigErrors(infos, callback)); + ExecutionException e = assertThrows(ExecutionException.class, callback::get); + assertTrue(e.getCause() instanceof BadRequestException); + } + + @Test + public void testMaybeAddConfigErrorsWithNullConfigValueAndNoErrors() { + AbstractHerder herder = testHerder(); + Map keys = new HashMap<>(); + addConfigKey(keys, "config.a1", null); + addConfigKey(keys, "config.b1", "group B"); + + List values = new ArrayList<>(); + addValue(values, "config.b1", "value.b1"); + + ConfigInfos infos = AbstractHerder.generateResult( + "com.acme.connector.MyConnector", keys, values, List.of("group B")); + assertFalse(herder.maybeAddConfigErrors(infos, new FutureCallback<>())); + } + @Test public void testSinkConnectorPluginConfig() throws ClassNotFoundException { testConnectorPluginConfig(