From 9b734c840090484ba958e361df590c041aef94c0 Mon Sep 17 00:00:00 2001 From: IgorFedchenko Date: Fri, 18 Oct 2019 22:33:49 +0300 Subject: [PATCH 1/7] Major update of producers core to match alpakka codebase and add features --- examples/SimpleProducer/Program.cs | 4 +- .../CommittableSourceIntegrationTests.cs | 6 +- .../Integration/PlainSinkIntegrationTests.cs | 9 +- .../KafkaIntegrationTests.cs | 6 +- src/Akka.Streams.Kafka/Dsl/KafkaProducer.cs | 106 +++++-- .../Extensions/CollectionExtensions.cs | 8 + .../Extensions/ProducerExtensions.cs | 27 ++ .../Helpers/ProducerStage.cs | 45 +++ src/Akka.Streams.Kafka/Messages/Envelope.cs | 269 +++++++++++++++++- .../Messages/MessageAndMeta.cs | 31 -- .../Messages/ProducerRecord.cs | 156 ++++++++++ .../Settings/ProducerSettings.cs | 3 +- .../Stages/Producers/DefaultProducerStage.cs | 237 +++++++++++++++ .../Stages/Producers/ProducerStage.cs | 179 ------------ 14 files changed, 842 insertions(+), 244 deletions(-) create mode 100644 src/Akka.Streams.Kafka/Extensions/ProducerExtensions.cs create mode 100644 src/Akka.Streams.Kafka/Helpers/ProducerStage.cs delete mode 100644 src/Akka.Streams.Kafka/Messages/MessageAndMeta.cs create mode 100644 src/Akka.Streams.Kafka/Messages/ProducerRecord.cs create mode 100644 src/Akka.Streams.Kafka/Stages/Producers/DefaultProducerStage.cs delete mode 100644 src/Akka.Streams.Kafka/Stages/Producers/ProducerStage.cs diff --git a/examples/SimpleProducer/Program.cs b/examples/SimpleProducer/Program.cs index 7618fd86..ec3770d6 100644 --- a/examples/SimpleProducer/Program.cs +++ b/examples/SimpleProducer/Program.cs @@ -31,8 +31,8 @@ public static void Main(string[] args) Source .Cycle(() => Enumerable.Range(1, 100).GetEnumerator()) .Select(c => c.ToString()) - .Select(elem => new MessageAndMeta { Topic = "akka100", Message = new Message { Value = elem }}) - .Via(KafkaProducer.PlainFlow(producerSettings)) + .Select(elem => new ProducerRecord { Topic = "akka100", Message = new Message { Value = elem }}) + .Via(KafkaProducer.FlexiFlow(producerSettings)) .Select(record => { Console.WriteLine($"Producer: {record.Topic}/{record.Partition} {record.Offset}: {record.Value}"); diff --git a/src/Akka.Streams.Kafka.Tests/Integration/CommittableSourceIntegrationTests.cs b/src/Akka.Streams.Kafka.Tests/Integration/CommittableSourceIntegrationTests.cs index 0ab47e91..1d31c8e2 100644 --- a/src/Akka.Streams.Kafka.Tests/Integration/CommittableSourceIntegrationTests.cs +++ b/src/Akka.Streams.Kafka.Tests/Integration/CommittableSourceIntegrationTests.cs @@ -35,7 +35,7 @@ public async Task CommitableSource_consumes_messages_from_Producer_without_commi await Source .From(Enumerable.Range(1, elementsCount)) - .Select(elem => new MessageAndMeta { TopicPartition = topicPartition1, Message = new Message { Value = elem.ToString() } }) + .Select(elem => new ProducerRecord(topicPartition1, elem.ToString())) .RunWith(KafkaProducer.PlainSink(ProducerSettings), Materializer); var consumerSettings = CreateConsumerSettings(group1); @@ -65,7 +65,7 @@ public async Task CommitableSource_resume_from_commited_offset() await Source .From(Enumerable.Range(1, 100)) - .Select(elem => new MessageAndMeta { TopicPartition = topicPartition1, Message = new Message { Value = elem.ToString() } }) + .Select(elem => new ProducerRecord(topicPartition1, elem.ToString())) .RunWith(KafkaProducer.PlainSink(ProducerSettings), Materializer); var consumerSettings = CreateConsumerSettings(group1); @@ -103,7 +103,7 @@ await Source // some concurrent publish await Source .From(Enumerable.Range(101, 100)) - .Select(elem => new MessageAndMeta { TopicPartition = topicPartition1, Message = new Message { Value = elem.ToString() } }) + .Select(elem => new ProducerRecord(topicPartition1, elem.ToString())) .RunWith(KafkaProducer.PlainSink(ProducerSettings), Materializer); probe2.Request(100); diff --git a/src/Akka.Streams.Kafka.Tests/Integration/PlainSinkIntegrationTests.cs b/src/Akka.Streams.Kafka.Tests/Integration/PlainSinkIntegrationTests.cs index f1d9df02..6b4f5984 100644 --- a/src/Akka.Streams.Kafka.Tests/Integration/PlainSinkIntegrationTests.cs +++ b/src/Akka.Streams.Kafka.Tests/Integration/PlainSinkIntegrationTests.cs @@ -42,7 +42,7 @@ public async Task PlainSink_should_publish_100_elements_to_Kafka_producer() await Source .From(Enumerable.Range(1, 100)) .Select(c => c.ToString()) - .Select(elem => new MessageAndMeta { TopicPartition = topicPartition1, Message = new Message { Value = elem } }) + .Select(elem => new ProducerRecord(topicPartition1, elem.ToString())) .RunWith(KafkaProducer.PlainSink(ProducerSettings), Materializer); var dateTimeStart = DateTime.UtcNow; @@ -80,9 +80,10 @@ public async Task PlainSink_should_fail_stage_if_broker_unavailable() var probe = Source .From(Enumerable.Range(1, 100)) .Select(c => c.ToString()) - .Select(elem => new MessageAndMeta { Topic = topic1, Message = new Message { Value = elem } }) - .Via(KafkaProducer.PlainFlow(config)) - .RunWith(this.SinkProbe>(), Materializer); + .Select(elem => new ProducerRecord(topic1, elem.ToString())) + .Select(record => new Message(record, NotUsed.Instance) as IEnvelope) + .Via(KafkaProducer.FlexiFlow(config)) + .RunWith(this.SinkProbe>(), Materializer); probe.ExpectSubscription(); probe.OnError(new KafkaException(ErrorCode.Local_Transport)); diff --git a/src/Akka.Streams.Kafka.Tests/KafkaIntegrationTests.cs b/src/Akka.Streams.Kafka.Tests/KafkaIntegrationTests.cs index b15c71ab..88188b33 100644 --- a/src/Akka.Streams.Kafka.Tests/KafkaIntegrationTests.cs +++ b/src/Akka.Streams.Kafka.Tests/KafkaIntegrationTests.cs @@ -62,7 +62,7 @@ protected async Task ProduceStrings(string topic, IEnumerable range, Produc { await Source .From(range) - .Select(elem => new MessageAndMeta { Topic = topic, Message = new Message { Value = elem.ToString() } }) + .Select(elem => new ProducerRecord(topic, elem.ToString())) .RunWith(KafkaProducer.PlainSink(producerSettings), Materializer); } @@ -70,7 +70,7 @@ protected async Task ProduceStrings(Func partitionSelector, { await Source .From(range) - .Select(elem => new MessageAndMeta { TopicPartition = partitionSelector(elem), Message = new Message { Value = elem.ToString() } }) + .Select(elem => new ProducerRecord(partitionSelector(elem), elem.ToString())) .RunWith(KafkaProducer.PlainSink(producerSettings), Materializer); } @@ -78,7 +78,7 @@ protected async Task ProduceStrings(TopicPartition topicPartition, IEnumerable new MessageAndMeta { TopicPartition = topicPartition, Message = new Message { Value = elem.ToString() } }) + .Select(elem => new ProducerRecord(topicPartition, elem.ToString())) .RunWith(KafkaProducer.PlainSink(producerSettings), Materializer); } diff --git a/src/Akka.Streams.Kafka/Dsl/KafkaProducer.cs b/src/Akka.Streams.Kafka/Dsl/KafkaProducer.cs index b708a7e4..9426c168 100644 --- a/src/Akka.Streams.Kafka/Dsl/KafkaProducer.cs +++ b/src/Akka.Streams.Kafka/Dsl/KafkaProducer.cs @@ -14,34 +14,76 @@ namespace Akka.Streams.Kafka.Dsl public static class KafkaProducer { /// - /// The `PlainSink` can be used for publishing records to Kafka topics. + /// + /// Create a sink for publishing records to Kafka topics. + /// + /// + /// + /// The contains the topic name to which the record is being sent, an optional + /// partition number, and an optional key and value. + /// /// - public static Sink, Task> PlainSink(ProducerSettings settings) + public static Sink, Task> PlainSink(ProducerSettings settings) { return Flow - .Create>() - .Via(PlainFlow(settings)) - .ToMaterialized(Sink.Ignore>(), Keep.Right); + .Create>() + .Select(record => new Message(record, NotUsed.Instance) as IEnvelope) + .Via(FlexiFlow(settings)) + .ToMaterialized(Sink.Ignore>(), Keep.Right); } /// - /// The `PlainSink` can be used for publishing records to Kafka topics. + /// + /// Create a sink for publishing records to Kafka topics. + /// + /// + /// + /// The contains the topic name to which the record is being sent, an optional + /// partition number, and an optional key and value. + /// + /// + /// + /// Supports sharing a Kafka Producer instance. + /// /// - public static Sink, Task> PlainSink(ProducerSettings settings, IProducer producer) + public static Sink, Task> PlainSink(ProducerSettings settings, IProducer producer) { return Flow - .Create>() - .Via(PlainFlow(settings, producer)) - .ToMaterialized(Sink.Ignore>(), Keep.Right); + .Create>() + .Select(record => new Message(record, NotUsed.Instance) as IEnvelope) + .Via(FlexiFlow(settings, producer)) + .ToMaterialized(Sink.Ignore>(), Keep.Right); } /// - /// Publish records to Kafka topics and then continue the flow. Possibility to pass through a message, which - /// can for example be a that can be committed later in the flow. + /// + /// Create a flow to conditionally publish records to Kafka topics and then pass it on. + /// + /// + /// + /// It publishes records to Kafka topics conditionally: + /// + /// + /// publishes a single message to its topic, and continues in the stream as + /// + /// + /// publishes all messages in its `records` field, and continues in the stream as + /// + /// + /// does not publish anything, and continues in the stream as + /// + /// + /// + /// + /// + /// The messages support the possibility to pass through arbitrary data, which can for example be a + /// or that can be committed later in the flow. + /// /// - public static Flow, DeliveryReport, NotUsed> PlainFlow(ProducerSettings settings) + public static Flow, IResults, NotUsed> FlexiFlow( + ProducerSettings settings) { - var flow = Flow.FromGraph(new ProducerStage( + var flow = Flow.FromGraph(new DefaultProducerStage, IResults>( settings, closeProducerOnStop: true)) .SelectAsync(settings.Parallelism, x => x); @@ -52,18 +94,44 @@ public static Flow, DeliveryReport, N } /// - /// Publish records to Kafka topics and then continue the flow. Possibility to pass through a message, which - /// can for example be a that can be committed later in the flow. + /// + /// Create a flow to conditionally publish records to Kafka topics and then pass it on. + /// + /// + /// + /// It publishes records to Kafka topics conditionally: + /// + /// + /// publishes a single message to its topic, and continues in the stream as + /// + /// + /// publishes all messages in its `records` field, and continues in the stream as + /// + /// + /// does not publish anything, and continues in the stream as + /// + /// + /// + /// + /// + /// The messages support the possibility to pass through arbitrary data, which can for example be a + /// or that can be committed later in the flow. + /// + /// + /// + /// Supports sharing a Kafka Producer instance. + /// /// - public static Flow, DeliveryReport, NotUsed> PlainFlow(ProducerSettings settings, IProducer producer) + public static Flow, IResults, NotUsed> FlexiFlow( + ProducerSettings settings, IProducer producer) { - var flow = Flow.FromGraph(new ProducerStage( + var flow = Flow.FromGraph(new DefaultProducerStage, IResults>( settings, closeProducerOnStop: false, customProducerProvider: () => producer)) .SelectAsync(settings.Parallelism, x => x); - return string.IsNullOrEmpty(settings.DispatcherId) + return string.IsNullOrEmpty(settings.DispatcherId) ? flow : flow.WithAttributes(ActorAttributes.CreateDispatcher(settings.DispatcherId)); } diff --git a/src/Akka.Streams.Kafka/Extensions/CollectionExtensions.cs b/src/Akka.Streams.Kafka/Extensions/CollectionExtensions.cs index 6ebd6d78..b97772c1 100644 --- a/src/Akka.Streams.Kafka/Extensions/CollectionExtensions.cs +++ b/src/Akka.Streams.Kafka/Extensions/CollectionExtensions.cs @@ -30,5 +30,13 @@ public static bool IsEmpty(this IEnumerable collection) { return !collection.Any(); } + + /// + /// Converts collection to + /// + public static IImmutableSet ToImmutableSet(this IEnumerable collection) + { + return collection.ToImmutableHashSet(); + } } } \ No newline at end of file diff --git a/src/Akka.Streams.Kafka/Extensions/ProducerExtensions.cs b/src/Akka.Streams.Kafka/Extensions/ProducerExtensions.cs new file mode 100644 index 00000000..b63a3165 --- /dev/null +++ b/src/Akka.Streams.Kafka/Extensions/ProducerExtensions.cs @@ -0,0 +1,27 @@ +using System; +using Akka.Streams.Kafka.Messages; +using Confluent.Kafka; + +namespace Akka.Streams.Kafka.Extensions +{ + /// + /// ProducerExtensions + /// + public static class ProducerExtensions + { + /// + /// Produce + /// + public static void Produce(this IProducer producer, ProducerRecord message, Action> reportHandler) + { + if (message.Partition.HasValue) + { + producer.Produce(new TopicPartition(message.Topic, message.Partition.Value), message.Message, reportHandler); + } + else + { + producer.Produce(message.Topic, message.Message, reportHandler); + } + } + } +} \ No newline at end of file diff --git a/src/Akka.Streams.Kafka/Helpers/ProducerStage.cs b/src/Akka.Streams.Kafka/Helpers/ProducerStage.cs new file mode 100644 index 00000000..947992da --- /dev/null +++ b/src/Akka.Streams.Kafka/Helpers/ProducerStage.cs @@ -0,0 +1,45 @@ +using System; +using System.Threading.Tasks; +using Akka.Annotations; +using Akka.Streams.Kafka.Messages; +using Akka.Streams.Kafka.Stages; +using Confluent.Kafka; + +namespace Akka.Streams.Kafka.Helpers +{ + /// + /// INTERNAL API + /// + /// Implemented by as + /// + [InternalApi] + public interface IProducerStage where TIn : IEnvelope where TOut : IResults + { + TimeSpan FlushTimeout { get; } + bool CloseProducerOnStop { get; } + Func, Error>, IProducer> ProducerProvider { get; } + + Inlet In { get; } + Outlet> Out { get; } + FlowShape> Shape { get; } + } + + /// + /// INTERNAL API + /// + /// Used to specify custom producer completion events handling + /// + [InternalApi] + public interface IProducerCompletionState + { + /// + /// Called when completion succeed + /// + void OnCompletionSuccess(); + /// + /// Called on completion failure + /// + void OnCompletionFailure(Exception ex); + } + +} \ No newline at end of file diff --git a/src/Akka.Streams.Kafka/Messages/Envelope.cs b/src/Akka.Streams.Kafka/Messages/Envelope.cs index cb311b57..2f561bdc 100644 --- a/src/Akka.Streams.Kafka/Messages/Envelope.cs +++ b/src/Akka.Streams.Kafka/Messages/Envelope.cs @@ -1,7 +1,272 @@ +using System; +using System.Collections.Immutable; +using Akka.Streams.Kafka.Dsl; +using Confluent.Kafka; + namespace Akka.Streams.Kafka.Messages { - public interface IEnvelope + /// + /// Type assepted by "KafkaProducer.FlexiFlow" with implementations + /// + /// + /// + /// + /// + public interface IEnvelope { - // TODO + TPassThrough PassThrough { get; } + IEnvelope WithPassThrough(TPassThrough2 value); + } + + /// + /// Contains methods to generate producer messages + /// + public static class ProducerMessage + { + /// + /// Create a message containing the `record` and a `passThrough`. + /// + public static IEnvelope Single(ProducerRecord record, TPassThrough passThrough) + => new Message(record, passThrough); + + /// + /// Create a message containing the `record`. + /// + public static IEnvelope Single(ProducerRecord record) + => new Message(record, NotUsed.Instance); + + /// + /// Create a multi-message containing several `records` and one `passThrough`. + /// + public static IEnvelope Multi(IImmutableSet> records, TPassThrough passThrough) + => new MultiMessage(records, passThrough); + + /// + /// Create a multi-message containing several `records` + /// + public static IEnvelope Multi(IImmutableSet> records) + => new MultiMessage(records, NotUsed.Instance); + + /// + /// Create a pass-through message not containing any records. + /// + /// + /// In some cases the type parameters need to be specified explicitly. + /// + public static IEnvelope PassThrough(TPassThrough passThrough) + => new PassThroughMessage(passThrough); + + /// + /// Create a pass-through message not containing any records for use with `withContext` flows and sinks. + /// + /// + /// In some cases the type parameters need to be specified explicitly. + /// + public static IEnvelope PassThrough() + => new PassThroughMessage(NotUsed.Instance); + } + + /// + /// + /// implementation that produces a single message to a Kafka topic, flows emit + /// a for every element processed. + /// + /// + /// The `record` contains a topic name to which the record is being sent, an optional + /// partition number, and an optional key and value. + /// + /// + /// The `passThrough` field may hold any element that is passed through the `Producer.flow` + /// and included in the . That is useful when some context is needed to be passed + /// on downstream operations. That could be done with unzip/zip, but this is more convenient. + /// It can for example be a or that can be committed later in the flow. + /// + /// + /// The type of keys + /// The type of values + /// The type of data passed through + public sealed class Message : IEnvelope + { + /// + /// Message + /// + public Message(ProducerRecord record, TPassThrough passThrough) + { + Record = record; + PassThrough = passThrough; + } + + /// + /// Published record + /// + public ProducerRecord Record { get; } + + /// + public TPassThrough PassThrough { get; } + + /// + public IEnvelope WithPassThrough(TPassThrough2 value) + { + return new Message(Record, value); + } + } + + /// + /// + /// implementation that produces multiple message to a Kafka topics, flows emit + /// a for every element processed. + /// + /// + /// Every element in `records` contains a topic name to which the record is being sent, an optional + /// partition number, and an optional key and value. + /// + /// + /// The `passThrough` field may hold any element that is passed through the `Producer.flow` + /// and included in the . That is useful when some context is needed to be passed + /// on downstream operations. That could be done with unzip/zip, but this is more convenient. + /// It can for example be a or that can be committed later in the flow. + /// + /// + public sealed class MultiMessage : IEnvelope + { + /// + /// MultiMessage + /// + public MultiMessage(IImmutableSet> records, TPassThrough passThrough) + { + Records = records; + PassThrough = passThrough; + } + + /// + /// Records + /// + public IImmutableSet> Records { get; } + /// + /// PassThrough + /// + public TPassThrough PassThrough { get; } + + /// + public IEnvelope WithPassThrough(TPassThrough2 value) + { + return new MultiMessage(Records, value); + } + } + + /// + /// + /// implementation that does not produce anything to Kafka, flows emit + /// a for every element processed. + /// + /// + /// + /// The `passThrough` field may hold any element that is passed through the `Producer.flow` + /// and included in the . That is useful when some context is needed to be passed + /// on downstream operations. That could be done with unzip/zip, but this is more convenient. + /// It can for example be a or that can be committed later in the flow. + /// + /// + public sealed class PassThroughMessage : IEnvelope + { + /// + /// PassThroughMessage + /// + public PassThroughMessage(TPassThrough passThrough) + { + PassThrough = passThrough; + } + + /// + public TPassThrough PassThrough { get; } + + /// + public IEnvelope WithPassThrough(TPassThrough2 value) + { + return new PassThroughMessage(value); + } + } + + /// + /// Output type produced by + /// + public interface IResults + { + /// + /// PassThrough + /// + TPassThrough PassThrough { get; } + } + + /// + /// implementation emitted when a has been successfully published. + /// + /// Includes the original message, metadata returned from and the `offset` of the produced message. + /// + internal sealed class Result : IResults + { + /// + /// Message metadata + /// + public DeliveryReport Metadata { get; } + /// + /// Message + /// + public Message Message { get; } + /// + /// Offset + /// + public Offset Offset => Metadata.Offset; + /// + public TPassThrough PassThrough => Message.PassThrough; + + /// + /// Result + /// + public Result(DeliveryReport metadata, Message message) + { + Metadata = metadata; + Message = message; + } + } + + internal sealed class MultiResultPart + { + public DeliveryReport Metadata { get; } + public ProducerRecord Record { get; } + + public MultiResultPart(DeliveryReport metadata, ProducerRecord record) + { + Metadata = metadata; + Record = record; + } + } + + internal sealed class MultiResult : IResults + { + public MultiResult(IImmutableSet> parts, TPassThrough passThrough) + { + Parts = parts; + PassThrough = passThrough; + } + + public IImmutableSet> Parts { get; } + public TPassThrough PassThrough { get; } + } + + /// + /// implementation enitted when + /// + /// + /// + /// + public sealed class PassThroughResult : IResults + { + public PassThroughResult(TPassThrough passThrough) + { + PassThrough = passThrough; + } + + public TPassThrough PassThrough { get; } } } \ No newline at end of file diff --git a/src/Akka.Streams.Kafka/Messages/MessageAndMeta.cs b/src/Akka.Streams.Kafka/Messages/MessageAndMeta.cs deleted file mode 100644 index 1cd498bf..00000000 --- a/src/Akka.Streams.Kafka/Messages/MessageAndMeta.cs +++ /dev/null @@ -1,31 +0,0 @@ -using Confluent.Kafka; - -namespace Akka.Streams.Kafka.Messages -{ - /// - /// Container for storing message and topic/parition info - /// - /// Type of key - /// Type of value - public class MessageAndMeta - { - /// - /// The message to send - /// - public Message Message { get; set; } - /// - /// Topic to send to. - /// - /// - /// If TopicPartition property is specified, the Topic property value is ignored. - /// - public string Topic { get; set; } - /// - /// Topic partition to sent to. - /// - /// - /// If TopicPartition property is specified, the Topic property value is ignored. - /// - public TopicPartition TopicPartition { get; set; } - } -} \ No newline at end of file diff --git a/src/Akka.Streams.Kafka/Messages/ProducerRecord.cs b/src/Akka.Streams.Kafka/Messages/ProducerRecord.cs new file mode 100644 index 00000000..e97531e8 --- /dev/null +++ b/src/Akka.Streams.Kafka/Messages/ProducerRecord.cs @@ -0,0 +1,156 @@ +using System; +using Confluent.Kafka; + +namespace Akka.Streams.Kafka.Messages +{ + /// + /// + /// A key/value pair to be sent to Kafka. This consists of a topic name to which the record is being sent, an optional + /// partition number, and an optional key and value. + /// + /// + /// + /// If a valid partition number is specified that partition will be used when sending the record. If no partition is + /// specified but a key is present a partition will be chosen using a hash of the key. If neither key nor partition is + /// present a partition will be assigned in a round-robin fashion. + /// + /// + /// + /// The record also has an associated timestamp. If the user did not provide a timestamp, the producer will stamp the + /// record with its current time. The timestamp eventually used by Kafka depends on the timestamp type configured for + /// the topic: + /// + /// + /// If the topic is configured to use {@link org.apache.kafka.common.record.TimestampType#CREATE_TIME CreateTime}, + /// the timestamp in the producer record will be used by the broker. + /// + /// + /// If the topic is configured to use {@link org.apache.kafka.common.record.TimestampType#LOG_APPEND_TIME LogAppendTime}, + /// the timestamp in the producer record will be overwritten by the broker with the broker local time when it appends the + /// message to its log. + /// + /// + /// + /// + /// In either of the cases above, the timestamp that has actually been used will be returned to user in + /// + /// + /// Type of key + /// Type of value + public class ProducerRecord : IEquatable> + { + /// + /// ProducerRecord + /// + public ProducerRecord(string topic, int? partition, long? timestamp, Message message) + { + if (topic == null) + throw new ArgumentNullException(nameof(topic), "Topic cannot be null"); + if (timestamp != null && timestamp < 0) + throw new ArgumentException($"Invalid timestamp: {timestamp}. Timestamp should always be non-negative or null."); + if (partition != null && partition < 0) + throw new ArgumentException($"Invalid partition: {partition}. Partition number should always be non-negative or null."); + + Topic = topic; + Partition = partition; + Timestamp = timestamp; + Message = message; + } + + /// + /// ProducerRecord + /// + public ProducerRecord(string topic, int? partition, Message message) + : this(topic, partition, null, message) + { + } + + /// + /// ProducerRecord + /// + public ProducerRecord(TopicPartition topicPartition, Message message) + : this(topicPartition.Topic, topicPartition.Partition, null, message) + { + } + + /// + /// ProducerRecord + /// + public ProducerRecord(string topic, Message message) + : this(topic, null, null, message) + { + } + + /// + /// ProducerRecord + /// + public ProducerRecord(string topic, V value) + : this(topic, null, null, new Message(){ Value = value }) + { + } + + /// + /// ProducerRecord + /// + public ProducerRecord(TopicPartition topicPartition, V value) + : this(topicPartition.Topic, topicPartition.Partition, null, new Message(){ Value = value }) + { + } + + /// + /// Topic to send to. + /// + public string Topic { get; set; } + /// + /// Partition to sent to. + /// + public int? Partition { get; set; } + /// + /// Timestamp + /// + public long? Timestamp { get; set; } + /// + /// The message to send + /// + public Message Message { get; set; } + + /// + public override string ToString() + { + return $"ProducerRecord(" + + $"topic={Topic}, " + + $"partition={Partition?.ToString() ?? "null"}, " + + $"key={Message?.Key?.ToString() ?? "null"}, " + + $"value={Message?.Value?.ToString() ?? "null"}, " + + $"timestamp={Timestamp?.ToString() ?? "null"}" + + $")"; + } + + public bool Equals(ProducerRecord other) + { + if (ReferenceEquals(null, other)) return false; + if (ReferenceEquals(this, other)) return true; + return Topic == other.Topic && Partition == other.Partition && Timestamp == other.Timestamp && Equals(Message, other.Message); + } + + public override bool Equals(object obj) + { + if (ReferenceEquals(null, obj)) return false; + if (ReferenceEquals(this, obj)) return true; + if (obj.GetType() != this.GetType()) return false; + return Equals((ProducerRecord) obj); + } + + public override int GetHashCode() + { + unchecked + { + var hashCode = (Topic != null ? Topic.GetHashCode() : 0); + hashCode = (hashCode * 397) ^ Partition.GetHashCode(); + hashCode = (hashCode * 397) ^ Timestamp.GetHashCode(); + hashCode = (hashCode * 397) ^ (Message != null ? Message.GetHashCode() : 0); + return hashCode; + } + } + } +} \ No newline at end of file diff --git a/src/Akka.Streams.Kafka/Settings/ProducerSettings.cs b/src/Akka.Streams.Kafka/Settings/ProducerSettings.cs index 380d5255..2b914da8 100644 --- a/src/Akka.Streams.Kafka/Settings/ProducerSettings.cs +++ b/src/Akka.Streams.Kafka/Settings/ProducerSettings.cs @@ -10,7 +10,8 @@ namespace Akka.Streams.Kafka.Settings public sealed class ProducerSettings { public ProducerSettings(ISerializer keySerializer, ISerializer valueSerializer, int parallelism, - string dispatcherId, TimeSpan flushTimeout, IImmutableDictionary properties) + string dispatcherId, TimeSpan flushTimeout, + IImmutableDictionary properties) { KeySerializer = keySerializer; ValueSerializer = valueSerializer; diff --git a/src/Akka.Streams.Kafka/Stages/Producers/DefaultProducerStage.cs b/src/Akka.Streams.Kafka/Stages/Producers/DefaultProducerStage.cs new file mode 100644 index 00000000..9ab8259c --- /dev/null +++ b/src/Akka.Streams.Kafka/Stages/Producers/DefaultProducerStage.cs @@ -0,0 +1,237 @@ +using System; +using System.Collections.Immutable; +using System.Linq; +using System.Threading.Tasks; +using Akka.Pattern; +using Akka.Streams.Kafka.Extensions; +using Akka.Streams.Kafka.Helpers; +using Akka.Streams.Kafka.Messages; +using Akka.Streams.Kafka.Settings; +using Akka.Streams.Stage; +using Akka.Streams.Supervision; +using Akka.Util.Internal; +using Confluent.Kafka; + +namespace Akka.Streams.Kafka.Stages +{ + internal sealed class DefaultProducerStage : GraphStage>>, IProducerStage + where TIn: IEnvelope + where TOut: IResults + { + public Func, Error>, IProducer> ProducerProvider { get; } + public ProducerSettings Settings { get; } + public TimeSpan FlushTimeout => Settings.FlushTimeout; + public bool CloseProducerOnStop { get; } + public Inlet In { get; } = new Inlet("kafka.producer.in"); + public Outlet> Out { get; } = new Outlet>("kafka.producer.out"); + public override FlowShape> Shape { get; } + + public DefaultProducerStage( + ProducerSettings settings, + bool closeProducerOnStop, + Func> customProducerProvider = null) + { + ProducerProvider = errorHandler => customProducerProvider?.Invoke() ?? Settings.CreateKafkaProducer(errorHandler); + Settings = settings; + CloseProducerOnStop = closeProducerOnStop; + + Shape = new FlowShape>(In, Out); + } + + protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes) + { + return new DefaultProducerStageLogic(this, inheritedAttributes); + } + } + + internal class DefaultProducerStageLogic : GraphStageLogic, IProducerCompletionState + where TIn: IEnvelope + where TOut: IResults + { + private readonly IProducerStage _stage; + private IProducer _producer; + private readonly TaskCompletionSource _completionState = new TaskCompletionSource(); + private volatile bool _inIsClosed; + private readonly AtomicCounter _awaitingConfirmation = new AtomicCounter(0); + private Action _checkForCompletionCallback; + private Action _failStageCallback; + private readonly Decider _decider; + + public DefaultProducerStageLogic(IProducerStage stage, Attributes attributes) : base(stage.Shape) + { + _stage = stage; + + var supervisionStrategy = attributes.GetAttribute(null); + _decider = supervisionStrategy != null ? supervisionStrategy.Decider : Deciders.StoppingDecider; + + SetHandler(_stage.In, + onPush: () => + { + var msg = Grab(_stage.In) as IEnvelope; + + switch (msg) + { + case Message message: + { + var result = new TaskCompletionSource>(); + _awaitingConfirmation.IncrementAndGet(); + _producer.Produce(message.Record, BuildSendCallback(result, onSuccess: report => + { + result.SetResult(new Result(report, message)); + })); + PostSend(msg); + Push(stage.Out, result.Task as Task); + break; + } + + case MultiMessage multiMessage: + { + var tasks = multiMessage.Records.Select(record => + { + var result = new TaskCompletionSource>(); + _awaitingConfirmation.IncrementAndGet(); + _producer.Produce(record, report => + { + result.SetResult(new MultiResultPart(report, record)); + }); + return result.Task; + }); + PostSend(msg); + var resultTask = Task.WhenAll(tasks).ContinueWith(t => new MultiResult(t.Result.ToImmutableHashSet(), multiMessage.PassThrough)); + Push(stage.Out, resultTask as Task); + break; + } + + case PassThroughMessage passThroughMessage: + { + PostSend(msg); + var resultTask = Task.FromResult(new PassThroughResult(msg.PassThrough)) as Task; + Push(stage.Out, resultTask); + break; + } + } + }, + onUpstreamFinish: () => + { + _inIsClosed = true; + _completionState.SetResult(NotUsed.Instance); + _checkForCompletionCallback(); + }, + onUpstreamFailure: exception => + { + _inIsClosed = true; + _completionState.SetException(exception); + _checkForCompletionCallback(); + }); + + SetHandler(_stage.Out, onPull: () => + { + TryPull(_stage.In); + }); + } + + public override void PreStart() + { + base.PreStart(); + + _producer = _stage.ProducerProvider(HandleProduceError); + Log.Debug($"Producer started: {_producer.Name}"); + + _checkForCompletionCallback = GetAsyncCallback(CheckForCompletion); + _failStageCallback = GetAsyncCallback(FailStage); + } + + public override void PostStop() + { + Log.Debug("Stage completed"); + + if (_stage.CloseProducerOnStop) + { + try + { + // we do not have to check if producer was already closed in send-callback as `flush()` and `close()` are effectively no-ops in this case + _producer.Flush(_stage.FlushTimeout); + // TODO: fix missing deferred close support: `producer.close(stage.closeTimeout.toMillis, TimeUnit.MILLISECONDS)` + _producer.Dispose(); + Log.Debug($"Producer closed: {_producer.Name}"); + } + catch (Exception ex) + { + Log.Error(ex, "Problem occurred during producer close"); + } + } + + base.PostStop(); + } + + public void OnCompletionSuccess() => CompleteStage(); + + public void OnCompletionFailure(Exception ex) => FailStage(ex); + + protected virtual void PostSend(IEnvelope msg) { } + + private Action> BuildSendCallback(TaskCompletionSource completion, Action> onSuccess) + { + return report => + { + if (!report.Error.IsError) + { + onSuccess(report); + } + else + { + Log.Error(report.Error.Reason); + var exception = new KafkaException(report.Error); + switch (_decider(exception)) + { + case Directive.Stop: + if (_stage.CloseProducerOnStop) + { + _producer.Dispose(); + } + _failStageCallback(exception); + break; + default: + completion.SetException(exception); + break; + } + } + + if (_awaitingConfirmation.DecrementAndGet() == 0 && _inIsClosed) + _checkForCompletionCallback(); + }; + } + + private void CheckForCompletion() + { + if (IsClosed(_stage.In) && _awaitingConfirmation.Current == 0) + { + var completionTask = _completionState.Task; + + if (completionTask.IsFaulted || completionTask.IsCanceled) + { + OnCompletionFailure(completionTask.Exception); + } + else if (completionTask.IsCompleted) + { + OnCompletionSuccess(); + } + else + { + FailStage(new IllegalStateException("Stage completed, but there is no info about status")); + } + } + } + + private void HandleProduceError(IProducer producer, Error error) + { + Log.Error(error.Reason); + + if (!KafkaExtensions.IsBrokerErrorRetriable(error) && !KafkaExtensions.IsLocalErrorRetriable(error)) + { + var exception = new KafkaException(error); + FailStage(exception); + } + } + } +} diff --git a/src/Akka.Streams.Kafka/Stages/Producers/ProducerStage.cs b/src/Akka.Streams.Kafka/Stages/Producers/ProducerStage.cs deleted file mode 100644 index 31ce3c04..00000000 --- a/src/Akka.Streams.Kafka/Stages/Producers/ProducerStage.cs +++ /dev/null @@ -1,179 +0,0 @@ -using System; -using System.Threading.Tasks; -using Akka.Streams.Kafka.Messages; -using Akka.Streams.Kafka.Settings; -using Akka.Streams.Stage; -using Akka.Streams.Supervision; -using Akka.Util.Internal; -using Confluent.Kafka; - -namespace Akka.Streams.Kafka.Stages -{ - internal sealed class ProducerStage : GraphStage, Task>>> - { - private readonly Func> _customProducerProvider; - public ProducerSettings Settings { get; } - public bool CloseProducerOnStop { get; } - public Inlet> In { get; } = new Inlet>("kafka.producer.in"); - public Outlet>> Out { get; } = new Outlet>>("kafka.producer.out"); - - public ProducerStage( - ProducerSettings settings, - bool closeProducerOnStop, - Func> customProducerProvider = null) - { - _customProducerProvider = customProducerProvider; - Settings = settings; - CloseProducerOnStop = closeProducerOnStop; - Shape = new FlowShape, Task>>(In, Out); - } - - public IProducer GetProducerProvider(Action, Error> errorHandler) - { - return _customProducerProvider?.Invoke() ?? Settings.CreateKafkaProducer(errorHandler); - } - - public override FlowShape, Task>> Shape { get; } - - protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes) - { - return new ProducerStageLogic(this, inheritedAttributes); - } - } - - internal sealed class ProducerStageLogic : GraphStageLogic - { - private readonly ProducerStage _stage; - private IProducer _producer; - private readonly TaskCompletionSource _completionState = new TaskCompletionSource(); - private volatile bool _inIsClosed; - private readonly AtomicCounter _awaitingConfirmation = new AtomicCounter(0); - private Action _checkForCompletion; - - public ProducerStageLogic(ProducerStage stage, Attributes attributes) : base(stage.Shape) - { - _stage = stage; - - var supervisionStrategy = attributes.GetAttribute(null); - var decider = supervisionStrategy != null ? supervisionStrategy.Decider : Deciders.ResumingDecider; - - SetHandler(_stage.In, - onPush: () => - { - var msg = Grab(_stage.In); - var result = new TaskCompletionSource>(); - - void PublishAction(Action> reportHandler) - { - if (msg.TopicPartition != null) - _producer.Produce(msg.TopicPartition, msg.Message, reportHandler); - else - _producer.Produce(msg.Topic, msg.Message, reportHandler); - } - - PublishAction(report => - { - if (!report.Error.IsError) - { - result.SetResult(report); - } - else - { - Log.Error(report.Error.Reason); - var exception = new KafkaException(report.Error); - switch (decider(exception)) - { - case Directive.Stop: - if (_stage.CloseProducerOnStop) - { - _producer.Dispose(); - } - FailStage(exception); - break; - default: - result.SetException(exception); - break; - } - } - - if (_awaitingConfirmation.DecrementAndGet() == 0 && _inIsClosed) - { - _checkForCompletion(); - } - }); - - _awaitingConfirmation.IncrementAndGet(); - Push(_stage.Out, result.Task); - }, - onUpstreamFinish: () => - { - _inIsClosed = true; - _completionState.SetResult(NotUsed.Instance); - _checkForCompletion(); - }, - onUpstreamFailure: exception => - { - _inIsClosed = true; - _completionState.SetException(exception); - _checkForCompletion(); - }); - - SetHandler(_stage.Out, onPull: () => - { - TryPull(_stage.In); - }); - } - - public override void PreStart() - { - base.PreStart(); - - _producer = _stage.GetProducerProvider(HandleProduceError); - Log.Debug($"Producer started: {_producer.Name}"); - - _checkForCompletion = GetAsyncCallback(CheckForCompletion); - } - - public override void PostStop() - { - Log.Debug("Stage completed"); - - if (_stage.CloseProducerOnStop) - { - _producer.Flush(_stage.Settings.FlushTimeout); - _producer.Dispose(); - Log.Debug($"Producer closed: {_producer.Name}"); - } - - base.PostStop(); - } - - private void CheckForCompletion() - { - if (IsClosed(_stage.In) && _awaitingConfirmation.Current == 0) - { - var completionTask = _completionState.Task; - - if (completionTask.IsFaulted || completionTask.IsCanceled) - { - FailStage(completionTask.Exception); - } - else if (completionTask.IsCompleted) - { - CompleteStage(); - } - } - } - - private void HandleProduceError(IProducer producer, Error error) - { - Log.Error(error.Reason); - - if (!KafkaExtensions.IsBrokerErrorRetriable(error) && !KafkaExtensions.IsLocalErrorRetriable(error)) - { - var exception = new KafkaException(error); - FailStage(exception); - } - } - } -} From 6cfe4fea69096377772698ef83f7cd4ce75415c4 Mon Sep 17 00:00:00 2001 From: IgorFedchenko Date: Sat, 19 Oct 2019 15:25:33 +0300 Subject: [PATCH 2/7] Added FlowWithContext implementation --- src/Akka.Streams.Kafka/Dsl/KafkaProducer.cs | 78 ++++++++++++++++++- .../Extensions/FlowExtensions.cs | 38 +++++++++ .../Messages/ProducerRecord.cs | 8 ++ 3 files changed, 121 insertions(+), 3 deletions(-) create mode 100644 src/Akka.Streams.Kafka/Extensions/FlowExtensions.cs diff --git a/src/Akka.Streams.Kafka/Dsl/KafkaProducer.cs b/src/Akka.Streams.Kafka/Dsl/KafkaProducer.cs index 9426c168..b440d939 100644 --- a/src/Akka.Streams.Kafka/Dsl/KafkaProducer.cs +++ b/src/Akka.Streams.Kafka/Dsl/KafkaProducer.cs @@ -1,6 +1,8 @@ using System; using System.Threading.Tasks; +using Akka.Annotations; using Akka.Streams.Dsl; +using Akka.Streams.Kafka.Extensions; using Akka.Streams.Kafka.Messages; using Akka.Streams.Kafka.Settings; using Akka.Streams.Kafka.Stages; @@ -136,10 +138,80 @@ public static Flow, IResults, C, DeliveryReport, C, NotUsed> FlowWithContext(ProducerSettings settings) + + /// + /// + /// API MAY CHANGE + /// + /// + /// Create a flow to conditionally publish records to Kafka topics and then pass it on. + /// + /// + /// It publishes records to Kafka topics conditionally: + /// + /// + /// publishes a single message to its topic, and continues in the stream as + /// + /// + /// publishes all messages in its `records` field, and continues in the stream as + /// + /// + /// does not publish anything, and continues in the stream as + /// + /// + /// + /// + /// This flow is intended to be used with Akka's + /// + /// + /// + /// Keys type + /// Values type + /// Flow context type + [ApiMayChange] + public static FlowWithContext, C, IResults, NotUsed> FlowWithContext(ProducerSettings settings) + { + return FlexiFlow(settings).AsFlowWithContext, C, IResults, NotUsed, IEnvelope>( + collapseContext: (env, c) => env.WithPassThrough(c), + extractContext: res => res.PassThrough); + } + + /// + /// + /// API MAY CHANGE + /// + /// + /// Create a flow to conditionally publish records to Kafka topics and then pass it on. + /// + /// + /// It publishes records to Kafka topics conditionally: + /// + /// + /// publishes a single message to its topic, and continues in the stream as + /// + /// + /// publishes all messages in its `records` field, and continues in the stream as + /// + /// + /// does not publish anything, and continues in the stream as + /// + /// + /// + /// + /// This flow is intended to be used with Akka's + /// + /// + /// Producer settings + /// Producer instance to reuse + /// Keys type + /// Values type + /// Flow context type + [ApiMayChange] + public static FlowWithContext, C, IResults, NotUsed> FlowWithContext(ProducerSettings settings, IProducer producer) { - throw new NotImplementedException(); + return FlexiFlow(settings, producer).AsFlowWithContext, C, IResults, NotUsed, IEnvelope>( + collapseContext: (env, c) => env.WithPassThrough(c), + extractContext: res => res.PassThrough); } } } diff --git a/src/Akka.Streams.Kafka/Extensions/FlowExtensions.cs b/src/Akka.Streams.Kafka/Extensions/FlowExtensions.cs new file mode 100644 index 00000000..94de5bcc --- /dev/null +++ b/src/Akka.Streams.Kafka/Extensions/FlowExtensions.cs @@ -0,0 +1,38 @@ +using System; +using Akka.Annotations; +using Akka.Streams.Dsl; + +namespace Akka.Streams.Kafka.Extensions +{ + public static class FlowExtensions + { + // TODO: Move this to Akka.Streams core library + /// + /// API MAY CHANGE + /// + /// Turns a Flow into a FlowWithContext which manages a context per element along a stream. + /// + /// Flow to convert + /// Turn each incoming pair of element and context value into an element of passed Flow + /// Turn each outgoing element of passed Flow into an outgoing context value + /// New flow incoming elements type + /// New flow incoming elements context + /// Out elements type + /// Resulting context type + /// Materialized value type + /// Type of passed flow elements + [ApiMayChange] + public static FlowWithContext AsFlowWithContext( + this Flow flow, + Func collapseContext, + Func extractContext) + { + var flowWithTuples = Flow.Create>() + .Select(pair => collapseContext(pair.Item1, pair.Item2)) + .ViaMaterialized(flow, Keep.Right) + .Select(e => Tuple.Create(e, extractContext(e))); + + return FlowWithContext.From(flowWithTuples); + } + } +} \ No newline at end of file diff --git a/src/Akka.Streams.Kafka/Messages/ProducerRecord.cs b/src/Akka.Streams.Kafka/Messages/ProducerRecord.cs index e97531e8..37440342 100644 --- a/src/Akka.Streams.Kafka/Messages/ProducerRecord.cs +++ b/src/Akka.Streams.Kafka/Messages/ProducerRecord.cs @@ -81,6 +81,14 @@ public ProducerRecord(string topic, Message message) { } + /// + /// ProducerRecord + /// + public ProducerRecord(string topic, K key, V value) + : this(topic, null, null, new Message(){ Key = key, Value = value}) + { + } + /// /// ProducerRecord /// From 38faa496a79d401eb9e9762a798f044973982ae0 Mon Sep 17 00:00:00 2001 From: IgorFedchenko Date: Sat, 19 Oct 2019 17:50:41 +0300 Subject: [PATCH 3/7] Implemented source/flow with context integration test --- ...ttablePartitionedSourceIntegrationTests.cs | 3 +- .../FlowWithContextIntegrationTests.cs | 90 +++++++++++++++++++ .../KafkaIntegrationTests.cs | 32 ++++--- src/Akka.Streams.Kafka/Helpers/Committer.cs | 1 + src/Akka.Streams.Kafka/Helpers/Control.cs | 8 ++ src/Akka.Streams.Kafka/Messages/Envelope.cs | 8 +- .../Stages/Producers/DefaultProducerStage.cs | 8 +- 7 files changed, 130 insertions(+), 20 deletions(-) create mode 100644 src/Akka.Streams.Kafka.Tests/Integration/FlowWithContextIntegrationTests.cs diff --git a/src/Akka.Streams.Kafka.Tests/Integration/CommittablePartitionedSourceIntegrationTests.cs b/src/Akka.Streams.Kafka.Tests/Integration/CommittablePartitionedSourceIntegrationTests.cs index a1665e24..c025f048 100644 --- a/src/Akka.Streams.Kafka.Tests/Integration/CommittablePartitionedSourceIntegrationTests.cs +++ b/src/Akka.Streams.Kafka.Tests/Integration/CommittablePartitionedSourceIntegrationTests.cs @@ -83,7 +83,8 @@ public async Task CommittablePartitionedSource_Should_handle_exceptions_in_strea return t.Result; }); }) - .MergeSubstreams().As>() + .MergeSubstreams() + .As>() .Scan(0, (c, n) => c + n) .ToMaterialized(Sink.Last(), Keep.Both) .MapMaterializedValue(tuple => DrainingControl.Create(tuple.Item1, tuple.Item2)) diff --git a/src/Akka.Streams.Kafka.Tests/Integration/FlowWithContextIntegrationTests.cs b/src/Akka.Streams.Kafka.Tests/Integration/FlowWithContextIntegrationTests.cs new file mode 100644 index 00000000..b9e09518 --- /dev/null +++ b/src/Akka.Streams.Kafka.Tests/Integration/FlowWithContextIntegrationTests.cs @@ -0,0 +1,90 @@ +using System; +using System.Collections.Immutable; +using System.Linq; +using System.Threading.Tasks; +using Akka.Streams.Dsl; +using Akka.Streams.Implementation.Fusing; +using Akka.Streams.Kafka.Dsl; +using Akka.Streams.Kafka.Extensions; +using Akka.Streams.Kafka.Helpers; +using Akka.Streams.Kafka.Messages; +using Akka.Streams.Kafka.Settings; +using Confluent.Kafka; +using FluentAssertions; +using Xunit; +using Xunit.Abstractions; + +namespace Akka.Streams.Kafka.Tests.Integration +{ + public class FlowWithContextIntegrationTests : KafkaIntegrationTests + { + public FlowWithContextIntegrationTests(ITestOutputHelper output, KafkaFixture fixture) + : base(nameof(FlowWithContextIntegrationTests), output, fixture) + { + } + + [Fact] + public async Task ProducerFlowWithContext_should_work_with_source_with_context() + { + bool Duplicate(string value) => value == "1"; + bool Ignore(string value) => value == "2"; + + var consumerSettings = CreateConsumerSettings(CreateGroup(1)); + var topic1 = CreateTopic(1); + var topic2 = CreateTopic(2); + var topic3 = CreateTopic(3); + var topic4 = CreateTopic(4); + var producerSettings = BuildProducerSettings(); + var committerSettings = CommitterSettings; + var totalMessages = 10; + + await ProduceStrings(topic1, Enumerable.Range(1, totalMessages), producerSettings); + + var control = KafkaConsumer.SourceWithOffsetContext(consumerSettings, Subscriptions.Topics(topic1)) + .Select(record => + { + IEnvelope output; + if (Duplicate(record.Value)) + { + output = ProducerMessage.Multi(new[] + { + new ProducerRecord(topic2, record.Key, record.Value), + new ProducerRecord(topic3, record.Key, record.Value) + }.ToImmutableSet()); + } + else if (Ignore(record.Value)) + { + output = ProducerMessage.PassThrough(); + } + else + { + output = ProducerMessage.Single(new ProducerRecord(topic4, record.Key, record.Value)); + } + + Log.Debug($"Giving message of type {output.GetType().Name}"); + return output; + }) + .Via(KafkaProducer.FlowWithContext(producerSettings)) + .AsSource() + .Log("Produced messages", r => $"Committing {r.Item2.Offset.Topic}:{r.Item2.Offset.Partition}[{r.Item2.Offset.Offset}]") + .ToMaterialized(Committer.SinkWithOffsetContext>(committerSettings), Keep.Both) + .MapMaterializedValue(tuple => DrainingControl.Create(tuple.Item1, tuple.Item2.ContinueWith(t => NotUsed.Instance))) + .Run(Materializer); + + var (control2, result) = KafkaConsumer.PlainSource(consumerSettings, Subscriptions.Topics(topic2, topic3, topic4)) + .ToMaterialized(Sink.Seq>(), Keep.Both) + .Run(Materializer); + + // Give it some time to handle message flow + await Task.Delay(TimeSpan.FromSeconds(10)); + + var shutdown1 = control.DrainAndShutdown(); + AwaitCondition(() => shutdown1.IsCompletedSuccessfully, TimeSpan.FromSeconds(20)); + var shutdown2 = control2.Shutdown(); + AwaitCondition(() => shutdown2.IsCompletedSuccessfully, TimeSpan.FromSeconds(10)); + + AwaitCondition(() => result.IsCompletedSuccessfully, TimeSpan.FromSeconds(10)); + result.Result.Should().HaveCount(totalMessages); + } + } +} \ No newline at end of file diff --git a/src/Akka.Streams.Kafka.Tests/KafkaIntegrationTests.cs b/src/Akka.Streams.Kafka.Tests/KafkaIntegrationTests.cs index 88188b33..18288bfe 100644 --- a/src/Akka.Streams.Kafka.Tests/KafkaIntegrationTests.cs +++ b/src/Akka.Streams.Kafka.Tests/KafkaIntegrationTests.cs @@ -38,16 +38,27 @@ public KafkaIntegrationTests(string actorSystemName, ITestOutputHelper output, K protected string CreateTopic(int number) => $"topic-{number}-{Uuid}"; protected string CreateGroup(int number) => $"group-{number}-{Uuid}"; + + protected ProducerSettings ProducerSettings => BuildProducerSettings(); - protected ProducerSettings ProducerSettings + protected ProducerSettings BuildProducerSettings() { - get => ProducerSettings.Create(Sys, null, null).WithBootstrapServers(_fixture.KafkaServer); + return ProducerSettings.Create(Sys, null, null).WithBootstrapServers(_fixture.KafkaServer); } protected CommitterSettings CommitterSettings { get => CommitterSettings.Create(Sys); } + + protected ConsumerSettings CreateConsumerSettings(string group) + { + return ConsumerSettings.Create(Sys, null, null) + .WithBootstrapServers(_fixture.KafkaServer) + .WithStopTimeout(TimeSpan.FromSeconds(1)) + .WithProperty("auto.offset.reset", "earliest") + .WithGroupId(group); + } protected ConsumerSettings CreateConsumerSettings(string group) { @@ -58,27 +69,27 @@ protected ConsumerSettings CreateConsumerSettings(string g .WithGroupId(group); } - protected async Task ProduceStrings(string topic, IEnumerable range, ProducerSettings producerSettings) + protected async Task ProduceStrings(string topic, IEnumerable range, ProducerSettings producerSettings) { await Source .From(range) - .Select(elem => new ProducerRecord(topic, elem.ToString())) + .Select(elem => new ProducerRecord(topic, elem.ToString())) .RunWith(KafkaProducer.PlainSink(producerSettings), Materializer); } - protected async Task ProduceStrings(Func partitionSelector, IEnumerable range, ProducerSettings producerSettings) + protected async Task ProduceStrings(Func partitionSelector, IEnumerable range, ProducerSettings producerSettings) { await Source .From(range) - .Select(elem => new ProducerRecord(partitionSelector(elem), elem.ToString())) + .Select(elem => new ProducerRecord(partitionSelector(elem), elem.ToString())) .RunWith(KafkaProducer.PlainSink(producerSettings), Materializer); } - protected async Task ProduceStrings(TopicPartition topicPartition, IEnumerable range, ProducerSettings producerSettings) + protected async Task ProduceStrings(TopicPartition topicPartition, IEnumerable range, ProducerSettings producerSettings) { await Source .From(range) - .Select(elem => new ProducerRecord(topicPartition, elem.ToString())) + .Select(elem => new ProducerRecord(topicPartition, elem.ToString())) .RunWith(KafkaProducer.PlainSink(producerSettings), Materializer); } @@ -91,8 +102,9 @@ protected async Task AssertCompletesSuccessfullyWithin(TimeSpan timeout, Task ta var timeoutTask = Task.Delay(timeout); await Task.WhenAny(timeoutTask, task); - - task.IsCompletedSuccessfully.Should().Be(true, $"Timeout {timeout} while waitilng task finish successfully"); + + task.IsCompleted.Should().Be(true, $"task should complete within {timeout} timeout"); + task.IsCompletedSuccessfully.Should().Be(true, "task should compete successfully"); } protected async Task GivenInitializedTopic(string topic) diff --git a/src/Akka.Streams.Kafka/Helpers/Committer.cs b/src/Akka.Streams.Kafka/Helpers/Committer.cs index cb88286a..f13c01be 100644 --- a/src/Akka.Streams.Kafka/Helpers/Committer.cs +++ b/src/Akka.Streams.Kafka/Helpers/Committer.cs @@ -64,6 +64,7 @@ public static Sink Sink(CommitterSettings settings) /// /// Batches offsets from context and commits them to Kafka. /// + /// Incoming flow elements type [ApiMayChange] public static Sink, Task> SinkWithOffsetContext(CommitterSettings settings) { diff --git a/src/Akka.Streams.Kafka/Helpers/Control.cs b/src/Akka.Streams.Kafka/Helpers/Control.cs index d2d235d9..c5a2f3bc 100644 --- a/src/Akka.Streams.Kafka/Helpers/Control.cs +++ b/src/Akka.Streams.Kafka/Helpers/Control.cs @@ -45,6 +45,7 @@ public interface IControl /// one, so that the stream can be stopped in a controlled way without losing /// commits. /// + /// Stream completion result type public class DrainingControl : IControl { public IControl Control { get; } @@ -86,6 +87,13 @@ private DrainingControl(IControl control, Task streamCompletion) /// commits. /// public static DrainingControl Create(IControl control, Task streamCompletion) => new DrainingControl(control, streamCompletion); + + /// + /// Combine control and a stream completion signal materialized values into + /// one, so that the stream can be stopped in a controlled way without losing + /// commits. + /// + public static DrainingControl Create(Tuple> tuple) => new DrainingControl(tuple.Item1, tuple.Item2); } /// diff --git a/src/Akka.Streams.Kafka/Messages/Envelope.cs b/src/Akka.Streams.Kafka/Messages/Envelope.cs index 2f561bdc..97bada44 100644 --- a/src/Akka.Streams.Kafka/Messages/Envelope.cs +++ b/src/Akka.Streams.Kafka/Messages/Envelope.cs @@ -255,12 +255,10 @@ public MultiResult(IImmutableSet> parts, TPassThrough pass } /// - /// implementation enitted when + /// implementation emitted when + /// has passed through the flow. /// - /// - /// - /// - public sealed class PassThroughResult : IResults + internal sealed class PassThroughResult : IResults { public PassThroughResult(TPassThrough passThrough) { diff --git a/src/Akka.Streams.Kafka/Stages/Producers/DefaultProducerStage.cs b/src/Akka.Streams.Kafka/Stages/Producers/DefaultProducerStage.cs index 9ab8259c..412ccda4 100644 --- a/src/Akka.Streams.Kafka/Stages/Producers/DefaultProducerStage.cs +++ b/src/Akka.Streams.Kafka/Stages/Producers/DefaultProducerStage.cs @@ -68,7 +68,7 @@ public DefaultProducerStageLogic(IProducerStage stage, Attri onPush: () => { var msg = Grab(_stage.In) as IEnvelope; - + switch (msg) { case Message message: @@ -97,7 +97,7 @@ public DefaultProducerStageLogic(IProducerStage stage, Attri return result.Task; }); PostSend(msg); - var resultTask = Task.WhenAll(tasks).ContinueWith(t => new MultiResult(t.Result.ToImmutableHashSet(), multiMessage.PassThrough)); + var resultTask = Task.WhenAll(tasks).ContinueWith(t => new MultiResult(t.Result.ToImmutableHashSet(), multiMessage.PassThrough) as IResults); Push(stage.Out, resultTask as Task); break; } @@ -105,8 +105,8 @@ public DefaultProducerStageLogic(IProducerStage stage, Attri case PassThroughMessage passThroughMessage: { PostSend(msg); - var resultTask = Task.FromResult(new PassThroughResult(msg.PassThrough)) as Task; - Push(stage.Out, resultTask); + var resultTask = Task.FromResult(new PassThroughResult(passThroughMessage.PassThrough) as IResults); + Push(stage.Out, resultTask as Task); break; } } From 232e6f0e681939410ab1d47a36dcf88ea3c5e3e2 Mon Sep 17 00:00:00 2001 From: IgorFedchenko Date: Tue, 22 Oct 2019 16:05:21 +0300 Subject: [PATCH 4/7] Fixed producer stage reports handling --- .../FlowWithContextIntegrationTests.cs | 12 ++++------ .../KafkaIntegrationTests.cs | 24 ++++++++++++++----- .../Helpers/PromiseControl.cs | 6 ++--- .../Stages/Producers/DefaultProducerStage.cs | 5 ++-- 4 files changed, 28 insertions(+), 19 deletions(-) diff --git a/src/Akka.Streams.Kafka.Tests/Integration/FlowWithContextIntegrationTests.cs b/src/Akka.Streams.Kafka.Tests/Integration/FlowWithContextIntegrationTests.cs index b9e09518..d8dd86cb 100644 --- a/src/Akka.Streams.Kafka.Tests/Integration/FlowWithContextIntegrationTests.cs +++ b/src/Akka.Streams.Kafka.Tests/Integration/FlowWithContextIntegrationTests.cs @@ -76,15 +76,13 @@ public async Task ProducerFlowWithContext_should_work_with_source_with_context() .Run(Materializer); // Give it some time to handle message flow - await Task.Delay(TimeSpan.FromSeconds(10)); + await Task.Delay(TimeSpan.FromSeconds(20)); - var shutdown1 = control.DrainAndShutdown(); - AwaitCondition(() => shutdown1.IsCompletedSuccessfully, TimeSpan.FromSeconds(20)); - var shutdown2 = control2.Shutdown(); - AwaitCondition(() => shutdown2.IsCompletedSuccessfully, TimeSpan.FromSeconds(10)); + AssertTaskCompletesWithin(TimeSpan.FromSeconds(10), control.DrainAndShutdown()); + AssertTaskCompletesWithin(TimeSpan.FromSeconds(10), control2.Shutdown()); - AwaitCondition(() => result.IsCompletedSuccessfully, TimeSpan.FromSeconds(10)); - result.Result.Should().HaveCount(totalMessages); + var received = AssertTaskCompletesWithin(TimeSpan.FromSeconds(10), result); + received.Should().HaveCount(totalMessages); } } } \ No newline at end of file diff --git a/src/Akka.Streams.Kafka.Tests/KafkaIntegrationTests.cs b/src/Akka.Streams.Kafka.Tests/KafkaIntegrationTests.cs index 18288bfe..0b0a0914 100644 --- a/src/Akka.Streams.Kafka.Tests/KafkaIntegrationTests.cs +++ b/src/Akka.Streams.Kafka.Tests/KafkaIntegrationTests.cs @@ -97,14 +97,26 @@ await Source /// Asserts that task will finish successfully until specified timeout. /// Throws task exception if task failes /// - protected async Task AssertCompletesSuccessfullyWithin(TimeSpan timeout, Task task) + protected void AssertTaskCompletesWithin(TimeSpan timeout, Task task, bool assertIsSuccessful = true) { - var timeoutTask = Task.Delay(timeout); - - await Task.WhenAny(timeoutTask, task); + AwaitCondition(() => task.IsCompleted, timeout, $"task should complete within {timeout} timeout"); + + if (assertIsSuccessful) + task.IsCompletedSuccessfully.Should().Be(true, "task should compete successfully"); + } + + /// + /// Asserts that task will finish successfully until specified timeout. + /// Throws task exception if task failes + /// + protected TResult AssertTaskCompletesWithin(TimeSpan timeout, Task task, bool assertIsSuccessful = true) + { + AwaitCondition(() => task.IsCompleted, timeout, $"task should complete within {timeout} timeout"); - task.IsCompleted.Should().Be(true, $"task should complete within {timeout} timeout"); - task.IsCompletedSuccessfully.Should().Be(true, "task should compete successfully"); + if (assertIsSuccessful) + task.IsCompletedSuccessfully.Should().Be(true, "task should compete successfully"); + + return task.Result; } protected async Task GivenInitializedTopic(string topic) diff --git a/src/Akka.Streams.Kafka/Helpers/PromiseControl.cs b/src/Akka.Streams.Kafka/Helpers/PromiseControl.cs index a478f2f1..c6b90ffe 100644 --- a/src/Akka.Streams.Kafka/Helpers/PromiseControl.cs +++ b/src/Akka.Streams.Kafka/Helpers/PromiseControl.cs @@ -9,7 +9,7 @@ namespace Akka.Streams.Kafka.Helpers /// Used in source logic classes to provide implementation. /// /// - internal class PromiseControl : IControl + internal abstract class PromiseControl : IControl { private readonly SourceShape _shape; private readonly Action> _completeStageOutlet; @@ -64,9 +64,7 @@ public virtual void PerformStop() /// /// Performs source logic shutdown /// - public virtual void PerformShutdown() - { - } + public abstract void PerformShutdown(); /// /// Executed on source logic stop diff --git a/src/Akka.Streams.Kafka/Stages/Producers/DefaultProducerStage.cs b/src/Akka.Streams.Kafka/Stages/Producers/DefaultProducerStage.cs index 412ccda4..fb3cac96 100644 --- a/src/Akka.Streams.Kafka/Stages/Producers/DefaultProducerStage.cs +++ b/src/Akka.Streams.Kafka/Stages/Producers/DefaultProducerStage.cs @@ -90,10 +90,10 @@ public DefaultProducerStageLogic(IProducerStage stage, Attri { var result = new TaskCompletionSource>(); _awaitingConfirmation.IncrementAndGet(); - _producer.Produce(record, report => + _producer.Produce(record, BuildSendCallback(result, report => { result.SetResult(new MultiResultPart(report, record)); - }); + })); return result.Task; }); PostSend(msg); @@ -206,6 +206,7 @@ private void CheckForCompletion() { if (IsClosed(_stage.In) && _awaitingConfirmation.Current == 0) { + Log.Debug("Completing publisher stage"); var completionTask = _completionState.Task; if (completionTask.IsFaulted || completionTask.IsCanceled) From b8a1e6c54ead18b369eb98afc7b60abeddc111b0 Mon Sep 17 00:00:00 2001 From: IgorFedchenko Date: Tue, 22 Oct 2019 16:10:42 +0300 Subject: [PATCH 5/7] Optimized test timing --- .../FlowWithContextIntegrationTests.cs | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/src/Akka.Streams.Kafka.Tests/Integration/FlowWithContextIntegrationTests.cs b/src/Akka.Streams.Kafka.Tests/Integration/FlowWithContextIntegrationTests.cs index d8dd86cb..087b7208 100644 --- a/src/Akka.Streams.Kafka.Tests/Integration/FlowWithContextIntegrationTests.cs +++ b/src/Akka.Streams.Kafka.Tests/Integration/FlowWithContextIntegrationTests.cs @@ -37,6 +37,7 @@ public async Task ProducerFlowWithContext_should_work_with_source_with_context() var producerSettings = BuildProducerSettings(); var committerSettings = CommitterSettings; var totalMessages = 10; + var totalConsumed = 0; await ProduceStrings(topic1, Enumerable.Range(1, totalMessages), producerSettings); @@ -72,17 +73,20 @@ public async Task ProducerFlowWithContext_should_work_with_source_with_context() .Run(Materializer); var (control2, result) = KafkaConsumer.PlainSource(consumerSettings, Subscriptions.Topics(topic2, topic3, topic4)) - .ToMaterialized(Sink.Seq>(), Keep.Both) + .Scan(0, (c, _) => c + 1) + .Select(consumed => + { + totalConsumed = consumed; + return consumed; + }) + .ToMaterialized(Sink.Last(), Keep.Both) .Run(Materializer); - // Give it some time to handle message flow - await Task.Delay(TimeSpan.FromSeconds(20)); + AwaitCondition(() => totalConsumed == totalMessages, TimeSpan.FromSeconds(30)); AssertTaskCompletesWithin(TimeSpan.FromSeconds(10), control.DrainAndShutdown()); AssertTaskCompletesWithin(TimeSpan.FromSeconds(10), control2.Shutdown()); - - var received = AssertTaskCompletesWithin(TimeSpan.FromSeconds(10), result); - received.Should().HaveCount(totalMessages); + AssertTaskCompletesWithin(TimeSpan.FromSeconds(10), result).Should().Be(totalConsumed); } } } \ No newline at end of file From 594da37ac7633f61f47a7580457ced5873592c00 Mon Sep 17 00:00:00 2001 From: IgorFedchenko Date: Tue, 22 Oct 2019 16:28:08 +0300 Subject: [PATCH 6/7] Fixed sample compilation errors --- examples/SimpleProducer/Program.cs | 15 +++++++++------ src/Akka.Streams.Kafka/Messages/Envelope.cs | 10 +++++----- 2 files changed, 14 insertions(+), 11 deletions(-) diff --git a/examples/SimpleProducer/Program.cs b/examples/SimpleProducer/Program.cs index ec3770d6..184a8e28 100644 --- a/examples/SimpleProducer/Program.cs +++ b/examples/SimpleProducer/Program.cs @@ -1,6 +1,8 @@ using System; using System.Linq; using System.Text; +using System.Threading.Tasks; +using Akka; using Akka.Actor; using Akka.Configuration; using Akka.Streams; @@ -31,14 +33,15 @@ public static void Main(string[] args) Source .Cycle(() => Enumerable.Range(1, 100).GetEnumerator()) .Select(c => c.ToString()) - .Select(elem => new ProducerRecord { Topic = "akka100", Message = new Message { Value = elem }}) - .Via(KafkaProducer.FlexiFlow(producerSettings)) - .Select(record => + .Select(elem => ProducerMessage.Single(new ProducerRecord("akka100", elem))) + .Via(KafkaProducer.FlexiFlow(producerSettings)) + .Select(result => { - Console.WriteLine($"Producer: {record.Topic}/{record.Partition} {record.Offset}: {record.Value}"); - return record; + var response = result as Result; + Console.WriteLine($"Producer: {response.Metadata.Topic}/{response.Metadata.Partition} {response.Metadata.Offset}: {response.Metadata.Value}"); + return result; }) - .RunWith(Sink.Ignore>(), materializer); + .RunWith(Sink.Ignore>(), materializer); // TODO: producer as a Commitable Sink diff --git a/src/Akka.Streams.Kafka/Messages/Envelope.cs b/src/Akka.Streams.Kafka/Messages/Envelope.cs index 97bada44..2fc4d4ce 100644 --- a/src/Akka.Streams.Kafka/Messages/Envelope.cs +++ b/src/Akka.Streams.Kafka/Messages/Envelope.cs @@ -188,7 +188,7 @@ public IEnvelope WithPassThrough(TPassThroug } /// - /// Output type produced by + /// Output type produced by /// public interface IResults { @@ -203,7 +203,7 @@ public interface IResults /// /// Includes the original message, metadata returned from and the `offset` of the produced message. /// - internal sealed class Result : IResults + public sealed class Result : IResults { /// /// Message metadata @@ -230,7 +230,7 @@ public Result(DeliveryReport metadata, Message message } } - internal sealed class MultiResultPart + public sealed class MultiResultPart { public DeliveryReport Metadata { get; } public ProducerRecord Record { get; } @@ -242,7 +242,7 @@ public MultiResultPart(DeliveryReport metadata, ProducerRecord recor } } - internal sealed class MultiResult : IResults + public sealed class MultiResult : IResults { public MultiResult(IImmutableSet> parts, TPassThrough passThrough) { @@ -258,7 +258,7 @@ public MultiResult(IImmutableSet> parts, TPassThrough pass /// implementation emitted when /// has passed through the flow. /// - internal sealed class PassThroughResult : IResults + public sealed class PassThroughResult : IResults { public PassThroughResult(TPassThrough passThrough) { From e9eb2bc8aa72aee7d85fa4baaa8874cf629853eb Mon Sep 17 00:00:00 2001 From: IgorFedchenko Date: Tue, 5 Nov 2019 18:30:24 +0300 Subject: [PATCH 7/7] Fixed compilation error --- src/Akka.Streams.Kafka/Extensions/FlowExtensions.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Akka.Streams.Kafka/Extensions/FlowExtensions.cs b/src/Akka.Streams.Kafka/Extensions/FlowExtensions.cs index 94de5bcc..740b0398 100644 --- a/src/Akka.Streams.Kafka/Extensions/FlowExtensions.cs +++ b/src/Akka.Streams.Kafka/Extensions/FlowExtensions.cs @@ -27,10 +27,10 @@ public static FlowWithContext AsFlowWithContex Func collapseContext, Func extractContext) { - var flowWithTuples = Flow.Create>() + var flowWithTuples = Flow.Create<(TIn, TCtxIn)>() .Select(pair => collapseContext(pair.Item1, pair.Item2)) .ViaMaterialized(flow, Keep.Right) - .Select(e => Tuple.Create(e, extractContext(e))); + .Select(e => (e, extractContext(e))); return FlowWithContext.From(flowWithTuples); }