diff --git a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java index c0c0a9a7172aa..f8c5b1df4c44f 100644 --- a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java +++ b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java @@ -26,9 +26,12 @@ import java.nio.ByteBuffer; import java.util.Collection; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Properties; import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; +import java.util.function.Function; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.security.auth.SecurityProtocol; @@ -65,7 +68,7 @@ private static void expectThrows(Class expectedType, St } catch (Throwable e) { if (expectedType.isInstance(e)) { T ex = expectedType.cast(e); - assertEquals(expectedMessage, ex.getMessage()); + assertEquals(ex.getMessage(), expectedMessage); return; } throw new AssertionError("Unexpected exception type, expected " + expectedType.getSimpleName() @@ -74,10 +77,24 @@ private static void expectThrows(Class expectedType, St throw new AssertionError("Expected exception"); } + /** + * Creates a valid Kafka Sink configuration that is used by multiple test cases. + * + * @return a map containing all required Kafka sink configuration fields + */ + private static Map validConfig() { + Map map = new HashMap<>(); + map.put("bootstrapServers", "localhost:6667"); + map.put("acks", "1"); + map.put("topic", "topic_2"); + map.put("batchSize", "16384"); + map.put("maxRequestSize", "1048576"); + return map; + } + @Test public void testInvalidConfigWillThrownException() throws Exception { KafkaAbstractSink sink = new DummySink(); - Map config = new HashMap<>(); SinkContext sc = new SinkContext() { @Override public int getInstanceId() { @@ -189,7 +206,7 @@ public void fatal(Throwable t) { } }; - ThrowingRunnable openAndClose = ()->{ + Function, ThrowingRunnable> runWith = config -> () -> { try { sink.open(config, sc); fail(); @@ -197,23 +214,54 @@ public void fatal(Throwable t) { sink.close(); } }; - expectThrows(IllegalArgumentException.class, "bootstrapServers cannot be null", openAndClose); - config.put("bootstrapServers", "localhost:6667"); - expectThrows(IllegalArgumentException.class, "acks cannot be null", openAndClose); - config.put("acks", "1"); - expectThrows(IllegalArgumentException.class, "topic cannot be null", openAndClose); - config.put("topic", "topic_2"); - config.put("batchSize", "-1"); - expectThrows(IllegalArgumentException.class, "Invalid Kafka Producer batchSize : -1", openAndClose); - config.put("batchSize", "16384"); - config.put("maxRequestSize", "-1"); - expectThrows(IllegalArgumentException.class, "Invalid Kafka Producer maxRequestSize : -1", openAndClose); - config.put("maxRequestSize", "1048576"); - config.put("acks", "none"); - expectThrows(ConfigException.class, - "Invalid value none for configuration acks: String must be one of: all, -1, 0, 1", - openAndClose); - config.put("acks", "1"); + + // Table of test cases for key removal and modification tests + record Case( + Consumer> mutate, + Class expectedType, + String expectedMessage + ) {} + + List cases = List.of( + // Missing bootstrapServers + new Case(config -> config.remove("bootstrapServers"), + IllegalArgumentException.class, + "bootstrapServers cannot be null"), + + // Missing acks + new Case(config -> config.remove("acks"), + IllegalArgumentException.class, + "acks cannot be null"), + + // Missing topic + new Case(config -> config.remove("topic"), + IllegalArgumentException.class, + "topic cannot be null"), + + // Bad batchSize + new Case(config -> config.put("batchSize", "-1"), + IllegalArgumentException.class, + "Invalid Kafka Producer batchSize : -1"), + + // Bad maxRequestSize + new Case(config -> config.put("maxRequestSize", "-1"), + IllegalArgumentException.class, + "Invalid Kafka Producer maxRequestSize : -1"), + + // Invalid acks value + new Case(config -> config.put("acks", "none"), + ConfigException.class, + "Invalid value none for configuration acks: String must be one of: all, -1, 0, 1") + ); + + for (Case currCase : cases) { + var config = validConfig(); // set fresh, valid, baseline each time + currCase.mutate.accept(config); // remove or change one field + expectThrows(currCase.expectedType, currCase.expectedMessage, runWith.apply(config)); + } + + // Finally verify a valid config passes cleanly + var config = validConfig(); sink.open(config, sc); sink.close(); }