From 568177f745042584825318be9d4842b7ce2ac10f Mon Sep 17 00:00:00 2001 From: Mickael Maison Date: Mon, 8 Jun 2026 18:21:27 +0200 Subject: [PATCH] MINOR: Remove jqwik dependency Update test to use plain JUnit logic --- NOTICE | 4 - NOTICE-binary | 4 - build.gradle | 5 +- .../import-control-group-coordinator.xml | 1 - checkstyle/import-control-metadata.xml | 1 - checkstyle/import-control-server-common.xml | 1 - checkstyle/import-control-server.xml | 1 - checkstyle/import-control-storage.xml | 1 - checkstyle/import-control.xml | 2 - gradle/dependencies.gradle | 2 - .../metadata/PartitionRegistrationTest.java | 36 +- .../StandardAuthorizerPropertyTest.java | 267 ++++---- .../kafka/raft/KafkaRaftClientFetchTest.java | 15 +- .../org/apache/kafka/raft/MockLogTest.java | 28 +- .../kafka/raft/RaftEventSimulationTest.java | 643 ++++++++---------- .../raft/internals/KafkaRaftLogTest.java | 39 +- .../raft/internals/RecordsIteratorTest.java | 120 ++-- .../internal/ArbitraryMemoryRecords.java | 33 +- 18 files changed, 565 insertions(+), 638 deletions(-) diff --git a/NOTICE b/NOTICE index 301c99863fdf7..eaa0b8690d226 100644 --- a/NOTICE +++ b/NOTICE @@ -7,10 +7,6 @@ The Apache Software Foundation (https://www.apache.org/). This distribution has a binary dependency on jersey, which is available under the CDDL License. The source code of jersey can be found at https://github.com/jersey/jersey/. -This distribution has a binary test dependency on jqwik, which is available under -the Eclipse Public License 2.0. The source code can be found at -https://github.com/jlink/jqwik. - The streams-scala (streams/streams-scala) module was donated by Lightbend and the original code was copyrighted by them: Copyright (C) 2018 Lightbend Inc. Copyright (C) 2017-2018 Alexis Seigneurin. diff --git a/NOTICE-binary b/NOTICE-binary index 54a5ffe7856c1..3a8292e35f6de 100644 --- a/NOTICE-binary +++ b/NOTICE-binary @@ -7,10 +7,6 @@ The Apache Software Foundation (https://www.apache.org/). This distribution has a binary dependency on jersey, which is available under the CDDL License. The source code of jersey can be found at https://github.com/jersey/jersey/. -This distribution has a binary test dependency on jqwik, which is available under -the Eclipse Public License 2.0. The source code can be found at -https://github.com/jlink/jqwik. - The streams-scala (streams/streams-scala) module was donated by Lightbend and the original code was copyrighted by them: Copyright (C) 2018 Lightbend Inc. Copyright (C) 2017-2018 Alexis Seigneurin. diff --git a/build.gradle b/build.gradle index d49828626cae9..b54e7e31a581f 100644 --- a/build.gradle +++ b/build.gradle @@ -1437,7 +1437,6 @@ project(':metadata') { testFixturesImplementation libs.junitJupiter testImplementation libs.junitJupiter - testImplementation libs.jqwik testImplementation libs.mockitoCore testImplementation testFixtures(project(':clients')) testImplementation testFixtures(project(':raft')) @@ -2211,7 +2210,6 @@ project(':raft') { testFixturesImplementation project(':clients') testFixturesImplementation testFixtures(project(':clients')) testFixturesImplementation project(':server-common') - testFixturesImplementation libs.jqwik testFixturesImplementation libs.junitJupiter testFixturesImplementation libs.mockitoCore testFixturesImplementation libs.slf4jApi @@ -2223,7 +2221,6 @@ project(':raft') { testImplementation libs.jacksonDataformatYaml testImplementation libs.junitJupiter testImplementation libs.mockitoCore - testImplementation libs.jqwik testImplementation testLog4j2Libs testRuntimeOnly runtimeTestLibs @@ -2289,7 +2286,7 @@ project(':raft') { test { useJUnitPlatform { - includeEngines 'jqwik', 'junit-jupiter' + includeEngines 'junit-jupiter' } } diff --git a/checkstyle/import-control-group-coordinator.xml b/checkstyle/import-control-group-coordinator.xml index 0e9aabad4a27f..befe4b002b391 100644 --- a/checkstyle/import-control-group-coordinator.xml +++ b/checkstyle/import-control-group-coordinator.xml @@ -33,7 +33,6 @@ - diff --git a/checkstyle/import-control-metadata.xml b/checkstyle/import-control-metadata.xml index 5c8e377f628a2..cdc73dcf65d19 100644 --- a/checkstyle/import-control-metadata.xml +++ b/checkstyle/import-control-metadata.xml @@ -32,7 +32,6 @@ - diff --git a/checkstyle/import-control-server-common.xml b/checkstyle/import-control-server-common.xml index 7d075dd71d7bf..05cf139520426 100644 --- a/checkstyle/import-control-server-common.xml +++ b/checkstyle/import-control-server-common.xml @@ -32,7 +32,6 @@ - diff --git a/checkstyle/import-control-server.xml b/checkstyle/import-control-server.xml index db528b8c7179d..30924f6132447 100644 --- a/checkstyle/import-control-server.xml +++ b/checkstyle/import-control-server.xml @@ -31,7 +31,6 @@ - diff --git a/checkstyle/import-control-storage.xml b/checkstyle/import-control-storage.xml index 3599ca5fffee8..eef568bda9059 100644 --- a/checkstyle/import-control-storage.xml +++ b/checkstyle/import-control-storage.xml @@ -33,7 +33,6 @@ - diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 49c57b8dffa97..022edfb86ac68 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -34,7 +34,6 @@ - @@ -474,7 +473,6 @@ - diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index a44cc95bcdcfb..a25eeb2be1cc5 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -83,7 +83,6 @@ versions += [ jopt: "5.0.4", jose4j: "0.9.6", junit: "5.14.3", - jqwik: "1.9.2", kafka_0110: "0.11.0.3", kafka_10: "1.0.2", kafka_11: "1.1.1", @@ -194,7 +193,6 @@ libs += [ junitJupiterApi: "org.junit.jupiter:junit-jupiter-api:$versions.junit", junitPlatformSuiteEngine: "org.junit.platform:junit-platform-suite-engine:$versions.junitPlatform", junitPlatformLanucher: "org.junit.platform:junit-platform-launcher:$versions.junitPlatform", - jqwik: "net.jqwik:jqwik:$versions.jqwik", hamcrest: "org.hamcrest:hamcrest:$versions.hamcrest", kafkaStreams_0110: "org.apache.kafka:kafka-streams:$versions.kafka_0110", kafkaStreams_10: "org.apache.kafka:kafka-streams:$versions.kafka_10", diff --git a/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java b/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java index 0fe7025c1787b..ad33b6b030de7 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/PartitionRegistrationTest.java @@ -26,12 +26,6 @@ import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.server.common.MetadataVersion; -import net.jqwik.api.Arbitraries; -import net.jqwik.api.Arbitrary; -import net.jqwik.api.ForAll; -import net.jqwik.api.Property; -import net.jqwik.api.Provide; - import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.params.ParameterizedTest; @@ -328,23 +322,9 @@ public void testPartitionRegistrationToRecord_ElrShouldBeNullIfEmpty() { assertTrue(exceptions.isEmpty()); } - @Property - public void testConsistentEqualsAndHashCode( - @ForAll("uniqueSamples") PartitionRegistration a, - @ForAll("uniqueSamples") PartitionRegistration b - ) { - if (a.equals(b)) { - assertEquals(a.hashCode(), b.hashCode(), "a=" + a + "\nb=" + b); - } - - if (a.hashCode() != b.hashCode()) { - assertNotEquals(a, b, "a=" + a + "\nb=" + b); - } - } - - @Provide - Arbitrary uniqueSamples() { - return Arbitraries.of( + @Test + public void testConsistentEqualsAndHashCode() { + List samples = List.of( new PartitionRegistration.Builder().setReplicas(new int[] {1, 2, 3}).setIsr(new int[] {1, 2, 3}). setDirectories(new Uuid[]{Uuid.fromString("HyTsxr8hT6Gq5heZMA2Bug"), Uuid.fromString("ePwTiSgFRvaKRBaUX3EcZQ"), Uuid.fromString("F3zwSDR1QWGKNNLMowVoYg")}). setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(100).setPartitionEpoch(200).setElr(new int[] {1, 2, 3}).build(), @@ -374,6 +354,16 @@ Arbitrary uniqueSamples() { setLeader(1).setLeaderRecoveryState(LeaderRecoveryState.RECOVERED).setLeaderEpoch(100).setPartitionEpoch(200).setElr(new int[] {2, 3}).setLastKnownElr(new int[] {1, 2}).build() ); + for (PartitionRegistration a : samples) { + for (PartitionRegistration b : samples) { + if (a.equals(b)) { + assertEquals(a.hashCode(), b.hashCode(), "a=" + a + "\nb=" + b); + } + if (a.hashCode() != b.hashCode()) { + assertNotEquals(a, b, "a=" + a + "\nb=" + b); + } + } + } } @Test diff --git a/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerPropertyTest.java b/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerPropertyTest.java index 2e0eaa80ab629..eb032b69280e5 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerPropertyTest.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerPropertyTest.java @@ -23,6 +23,8 @@ import org.apache.kafka.common.acl.AclOperation; import org.apache.kafka.common.acl.AclPermissionType; import org.apache.kafka.common.internals.Topic; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.internals.PluginMetricsImpl; import org.apache.kafka.common.resource.PatternType; import org.apache.kafka.common.resource.ResourcePattern; import org.apache.kafka.common.resource.ResourcePatternFilter; @@ -32,20 +34,11 @@ import org.apache.kafka.server.authorizer.AuthorizableRequestContext; import org.apache.kafka.server.authorizer.AuthorizationResult; -import net.jqwik.api.Assume; -import net.jqwik.api.ForAll; -import net.jqwik.api.Property; -import net.jqwik.api.constraints.AlphaChars; -import net.jqwik.api.constraints.Chars; -import net.jqwik.api.constraints.NumericChars; -import net.jqwik.api.constraints.Size; - -import java.lang.annotation.ElementType; -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; -import java.lang.annotation.Target; +import org.junit.jupiter.api.Test; + import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Random; import java.util.Set; import java.util.function.Predicate; @@ -53,120 +46,147 @@ import static org.apache.kafka.common.security.auth.KafkaPrincipal.USER_TYPE; import static org.apache.kafka.metadata.authorizer.StandardAuthorizerTest.PLAINTEXT; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.fail; public class StandardAuthorizerPropertyTest { - @Target({ ElementType.ANNOTATION_TYPE, ElementType.PARAMETER, ElementType.TYPE_USE }) - @Retention(RetentionPolicy.RUNTIME) - @AlphaChars @NumericChars @Chars({ '_', '-', '.' }) - public @interface ValidTopicChars { } - - @Property(tries = 5000) - public void matchingPrefixDenyOverridesAllAllowRules( - @ForAll Random random, - @ForAll @ValidTopicChars String topic, - @ForAll @Size(max = 10) Set<@ValidTopicChars String> randomSuffixes - ) throws Exception { - Assume.that(Topic.isValid(topic)); - StandardAuthorizer authorizer = buildAuthorizer(); - - // Create one DENY rule which matches and zero or more ALLOW rules which may or - // may not match. Regardless of the ALLOW rules, the final result should be DENIED. + private static final String VALID_TOPIC_CHARS = + "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_-."; - String topicPrefix = topic.substring(0, random.nextInt(topic.length())); - StandardAcl denyRule = buildTopicWriteAcl(topicPrefix, PatternType.PREFIXED, AclPermissionType.DENY); - authorizer.addAcl(Uuid.randomUuid(), denyRule); - addRandomPrefixAllowAcls(authorizer, topic, randomSuffixes); - - assertAuthorizationResult( - authorizer, - AuthorizationResult.DENIED, - AclOperation.WRITE, - new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL) - ); + private interface TopicSuffixesRandom { + void accept(String topic, Set suffixes, Random random) throws Exception; } - @Property(tries = 5000) - public void matchingLiteralDenyOverridesAllAllowRules( - @ForAll @ValidTopicChars String topic, - @ForAll @Size(max = 10) Set<@ValidTopicChars String> randomSuffixes - ) throws Exception { - Assume.that(Topic.isValid(topic)); - StandardAuthorizer authorizer = buildAuthorizer(); - - // Create one DENY rule which matches and zero or more ALLOW rules which may or - // may not match. Regardless of the ALLOW rules, the final result should be DENIED. - - StandardAcl denyRule = buildTopicWriteAcl(topic, PatternType.LITERAL, AclPermissionType.DENY); - authorizer.addAcl(Uuid.randomUuid(), denyRule); - addRandomPrefixAllowAcls(authorizer, topic, randomSuffixes); - - assertAuthorizationResult( - authorizer, - AuthorizationResult.DENIED, - AclOperation.WRITE, - new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL) - ); + private interface TopicSuffixes { + void accept(String topic, Set suffixes) throws Exception; } - @Property(tries = 5000) - public void matchingPrefixAllowWithNoMatchingDenyRules( - @ForAll Random random, - @ForAll @ValidTopicChars String topic, - @ForAll @Size(max = 10) Set<@ValidTopicChars String> randomSuffixes - ) throws Exception { - Assume.that(Topic.isValid(topic)); - StandardAuthorizer authorizer = buildAuthorizer(); - - // Create one ALLOW rule which matches and zero or more DENY rules which do not - // match. The final result should be ALLOWED. - - String topicPrefix = topic.substring(0, random.nextInt(topic.length())); - StandardAcl denyRule = buildTopicWriteAcl(topicPrefix, PatternType.PREFIXED, AclPermissionType.ALLOW); - authorizer.addAcl(Uuid.randomUuid(), denyRule); - - addRandomNonMatchingPrefixDenyAcls(authorizer, topic, randomSuffixes); - - assertAuthorizationResult( - authorizer, - AuthorizationResult.ALLOWED, - AclOperation.WRITE, - new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL) - ); + private static void forTopicSuffixesRandom(TopicSuffixesRandom topicSuffixesRandom) { + for (int run = 0; run < 5000; run++) { + long seed = System.nanoTime() + run; + Random random = new Random(seed); + String topic; + do { + topic = randomTopicString(random, 249); + } while (!Topic.isValid(topic)); + Set suffixes = randomSuffixes(random); + try { + topicSuffixesRandom.accept(topic, suffixes, random); + } catch (Throwable e) { + fail("Failed with seed=" + seed + ", topic=" + topic + ", suffixes=" + suffixes, e); + } + } } - @Property(tries = 5000) - public void matchingLiteralAllowWithNoMatchingDenyRules( - @ForAll @ValidTopicChars String topic, - @ForAll @Size(max = 10) Set<@ValidTopicChars String> randomSuffixes - ) throws Exception { - Assume.that(Topic.isValid(topic)); - StandardAuthorizer authorizer = buildAuthorizer(); + private static void forTopicSuffixes(TopicSuffixes topicSuffixes) { + forTopicSuffixesRandom((topic, suffixes, random) -> topicSuffixes.accept(topic, suffixes)); + } - // Create one ALLOW rule which matches and zero or more DENY rules which do not - // match. The final result should be ALLOWED. + @Test + public void matchingPrefixDenyOverridesAllAllowRules() { + forTopicSuffixesRandom((topic, suffixes, random) -> { + StandardAuthorizer authorizer = buildAuthorizer(); + + // Create one DENY rule which matches and zero or more ALLOW rules which may or + // may not match. Regardless of the ALLOW rules, the final result should be DENIED. + String topicPrefix = topic.substring(0, random.nextInt(topic.length())); + StandardAcl denyRule = buildTopicWriteAcl(topicPrefix, PatternType.PREFIXED, AclPermissionType.DENY); + authorizer.addAcl(Uuid.randomUuid(), denyRule); + addRandomPrefixAllowAcls(authorizer, topic, suffixes); + + assertAuthorizationResult( + authorizer, + AuthorizationResult.DENIED, + AclOperation.WRITE, + new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL)); + }); + } - StandardAcl denyRule = buildTopicWriteAcl(topic, PatternType.LITERAL, AclPermissionType.ALLOW); - authorizer.addAcl(Uuid.randomUuid(), denyRule); + @Test + public void matchingLiteralDenyOverridesAllAllowRules() { + forTopicSuffixes((topic, suffixes) -> { + StandardAuthorizer authorizer = buildAuthorizer(); + + // Create one DENY rule which matches and zero or more ALLOW rules which may or + // may not match. Regardless of the ALLOW rules, the final result should be DENIED. + StandardAcl denyRule = buildTopicWriteAcl(topic, PatternType.LITERAL, AclPermissionType.DENY); + authorizer.addAcl(Uuid.randomUuid(), denyRule); + addRandomPrefixAllowAcls(authorizer, topic, suffixes); + + assertAuthorizationResult( + authorizer, + AuthorizationResult.DENIED, + AclOperation.WRITE, + new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL)); + }); + } - addRandomNonMatchingPrefixDenyAcls(authorizer, topic, randomSuffixes); + @Test + public void matchingPrefixAllowWithNoMatchingDenyRules() { + forTopicSuffixesRandom((topic, suffixes, random) -> { + StandardAuthorizer authorizer = buildAuthorizer(); + + // Create one ALLOW rule which matches and zero or more DENY rules which do not + // match. The final result should be ALLOWED. + String topicPrefix = topic.substring(0, random.nextInt(topic.length())); + StandardAcl allowRule = buildTopicWriteAcl(topicPrefix, PatternType.PREFIXED, AclPermissionType.ALLOW); + authorizer.addAcl(Uuid.randomUuid(), allowRule); + addRandomNonMatchingPrefixDenyAcls(authorizer, topic, suffixes); + + assertAuthorizationResult( + authorizer, + AuthorizationResult.ALLOWED, + AclOperation.WRITE, + new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL)); + }); + } - assertAuthorizationResult( - authorizer, - AuthorizationResult.ALLOWED, - AclOperation.WRITE, - new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL) - ); + @Test + public void matchingLiteralAllowWithNoMatchingDenyRules() { + forTopicSuffixes((topic, suffixes) -> { + StandardAuthorizer authorizer = buildAuthorizer(); + + // Create one ALLOW rule which matches and zero or more DENY rules which do not + // match. The final result should be ALLOWED. + StandardAcl allowRule = buildTopicWriteAcl(topic, PatternType.LITERAL, AclPermissionType.ALLOW); + authorizer.addAcl(Uuid.randomUuid(), allowRule); + addRandomNonMatchingPrefixDenyAcls(authorizer, topic, suffixes); + + assertAuthorizationResult( + authorizer, + AuthorizationResult.ALLOWED, + AclOperation.WRITE, + new ResourcePattern(ResourceType.TOPIC, topic, PatternType.LITERAL)); + }); } - private StandardAuthorizer buildAuthorizer() { + private static StandardAuthorizer buildAuthorizer() { StandardAuthorizer authorizer = new StandardAuthorizer(); authorizer.start(new AuthorizerTestServerInfo(List.of(PLAINTEXT))); + authorizer.withPluginMetrics(new PluginMetricsImpl(new Metrics(), Map.of())); authorizer.completeInitialLoad(); return authorizer; } - private void assertAuthorizationResult( + private static String randomTopicString(Random random, int maxLength) { + int length = random.nextInt(maxLength) + 1; + StringBuilder sb = new StringBuilder(length); + for (int i = 0; i < length; i++) { + sb.append(VALID_TOPIC_CHARS.charAt(random.nextInt(VALID_TOPIC_CHARS.length()))); + } + return sb.toString(); + } + + private static Set randomSuffixes(Random random) { + int size = random.nextInt(11); + Set suffixes = new HashSet<>(); + for (int i = 0; i < size; i++) { + suffixes.add(randomTopicString(random, 10)); + } + return suffixes; + } + + private static void assertAuthorizationResult( StandardAuthorizer authorizer, AuthorizationResult expectedResult, AclOperation operation, @@ -183,29 +203,18 @@ private void assertAuthorizationResult( try { assertEquals(expectedResult, actualResult); - } catch (Throwable e) { - printCounterExample(authorizer, operation, pattern, actualResult); + } catch (AssertionError e) { + System.out.println("Assertion FAILED: Operation " + operation + " on " + + pattern + " is " + actualResult + ". Current ACLS:"); + Iterable allAcls = authorizer.acls(new AclBindingFilter( + new ResourcePatternFilter(ResourceType.ANY, null, PatternType.ANY), + new AccessControlEntryFilter(null, null, AclOperation.ANY, AclPermissionType.ANY) + )); + allAcls.forEach(System.out::println); throw e; } } - private void printCounterExample( - StandardAuthorizer authorizer, - AclOperation operation, - ResourcePattern resourcePattern, - AuthorizationResult result - ) { - System.out.println("Assertion FAILED: Operation " + operation + " on " + - resourcePattern + " is " + result + ". Current ACLS:"); - - Iterable allAcls = authorizer.acls(new AclBindingFilter( - new ResourcePatternFilter(ResourceType.ANY, null, PatternType.ANY), - new AccessControlEntryFilter(null, null, AclOperation.ANY, AclPermissionType.ANY) - )); - - allAcls.forEach(System.out::println); - } - private static AuthorizableRequestContext newRequestContext() throws Exception { return new MockAuthorizableRequestContext.Builder() .setPrincipal(new KafkaPrincipal(USER_TYPE, "user")) @@ -228,7 +237,7 @@ private static StandardAcl buildTopicWriteAcl( ); } - private boolean isPrefix( + private static boolean isPrefix( String value, String prefix ) { @@ -240,7 +249,7 @@ private boolean isPrefix( } } - private void addRandomNonMatchingPrefixDenyAcls( + private static void addRandomNonMatchingPrefixDenyAcls( StandardAuthorizer authorizer, String topic, Set randomSuffixes @@ -254,7 +263,7 @@ private void addRandomNonMatchingPrefixDenyAcls( ); } - private void addRandomPrefixAllowAcls( + private static void addRandomPrefixAllowAcls( StandardAuthorizer authorizer, String topic, Set randomSuffixes @@ -267,8 +276,8 @@ private void addRandomPrefixAllowAcls( pattern -> !pattern.isEmpty() ); } - - private void addRandomPrefixRules( + + private static void addRandomPrefixRules( StandardAuthorizer authorizer, String topic, Set randomSuffixes, @@ -293,7 +302,7 @@ private void addRandomPrefixRules( PatternType.PREFIXED, permissionType )); - } + } } } diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientFetchTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientFetchTest.java index 74e4afd362ac2..df3e9ef4c4dcf 100644 --- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientFetchTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientFetchTest.java @@ -28,10 +28,6 @@ import org.apache.kafka.common.requests.FetchResponse; import org.apache.kafka.server.common.KRaftVersion; -import net.jqwik.api.AfterFailureMode; -import net.jqwik.api.ForAll; -import net.jqwik.api.Property; - import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ArgumentsSource; @@ -49,11 +45,12 @@ import static org.junit.jupiter.api.Assertions.assertTrue; public final class KafkaRaftClientFetchTest { - @Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY) - void testRandomRecords( - @ForAll(supplier = ArbitraryMemoryRecords.class) MemoryRecords memoryRecords - ) throws Exception { - testFetchResponseWithInvalidRecord(memoryRecords, Integer.MAX_VALUE); + + @Test + void testRandomRecords() { + ArbitraryMemoryRecords.forRandomRecords(100, memoryRecords -> + testFetchResponseWithInvalidRecord(memoryRecords, Integer.MAX_VALUE) + ); } @ParameterizedTest diff --git a/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java b/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java index a004e3a09aff8..68ffc33792f2c 100644 --- a/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/MockLogTest.java @@ -36,10 +36,6 @@ import org.apache.kafka.snapshot.RawSnapshotReader; import org.apache.kafka.snapshot.RawSnapshotWriter; -import net.jqwik.api.AfterFailureMode; -import net.jqwik.api.ForAll; -import net.jqwik.api.Property; - import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -453,20 +449,20 @@ void testInvalidMemoryRecords(MemoryRecords records, Optional> assertEquals(previousEndOffset, log.endOffset().offset()); } - @Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY) - void testRandomRecords( - @ForAll(supplier = ArbitraryMemoryRecords.class) MemoryRecords records - ) { - try (MockLog log = new MockLog(topicPartition, topicId, new LogContext())) { - long previousEndOffset = log.endOffset().offset(); + @Test + void testRandomRecords() { + ArbitraryMemoryRecords.forRandomRecords(100, records -> { + try (MockLog log = new MockLog(topicPartition, topicId, new LogContext())) { + long previousEndOffset = log.endOffset().offset(); - assertThrows( - CorruptRecordException.class, - () -> log.appendAsFollower(records, Integer.MAX_VALUE) - ); + assertThrows( + CorruptRecordException.class, + () -> log.appendAsFollower(records, Integer.MAX_VALUE) + ); - assertEquals(previousEndOffset, log.endOffset().offset()); - } + assertEquals(previousEndOffset, log.endOffset().offset()); + } + }); } @Test diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java index 44a98bc54611f..9bb84149877e6 100644 --- a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java @@ -40,12 +40,8 @@ import org.apache.kafka.snapshot.RecordsSnapshotReader; import org.apache.kafka.snapshot.SnapshotReader; -import net.jqwik.api.AfterFailureMode; -import net.jqwik.api.ForAll; -import net.jqwik.api.Property; -import net.jqwik.api.Tag; -import net.jqwik.api.constraints.IntRange; - +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; import org.mockito.Mockito; import java.net.InetSocketAddress; @@ -78,26 +74,26 @@ * The simulation testing framework provides a way to verify quorum behavior under * different conditions. It is similar to system testing in that the test involves * independently executing nodes, but there are several important differences: - * - * 1. Simulation behavior is deterministic provided an initial random seed. This + *
    + *
  1. Simulation behavior is deterministic provided an initial random seed. This * makes it easy to reproduce and debug test failures. - * 2. The simulation uses an in-memory message router instead of a real network. + *
  2. The simulation uses an in-memory message router instead of a real network. * Not only is this much cheaper and faster, it provides an easy way to create * flaky network conditions or even network partitions without losing the * simulation determinism. - * 3. Similarly, persistent state is stored in memory. We can nevertheless simulate + *
  3. Similarly, persistent state is stored in memory. We can nevertheless simulate * different kinds of failures, such as the loss of unflushed data after a hard * node restart using {@link MockLog}. * - * The framework uses a single event scheduler in order to provide deterministic + *

    The framework uses a single event scheduler in order to provide deterministic * executions. Each test is setup as a specific scenario with a variable number of * voters and observers. Much like system tests, there is typically a warmup * period, followed by some cluster event (such as a node failure), and then some * logic to validate behavior after recovery. * - * If any of the tests fail, the output will indicate the arguments that failed. + *

    If any of the tests fail, the output will indicate the arguments that failed. * The easiest way to reproduce the failure for debugging is to create a separate - * `@Test` case which invokes the `@Property` method with those arguments directly. + * test case which invokes the test method with those arguments directly. * This ensures that logging output will only include output from a single * simulation execution. */ @@ -112,366 +108,319 @@ public class RaftEventSimulationTest { private static final int FETCH_MAX_WAIT_MS = 100; private static final int LINGER_MS = 0; - @Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY) - void canElectInitialLeader( - @ForAll int seed, - @ForAll @IntRange(min = 1, max = 5) int numVoters, - @ForAll @IntRange(min = 0, max = 5) int numObservers - ) { - Random random = new Random(seed); - Cluster cluster = new Cluster(numVoters, numObservers, random); - MessageRouter router = new MessageRouter(cluster); - EventScheduler scheduler = schedulerWithDefaultInvariants(cluster); - - cluster.startAll(); - schedulePolling(scheduler, cluster, 3, 5); - scheduler.schedule(router::deliverAll, 0, 2, 1); - scheduler.schedule(new SequentialAppendAction(cluster), 0, 2, 3); - scheduler.runUntil(cluster::hasConsistentLeader); - scheduler.runUntil(() -> cluster.allReachedHighWatermark(10)); + private interface SeedObservers { + void accept(long seed, int observer); } - @Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY) - void canElectNewLeaderAfterOldLeaderFailure( - @ForAll int seed, - @ForAll @IntRange(min = 3, max = 5) int numVoters, - @ForAll @IntRange(min = 0, max = 5) int numObservers, - @ForAll boolean isGracefulShutdown - ) { - Random random = new Random(seed); - Cluster cluster = new Cluster(numVoters, numObservers, random); - MessageRouter router = new MessageRouter(cluster); - EventScheduler scheduler = schedulerWithDefaultInvariants(cluster); - - // Seed the cluster with some data - cluster.startAll(); - schedulePolling(scheduler, cluster, 3, 5); - scheduler.schedule(router::deliverAll, 0, 2, 1); - scheduler.schedule(new SequentialAppendAction(cluster), 0, 2, 3); - scheduler.runUntil(cluster::hasConsistentLeader); - scheduler.runUntil(() -> cluster.anyReachedHighWatermark(10)); - - // Shutdown the leader and write some more data. We can verify the new leader has been elected - // by verifying that the high watermark can still advance. - int leaderId = cluster.latestLeader().orElseThrow(() -> - new AssertionError("Failed to find current leader") - ); - - if (isGracefulShutdown) { - cluster.shutdown(leaderId); - } else { - cluster.kill(leaderId); - } - - scheduler.runUntil(() -> cluster.allReachedHighWatermark(20)); - long highWatermark = cluster.maxHighWatermarkReached(); + private interface SeedVotersObservers { + void accept(long seed, int voter, int observer); + } - // Restart the node and verify it catches up - cluster.start(leaderId); - scheduler.runUntil(() -> cluster.allReachedHighWatermark(highWatermark + 10)); + private interface SeedVotersObserversIsGracefulShutdown { + void accept(long seed, int voter, int observer, boolean isGracefulShutdown); } - @Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY) - void canRecoverAfterAllNodesKilled( - @ForAll int seed, - @ForAll @IntRange(min = 1, max = 5) int numVoters, - @ForAll @IntRange(min = 0, max = 5) int numObservers - ) { - Random random = new Random(seed); - Cluster cluster = new Cluster(numVoters, numObservers, random); - MessageRouter router = new MessageRouter(cluster); - EventScheduler scheduler = schedulerWithDefaultInvariants(cluster); - - // Seed the cluster with some data - cluster.startAll(); - schedulePolling(scheduler, cluster, 3, 5); - scheduler.schedule(router::deliverAll, 0, 2, 1); - scheduler.schedule(new SequentialAppendAction(cluster), 0, 2, 3); - scheduler.runUntil(cluster::hasConsistentLeader); - scheduler.runUntil(() -> cluster.anyReachedHighWatermark(10)); - long highWatermark = cluster.maxHighWatermarkReached(); - - // We kill all of the nodes. Then we bring back a majority and verify that - // they are able to elect a leader and continue making progress - cluster.killAll(); - - Iterator nodeIdsIterator = cluster.nodeIds().iterator(); - for (int i = 0; i < cluster.majoritySize(); i++) { - Integer nodeId = nodeIdsIterator.next(); - cluster.start(nodeId); - } - - scheduler.runUntil(() -> cluster.allReachedHighWatermark(highWatermark + 10)); + private static void withObservers(int minObservers, int maxObservers, SeedObservers so) { + withVotersObserversIsGracefulShutdown(0, 0, minObservers, maxObservers, (seed, numVoters, numObservers, isGracefulShutdown) -> + so.accept(seed, numObservers) + ); } - @Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY) - void canElectNewLeaderAfterOldLeaderPartitionedAway( - @ForAll int seed, - @ForAll @IntRange(min = 3, max = 5) int numVoters, - @ForAll @IntRange(min = 0, max = 5) int numObservers - ) { - Random random = new Random(seed); - Cluster cluster = new Cluster(numVoters, numObservers, random); - MessageRouter router = new MessageRouter(cluster); - EventScheduler scheduler = schedulerWithDefaultInvariants(cluster); - - // Seed the cluster with some data - cluster.startAll(); - schedulePolling(scheduler, cluster, 3, 5); - scheduler.schedule(router::deliverAll, 0, 2, 2); - scheduler.schedule(new SequentialAppendAction(cluster), 0, 2, 3); - scheduler.runUntil(cluster::hasConsistentLeader); - scheduler.runUntil(() -> cluster.anyReachedHighWatermark(10)); - - // The leader gets partitioned off. We can verify the new leader has been elected - // by writing some data and ensuring that it gets replicated - int leaderId = cluster.latestLeader().orElseThrow(() -> - new AssertionError("Failed to find current leader") + private static void withVotersObservers(int minVoters, int maxVoters, int minObservers, int maxObservers, SeedVotersObservers svo) { + withVotersObserversIsGracefulShutdown(minVoters, maxVoters, minObservers, maxObservers, (seed, numVoters, numObservers, isGracefulShutdown) -> + svo.accept(seed, numVoters, numObservers) ); - router.filter(leaderId, new DropAllTraffic()); + } - Set nonPartitionedNodes = new HashSet<>(cluster.nodeIds()); - nonPartitionedNodes.remove(leaderId); + private static void withVotersObserversIsGracefulShutdown(int minVoters, int maxVoters, int minObservers, int maxObservers, SeedVotersObserversIsGracefulShutdown svos) { + for (int run = 0; run < 100; run++) { + long seed = System.nanoTime() + run; + Random random = new Random(seed); + int numVoters = random.nextInt(minVoters, maxVoters + 1); + int numObservers = random.nextInt(minObservers, maxObservers + 1); + boolean isGracefulShutdown = random.nextBoolean(); + try { + svos.accept(seed, numVoters, numObservers, isGracefulShutdown); + } catch (Throwable e) { + fail("Failed with seed=" + seed + + ", numVoters=" + numVoters + ", numObservers=" + numObservers + + ", isGracefulShutdown=" + isGracefulShutdown, e); + } + } + } - scheduler.runUntil(() -> cluster.allReachedHighWatermark(20, nonPartitionedNodes)); + @Test + void canElectInitialLeader() { + withVotersObservers(1, 5, 0, 5, (seed, numVoters, numObservers) -> { + Cluster cluster = new Cluster(numVoters, numObservers, seed); + MessageRouter router = new MessageRouter(cluster); + EventScheduler scheduler = schedulerWithDefaultInvariants(cluster); + cluster.startAll(); + schedulePolling(scheduler, cluster, 3, 5); + scheduler.schedule(router::deliverAll, 0, 2, 1); + scheduler.schedule(new SequentialAppendAction(cluster), 0, 2, 3); + scheduler.runUntil(cluster::hasConsistentLeader); + scheduler.runUntil(() -> cluster.allReachedHighWatermark(10)); + }); } - @Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY) - void canMakeProgressIfMajorityIsReachable( - @ForAll int seed, - @ForAll @IntRange(min = 0, max = 3) int numObservers - ) { - int numVoters = 5; - Random random = new Random(seed); - Cluster cluster = new Cluster(numVoters, numObservers, random); - MessageRouter router = new MessageRouter(cluster); - EventScheduler scheduler = schedulerWithDefaultInvariants(cluster); - - // Seed the cluster with some data - cluster.startAll(); - schedulePolling(scheduler, cluster, 3, 5); - scheduler.schedule(router::deliverAll, 0, 2, 2); - scheduler.schedule(new SequentialAppendAction(cluster), 0, 2, 3); - scheduler.runUntil(cluster::hasConsistentLeader); - scheduler.runUntil(() -> cluster.anyReachedHighWatermark(10)); - - // Partition the nodes into two sets. Nodes are reachable within each set, - // but the two sets cannot communicate with each other. We should be able - // to make progress even if an election is needed in the larger set. - router.filter( - 0, - new DropOutboundRequestsTo(cluster.endpointsFromIds(Set.of(2, 3, 4))) - ); - router.filter( - 1, - new DropOutboundRequestsTo(cluster.endpointsFromIds(Set.of(2, 3, 4))) - ); - router.filter(2, new DropOutboundRequestsTo(cluster.endpointsFromIds(Set.of(0, 1)))); - router.filter(3, new DropOutboundRequestsTo(cluster.endpointsFromIds(Set.of(0, 1)))); - router.filter(4, new DropOutboundRequestsTo(cluster.endpointsFromIds(Set.of(0, 1)))); - - long partitionLogEndOffset = cluster.maxLogEndOffset(); - scheduler.runUntil(() -> cluster.anyReachedHighWatermark(2 * partitionLogEndOffset)); - - long minorityHighWatermark = cluster.maxHighWatermarkReached(Set.of(0, 1)); - long majorityHighWatermark = cluster.maxHighWatermarkReached(Set.of(2, 3, 4)); - - assertTrue( - majorityHighWatermark > minorityHighWatermark, - String.format( - "majorityHighWatermark = %s, minorityHighWatermark = %s", - majorityHighWatermark, - minorityHighWatermark - ) - ); + @Test + void canElectNewLeaderAfterOldLeaderFailure() { + withVotersObserversIsGracefulShutdown(3, 5, 0, 5, (seed, numVoters, numObservers, isGracefulShutdown) -> { + Cluster cluster = new Cluster(numVoters, numObservers, seed); + MessageRouter router = new MessageRouter(cluster); + EventScheduler scheduler = schedulerWithDefaultInvariants(cluster); + + cluster.startAll(); + schedulePolling(scheduler, cluster, 3, 5); + scheduler.schedule(router::deliverAll, 0, 2, 1); + scheduler.schedule(new SequentialAppendAction(cluster), 0, 2, 3); + scheduler.runUntil(cluster::hasConsistentLeader); + scheduler.runUntil(() -> cluster.anyReachedHighWatermark(10)); + + int leaderId = cluster.latestLeader().orElseThrow(() -> + new AssertionError("Failed to find current leader") + ); + + if (isGracefulShutdown) { + cluster.shutdown(leaderId); + } else { + cluster.kill(leaderId); + } - // Now restore the partition and verify everyone catches up - router.filter(0, new PermitAllTraffic()); - router.filter(1, new PermitAllTraffic()); - router.filter(2, new PermitAllTraffic()); - router.filter(3, new PermitAllTraffic()); - router.filter(4, new PermitAllTraffic()); + scheduler.runUntil(() -> cluster.allReachedHighWatermark(20)); + long highWatermark = cluster.maxHighWatermarkReached(); - long restoredLogEndOffset = cluster.maxLogEndOffset(); - scheduler.runUntil(() -> cluster.allReachedHighWatermark(2 * restoredLogEndOffset)); + cluster.start(leaderId); + scheduler.runUntil(() -> cluster.allReachedHighWatermark(highWatermark + 10)); + }); } - @Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY) - void leadershipAssignedOnlyOnceWithNetworkPartitionIfThereExistsMajority( - @ForAll int seed, - @ForAll @IntRange(min = 0, max = 3) int numObservers - ) { - int numVoters = 5; - Random random = new Random(seed); - Cluster cluster = new Cluster(numVoters, numObservers, random); - MessageRouter router = new MessageRouter(cluster); - EventScheduler scheduler = schedulerWithDefaultInvariants(cluster); - scheduler.addInvariant(new StableLeadership(cluster)); - - // Create network partition which would result in ping-pong of leadership between nodes 2 and 3 without PreVote - // Scenario explained in detail in KIP-996 - // 0 1 - // | | - // 2 - 3 - // \ / - // 4 - router.filter( - 0, - new DropOutboundRequestsTo(cluster.endpointsFromIds(Set.of(1, 3, 4))) - ); - router.filter( - 1, - new DropOutboundRequestsTo(cluster.endpointsFromIds(Set.of(0, 2, 4))) - ); - router.filter(2, new DropOutboundRequestsTo(cluster.endpointsFromIds(Set.of(1)))); - router.filter(3, new DropOutboundRequestsTo(cluster.endpointsFromIds(Set.of(0)))); - router.filter(4, new DropOutboundRequestsTo(cluster.endpointsFromIds(Set.of(0, 1)))); - - // Start cluster - cluster.startAll(); - schedulePolling(scheduler, cluster, 3, 5); - scheduler.schedule(router::deliverAll, 0, 2, 1); - scheduler.schedule(new SequentialAppendAction(cluster), 0, 2, 1); - scheduler.runUntil(cluster::hasConsistentLeader); - - // Check that leadership remains stable after majority processes some data - int leaderId = cluster.latestLeader().getAsInt(); - // Determine the voters in the majority based on the leader - Set majority = new HashSet<>(Set.of(0, 1, 2, 3, 4)); - switch (leaderId) { - case 2 -> majority.remove(1); - case 3 -> majority.remove(0); - case 4 -> { - majority.remove(0); - majority.remove(1); + @Test + void canRecoverAfterAllNodesKilled() { + withVotersObservers(1, 5, 0, 5, (seed, numVoters, numObservers) -> { + Cluster cluster = new Cluster(numVoters, numObservers, seed); + MessageRouter router = new MessageRouter(cluster); + EventScheduler scheduler = schedulerWithDefaultInvariants(cluster); + + cluster.startAll(); + schedulePolling(scheduler, cluster, 3, 5); + scheduler.schedule(router::deliverAll, 0, 2, 1); + scheduler.schedule(new SequentialAppendAction(cluster), 0, 2, 3); + scheduler.runUntil(cluster::hasConsistentLeader); + scheduler.runUntil(() -> cluster.anyReachedHighWatermark(10)); + long highWatermark = cluster.maxHighWatermarkReached(); + + cluster.killAll(); + + Iterator nodeIdsIterator = cluster.nodeIds().iterator(); + for (int i = 0; i < cluster.majoritySize(); i++) { + Integer nodeId = nodeIdsIterator.next(); + cluster.start(nodeId); } - default -> throw new IllegalStateException("Unexpected leader: " + leaderId); - } - scheduler.runUntil(() -> cluster.allReachedHighWatermark(20, majority)); + + scheduler.runUntil(() -> cluster.allReachedHighWatermark(highWatermark + 10)); + }); } - @Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY) - void leadershipWillNotChangeDuringNetworkPartitionIfMajorityStillReachable( - @ForAll int seed, - @ForAll @IntRange(min = 0, max = 3) int numObservers - ) { - int numVoters = 5; - Random random = new Random(seed); - Cluster cluster = new Cluster(numVoters, numObservers, random); - MessageRouter router = new MessageRouter(cluster); - EventScheduler scheduler = schedulerWithDefaultInvariants(cluster); - scheduler.addInvariant(new StableLeadership(cluster)); - - // Seed the cluster with some data - cluster.startAll(); - schedulePolling(scheduler, cluster, 3, 5); - scheduler.schedule(router::deliverAll, 0, 2, 1); - scheduler.schedule(new SequentialAppendAction(cluster), 0, 2, 1); - scheduler.runUntil(cluster::hasConsistentLeader); - scheduler.runUntil(() -> cluster.allReachedHighWatermark(5)); - - int leaderId = cluster.latestLeader().orElseThrow(() -> - new AssertionError("Failed to find current leader during setup") - ); + @Test + void canElectNewLeaderAfterOldLeaderPartitionedAway() { + withVotersObservers(3, 5, 0, 5, (seed, numVoters, numObservers) -> { + Cluster cluster = new Cluster(numVoters, numObservers, seed); + MessageRouter router = new MessageRouter(cluster); + EventScheduler scheduler = schedulerWithDefaultInvariants(cluster); + + cluster.startAll(); + schedulePolling(scheduler, cluster, 3, 5); + scheduler.schedule(router::deliverAll, 0, 2, 2); + scheduler.schedule(new SequentialAppendAction(cluster), 0, 2, 3); + scheduler.runUntil(cluster::hasConsistentLeader); + scheduler.runUntil(() -> cluster.anyReachedHighWatermark(10)); + + int leaderId = cluster.latestLeader().orElseThrow(() -> + new AssertionError("Failed to find current leader") + ); + router.filter(leaderId, new DropAllTraffic()); - // Create network partition which would result in ping-pong of leadership between nodes C and D without PreVote - // Scenario explained in detail in KIP-996 - // A B - // | | - // C - D (have leader start in position C) - // \ / - // E - int nodeA = (leaderId + 1) % numVoters; - int nodeB = (leaderId + 2) % numVoters; - int nodeD = (leaderId + 3) % numVoters; - int nodeE = (leaderId + 4) % numVoters; - router.filter( - nodeA, - new DropOutboundRequestsTo(cluster.endpointsFromIds(Set.of(nodeB, nodeD, nodeE))) - ); - router.filter( - nodeB, - new DropOutboundRequestsTo(cluster.endpointsFromIds(Set.of(nodeA, leaderId, nodeE))) - ); - router.filter(leaderId, new DropOutboundRequestsTo(cluster.endpointsFromIds(Set.of(nodeB)))); - router.filter(nodeD, new DropOutboundRequestsTo(cluster.endpointsFromIds(Set.of(nodeA)))); - router.filter(nodeE, new DropOutboundRequestsTo(cluster.endpointsFromIds(Set.of(nodeA, nodeB)))); + Set nonPartitionedNodes = new HashSet<>(cluster.nodeIds()); + nonPartitionedNodes.remove(leaderId); - // Check that leadership remains stable - scheduler.runUntil(() -> cluster.allReachedHighWatermark(20, Set.of(nodeA, leaderId, nodeD, nodeE))); + scheduler.runUntil(() -> cluster.allReachedHighWatermark(20, nonPartitionedNodes)); + }); } - @Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY) - void canMakeProgressAfterBackToBackLeaderFailures( - @ForAll int seed, - @ForAll @IntRange(min = 3, max = 5) int numVoters, - @ForAll @IntRange(min = 0, max = 5) int numObservers - ) { - Random random = new Random(seed); - Cluster cluster = new Cluster(numVoters, numObservers, random); - MessageRouter router = new MessageRouter(cluster); - EventScheduler scheduler = schedulerWithDefaultInvariants(cluster); - - // Seed the cluster with some data - cluster.startAll(); - schedulePolling(scheduler, cluster, 3, 5); - scheduler.schedule(router::deliverAll, 0, 2, 5); - scheduler.schedule(new SequentialAppendAction(cluster), 0, 2, 3); - scheduler.runUntil(cluster::hasConsistentLeader); - scheduler.runUntil(() -> cluster.anyReachedHighWatermark(10)); - - int leaderId = cluster.latestLeader().getAsInt(); - router.filter(leaderId, new DropAllTraffic()); - scheduler.runUntil(() -> cluster.latestLeader().isPresent() && cluster.latestLeader().getAsInt() != leaderId); - - // As soon as we have a new leader, restore traffic to the old leader and partition the new leader - int newLeaderId = cluster.latestLeader().getAsInt(); - router.filter(leaderId, new PermitAllTraffic()); - router.filter(newLeaderId, new DropAllTraffic()); - - // Verify now that we can make progress - long targetHighWatermark = cluster.maxHighWatermarkReached() + 10; - scheduler.runUntil(() -> cluster.anyReachedHighWatermark(targetHighWatermark)); + @Test + void canMakeProgressIfMajorityIsReachable() { + withObservers(0, 3, (seed, numObservers) -> { + int numVoters = 5; + Cluster cluster = new Cluster(numVoters, numObservers, seed); + MessageRouter router = new MessageRouter(cluster); + EventScheduler scheduler = schedulerWithDefaultInvariants(cluster); + + cluster.startAll(); + schedulePolling(scheduler, cluster, 3, 5); + scheduler.schedule(router::deliverAll, 0, 2, 2); + scheduler.schedule(new SequentialAppendAction(cluster), 0, 2, 3); + scheduler.runUntil(cluster::hasConsistentLeader); + scheduler.runUntil(() -> cluster.anyReachedHighWatermark(10)); + + router.filter(0, new DropOutboundRequestsTo(cluster.endpointsFromIds(Set.of(2, 3, 4)))); + router.filter(1, new DropOutboundRequestsTo(cluster.endpointsFromIds(Set.of(2, 3, 4)))); + router.filter(2, new DropOutboundRequestsTo(cluster.endpointsFromIds(Set.of(0, 1)))); + router.filter(3, new DropOutboundRequestsTo(cluster.endpointsFromIds(Set.of(0, 1)))); + router.filter(4, new DropOutboundRequestsTo(cluster.endpointsFromIds(Set.of(0, 1)))); + + long partitionLogEndOffset = cluster.maxLogEndOffset(); + scheduler.runUntil(() -> cluster.anyReachedHighWatermark(2 * partitionLogEndOffset)); + + long minorityHighWatermark = cluster.maxHighWatermarkReached(Set.of(0, 1)); + long majorityHighWatermark = cluster.maxHighWatermarkReached(Set.of(2, 3, 4)); + + assertTrue( + majorityHighWatermark > minorityHighWatermark, + String.format( + "majorityHighWatermark = %s, minorityHighWatermark = %s", + majorityHighWatermark, + minorityHighWatermark + ) + ); + + router.filter(0, new PermitAllTraffic()); + router.filter(1, new PermitAllTraffic()); + router.filter(2, new PermitAllTraffic()); + router.filter(3, new PermitAllTraffic()); + router.filter(4, new PermitAllTraffic()); + + long restoredLogEndOffset = cluster.maxLogEndOffset(); + scheduler.runUntil(() -> cluster.allReachedHighWatermark(2 * restoredLogEndOffset)); + }); } - @Property(tries = 100, afterFailure = AfterFailureMode.SAMPLE_ONLY) - void canRecoverFromSingleNodeCommittedDataLoss( - @ForAll int seed, - @ForAll @IntRange(min = 3, max = 5) int numVoters, - @ForAll @IntRange(min = 0, max = 2) int numObservers - ) { - // We run this test without the `MonotonicEpoch` and `MajorityReachedHighWatermark` - // invariants since the loss of committed data on one node can violate them. - - Random random = new Random(seed); - Cluster cluster = new Cluster(numVoters, numObservers, random); - EventScheduler scheduler = new EventScheduler(cluster.random, cluster.time); - scheduler.addInvariant(new MonotonicHighWatermark(cluster)); - scheduler.addInvariant(new SingleLeader(cluster)); - scheduler.addValidation(new ConsistentCommittedData(cluster)); + @Test + void leadershipAssignedOnlyOnceWithNetworkPartitionIfThereExistsMajority() { + withObservers(0, 3, (seed, numObservers) -> { + int numVoters = 5; + Cluster cluster = new Cluster(numVoters, numObservers, seed); + MessageRouter router = new MessageRouter(cluster); + EventScheduler scheduler = schedulerWithDefaultInvariants(cluster); + scheduler.addInvariant(new StableLeadership(cluster)); + + router.filter(0, new DropOutboundRequestsTo(cluster.endpointsFromIds(Set.of(1, 3, 4)))); + router.filter(1, new DropOutboundRequestsTo(cluster.endpointsFromIds(Set.of(0, 2, 4)))); + router.filter(2, new DropOutboundRequestsTo(cluster.endpointsFromIds(Set.of(1)))); + router.filter(3, new DropOutboundRequestsTo(cluster.endpointsFromIds(Set.of(0)))); + router.filter(4, new DropOutboundRequestsTo(cluster.endpointsFromIds(Set.of(0, 1)))); + + cluster.startAll(); + schedulePolling(scheduler, cluster, 3, 5); + scheduler.schedule(router::deliverAll, 0, 2, 1); + scheduler.schedule(new SequentialAppendAction(cluster), 0, 2, 1); + scheduler.runUntil(cluster::hasConsistentLeader); + + int leaderId = cluster.latestLeader().getAsInt(); + Set majority = new HashSet<>(Set.of(0, 1, 2, 3, 4)); + switch (leaderId) { + case 2 -> majority.remove(1); + case 3 -> majority.remove(0); + case 4 -> { + majority.remove(0); + majority.remove(1); + } + default -> throw new IllegalStateException("Unexpected leader: " + leaderId); + } + scheduler.runUntil(() -> cluster.allReachedHighWatermark(20, majority)); + }); + } - MessageRouter router = new MessageRouter(cluster); + @Test + void leadershipWillNotChangeDuringNetworkPartitionIfMajorityStillReachable() { + withObservers(0, 3, (seed, numObservers) -> { + int numVoters = 5; + Cluster cluster = new Cluster(numVoters, numObservers, seed); + MessageRouter router = new MessageRouter(cluster); + EventScheduler scheduler = schedulerWithDefaultInvariants(cluster); + scheduler.addInvariant(new StableLeadership(cluster)); + + cluster.startAll(); + schedulePolling(scheduler, cluster, 3, 5); + scheduler.schedule(router::deliverAll, 0, 2, 1); + scheduler.schedule(new SequentialAppendAction(cluster), 0, 2, 1); + scheduler.runUntil(cluster::hasConsistentLeader); + scheduler.runUntil(() -> cluster.allReachedHighWatermark(5)); + + int leaderId = cluster.latestLeader().orElseThrow(() -> + new AssertionError("Failed to find current leader during setup") + ); - cluster.startAll(); - schedulePolling(scheduler, cluster, 3, 5); - scheduler.schedule(router::deliverAll, 0, 2, 5); - scheduler.schedule(new SequentialAppendAction(cluster), 0, 2, 3); - scheduler.runUntil(() -> cluster.anyReachedHighWatermark(10)); + int nodeA = (leaderId + 1) % numVoters; + int nodeB = (leaderId + 2) % numVoters; + int nodeD = (leaderId + 3) % numVoters; + int nodeE = (leaderId + 4) % numVoters; + router.filter(nodeA, new DropOutboundRequestsTo(cluster.endpointsFromIds(Set.of(nodeB, nodeD, nodeE)))); + router.filter(nodeB, new DropOutboundRequestsTo(cluster.endpointsFromIds(Set.of(nodeA, leaderId, nodeE)))); + router.filter(leaderId, new DropOutboundRequestsTo(cluster.endpointsFromIds(Set.of(nodeB)))); + router.filter(nodeD, new DropOutboundRequestsTo(cluster.endpointsFromIds(Set.of(nodeA)))); + router.filter(nodeE, new DropOutboundRequestsTo(cluster.endpointsFromIds(Set.of(nodeA, nodeB)))); + + scheduler.runUntil(() -> cluster.allReachedHighWatermark(20, Set.of(nodeA, leaderId, nodeD, nodeE))); + }); + } - RaftNode node = cluster.randomRunning().orElseThrow(() -> - new AssertionError("Failed to find running node") - ); + @Test + void canMakeProgressAfterBackToBackLeaderFailures() { + withVotersObservers(3, 5, 0, 5, (seed, numVoters, numObservers) -> { + Cluster cluster = new Cluster(numVoters, numObservers, seed); + MessageRouter router = new MessageRouter(cluster); + EventScheduler scheduler = schedulerWithDefaultInvariants(cluster); + + cluster.startAll(); + schedulePolling(scheduler, cluster, 3, 5); + scheduler.schedule(router::deliverAll, 0, 2, 5); + scheduler.schedule(new SequentialAppendAction(cluster), 0, 2, 3); + scheduler.runUntil(cluster::hasConsistentLeader); + scheduler.runUntil(() -> cluster.anyReachedHighWatermark(10)); + + int leaderId = cluster.latestLeader().getAsInt(); + router.filter(leaderId, new DropAllTraffic()); + scheduler.runUntil(() -> cluster.latestLeader().isPresent() && cluster.latestLeader().getAsInt() != leaderId); + + int newLeaderId = cluster.latestLeader().getAsInt(); + router.filter(leaderId, new PermitAllTraffic()); + router.filter(newLeaderId, new DropAllTraffic()); + + long targetHighWatermark = cluster.maxHighWatermarkReached() + 10; + scheduler.runUntil(() -> cluster.anyReachedHighWatermark(targetHighWatermark)); + }); + } + + @Test + void canRecoverFromSingleNodeCommittedDataLoss() { + withVotersObservers(3, 5, 0, 2, (seed, numVoters, numObservers) -> { + Cluster cluster = new Cluster(numVoters, numObservers, seed); + EventScheduler scheduler = new EventScheduler(cluster.random, cluster.time); + scheduler.addInvariant(new MonotonicHighWatermark(cluster)); + scheduler.addInvariant(new SingleLeader(cluster)); + scheduler.addValidation(new ConsistentCommittedData(cluster)); + + MessageRouter router = new MessageRouter(cluster); + + cluster.startAll(); + schedulePolling(scheduler, cluster, 3, 5); + scheduler.schedule(router::deliverAll, 0, 2, 5); + scheduler.schedule(new SequentialAppendAction(cluster), 0, 2, 3); + scheduler.runUntil(() -> cluster.anyReachedHighWatermark(10)); + + RaftNode node = cluster.randomRunning().orElseThrow(() -> + new AssertionError("Failed to find running node") + ); - // Kill a random node and drop all of its persistent state. The Raft - // protocol guarantees should still ensure we lose no committed data - // as long as a new leader is elected before the failed node is restarted. - cluster.killAndDeletePersistentState(node.nodeId); - scheduler.runUntil(() -> !cluster.hasLeader(node.nodeId) && cluster.hasConsistentLeader()); + cluster.killAndDeletePersistentState(node.nodeId); + scheduler.runUntil(() -> !cluster.hasLeader(node.nodeId) && cluster.hasConsistentLeader()); - // Now restart the failed node and ensure that it recovers. - long highWatermarkBeforeRestart = cluster.maxHighWatermarkReached(); - cluster.start(node.nodeId); - scheduler.runUntil(() -> cluster.allReachedHighWatermark(highWatermarkBeforeRestart + 10)); + long highWatermarkBeforeRestart = cluster.maxHighWatermarkReached(); + cluster.start(node.nodeId); + scheduler.runUntil(() -> cluster.allReachedHighWatermark(highWatermarkBeforeRestart + 10)); + }); } private EventScheduler schedulerWithDefaultInvariants(Cluster cluster) { @@ -643,8 +592,8 @@ private static class Cluster { final Map nodes = new HashMap<>(); final Map running = new HashMap<>(); - private Cluster(int numVoters, int numObservers, Random random) { - this.random = random; + private Cluster(int numVoters, int numObservers, long seed) { + this.random = new Random(seed); for (int nodeId = 0; nodeId < numVoters; nodeId++) { voters.put(nodeId, nodeFromId(nodeId)); @@ -911,8 +860,6 @@ void start(int nodeId) { messageQueue, persistentState.store, logContext, - time, - random, serde ); node.initialize(voterAddressMap, metrics); @@ -940,8 +887,6 @@ private RaftNode( MockMessageQueue messageQueue, MockQuorumStateStore store, LogContext logContext, - Time time, - Random random, RecordSerde intSerde ) { this.logContext = logContext; diff --git a/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftLogTest.java b/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftLogTest.java index ba3f39cdeb0fa..e905e77a35746 100644 --- a/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftLogTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/internals/KafkaRaftLogTest.java @@ -55,10 +55,6 @@ import org.apache.kafka.storage.internals.log.UnifiedLog; import org.apache.kafka.test.TestUtils; -import net.jqwik.api.AfterFailureMode; -import net.jqwik.api.ForAll; -import net.jqwik.api.Property; - import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -80,6 +76,7 @@ import java.util.Properties; import static org.apache.kafka.test.TestUtils.assertOptional; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; @@ -179,22 +176,24 @@ public void testInvalidMemoryRecords(MemoryRecords records, Optional log.appendAsFollower(records, Integer.MAX_VALUE) - ); - - assertEquals(previousEndOffset, log.endOffset().offset()); - } finally { - Utils.delete(tempDir); - } + @Test + public void testRandomRecords() { + ArbitraryMemoryRecords.forRandomRecords(100, records -> { + File tempDir = TestUtils.tempDirectory(); + try { + KafkaRaftLog log = buildMetadataLog(tempDir, mockTime); + long previousEndOffset = log.endOffset().offset(); + + assertThrows( + CorruptRecordException.class, + () -> log.appendAsFollower(records, Integer.MAX_VALUE) + ); + + assertEquals(previousEndOffset, log.endOffset().offset()); + } finally { + assertDoesNotThrow(() -> Utils.delete(tempDir)); + } + }); } @Test diff --git a/raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java b/raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java index c64a5fb953ff5..0fa74547ad30c 100644 --- a/raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java @@ -50,9 +50,6 @@ import org.apache.kafka.snapshot.RecordsSnapshotWriter; import org.apache.kafka.test.TestUtils; -import net.jqwik.api.ForAll; -import net.jqwik.api.Property; - import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; @@ -81,6 +78,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; public final class RecordsIteratorTest { private static final RecordSerde STRING_SERDE = new StringSerde(); @@ -98,67 +96,73 @@ void testEmptyRecords(Records records) { testIterator(List.of(), records, true); } - @Property(tries = 50) - public void testMemoryRecords( - @ForAll CompressionType compressionType, - @ForAll long seed - ) { - List> batches = createBatches(seed); - - MemoryRecords memRecords = buildRecords(compressionType, batches); - testIterator(batches, memRecords, true); + private interface CompressionTypeSeed { + void accept(CompressionType compressionType, long seed) throws IOException; } - @Property(tries = 50) - public void testFileRecords( - @ForAll CompressionType compressionType, - @ForAll long seed - ) throws IOException { - List> batches = createBatches(seed); + private void forCompressionTypeSeed(CompressionTypeSeed compressionTypeSeed) { + for (CompressionType ct : CompressionType.values()) { + for (int i = 0; i < 50; i++) { + long seed = new Random(System.nanoTime() + i).nextLong(); + try { + compressionTypeSeed.accept(ct, seed); + } catch (Throwable e) { + fail("Failed with compressionType=" + ct + ", seed=" + seed, e); + } + } + } + } - MemoryRecords memRecords = buildRecords(compressionType, batches); - FileRecords fileRecords = FileRecords.open(TestUtils.tempFile()); - fileRecords.append(memRecords); + @Test + public void testMemoryRecords() { + forCompressionTypeSeed((compressionType, seed) -> { + List> batches = createBatches(seed); + MemoryRecords memRecords = buildRecords(compressionType, batches); + testIterator(batches, memRecords, true); + }); + } - testIterator(batches, fileRecords, true); - fileRecords.close(); + @Test + public void testFileRecords() { + forCompressionTypeSeed((compressionType, seed) -> { + List> batches = createBatches(seed); + MemoryRecords memRecords = buildRecords(compressionType, batches); + FileRecords fileRecords = FileRecords.open(TestUtils.tempFile()); + fileRecords.append(memRecords); + testIterator(batches, fileRecords, true); + fileRecords.close(); + }); } - @Property(tries = 50) - public void testCrcValidation( - @ForAll CompressionType compressionType, - @ForAll long seed - ) throws IOException { - List> batches = createBatches(seed); - MemoryRecords memRecords = buildRecords(compressionType, batches); - // Read the Batch CRC for the first batch from the buffer - ByteBuffer readBuf = memRecords.buffer(); - readBuf.position(DefaultRecordBatch.CRC_OFFSET); - int actualCrc = readBuf.getInt(); - // Corrupt the CRC on the first batch - memRecords.buffer().putInt(DefaultRecordBatch.CRC_OFFSET, actualCrc + 1); - - assertThrows(CorruptRecordException.class, () -> testIterator(batches, memRecords, true)); - - FileRecords fileRecords = FileRecords.open(TestUtils.tempFile()); - fileRecords.append(memRecords); - assertThrows(CorruptRecordException.class, () -> testIterator(batches, fileRecords, true)); - - // Verify check does not trigger when doCrcValidation is false - assertDoesNotThrow(() -> testIterator(batches, memRecords, false)); - assertDoesNotThrow(() -> testIterator(batches, fileRecords, false)); - - // Fix the corruption - memRecords.buffer().putInt(DefaultRecordBatch.CRC_OFFSET, actualCrc); - - // Verify check does not trigger when the corruption is fixed - assertDoesNotThrow(() -> testIterator(batches, memRecords, true)); - FileRecords moreFileRecords = FileRecords.open(TestUtils.tempFile()); - moreFileRecords.append(memRecords); - assertDoesNotThrow(() -> testIterator(batches, moreFileRecords, true)); - - fileRecords.close(); - moreFileRecords.close(); + @Test + public void testCrcValidation() { + forCompressionTypeSeed((compressionType, seed) -> { + List> batches = createBatches(seed); + MemoryRecords memRecords = buildRecords(compressionType, batches); + ByteBuffer readBuf = memRecords.buffer(); + readBuf.position(DefaultRecordBatch.CRC_OFFSET); + int actualCrc = readBuf.getInt(); + memRecords.buffer().putInt(DefaultRecordBatch.CRC_OFFSET, actualCrc + 1); + + assertThrows(CorruptRecordException.class, () -> testIterator(batches, memRecords, true)); + + FileRecords fileRecords = FileRecords.open(TestUtils.tempFile()); + fileRecords.append(memRecords); + assertThrows(CorruptRecordException.class, () -> testIterator(batches, fileRecords, true)); + + assertDoesNotThrow(() -> testIterator(batches, memRecords, false)); + assertDoesNotThrow(() -> testIterator(batches, fileRecords, false)); + + memRecords.buffer().putInt(DefaultRecordBatch.CRC_OFFSET, actualCrc); + + assertDoesNotThrow(() -> testIterator(batches, memRecords, true)); + FileRecords moreFileRecords = FileRecords.open(TestUtils.tempFile()); + moreFileRecords.append(memRecords); + assertDoesNotThrow(() -> testIterator(batches, moreFileRecords, true)); + + fileRecords.close(); + moreFileRecords.close(); + }); } @Test diff --git a/raft/src/testFixtures/java/org/apache/kafka/common/record/internal/ArbitraryMemoryRecords.java b/raft/src/testFixtures/java/org/apache/kafka/common/record/internal/ArbitraryMemoryRecords.java index 573bbbe1d40ed..cc6b4f27ac92a 100644 --- a/raft/src/testFixtures/java/org/apache/kafka/common/record/internal/ArbitraryMemoryRecords.java +++ b/raft/src/testFixtures/java/org/apache/kafka/common/record/internal/ArbitraryMemoryRecords.java @@ -16,24 +16,31 @@ */ package org.apache.kafka.common.record.internal; -import net.jqwik.api.Arbitraries; -import net.jqwik.api.Arbitrary; -import net.jqwik.api.ArbitrarySupplier; +import org.junit.jupiter.api.function.ThrowingConsumer; import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.Random; -public final class ArbitraryMemoryRecords implements ArbitrarySupplier { - @Override - public Arbitrary get() { - return Arbitraries.randomValue(ArbitraryMemoryRecords::buildRandomRecords); - } +import static org.junit.jupiter.api.Assertions.fail; + +public final class ArbitraryMemoryRecords { - private static MemoryRecords buildRandomRecords(Random random) { - int size = random.nextInt(128) + 1; - byte[] bytes = new byte[size]; - random.nextBytes(bytes); + private ArbitraryMemoryRecords() {} - return MemoryRecords.readableRecords(ByteBuffer.wrap(bytes)); + public static void forRandomRecords(int tries, ThrowingConsumer test) { + for (int i = 0; i < tries; i++) { + long seed = System.nanoTime() + i; + Random random = new Random(seed); + int size = random.nextInt(128) + 1; + byte[] bytes = new byte[size]; + random.nextBytes(bytes); + MemoryRecords records = MemoryRecords.readableRecords(ByteBuffer.wrap(bytes)); + try { + test.accept(records); + } catch (Throwable e) { + fail("Failed with seed=" + seed + ", size=" + size + ", bytes=" + Arrays.toString(bytes), e); + } + } } }