Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,12 @@
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.security.auth.SecurityProtocol;
Expand Down Expand Up @@ -65,7 +68,7 @@ private static <T extends Exception> void expectThrows(Class<T> expectedType, St
} catch (Throwable e) {
if (expectedType.isInstance(e)) {
T ex = expectedType.cast(e);
assertEquals(expectedMessage, ex.getMessage());
assertEquals(ex.getMessage(), expectedMessage);
return;
}
throw new AssertionError("Unexpected exception type, expected " + expectedType.getSimpleName()
Expand All @@ -74,10 +77,24 @@ private static <T extends Exception> void expectThrows(Class<T> expectedType, St
throw new AssertionError("Expected exception");
}

/**
* Creates a valid Kafka Sink configuration that is used by multiple test cases.
*
* @return a map containing all required Kafka sink configuration fields
*/
private static Map<String, Object> validConfig() {
Map<String, Object> map = new HashMap<>();
map.put("bootstrapServers", "localhost:6667");
map.put("acks", "1");
map.put("topic", "topic_2");
map.put("batchSize", "16384");
map.put("maxRequestSize", "1048576");
return map;
}

@Test
public void testInvalidConfigWillThrownException() throws Exception {
KafkaAbstractSink<String, byte[]> sink = new DummySink();
Map<String, Object> config = new HashMap<>();
SinkContext sc = new SinkContext() {
@Override
public int getInstanceId() {
Expand Down Expand Up @@ -189,31 +206,62 @@ public void fatal(Throwable t) {

}
};
ThrowingRunnable openAndClose = ()->{
Function<Map<String, Object>, ThrowingRunnable> runWith = config -> () -> {
try {
sink.open(config, sc);
fail();
} finally {
sink.close();
}
};
expectThrows(IllegalArgumentException.class, "bootstrapServers cannot be null", openAndClose);
config.put("bootstrapServers", "localhost:6667");
expectThrows(IllegalArgumentException.class, "acks cannot be null", openAndClose);
config.put("acks", "1");
expectThrows(IllegalArgumentException.class, "topic cannot be null", openAndClose);
config.put("topic", "topic_2");
config.put("batchSize", "-1");
expectThrows(IllegalArgumentException.class, "Invalid Kafka Producer batchSize : -1", openAndClose);
config.put("batchSize", "16384");
config.put("maxRequestSize", "-1");
expectThrows(IllegalArgumentException.class, "Invalid Kafka Producer maxRequestSize : -1", openAndClose);
config.put("maxRequestSize", "1048576");
config.put("acks", "none");
expectThrows(ConfigException.class,
"Invalid value none for configuration acks: String must be one of: all, -1, 0, 1",
openAndClose);
config.put("acks", "1");

// Table of test cases for key removal and modification tests
record Case(
Consumer<Map<String, Object>> mutate,
Class<? extends Exception> expectedType,
String expectedMessage
) {}

List<Case> cases = List.of(
// Missing bootstrapServers
new Case(config -> config.remove("bootstrapServers"),
IllegalArgumentException.class,
"bootstrapServers cannot be null"),

// Missing acks
new Case(config -> config.remove("acks"),
IllegalArgumentException.class,
"acks cannot be null"),

// Missing topic
new Case(config -> config.remove("topic"),
IllegalArgumentException.class,
"topic cannot be null"),

// Bad batchSize
new Case(config -> config.put("batchSize", "-1"),
IllegalArgumentException.class,
"Invalid Kafka Producer batchSize : -1"),

// Bad maxRequestSize
new Case(config -> config.put("maxRequestSize", "-1"),
IllegalArgumentException.class,
"Invalid Kafka Producer maxRequestSize : -1"),

// Invalid acks value
new Case(config -> config.put("acks", "none"),
ConfigException.class,
"Invalid value none for configuration acks: String must be one of: all, -1, 0, 1")
);

for (Case currCase : cases) {
var config = validConfig(); // set fresh, valid, baseline each time
currCase.mutate.accept(config); // remove or change one field
expectThrows(currCase.expectedType, currCase.expectedMessage, runWith.apply(config));
}

// Finally verify a valid config passes cleanly
var config = validConfig();
sink.open(config, sc);
sink.close();
}
Expand Down
Loading