Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .git-blame-ignore-revs
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
# Scala Steward: Reformat with sbt-java-formatter 0.8.0
4ccee08177d1795422c2831ac89ae877627efb45

# Scala Steward: Reformat with sbt-java-formatter 0.10.0
b91dcd573fd89ff1df6d5d3dd40a1809de341637
2 changes: 1 addition & 1 deletion project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ addSbtPlugin("net.aichler" % "sbt-jupiter-interface" % "0.11.1")
addSbtPlugin("de.heikoseeberger" % "sbt-header" % "5.10.0")
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.5.5")
addSbtPlugin("com.typesafe" % "sbt-mima-plugin" % "1.1.4")
addSbtPlugin("com.lightbend.sbt" % "sbt-java-formatter" % "0.8.0")
addSbtPlugin("com.github.sbt" % "sbt-java-formatter" % "0.10.0")
// docs
addSbtPlugin("io.akka" % "sbt-paradox-akka" % "25.10.0")
addSbtPlugin("com.lightbend.paradox" % "sbt-paradox-dependencies" % "0.2.4")
Expand Down
20 changes: 10 additions & 10 deletions tests/src/test/java/docs/javadsl/AssignmentTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package docs.javadsl;

import static org.junit.Assert.assertEquals;

import akka.Done;
import akka.actor.ActorSystem;
import akka.kafka.AutoSubscription;
Expand All @@ -13,31 +15,29 @@
import akka.kafka.Subscriptions;
import akka.kafka.javadsl.Consumer;
// #testkit
import akka.kafka.javadsl.Producer;
import akka.kafka.testkit.javadsl.TestcontainersKafkaJunit4Test;
// #testkit
import akka.kafka.javadsl.Producer;
import akka.kafka.tests.javadsl.LogCapturingJunit4;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
// #testkit
import akka.testkit.javadsl.TestKit;
// #testkit
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
// #testkit
import org.junit.AfterClass;
import org.junit.Test;
// #testkit
import org.junit.Rule;
import org.junit.Test;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.junit.Assert.assertEquals;
// #testkit

// #testkit

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ static final class User {
this.mame = mame;
}
}

// #user-entity

public static Behavior<User> userBehaviour() {
Expand Down
45 changes: 24 additions & 21 deletions tests/src/test/java/docs/javadsl/ConsumerExampleTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@

package docs.javadsl;

import static org.hamcrest.CoreMatchers.*;
import static org.hamcrest.MatcherAssert.*;
import static org.junit.Assert.assertEquals;

import akka.Done;
import akka.NotUsed;
import akka.actor.AbstractLoggingActor;
Expand All @@ -13,25 +17,36 @@
import akka.actor.Props;
import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.javadsl.Adapter;
// #consumerActorTyped
// #withTypedRebalanceListenerActor
import akka.actor.typed.javadsl.Behaviors;
// #withTypedRebalanceListenerActor
// #consumerActorTyped
// adds support for actors to a classic actor system and context
import akka.actor.typed.javadsl.Adapter;
// #consumerActorTyped
// #withTypedRebalanceListenerActor
import akka.japi.Pair;
import akka.kafka.*;
import akka.kafka.javadsl.Committer;
import akka.kafka.javadsl.Consumer;
import akka.kafka.javadsl.Producer;
import akka.kafka.javadsl.PartitionAssignmentHandler;
import akka.kafka.javadsl.Producer;
import akka.kafka.testkit.javadsl.TestcontainersKafkaTest;
import akka.kafka.tests.javadsl.LogCapturingExtension;
import akka.stream.RestartSettings;
import akka.stream.javadsl.*;
import akka.testkit.javadsl.TestKit;
import com.typesafe.config.Config;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
Expand All @@ -45,22 +60,6 @@
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.extension.ExtendWith;

import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;

import static org.hamcrest.CoreMatchers.*;
import static org.hamcrest.MatcherAssert.*;
import static org.junit.Assert.assertEquals;

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@ExtendWith(LogCapturingExtension.class)
class ConsumerExampleTest extends TestcontainersKafkaTest {
Expand Down Expand Up @@ -99,6 +98,7 @@ private <T> Flow<T, T, NotUsed> business() {
consumerSettings
.withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
.withProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000");

// #settings-autocommit

@Test
Expand Down Expand Up @@ -156,6 +156,7 @@ public CompletionStage<Done> storeProcessedOffset(long offset) { // ... }

// #plainSource
}

// #plainSource

@Test
Expand Down Expand Up @@ -640,7 +641,9 @@ void consumerMetrics() throws Exception {
// #consumerMetrics
sleepMillis(
100,
"to let the control establish itself (fails with `java.lang.IllegalStateException: not yet initialized: only setHandler is allowed in GraphStageLogic constructor)` otherwise");
"to let the control establish itself (fails with `java.lang.IllegalStateException: not yet"
+ " initialized: only setHandler is allowed in GraphStageLogic constructor)`"
+ " otherwise");
// #consumerMetrics
CompletionStage<Map<MetricName, Metric>> metrics = control.getMetrics();
metrics.thenAccept(map -> System.out.println("Metrics: " + map));
Expand Down
20 changes: 11 additions & 9 deletions tests/src/test/java/docs/javadsl/ConsumerSettingsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package docs.javadsl;

import static org.junit.jupiter.api.Assertions.*;

import akka.actor.ActorSystem;
import akka.kafka.ConsumerSettings;
import akka.kafka.ConsumerSettingsSpec$;
Expand All @@ -18,8 +20,6 @@
import org.apache.kafka.common.serialization.StringDeserializer;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.*;

public class ConsumerSettingsTest {

@Test
Expand All @@ -44,13 +44,15 @@ public void discoverySetup() throws Exception {
@Test
public void setAssignor() throws Exception {
ActorSystem system = ActorSystem.create("ConsumerSettingsTest");
ConsumerSettings<String, String> settings = ConsumerSettings.create(system, new StringDeserializer(), new StringDeserializer())
.withPartitionAssignmentStrategies(new String[] {
org.apache.kafka.clients.consumer.CooperativeStickyAssignor.class.getName(),
org.apache.kafka.clients.consumer.StickyAssignor.class.getName()
});
ConsumerSettings<String, String> settings =
ConsumerSettings.create(system, new StringDeserializer(), new StringDeserializer())
.withPartitionAssignmentStrategies(
new String[] {
org.apache.kafka.clients.consumer.CooperativeStickyAssignor.class.getName(),
org.apache.kafka.clients.consumer.StickyAssignor.class.getName()
});
assertEquals(
settings.getProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG),
"org.apache.kafka.clients.consumer.CooperativeStickyAssignor,org.apache.kafka.clients.consumer.StickyAssignor");
settings.getProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG),
"org.apache.kafka.clients.consumer.CooperativeStickyAssignor,org.apache.kafka.clients.consumer.StickyAssignor");
}
}
27 changes: 13 additions & 14 deletions tests/src/test/java/docs/javadsl/MetadataClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,19 @@
package docs.javadsl;

// #metadataClient
import static java.util.stream.Collectors.toSet;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.core.Is.is;

import akka.actor.ActorSystem;
import akka.kafka.ConsumerSettings;
import akka.kafka.javadsl.MetadataClient;
import akka.kafka.testkit.javadsl.TestcontainersKafkaJunit4Test;
import akka.kafka.tests.javadsl.LogCapturingJunit4;
import akka.testkit.javadsl.TestKit;
import akka.util.Timeout;
// #metadataClient
import akka.kafka.tests.javadsl.LogCapturingJunit4;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.hamcrest.core.IsInstanceOf;
import org.junit.AfterClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand All @@ -31,11 +28,13 @@
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import static java.util.stream.Collectors.toSet;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.core.Is.is;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.hamcrest.core.IsInstanceOf;
import org.junit.AfterClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

public class MetadataClientTest extends TestcontainersKafkaJunit4Test {

Expand Down
16 changes: 8 additions & 8 deletions tests/src/test/java/docs/javadsl/ProducerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@

package docs.javadsl;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;

import akka.Done;
import akka.actor.ActorSystem;
import akka.kafka.*;
Expand All @@ -20,6 +23,10 @@
import akka.testkit.javadsl.TestKit;
// #testkit
import com.typesafe.config.Config;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
Expand All @@ -28,15 +35,8 @@
import org.junit.jupiter.api.*;
import org.junit.jupiter.api.TestInstance.Lifecycle;
import org.junit.jupiter.api.extension.ExtendWith;
// #testkit

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
// #testkit

// #testkit

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@

package docs.javadsl;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;

import akka.Done;
import akka.actor.ActorSystem;
import akka.kafka.ConsumerSettings;
Expand All @@ -23,6 +26,14 @@
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
// #imports
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
Expand All @@ -35,18 +46,6 @@
import org.junit.AfterClass;
import org.junit.Test;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;

// #schema-registry-settings
public class SchemaRegistrySerializationTest extends TestcontainersKafkaJunit4Test {

Expand Down
25 changes: 12 additions & 13 deletions tests/src/test/java/docs/javadsl/SerializationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@

package docs.javadsl;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;

import akka.Done;
import akka.actor.ActorSystem;
import akka.kafka.ConsumerSettings;
Expand All @@ -19,34 +22,30 @@
import akka.stream.javadsl.Source;
import akka.testkit.javadsl.TestKit;
// #jackson-imports
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.fasterxml.jackson.core.JsonParseException;
// #jackson-imports
// #protobuf-imports
// the Protobuf generated class
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.databind.ObjectWriter;
import docs.javadsl.proto.OrderMessages;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
// #protobuf-imports
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.extension.ExtendWith;

import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@ExtendWith(LogCapturingExtension.class)
public class SerializationTest extends TestcontainersKafkaTest {
Expand Down
Loading
Loading