Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 60 additions & 33 deletions src/producer/future_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,22 @@ use super::Partitioner;
// ********** FUTURE PRODUCER **********
//

///Default implementation for FutureProducerContext::delivery
pub fn future_delivery_impl(
delivery_result: &DeliveryResult<'_>,
tx: oneshot::Sender<OwnedDeliveryResult>,
) {
let owned_delivery_result = match *delivery_result {
Ok(ref message) => Ok(Delivery {
partition: message.partition(),
offset: message.offset(),
timestamp: message.timestamp(),
}),
Err((ref error, ref message)) => Err((error.clone(), message.detach())),
};
let _ = tx.send(owned_delivery_result); // TODO: handle error
}

/// A record for the future producer.
///
/// Like [`BaseRecord`], but specific to the [`FutureProducer`]. The only
Expand Down Expand Up @@ -186,20 +202,9 @@ where
{
type DeliveryOpaque = Box<oneshot::Sender<OwnedDeliveryResult>>;

fn delivery(
&self,
delivery_result: &DeliveryResult<'_>,
tx: Box<oneshot::Sender<OwnedDeliveryResult>>,
) {
let owned_delivery_result = match *delivery_result {
Ok(ref message) => Ok(Delivery {
partition: message.partition(),
offset: message.offset(),
timestamp: message.timestamp(),
}),
Err((ref error, ref message)) => Err((error.clone(), message.detach())),
};
let _ = tx.send(owned_delivery_result); // TODO: handle error
#[inline]
fn delivery(&self, delivery_result: &DeliveryResult<'_>, tx: Self::DeliveryOpaque) {
future_delivery_impl(delivery_result, *tx)
}
}

Expand All @@ -215,18 +220,21 @@ where
/// underlying producer. The internal polling thread will be terminated when the
/// `FutureProducer` goes out of scope.
#[must_use = "Producer polling thread will stop immediately if unused"]
pub struct FutureProducer<C = DefaultClientContext, R = DefaultRuntime, Part = NoCustomPartitioner>
where
pub struct FutureProducer<
C = FutureProducerContext<DefaultClientContext>,
R = DefaultRuntime,
Part = NoCustomPartitioner,
> where
Part: Partitioner,
C: ClientContext + 'static,
C: ProducerContext<Part> + 'static,
{
producer: Arc<ThreadedProducer<FutureProducerContext<C>, Part>>,
producer: Arc<ThreadedProducer<C, Part>>,
_runtime: PhantomData<R>,
}

impl<C, R> Clone for FutureProducer<C, R>
where
C: ClientContext + 'static,
C: ProducerContext + 'static,
{
fn clone(&self) -> FutureProducer<C, R> {
FutureProducer {
Expand All @@ -236,28 +244,30 @@ where
}
}

impl<R> FromClientConfig for FutureProducer<DefaultClientContext, R>
impl<R> FromClientConfig for FutureProducer<FutureProducerContext<DefaultClientContext>, R>
where
R: AsyncRuntime,
{
fn from_config(config: &ClientConfig) -> KafkaResult<FutureProducer<DefaultClientContext, R>> {
FutureProducer::from_config_and_context(config, DefaultClientContext)
fn from_config(
config: &ClientConfig,
) -> KafkaResult<FutureProducer<FutureProducerContext<DefaultClientContext>, R>> {
let context = FutureProducerContext {
wrapped_context: DefaultClientContext,
};
FutureProducer::from_config_and_context(config, context)
}
}

impl<C, R> FromClientConfigAndContext<C> for FutureProducer<C, R>
where
C: ClientContext + 'static,
C: ProducerContext + 'static,
R: AsyncRuntime,
{
fn from_config_and_context(
config: &ClientConfig,
context: C,
) -> KafkaResult<FutureProducer<C, R>> {
let future_context = FutureProducerContext {
wrapped_context: context,
};
let threaded_producer = ThreadedProducer::from_config_and_context(config, future_context)?;
let threaded_producer = ThreadedProducer::from_config_and_context(config, context)?;
Ok(FutureProducer {
producer: Arc::new(threaded_producer),
_runtime: PhantomData,
Expand All @@ -283,9 +293,26 @@ impl Future for DeliveryFuture {
}
}

impl<C, R> FutureProducer<C, R>
/// Creates `FutureProducer` with customized `ProducerContext`
pub fn custom_future_producer<
P: Partitioner + Send + Sync + 'static,
C: ProducerContext<P, DeliveryOpaque = Box<oneshot::Sender<OwnedDeliveryResult>>> + 'static,
R: AsyncRuntime,
>(
config: &ClientConfig,
context: C,
) -> KafkaResult<FutureProducer<C, R, P>> {
let threaded_producer = ThreadedProducer::<C, P>::from_config_and_context(config, context)?;
Ok(FutureProducer {
producer: Arc::new(threaded_producer),
_runtime: PhantomData,
})
}

impl<C, R, Part> FutureProducer<C, R, Part>
where
C: ClientContext + 'static,
Part: Partitioner,
C: ProducerContext<Part, DeliveryOpaque = Box<oneshot::Sender<OwnedDeliveryResult>>> + 'static,
R: AsyncRuntime,
{
/// Sends a message to Kafka, returning the result of the send.
Expand Down Expand Up @@ -385,13 +412,13 @@ where
}
}

impl<C, R, Part> Producer<FutureProducerContext<C>, Part> for FutureProducer<C, R, Part>
impl<C, R, Part> Producer<C, Part> for FutureProducer<C, R, Part>
where
C: ClientContext + 'static,
R: AsyncRuntime,
Part: Partitioner,
C: ProducerContext<Part> + 'static,
R: AsyncRuntime,
{
fn client(&self) -> &Client<FutureProducerContext<C>> {
fn client(&self) -> &Client<C> {
self.producer.client()
}

Expand Down
3 changes: 1 addition & 2 deletions tests/test_high_producers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use std::time::{Duration, Instant};

use futures::stream::{FuturesUnordered, StreamExt};

use rdkafka::client::DefaultClientContext;
use rdkafka::config::ClientConfig;
use rdkafka::error::{KafkaError, RDKafkaErrorCode};
use rdkafka::message::{Header, Headers, Message, OwnedHeaders};
Expand All @@ -17,7 +16,7 @@ use crate::utils::*;

mod utils;

fn future_producer(config_overrides: HashMap<&str, &str>) -> FutureProducer<DefaultClientContext> {
fn future_producer(config_overrides: HashMap<&str, &str>) -> FutureProducer {
let mut config = ClientConfig::new();
config
.set("bootstrap.servers", "localhost")
Expand Down
15 changes: 14 additions & 1 deletion tests/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ use rdkafka::config::ClientConfig;
use rdkafka::consumer::ConsumerContext;
use rdkafka::error::KafkaResult;
use rdkafka::message::ToBytes;
use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::producer::future_producer::{future_delivery_impl, OwnedDeliveryResult};
use rdkafka::producer::{FutureProducer, FutureRecord, ProducerContext};
use rdkafka::statistics::Statistics;
use rdkafka::TopicPartitionList;

Expand Down Expand Up @@ -74,6 +75,18 @@ impl ClientContext for ProducerTestContext {
fn stats(&self, _: Statistics) {} // Don't print stats
}

impl ProducerContext for ProducerTestContext {
type DeliveryOpaque = Box<futures_channel::oneshot::Sender<OwnedDeliveryResult>>;

fn delivery(
&self,
delivery_result: &rdkafka::message::DeliveryResult<'_>,
delivery_opaque: Self::DeliveryOpaque,
) {
future_delivery_impl(delivery_result, *delivery_opaque);
}
}

pub async fn create_topic(name: &str, partitions: i32) {
let client: AdminClient<_> = consumer_config("create_topic", None).create().unwrap();
client
Expand Down