Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions rdkafka-sys/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,39 @@ pub type RDKafkaTopicResult = bindings::rd_kafka_topic_result_t;
/// Native rdkafka group result.
pub type RDKafkaGroupResult = bindings::rd_kafka_group_result_t;

/// Native rdkafka list offsets result.
pub type RDKafkaListOffsetsResult = bindings::rd_kafka_ListOffsets_result_t;

/// Native rdkafka list offsets info returned.
pub type RDKafkaListOffsetsResultInfo = bindings::rd_kafka_ListOffsetsResultInfo_t;

/// Native result of the Describe Consumer Group operation.
pub type RDKafkaDescribeConsumerGroupsResult = bindings::rd_kafka_DescribeConsumerGroups_result_t;

/// Native description of a Consumer Group.
pub type RDKafkaConsumerGroupDescription = bindings::rd_kafka_ConsumerGroupDescription_t;

/// Native ACL Operation enum.
pub type RDKafkaAclOperation = bindings::rd_kafka_AclOperation_t;

/// Native Consumer Group State enum.
pub type RDKafkaConsumerGroupState = bindings::rd_kafka_consumer_group_state_t;

/// Native Node data.
pub type RDKafkaNode = bindings::rd_kafka_Node_t;

/// Native Consumer Group Type enum.
pub type RDKafkaConsumerGroupType = bindings::rd_kafka_consumer_group_type_t;

/// Native Member Description data.
pub type RDKafkaMemberDescription = bindings::rd_kafka_MemberDescription_t;

/// Native Member Assignment data.
pub type RDKafkaMemberAssignment = bindings::rd_kafka_MemberAssignment_t;

/// Native information provided by the List Consumer Group Offsets operation.
pub type RDKafkaListConsumerGroupOffsets = bindings::rd_kafka_ListConsumerGroupOffsets_t;

/// Native rdkafka mock cluster.
pub type RDKafkaMockCluster = bindings::rd_kafka_mock_cluster_t;

Expand Down
225 changes: 224 additions & 1 deletion src/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,26 @@ use crate::log::{trace, warn};
use crate::util::{cstr_to_owned, AsCArray, ErrBuf, IntoOpaque, KafkaDrop, NativePtr, Timeout};
use crate::TopicPartitionList;

// Reexport the symbols defined in the auxiliary files
pub use crate::group_description::{
group_description_result_key, AclOperation, ConsumerGroupDescription,
ConsumerGroupDescriptionResult, ConsumerGroupState, MemberAssignment, MemberDescription,
};
pub use crate::list_consumer_group_offsets::{
group_result_key, ConsumerGroup, ConsumerGroupResult, ListConsumerGroupOffsets,
};
pub use crate::list_offsets_result_info::{
list_offsets_result_key, ListOffsetsResult, ListOffsetsResultInfo,
};

//
// ********** ADMIN CLIENT **********
//

/// A client for the Kafka admin API.
///
/// `AdminClient` provides programmatic access to managing a Kafka cluster,
/// notably manipulating topics, partitions, and configuration paramaters.
/// notably manipulating topics, partitions, and configuration parameters.
pub struct AdminClient<C: ClientContext> {
client: Client<C>,
queue: Arc<NativeQueue>,
Expand Down Expand Up @@ -377,6 +389,134 @@ impl<C: ClientContext> AdminClient<C> {
Ok(rx)
}

///
/// List offsets for the specified topic_partitions. This operation enables to find the
/// beginning offset, end offset as well as the offset matching a timestamp in partitions or
/// the offset with max timestamp.
///
/// In order to get the latest or the earliest offset, define the input [TopicPartitionListElem]
/// with [Offset::Beginning] or [Offset::End].
///
pub fn list_offsets(
&self,
offsets: &TopicPartitionList,
opts: &AdminOptions,
) -> impl Future<Output = KafkaResult<Vec<ListOffsetsResult>>> {
match self.list_offsets_inner(offsets, opts) {
Ok(rx) => Either::Left(ListOffsetsFuture { rx }),
Err(err) => Either::Right(future::err(err)),
}
}

fn list_offsets_inner(
&self,
offsets: &TopicPartitionList,
opts: &AdminOptions,
) -> KafkaResult<oneshot::Receiver<NativeEvent>> {
let mut err_buf = ErrBuf::new();
let (native_opts, rx) = opts.to_native(self.client.native_ptr(), &mut err_buf)?;
unsafe {
rdsys::rd_kafka_ListOffsets(
self.client.native_ptr(),
offsets.ptr(),
native_opts.ptr(),
self.queue.ptr(),
);
}
Ok(rx)
}

///
/// Provides a description on the requested consumer groups.
///
pub fn describe_consumer_groups<'a, I>(
&self,
groups: I,
opts: &AdminOptions,
) -> impl Future<Output = KafkaResult<Vec<ConsumerGroupDescriptionResult>>>
where
I: IntoIterator<Item = &'a &'a str>,
{
match self.describe_consumer_groups_inner(groups, opts) {
Ok(rx) => Either::Left(ConsumerGroupDescriptionFuture { rx }),
Err(err) => Either::Right(future::err(err)),
}
}

fn describe_consumer_groups_inner<'a, I>(
&self,
groups: I,
opts: &AdminOptions,
) -> KafkaResult<oneshot::Receiver<NativeEvent>>
where
I: IntoIterator<Item = &'a &'a str>,
{
let mut native_groups = Vec::new();
for g in groups {
let cstring = CString::new(*g).expect("Failed to create CString");
native_groups.push(cstring.into_raw());
}

let mut err_buf = ErrBuf::new();
let (native_opts, rx) = opts.to_native(self.client.native_ptr(), &mut err_buf)?;
unsafe {
rdsys::rd_kafka_DescribeConsumerGroups(
self.client.native_ptr(),
native_groups.as_ptr() as *mut _,
native_groups.len(),
native_opts.ptr(),
self.queue.ptr(),
);
}
Ok(rx)
}

///
/// List offset information for the consumer group and (optional) topic partition provided in
/// the request.
///
/// Note that, while the API takes a vector as input, it will only support one group at a time.
pub fn list_consumer_group_offsets<'a, I>(
&self,
groups: I,
opts: &AdminOptions,
) -> impl Future<Output = KafkaResult<Vec<ConsumerGroupResult>>>
where
I: IntoIterator<Item = &'a ListConsumerGroupOffsets<'a>>,
{
match self.list_consumer_group_offsets_inner(groups, opts) {
Ok(rx) => Either::Left(ListConsumerGroupOffsetsFuture { rx }),
Err(err) => Either::Right(future::err(err)),
}
}

fn list_consumer_group_offsets_inner<'a, I>(
&self,
groups: I,
opts: &AdminOptions,
) -> KafkaResult<oneshot::Receiver<NativeEvent>>
where
I: IntoIterator<Item = &'a ListConsumerGroupOffsets<'a>>,
{
let mut native_groups = Vec::new();
for g in groups {
native_groups.push(g.to_native()?);
}

let mut err_buf = ErrBuf::new();
let (native_opts, rx) = opts.to_native(self.client.native_ptr(), &mut err_buf)?;
unsafe {
rdsys::rd_kafka_ListConsumerGroupOffsets(
self.client.native_ptr(),
native_groups.as_c_array(),
native_groups.len(),
native_opts.ptr(),
self.queue.ptr(),
);
}
Ok(rx)
}

/// Returns the client underlying this admin client.
pub fn inner(&self) -> &Client<C> {
&self.client
Expand Down Expand Up @@ -1341,3 +1481,86 @@ impl Future for AlterConfigsFuture {
Poll::Ready(Ok(out))
}
}

//
// List offsets handling
//

struct ListOffsetsFuture {
rx: oneshot::Receiver<NativeEvent>,
}

impl Future for ListOffsetsFuture {
type Output = KafkaResult<Vec<ListOffsetsResult>>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let event = ready!(self.rx.poll_unpin(cx)).map_err(|_| KafkaError::Canceled)?;
event.check_error()?;
let res = unsafe { rdsys::rd_kafka_event_ListOffsets_result(event.ptr()) };
if res.is_null() {
let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) };
return Poll::Ready(Err(KafkaError::AdminOpCreation(format!(
"list offsets request received response of incorrect type ({})",
typ
))));
}
let info_list =
unsafe { ListOffsetsResultInfo::vec_from_ptr(res as *mut RDKafkaListOffsetsResult) };
Poll::Ready(Ok(info_list))
}
}

//
// Describe Consumer Groups handling
//

struct ConsumerGroupDescriptionFuture {
rx: oneshot::Receiver<NativeEvent>,
}

impl Future for ConsumerGroupDescriptionFuture {
type Output = KafkaResult<Vec<Result<ConsumerGroupDescription, (String, KafkaError)>>>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let event = ready!(self.rx.poll_unpin(cx)).map_err(|_| KafkaError::Canceled)?;
event.check_error()?;
let res = unsafe { rdsys::rd_kafka_event_DescribeConsumerGroups_result(event.ptr()) };
if res.is_null() {
let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) };
return Poll::Ready(Err(KafkaError::AdminOpCreation(format!(
"describe consumer groups request received response of incorrect type ({})",
typ
))));
}
let group_list = unsafe { ConsumerGroupDescription::vec_from_ptr(res) };
Poll::Ready(Ok(group_list))
}
}

//
// List Consumer Group Offsets handling
//
struct ListConsumerGroupOffsetsFuture {
rx: oneshot::Receiver<NativeEvent>,
}

impl Future for ListConsumerGroupOffsetsFuture {
type Output = KafkaResult<Vec<ConsumerGroupResult>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let event = ready!(self.rx.poll_unpin(cx)).map_err(|_| KafkaError::Canceled)?;
event.check_error()?;

let res = unsafe { rdsys::rd_kafka_event_ListConsumerGroupOffsets_result(event.ptr()) };
if res.is_null() {
let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) };
return Poll::Ready(Err(KafkaError::AdminOpCreation(format!(
"list consumer group offsets request received response of incorrect type ({})",
typ
))));
}

let mut n = 0;
let groups = unsafe { rdsys::rd_kafka_DeleteGroups_result_groups(res, &mut n) };
Poll::Ready(Ok(ConsumerGroup::vec_result_from_ptr(groups, n)))
}
}
Loading