Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .scala-steward.conf
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ updates.ignore = [
{ groupId = "org.scalameta", artifactId = "scalafmt-core" }
]
updates.pin = [
{ groupId = "org.apache.kafka", artifactId="kafka-clients", version="3.9." }
# To be updated in tandem with upstream Akka
{ groupId = "org.scalatest", artifactId = "scalatest", version = "3.2." }
]
Expand Down
8 changes: 5 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ val AkkaBinaryVersionForDocs = VersionNumber(akkaVersion).numbers match {
case Seq(major, minor, _*) => s"$major.$minor"
}

// Keep .scala-steward.conf pin in sync
val kafkaVersion = "3.9.1"
val KafkaVersionForDocs = "39"
val kafkaVersion = "4.1.0"
val KafkaVersionForDocs = VersionNumber(kafkaVersion).numbers match {
case Seq(major, minor, _*) => s"$major$minor" // e.g. https://kafka.apache.org/41/documentation/
}
// This should align with the ScalaTest version used in the Akka testkit
// https://github.com/akka/akka/blob/main/project/Dependencies.scala#L44
val scalatestVersion = "3.2.16"
Expand Down Expand Up @@ -106,6 +107,7 @@ val commonSettings = Def.settings(
"11"
),
scalacOptions ++= Seq(
"--deprecation",
"-encoding",
"UTF-8", // yes, this is 2 args
"-release",
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/mima-filters/7.0.4.backwards.excludes
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# kafka-clients 4.0.0
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.kafka.RestrictedConsumer.committed")
6 changes: 0 additions & 6 deletions core/src/main/scala/akka/kafka/RestrictedConsumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,6 @@ final class RestrictedConsumer(consumer: Consumer[_, _], duration: java.time.Dur
def commitSync(offsets: java.util.Map[TopicPartition, OffsetAndMetadata]): Unit =
consumer.commitSync(offsets, duration)

/**
* See [[org.apache.kafka.clients.consumer.KafkaConsumer#committed(TopicPartition,java.time.Duration)]]
*/
@deprecated("use `committed(java.util.Set[TopicPartition])`", "2.0.5")
def committed(tp: TopicPartition): OffsetAndMetadata = consumer.committed(tp, duration)

/**
* See [[org.apache.kafka.clients.consumer.KafkaConsumer#committed(java.util.Set[TopicPartition],java.time.Duration)]]
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@ object ConsumerResetProtection {
records
.partitions()
.asScala
.flatMap(maybeProtectRecords(consumer, _, records).toList)
.flatMap(maybeProtectRecords(consumer, _, records).toMap)
.toMap
.asJava

new ConsumerRecords[K, V](safe)
new ConsumerRecords[K, V](safe, java.util.Map.of())
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import org.apache.kafka.common.errors.{
TimeoutException
}
import org.apache.kafka.common.{Metric, MetricName, TopicPartition}

import java.util
import scala.annotation.nowarn
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._
Expand Down Expand Up @@ -467,7 +469,7 @@ import scala.util.control.NonFatal
ref ! Messages(req.requestId, Iterator.empty)
}
partitionAssignmentHandler.postStop()
consumer.close(settings.getCloseTimeout)
consumer.close(CloseOptions.timeout(settings.getCloseTimeout))
super.postStop()
}

Expand Down Expand Up @@ -737,8 +739,9 @@ import scala.util.control.NonFatal
case req: Metadata.GetCommittedOffset @nowarn("cat=deprecation") =>
@nowarn("cat=deprecation") val resp = Metadata.CommittedOffset(
Try {
@nowarn("cat=deprecation") val offset = consumer.committed(req.partition, settings.getMetadataRequestTimeout)
offset
@nowarn("cat=deprecation") val offset =
consumer.committed(Set(req.partition).asJava, settings.getMetadataRequestTimeout)
offset.asScala.headOption.map(_._2).get
},
req.partition
)
Expand Down
1 change: 1 addition & 0 deletions docs/src/main/paradox/home.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ See all releases in [GitHub releases](https://github.com/akka/alpakka-kafka/rele

| Kafka client | Scala Versions | Akka version | Alpakka Kafka Connector
|----------------------------------------------------------------------------|------------------|-----------------|-------------------------
| 4.1.0 | 3.3, 2.13 | 2.10.5+ | [release 8.0.0](https://github.com/akka/alpakka-kafka/releases/tag/v8.0.0)
| 3.9.1 | 3.3, 2.13 | 2.10.5+ | [release 7.0.4](https://github.com/akka/alpakka-kafka/releases/tag/v7.0.4)
| 3.9.0 | 3.3, 2.13 | 2.10.5+ | [release 7.0.3](https://github.com/akka/alpakka-kafka/releases/tag/v7.0.3)
| 3.7.1 | 3.3, 2.13 | 2.10.5+ | [release 7.0.2](https://github.com/akka/alpakka-kafka/releases/tag/v7.0.2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.kafka.clients.admin.MemberDescription;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.ConsumerGroupState;
import org.apache.kafka.common.GroupState;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.slf4j.Logger;
Expand Down Expand Up @@ -154,7 +154,7 @@ public void waitUntilConsumerSummary(
groupId,
group -> {
try {
return group.state() == ConsumerGroupState.STABLE && predicate.test(group.members());
return group.groupState() == GroupState.STABLE && predicate.test(group.members());
} catch (Exception ex) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ package akka.kafka.testkit.scaladsl
import java.time.Duration
import java.util
import java.util.concurrent.TimeUnit

import akka.Done
import akka.actor.ActorSystem
import akka.event.LoggingAdapter
Expand All @@ -23,7 +22,7 @@ import akka.stream.testkit.scaladsl.TestSink
import akka.testkit.TestKit
import org.apache.kafka.clients.admin._
import org.apache.kafka.clients.producer.{ProducerRecord, Producer => KProducer}
import org.apache.kafka.common.ConsumerGroupState
import org.apache.kafka.common.{ConsumerGroupState, GroupState}
import org.slf4j.{Logger, LoggerFactory}

import scala.collection.immutable
Expand Down Expand Up @@ -112,7 +111,7 @@ abstract class KafkaSpec(_kafkaPort: Int, val zooKeeperPort: Int, actorSystem: A
*/
def waitUntilConsumerSummary(groupId: String)(predicate: PartialFunction[List[MemberDescription], Boolean]): Unit =
waitUntilConsumerGroup(groupId) { group =>
group.state() == ConsumerGroupState.STABLE &&
group.groupState() == GroupState.STABLE &&
Try(predicate(group.members().asScala.toList)).getOrElse(false)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
consumer.message(partition, "value 2")
)

val producer = new MockProducer[String, String](true, new StringSerializer, new StringSerializer)
val producer =
new MockProducer[String, String](true, new RoundRobinPartitioner(), new StringSerializer, new StringSerializer)
val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer)
.withProducer(producer)
val commitInterval = 200.millis
Expand Down Expand Up @@ -111,7 +112,8 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
consumer.message(partition, "send")
)

val producer = new MockProducer[String, String](true, new StringSerializer, new StringSerializer)
val producer =
new MockProducer[String, String](true, new RoundRobinPartitioner(), new StringSerializer, new StringSerializer)
val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer)
.withProducer(producer)
val commitInterval = 200.millis
Expand Down Expand Up @@ -153,7 +155,8 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
consumer.message(partition, "value 2")
)

val producer = new MockProducer[String, String](true, new StringSerializer, new StringSerializer)
val producer =
new MockProducer[String, String](true, new RoundRobinPartitioner(), new StringSerializer, new StringSerializer)
val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer)
.withProducer(producer)
val committerSettings = CommitterSettings(system).withMaxBatch(2L).withMaxInterval(10.seconds)
Expand Down Expand Up @@ -189,7 +192,8 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
consumer.message(partition, "value 2")
)

val producer = new MockProducer[String, String](true, new StringSerializer, new StringSerializer)
val producer =
new MockProducer[String, String](true, new RoundRobinPartitioner(), new StringSerializer, new StringSerializer)
val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer)
.withProducer(producer)
val committerSettings = CommitterSettings(system).withMaxBatch(2L).withMaxInterval(10.seconds)
Expand Down Expand Up @@ -225,7 +229,8 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
val producerRecordsPerInput = 2
val totalProducerRecords = elements.size * producerRecordsPerInput

val producer = new MockProducer[String, String](true, new StringSerializer, new StringSerializer)
val producer =
new MockProducer[String, String](true, new RoundRobinPartitioner(), new StringSerializer, new StringSerializer)
val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer)
.withProducer(producer)
val committerSettings = CommitterSettings(system).withMaxBatch(elements.size.longValue())
Expand Down Expand Up @@ -260,7 +265,8 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
val consumer = FakeConsumer(groupId, topic, startOffset = 1616L)
val message = consumer.message(partition, "increment the offset")

val producer = new MockProducer[String, String](true, new StringSerializer, new StringSerializer)
val producer =
new MockProducer[String, String](true, new RoundRobinPartitioner(), new StringSerializer, new StringSerializer)
val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer)
.withProducer(producer)
val committerSettings = CommitterSettings(system).withMaxBatch(1)
Expand Down Expand Up @@ -294,7 +300,8 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
consumer.message(partition, "value 2")
)

val producer = new MockProducer[String, String](true, new StringSerializer, new StringSerializer)
val producer =
new MockProducer[String, String](true, new RoundRobinPartitioner(), new StringSerializer, new StringSerializer)
val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer)
.withProducer(producer)
val committerSettings = CommitterSettings(system)
Expand Down Expand Up @@ -332,7 +339,8 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
consumer.message(partition, "value 2")
)

val producer = new MockProducer[String, String](true, new StringSerializer, new StringSerializer)
val producer =
new MockProducer[String, String](true, new RoundRobinPartitioner(), new StringSerializer, new StringSerializer)
val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer)
.withProducer(producer)
// choose a large commit interval so that completion happens before
Expand Down Expand Up @@ -370,7 +378,8 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
consumer.message(partition, "value 2")
)

val producer = new MockProducer[String, String](true, new StringSerializer, new StringSerializer)
val producer =
new MockProducer[String, String](true, new RoundRobinPartitioner(), new StringSerializer, new StringSerializer)
val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer)
.withProducer(producer)
val commitInterval = 5.seconds
Expand Down Expand Up @@ -411,7 +420,8 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
consumer.message(partition, "value 2")
)

val producer = new MockProducer[String, String](true, new StringSerializer, new StringSerializer)
val producer =
new MockProducer[String, String](true, new RoundRobinPartitioner(), new StringSerializer, new StringSerializer)
val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer)
.withProducer(producer)
val commitInterval = 5.seconds
Expand Down Expand Up @@ -450,7 +460,8 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
)

// this producer does not auto complete messages
val producer = new MockProducer[String, String](false, new StringSerializer, new StringSerializer)
val producer =
new MockProducer[String, String](false, new RoundRobinPartitioner(), new StringSerializer, new StringSerializer)
val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer)
.withProducer(producer)
val committerSettings = CommitterSettings(system).withMaxBatch(1L)
Expand Down Expand Up @@ -491,7 +502,8 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
consumer.message(partition, "value 2")
)

val producer = new MockProducer[String, String](false, new StringSerializer, new StringSerializer)
val producer =
new MockProducer[String, String](false, new RoundRobinPartitioner(), new StringSerializer, new StringSerializer)
val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer)
.withProducer(producer)
val committerSettings = CommitterSettings(system).withMaxBatch(1L)
Expand Down Expand Up @@ -537,7 +549,8 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
consumer.message(partition, "value 2")
)

val producer = new MockProducer[String, String](false, new StringSerializer, new StringSerializer)
val producer =
new MockProducer[String, String](false, new RoundRobinPartitioner(), new StringSerializer, new StringSerializer)
val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer)
.withProducer(producer)
// choose a large commit interval so that completion happens before
Expand Down Expand Up @@ -586,7 +599,8 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
consumer.message(partition, "value 2")
)

val producer = new MockProducer[String, String](true, new StringSerializer, new StringSerializer)
val producer =
new MockProducer[String, String](true, new RoundRobinPartitioner(), new StringSerializer, new StringSerializer)
val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer)
.withProducer(producer)
val committerSettings = CommitterSettings(system).withMaxBatch(2L)
Expand Down Expand Up @@ -621,7 +635,8 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
consumer.message(partition, "value 2")
)

val producer = new MockProducer[String, String](true, new StringSerializer, new StringSerializer)
val producer =
new MockProducer[String, String](true, new RoundRobinPartitioner(), new StringSerializer, new StringSerializer)
val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer)
.withProducer(producer)
val committerSettings = CommitterSettings(system).withMaxBatch(2L)
Expand Down Expand Up @@ -662,7 +677,8 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
consumer.message(partition, "value 2")
)

val producer = new MockProducer[String, String](true, new StringSerializer, new StringSerializer)
val producer =
new MockProducer[String, String](true, new RoundRobinPartitioner(), new StringSerializer, new StringSerializer)
val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer)
.withProducer(producer)
val committerSettings = CommitterSettings(system)
Expand Down Expand Up @@ -698,7 +714,8 @@ class CommittingProducerSinkSpec(_system: ActorSystem)
}

it should "shut down without elements" in assertAllStagesStopped {
val producer = new MockProducer[String, String](true, new StringSerializer, new StringSerializer)
val producer =
new MockProducer[String, String](true, new RoundRobinPartitioner(), new StringSerializer, new StringSerializer)
val producerSettings = ProducerSettings(system, new StringSerializer, new StringSerializer)
.withProducer(producer)
val committerSettings = CommitterSettings(system).withMaxInterval(1.second)
Expand Down
4 changes: 1 addition & 3 deletions tests/src/test/scala/akka/kafka/internal/ConsumerDummy.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ abstract class ConsumerDummy[K, V] extends Consumer[K, V] {
override def subscribe(pattern: java.util.regex.Pattern, callback: ConsumerRebalanceListener): Unit = ???
override def subscribe(pattern: java.util.regex.Pattern): Unit = ???
override def unsubscribe(): Unit = ???
override def poll(timeout: Long): ConsumerRecords[K, V] = ???
override def commitSync(): Unit = ???
override def commitSync(offsets: java.util.Map[TopicPartition, OffsetAndMetadata]): Unit = ???
override def commitAsync(): Unit = ???
Expand All @@ -55,7 +54,6 @@ abstract class ConsumerDummy[K, V] extends Consumer[K, V] {
override def seekToEnd(partitions: java.util.Collection[TopicPartition]): Unit = ???
override def position(partition: TopicPartition): Long = ???
override def position(partition: TopicPartition, timeout: java.time.Duration): Long = ???
override def committed(partition: TopicPartition): OffsetAndMetadata = ???
override def metrics(): java.util.Map[MetricName, _ <: Metric] = ???
override def partitionsFor(topic: String): java.util.List[PartitionInfo] = ???
override def listTopics(): java.util.Map[String, java.util.List[PartitionInfo]] = ???
Expand All @@ -76,12 +74,12 @@ abstract class ConsumerDummy[K, V] extends Consumer[K, V] {
): java.util.Map[TopicPartition, java.lang.Long] = ???
override def close(): Unit = {}
override def close(timeout: java.time.Duration): Unit = {}
override def close(options: org.apache.kafka.clients.consumer.CloseOptions): Unit = {}
override def wakeup(): Unit = ???

override def commitSync(timeout: java.time.Duration): Unit = ???
override def commitSync(offsets: java.util.Map[TopicPartition, OffsetAndMetadata],
timeout: java.time.Duration): Unit = ???
override def committed(partition: TopicPartition, timeout: java.time.Duration): OffsetAndMetadata = ???
override def committed(partitions: util.Set[TopicPartition]): util.Map[TopicPartition, OffsetAndMetadata] = ???
override def committed(partitions: util.Set[TopicPartition],
timeout: Duration): util.Map[TopicPartition, OffsetAndMetadata] = ???
Expand Down
8 changes: 5 additions & 3 deletions tests/src/test/scala/akka/kafka/internal/ConsumerMock.scala
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class ConsumerMock[K, V](handler: ConsumerMock.CommitHandler = new ConsumerMock.
if (releaseCommitCallbacks.get()) {
handler.onComplete()
}
new ConsumerRecords[K, V](records.asJava)
new ConsumerRecords[K, V](records.asJava, java.util.Map.of())
}
})
Mockito
Expand Down Expand Up @@ -161,7 +161,7 @@ class ConsumerMock[K, V](handler: ConsumerMock.CommitHandler = new ConsumerMock.
}

def verifyClosed(mode: VerificationMode = Mockito.times(1)): Unit =
verify(mock, mode).close(ConsumerMock.closeTimeout.toJava)
verify(mock, mode).close(ArgumentMatchers.isA(classOf[CloseOptions]))

def verifyPoll(mode: VerificationMode = Mockito.atLeastOnce()): ConsumerRecords[K, V] =
verify(mock, mode).poll(ArgumentMatchers.any[java.time.Duration])
Expand Down Expand Up @@ -213,7 +213,9 @@ class FailingConsumerMock[K, V](throwable: Throwable, failOnCallNumber: Int*) ex
callNumber = callNumber + 1
if (failOnCallNumber.contains(callNumber))
throw throwable
else new ConsumerRecords[K, V](Map.empty[TopicPartition, java.util.List[ConsumerRecord[K, V]]].asJava)
else
new ConsumerRecords[K, V](Map.empty[TopicPartition, java.util.List[ConsumerRecord[K, V]]].asJava,
java.util.Map.of())
}
})
}
Loading
Loading