diff --git a/rdkafka-sys/src/types.rs b/rdkafka-sys/src/types.rs index d7eeee036..8495b875e 100644 --- a/rdkafka-sys/src/types.rs +++ b/rdkafka-sys/src/types.rs @@ -96,6 +96,39 @@ pub type RDKafkaTopicResult = bindings::rd_kafka_topic_result_t; /// Native rdkafka group result. pub type RDKafkaGroupResult = bindings::rd_kafka_group_result_t; +/// Native rdkafka list offsets result. +pub type RDKafkaListOffsetsResult = bindings::rd_kafka_ListOffsets_result_t; + +/// Native rdkafka list offsets info returned. +pub type RDKafkaListOffsetsResultInfo = bindings::rd_kafka_ListOffsetsResultInfo_t; + +/// Native result of the Describe Consumer Group operation. +pub type RDKafkaDescribeConsumerGroupsResult = bindings::rd_kafka_DescribeConsumerGroups_result_t; + +/// Native description of a Consumer Group. +pub type RDKafkaConsumerGroupDescription = bindings::rd_kafka_ConsumerGroupDescription_t; + +/// Native ACL Operation enum. +pub type RDKafkaAclOperation = bindings::rd_kafka_AclOperation_t; + +/// Native Consumer Group State enum. +pub type RDKafkaConsumerGroupState = bindings::rd_kafka_consumer_group_state_t; + +/// Native Node data. +pub type RDKafkaNode = bindings::rd_kafka_Node_t; + +/// Native Consumer Group Type enum. +pub type RDKafkaConsumerGroupType = bindings::rd_kafka_consumer_group_type_t; + +/// Native Member Description data. +pub type RDKafkaMemberDescription = bindings::rd_kafka_MemberDescription_t; + +/// Native Member Assignment data. +pub type RDKafkaMemberAssignment = bindings::rd_kafka_MemberAssignment_t; + +/// Native information provided by the List Consumer Group Offsets operation. +pub type RDKafkaListConsumerGroupOffsets = bindings::rd_kafka_ListConsumerGroupOffsets_t; + /// Native rdkafka mock cluster. pub type RDKafkaMockCluster = bindings::rd_kafka_mock_cluster_t; diff --git a/src/admin.rs b/src/admin.rs index 998053fe0..30ffd7e02 100644 --- a/src/admin.rs +++ b/src/admin.rs @@ -28,6 +28,18 @@ use crate::log::{trace, warn}; use crate::util::{cstr_to_owned, AsCArray, ErrBuf, IntoOpaque, KafkaDrop, NativePtr, Timeout}; use crate::TopicPartitionList; +// Reexport the symbols defined in the auxiliary files +pub use crate::group_description::{ + group_description_result_key, AclOperation, ConsumerGroupDescription, + ConsumerGroupDescriptionResult, ConsumerGroupState, MemberAssignment, MemberDescription, +}; +pub use crate::list_consumer_group_offsets::{ + group_result_key, ConsumerGroup, ConsumerGroupResult, ListConsumerGroupOffsets, +}; +pub use crate::list_offsets_result_info::{ + list_offsets_result_key, ListOffsetsResult, ListOffsetsResultInfo, +}; + // // ********** ADMIN CLIENT ********** // @@ -35,7 +47,7 @@ use crate::TopicPartitionList; /// A client for the Kafka admin API. /// /// `AdminClient` provides programmatic access to managing a Kafka cluster, -/// notably manipulating topics, partitions, and configuration paramaters. +/// notably manipulating topics, partitions, and configuration parameters. pub struct AdminClient { client: Client, queue: Arc, @@ -377,6 +389,134 @@ impl AdminClient { Ok(rx) } + /// + /// List offsets for the specified topic_partitions. This operation enables to find the + /// beginning offset, end offset as well as the offset matching a timestamp in partitions or + /// the offset with max timestamp. + /// + /// In order to get the latest or the earliest offset, define the input [TopicPartitionListElem] + /// with [Offset::Beginning] or [Offset::End]. + /// + pub fn list_offsets( + &self, + offsets: &TopicPartitionList, + opts: &AdminOptions, + ) -> impl Future>> { + match self.list_offsets_inner(offsets, opts) { + Ok(rx) => Either::Left(ListOffsetsFuture { rx }), + Err(err) => Either::Right(future::err(err)), + } + } + + fn list_offsets_inner( + &self, + offsets: &TopicPartitionList, + opts: &AdminOptions, + ) -> KafkaResult> { + let mut err_buf = ErrBuf::new(); + let (native_opts, rx) = opts.to_native(self.client.native_ptr(), &mut err_buf)?; + unsafe { + rdsys::rd_kafka_ListOffsets( + self.client.native_ptr(), + offsets.ptr(), + native_opts.ptr(), + self.queue.ptr(), + ); + } + Ok(rx) + } + + /// + /// Provides a description on the requested consumer groups. + /// + pub fn describe_consumer_groups<'a, I>( + &self, + groups: I, + opts: &AdminOptions, + ) -> impl Future>> + where + I: IntoIterator, + { + match self.describe_consumer_groups_inner(groups, opts) { + Ok(rx) => Either::Left(ConsumerGroupDescriptionFuture { rx }), + Err(err) => Either::Right(future::err(err)), + } + } + + fn describe_consumer_groups_inner<'a, I>( + &self, + groups: I, + opts: &AdminOptions, + ) -> KafkaResult> + where + I: IntoIterator, + { + let mut native_groups = Vec::new(); + for g in groups { + let cstring = CString::new(*g).expect("Failed to create CString"); + native_groups.push(cstring.into_raw()); + } + + let mut err_buf = ErrBuf::new(); + let (native_opts, rx) = opts.to_native(self.client.native_ptr(), &mut err_buf)?; + unsafe { + rdsys::rd_kafka_DescribeConsumerGroups( + self.client.native_ptr(), + native_groups.as_ptr() as *mut _, + native_groups.len(), + native_opts.ptr(), + self.queue.ptr(), + ); + } + Ok(rx) + } + + /// + /// List offset information for the consumer group and (optional) topic partition provided in + /// the request. + /// + /// Note that, while the API takes a vector as input, it will only support one group at a time. + pub fn list_consumer_group_offsets<'a, I>( + &self, + groups: I, + opts: &AdminOptions, + ) -> impl Future>> + where + I: IntoIterator>, + { + match self.list_consumer_group_offsets_inner(groups, opts) { + Ok(rx) => Either::Left(ListConsumerGroupOffsetsFuture { rx }), + Err(err) => Either::Right(future::err(err)), + } + } + + fn list_consumer_group_offsets_inner<'a, I>( + &self, + groups: I, + opts: &AdminOptions, + ) -> KafkaResult> + where + I: IntoIterator>, + { + let mut native_groups = Vec::new(); + for g in groups { + native_groups.push(g.to_native()?); + } + + let mut err_buf = ErrBuf::new(); + let (native_opts, rx) = opts.to_native(self.client.native_ptr(), &mut err_buf)?; + unsafe { + rdsys::rd_kafka_ListConsumerGroupOffsets( + self.client.native_ptr(), + native_groups.as_c_array(), + native_groups.len(), + native_opts.ptr(), + self.queue.ptr(), + ); + } + Ok(rx) + } + /// Returns the client underlying this admin client. pub fn inner(&self) -> &Client { &self.client @@ -1341,3 +1481,86 @@ impl Future for AlterConfigsFuture { Poll::Ready(Ok(out)) } } + +// +// List offsets handling +// + +struct ListOffsetsFuture { + rx: oneshot::Receiver, +} + +impl Future for ListOffsetsFuture { + type Output = KafkaResult>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let event = ready!(self.rx.poll_unpin(cx)).map_err(|_| KafkaError::Canceled)?; + event.check_error()?; + let res = unsafe { rdsys::rd_kafka_event_ListOffsets_result(event.ptr()) }; + if res.is_null() { + let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) }; + return Poll::Ready(Err(KafkaError::AdminOpCreation(format!( + "list offsets request received response of incorrect type ({})", + typ + )))); + } + let info_list = + unsafe { ListOffsetsResultInfo::vec_from_ptr(res as *mut RDKafkaListOffsetsResult) }; + Poll::Ready(Ok(info_list)) + } +} + +// +// Describe Consumer Groups handling +// + +struct ConsumerGroupDescriptionFuture { + rx: oneshot::Receiver, +} + +impl Future for ConsumerGroupDescriptionFuture { + type Output = KafkaResult>>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let event = ready!(self.rx.poll_unpin(cx)).map_err(|_| KafkaError::Canceled)?; + event.check_error()?; + let res = unsafe { rdsys::rd_kafka_event_DescribeConsumerGroups_result(event.ptr()) }; + if res.is_null() { + let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) }; + return Poll::Ready(Err(KafkaError::AdminOpCreation(format!( + "describe consumer groups request received response of incorrect type ({})", + typ + )))); + } + let group_list = unsafe { ConsumerGroupDescription::vec_from_ptr(res) }; + Poll::Ready(Ok(group_list)) + } +} + +// +// List Consumer Group Offsets handling +// +struct ListConsumerGroupOffsetsFuture { + rx: oneshot::Receiver, +} + +impl Future for ListConsumerGroupOffsetsFuture { + type Output = KafkaResult>; + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let event = ready!(self.rx.poll_unpin(cx)).map_err(|_| KafkaError::Canceled)?; + event.check_error()?; + + let res = unsafe { rdsys::rd_kafka_event_ListConsumerGroupOffsets_result(event.ptr()) }; + if res.is_null() { + let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) }; + return Poll::Ready(Err(KafkaError::AdminOpCreation(format!( + "list consumer group offsets request received response of incorrect type ({})", + typ + )))); + } + + let mut n = 0; + let groups = unsafe { rdsys::rd_kafka_DeleteGroups_result_groups(res, &mut n) }; + Poll::Ready(Ok(ConsumerGroup::vec_result_from_ptr(groups, n))) + } +} diff --git a/src/group_description.rs b/src/group_description.rs new file mode 100644 index 000000000..f8ad138ad --- /dev/null +++ b/src/group_description.rs @@ -0,0 +1,389 @@ +//! Data structures supporting the [AdminClient::describe_consumer_groups] operation. + +use crate::error::{IsError, KafkaError, RDKafkaError}; +use crate::group_description::ConsumerGroupState::Invalid; +use crate::util::cstr_to_owned; +use crate::TopicPartitionList; +use rdkafka_sys as rdsys; +use rdkafka_sys::types::*; + +/// Result of the `AdminClient::describe_consumer_groups` method. +pub type ConsumerGroupDescriptionResult = Result; + +/// Given a `ConsumerGroupDescriptionResult` provides the identifier (the group_id) from either +/// the `Ok` or the `Err` branches. +pub fn group_description_result_key(result: &ConsumerGroupDescriptionResult) -> &str { + match result { + Ok(description) => description.group_id.as_str(), + Err((group_id, _error)) => group_id.as_str(), + } +} + +/// +/// Top level description of a consumer group +/// +#[derive(Debug, PartialEq)] +pub struct ConsumerGroupDescription { + /// The id of the consumer group. + pub group_id: String, + /// If consumer group is simple or not. + pub is_simple_consumer_group: bool, + /// The consumer group partition assignor. + pub partition_assignor: String, + /// authorizedOperations for this group, or null if that information is not known. + pub authorized_operations: Vec, + /// The group state, or UNKNOWN if the state is too new for us to parse. + pub state: ConsumerGroupState, + /// The consumer group coordinator, or None if the coordinator is not known. + pub coordinator: Option, + /// The group type (or the protocol) of this consumer group. It defaults to Classic if not + /// provided by the server. + pub group_type: GroupType, + /// A list of the members of the consumer group. + pub members: Vec, +} + +impl ConsumerGroupDescription { + pub(crate) unsafe fn vec_from_ptr( + ptr: *const RDKafkaDescribeConsumerGroupsResult, + ) -> Vec { + let mut group_count = 0; + + let groups = rdsys::rd_kafka_DescribeConsumerGroups_result_groups(ptr, &mut group_count); + + let mut groups_out = Vec::with_capacity(group_count); + + // Copy the offsets from the C structure + for i in 0..group_count { + let group_ptr = *groups.add(i); + + let description = ConsumerGroupDescription::from_ptr(group_ptr); + groups_out.push(description); + } + groups_out + } + + pub(crate) unsafe fn from_ptr( + ptr: *const RDKafkaConsumerGroupDescription, + ) -> ConsumerGroupDescriptionResult { + let group_id = cstr_to_owned(rdsys::rd_kafka_ConsumerGroupDescription_group_id(ptr)); + + let kafka_error = + RDKafkaError::from_ptr(rdsys::rd_kafka_ConsumerGroupDescription_error(ptr) as *mut _); + if kafka_error.is_error() { + Err((group_id.clone(),KafkaError::AdminOp(kafka_error.code()))) + } else { + let is_simple_consumer_group: bool = + rdsys::rd_kafka_ConsumerGroupDescription_is_simple_consumer_group(ptr) != 0; + + let partition_assignor = cstr_to_owned( + rdsys::rd_kafka_ConsumerGroupDescription_partition_assignor(ptr), + ); + + let mut acl_count = 0; + let acl_list = + rdsys::rd_kafka_ConsumerGroupDescription_authorized_operations(ptr, &mut acl_count); + let mut authorized_operations = Vec::with_capacity(acl_count); + for i in 0..acl_count { + let native_acl = *acl_list.add(i); + authorized_operations.push(AclOperation::from_native(native_acl)); + } + let state = + ConsumerGroupState::from_native(rdsys::rd_kafka_ConsumerGroupDescription_state(ptr)); + + let coordinator_ptr = rdsys::rd_kafka_ConsumerGroupDescription_coordinator(ptr); + let coordinator = if coordinator_ptr.is_null() { + None + } else { + Some(Node::from_ptr(coordinator_ptr)) + }; + + let group_type = + GroupType::from_native(rdsys::rd_kafka_ConsumerGroupDescription_type(ptr)); + + // rd_kafka_ConsumerGroupDescription_member_count + let mut members = Vec::new(); + let member_count = rdsys::rd_kafka_ConsumerGroupDescription_member_count(ptr); + for i in 0..member_count { + let member_ptr = rdsys::rd_kafka_ConsumerGroupDescription_member(ptr, i); + let member = MemberDescription::from_ptr(member_ptr); + members.push(member); + } + Ok(ConsumerGroupDescription { + group_id, + is_simple_consumer_group, + partition_assignor, + authorized_operations, + state, + coordinator, + group_type, + members, + }) + } + } +} + +/// +/// Represents an operation which an ACL grants or denies permission to perform. Some operations +/// imply other operations: +/// +/// - `ALLOW ALL` implies `ALLOW` everything +/// - `DENY ALL` implies `DENY` everything +/// - `ALLOW READ` implies `ALLOW DESCRIBE` +/// - `ALLOW WRITE` implies `ALLOW DESCRIBE` +/// - `ALLOW DELETE` implies `ALLOW DESCRIBE` +/// - `ALLOW ALTER` implies `ALLOW DESCRIBE` +/// - `ALLOW ALTER_CONFIGS` implies `ALLOW DESCRIBE_CONFIGS` +#[derive(Debug, PartialEq)] +pub enum AclOperation { + /// Represents any AclOperation which this client cannot understand, + /// perhaps because this client is too old. + Unknown = 0, + /// In a filter, matches any AclOperation. + Any = 1, + /// `ALL` operations. + All = 2, + /// `READ` operation. + Read = 3, + /// `WRITE` operation. + Write = 4, + /// `CREATE` operation. + Create = 5, + /// `DELETE` operation. + Delete = 6, + /// `ALTE``` operation. + Alter = 7, + /// `DESCRIBE` operation. + Describe = 8, + /// `CLUSTER_ACTION` operation. + ClusterAction = 9, + /// `DESCRIBE_CONFIGS` operation. + DescribeConfigs = 10, + /// ALTER_CONFIGS` operation. + AlterConfigs = 11, + /// IDEMPOTENT_WRITE operation. + IdempotentWrite = 12, + /// Invalid enum value. + Invalid = 13, +} + +impl AclOperation { + pub(crate) fn from_native(native_value: RDKafkaAclOperation) -> AclOperation { + match native_value { + RDKafkaAclOperation::RD_KAFKA_ACL_OPERATION_UNKNOWN => AclOperation::Unknown, + RDKafkaAclOperation::RD_KAFKA_ACL_OPERATION_ANY => AclOperation::Any, + RDKafkaAclOperation::RD_KAFKA_ACL_OPERATION_ALL => AclOperation::All, + RDKafkaAclOperation::RD_KAFKA_ACL_OPERATION_READ => AclOperation::Read, + RDKafkaAclOperation::RD_KAFKA_ACL_OPERATION_WRITE => AclOperation::Write, + RDKafkaAclOperation::RD_KAFKA_ACL_OPERATION_CREATE => AclOperation::Create, + RDKafkaAclOperation::RD_KAFKA_ACL_OPERATION_DELETE => AclOperation::Delete, + RDKafkaAclOperation::RD_KAFKA_ACL_OPERATION_ALTER => AclOperation::Alter, + RDKafkaAclOperation::RD_KAFKA_ACL_OPERATION_DESCRIBE => AclOperation::Describe, + RDKafkaAclOperation::RD_KAFKA_ACL_OPERATION_CLUSTER_ACTION => { + AclOperation::ClusterAction + } + RDKafkaAclOperation::RD_KAFKA_ACL_OPERATION_DESCRIBE_CONFIGS => { + AclOperation::DescribeConfigs + } + RDKafkaAclOperation::RD_KAFKA_ACL_OPERATION_ALTER_CONFIGS => { + AclOperation::AlterConfigs + } + RDKafkaAclOperation::RD_KAFKA_ACL_OPERATION_IDEMPOTENT_WRITE => { + AclOperation::IdempotentWrite + } + _ => AclOperation::Invalid, + } + } +} + +/// +/// The consumer group state. +/// +#[derive(Debug, PartialEq)] +pub enum ConsumerGroupState { + /// Any state this client cannot understand. + Unknown = 0, + ///The group is preparing to rebalance. A rebalance is triggered whenever a consumer + /// joins or leaves the group or when a consumer fails. + PreparingRebalance = 1, + /// The group is completing the rebalance process, assigning partitions to consumers. + CompletingRebalance = 2, + /// The group is stable with all consumers active and partitions assigned accordingly. + Stable = 3, + /// The group has been marked for deletion, or it does not exist. + Dead = 4, + /// All consumers in the group are inactive, and no partitions are assigned. + Empty = 5, + /// Invalid enum value. + Invalid = 6, +} + +impl ConsumerGroupState { + pub(crate) fn from_native( + native_value: RDKafkaConsumerGroupState, + ) -> ConsumerGroupState { + match native_value { + RDKafkaConsumerGroupState::RD_KAFKA_CONSUMER_GROUP_STATE_UNKNOWN => ConsumerGroupState::Unknown, + RDKafkaConsumerGroupState::RD_KAFKA_CONSUMER_GROUP_STATE_PREPARING_REBALANCE => ConsumerGroupState::PreparingRebalance, + RDKafkaConsumerGroupState::RD_KAFKA_CONSUMER_GROUP_STATE_COMPLETING_REBALANCE => ConsumerGroupState::CompletingRebalance, + RDKafkaConsumerGroupState::RD_KAFKA_CONSUMER_GROUP_STATE_STABLE => ConsumerGroupState::Stable, + RDKafkaConsumerGroupState::RD_KAFKA_CONSUMER_GROUP_STATE_DEAD => ConsumerGroupState::Dead, + RDKafkaConsumerGroupState::RD_KAFKA_CONSUMER_GROUP_STATE_EMPTY => ConsumerGroupState::Empty, + _ => Invalid + + } + } +} + +/// +/// Information about a Kafka node +/// +#[derive(Debug, PartialEq)] +pub struct Node { + /// The node id of this node + pub id: i32, + /// The host name for this node + pub host: String, + /// The port for this node + pub port: u16, + /// The rack for this node (if defined) + pub rack: Option, +} + +impl Node { + pub(crate) unsafe fn from_ptr(ptr: *const RDKafkaNode) -> Node { + let id = rdsys::rd_kafka_Node_id(ptr); + let host = cstr_to_owned(rdsys::rd_kafka_Node_host(ptr)); + let port = rdsys::rd_kafka_Node_port(ptr); + let rack_ptr = rdsys::rd_kafka_Node_rack(ptr); + let rack = if rack_ptr.is_null() { + None + } else { + Some(cstr_to_owned(rack_ptr)) + }; + Node { + id, + host, + port, + rack, + } + } +} + +/// +/// The type of the consumer group. +/// +#[derive(Debug, PartialEq)] +pub enum GroupType { + /// Unknown or unsupported group type. + Unknown = 0, + /// The standard consumer group. + Consumer = 1, + /// Legacy consumer group. + Classic = 2, + /// Invalid enum value. + Invalid = 3, +} + +impl GroupType { + pub(crate) fn from_native(native: RDKafkaConsumerGroupType) -> GroupType { + match native { + RDKafkaConsumerGroupType::RD_KAFKA_CONSUMER_GROUP_TYPE_UNKNOWN => { + GroupType::Unknown + } + RDKafkaConsumerGroupType::RD_KAFKA_CONSUMER_GROUP_TYPE_CONSUMER => { + GroupType::Consumer + } + RDKafkaConsumerGroupType::RD_KAFKA_CONSUMER_GROUP_TYPE_CLASSIC => { + GroupType::Classic + } + _ => GroupType::Invalid, + } + } +} + +/// +/// A detailed description of a single group member in the cluster. +/// +#[derive(Debug, PartialEq)] +pub struct MemberDescription { + /// The client id of the group member. + /// This is the client identifier string assigned by the Kafka client. + /// It identifies the client application instance that is a member of the consumer group. + /// The client_id is typically set in the consumer configuration and is used to + /// differentiate clients in logs, metrics, and management tools. + pub client_id: String, + /// This is a unique identifier for a consumer instance within a consumer group, + /// introduced to support static membership in Kafka. When set, it identifies the + /// consumer instance persistently across restarts, enabling more stable group + /// membership and reducing rebalances. If null, the member is considered a dynamic + /// member with a broker-generated member ID. + pub group_instance_id: Option, + /// This identifies the specific consumer member of the group and is generated by the + /// broker upon joining the group. It is the broker-assigned ID used internally to track + /// the group member and its state. + pub consumer_id: String, + /// The host where the group member is running. + pub host: String, + /// The assignment of the group member. + pub assignment: Option, + /// The target assignment of the member. + pub target_assignment: Option, +} + +impl MemberDescription { + pub(crate) unsafe fn from_ptr( + ptr: *const RDKafkaMemberDescription, + ) -> MemberDescription { + let client_id = cstr_to_owned(rdsys::rd_kafka_MemberDescription_client_id(ptr)); + let group_instance_id_ptr = rdsys::rd_kafka_MemberDescription_group_instance_id(ptr); + let group_instance_id = if group_instance_id_ptr.is_null() { + None + } else { + Some(cstr_to_owned(group_instance_id_ptr)) + }; + let consumer_id = cstr_to_owned(rdsys::rd_kafka_MemberDescription_consumer_id(ptr)); + let host = cstr_to_owned(rdsys::rd_kafka_MemberDescription_host(ptr)); + let assignment_ptr = rdsys::rd_kafka_MemberDescription_assignment(ptr); + let assignment = if assignment_ptr.is_null() { + None + } else { + Some(MemberAssignment::from_ptr(assignment_ptr)) + }; + let target_assignment_ptr = rdsys::rd_kafka_MemberDescription_target_assignment(ptr); + let target_assignment = if target_assignment_ptr.is_null() { + None + } else { + Some(MemberAssignment::from_ptr(target_assignment_ptr)) + }; + MemberDescription { + client_id, + group_instance_id, + consumer_id, + host, + assignment, + target_assignment, + } + } +} + +/// +/// A description of the assignments of a specific group member. +/// +#[derive(Debug, PartialEq)] +pub struct MemberAssignment { + /// The topic partitions assigned to a group member. + pub partitions: TopicPartitionList, +} + +impl MemberAssignment { + pub(crate) unsafe fn from_ptr( + ptr: *const RDKafkaMemberAssignment, + ) -> MemberAssignment { + let topic_partition_list_ptr = rdsys::rd_kafka_MemberAssignment_partitions(ptr); + let topic_partition_list_cloned_ptr = + rdsys::rd_kafka_topic_partition_list_copy(topic_partition_list_ptr); + let partitions = TopicPartitionList::from_ptr(topic_partition_list_cloned_ptr); + MemberAssignment { partitions } + } +} diff --git a/src/lib.rs b/src/lib.rs index 46709c5a7..c3fd9d1a8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -287,6 +287,9 @@ pub mod producer; pub mod statistics; pub mod topic_partition_list; pub mod util; +mod list_offsets_result_info; +mod group_description; +mod list_consumer_group_offsets; // Re-exports. pub use crate::client::ClientContext; diff --git a/src/list_consumer_group_offsets.rs b/src/list_consumer_group_offsets.rs new file mode 100644 index 000000000..34571e559 --- /dev/null +++ b/src/list_consumer_group_offsets.rs @@ -0,0 +1,121 @@ +//! Types required for the list_consumer_group_offsets operations. + +use std::ffi::CString; +use crate::util::{cstr_to_owned, KafkaDrop, NativePtr}; +use rdkafka_sys as rdsys; +use rdkafka_sys::types::*; +use crate::error::{IsError, KafkaError, KafkaResult}; +use crate::TopicPartitionList; + + +type NativeListConsumerGroupOffsets = NativePtr; + +unsafe impl KafkaDrop for RDKafkaListConsumerGroupOffsets { + const TYPE: &'static str = "list consumer group offsets"; + const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_ListConsumerGroupOffsets_destroy; + +} + +/// +/// Specification of consumer group offsets to list using +/// `AdminClient.list_consumer_group_offsets`. +/// +/// If the partitions is set to `None`, the operation will provide information about all +/// topics assigned to the consumer group. +/// +pub struct ListConsumerGroupOffsets<'a> { + /// The group_id of the consumer group requested. + pub group_id: &'a str, + /// The topic and partitions, assigned to the group, that we request information for. + pub partitions: Option +} + + +impl<'a> ListConsumerGroupOffsets<'a> { + /// Creates a new `ListConsumerGroupOffsets` + pub fn new(group_id: &'a str, partitions: TopicPartitionList) -> ListConsumerGroupOffsets<'a> { + ListConsumerGroupOffsets { + group_id, + partitions: Some(partitions) + } + } + + /// Creates a bew `ListConsumerGroupOffsets` providing only the group_id. + /// This will retrieve all the topics associated to the group. + pub fn from_group(group_id: &'a str) -> ListConsumerGroupOffsets<'a> { + ListConsumerGroupOffsets { + group_id, + partitions: None + } + } + + pub(crate) fn to_native(&self) -> KafkaResult { + let group_id = CString::new(self.group_id)?; + + let list_consumer_group_offset = unsafe { + NativeListConsumerGroupOffsets::from_ptr(rdsys::rd_kafka_ListConsumerGroupOffsets_new( + group_id.as_ptr(), + if let Some(partition) = &self.partitions { partition.ptr() } else { std::ptr::null() }, + )) + }; + list_consumer_group_offset.ok_or(KafkaError::AdminOpCreation("ListConsumer Group Offset creation failed".to_string())) + } +} + +/// Each of the items returned by the `AdminClient.list_consumer_group_offsets` +/// The error branch contains the group_id as first element of the tuple. +pub type ConsumerGroupResult = Result; + +/// Obtain the identification (the group_id) extracted from either the `Ok` or the `Err` branches. +pub fn group_result_key(result: &ConsumerGroupResult) -> &str { + match result { + Ok(description) => description.group_id.as_str(), + Err((group_id, _error)) => group_id.as_str(), + } +} + +/// +/// Information retrieved for the requested group on success. +/// +#[derive(Debug, PartialEq)] +pub struct ConsumerGroup { + /// Identifies the group + pub group_id: String, + /// the partitions assigned to the group that the operation provided information for. + pub topic_partitions: TopicPartitionList, +} + + +impl ConsumerGroup { + pub(crate) fn vec_result_from_ptr(groups: *mut *const RDKafkaGroupResult, + count: usize) -> Vec { + let mut out = Vec::with_capacity(count); + for i in 0..count { + let group = unsafe { *groups.add(i) }; + let r = Self::from_ptr(group); + out.push(r); + } + out + } + + fn from_ptr(group: *const RDKafkaGroupResult) -> Result { + let name = unsafe { cstr_to_owned(rdsys::rd_kafka_group_result_name(group)) }; + let err = unsafe { + let err = rdsys::rd_kafka_group_result_error(group); + rdsys::rd_kafka_error_code(err) + }; + let r = if err.is_error() { + Err((name.clone(), KafkaError::AdminOp(err.into()))) + } else { + let partitions_native = unsafe { rdsys::rd_kafka_group_result_partitions(group) }; + let cloned_partition_native = unsafe { rdsys::rd_kafka_topic_partition_list_copy(partitions_native) }; + let partition = unsafe { TopicPartitionList::from_ptr(cloned_partition_native) }; + let group_tp = ConsumerGroup { + group_id: name, + topic_partitions: partition, + }; + Ok(group_tp) + }; + r + } +} diff --git a/src/list_offsets_result_info.rs b/src/list_offsets_result_info.rs new file mode 100644 index 000000000..c9bec1571 --- /dev/null +++ b/src/list_offsets_result_info.rs @@ -0,0 +1,91 @@ +//! +//! Data structures supporting the [AdminClient::list_offsets] operation +//! +use crate::error::{IsError, KafkaError}; +use crate::Offset; +use rdkafka_sys as rdsys; +use rdkafka_sys::types::*; +use std::ffi::CStr; +use std::slice; + +/// Each of the items returned by [AdminClient::list_offsets] operation. +/// The error branch contains the topic name and partition to identify the source +/// of the error. +pub type ListOffsetsResult = Result; + +/// Obtain the identification (the topic_id and partition) extracted from either +/// the `Ok` or the `Err` branches. +pub fn list_offsets_result_key(result: &ListOffsetsResult) -> (&str, i32) { + match result { + Ok(tp) => (tp.topic.as_str(), tp.partition), + Err((topic, partition, _error)) => (topic.as_str(), *partition), + } +} + + +/// +/// The information returned on success for each topic and instance requested through the +/// [AdminClient::list_offsets] operation. +/// +#[derive(Debug, PartialEq)] +pub struct ListOffsetsResultInfo { + /// The name of the topic + pub topic: String, + /// The partition of the topic. + pub partition: i32, + /// The requested offset from the topic. + pub offset: Offset, + /// Additional information in raw format. + pub metadata: Option>, + /// The timestamp in milliseconds corresponding to the offset. Not available (-1) + /// when querying for the earliest or the latest offsets. + pub timestamp: i64, +} + +impl ListOffsetsResultInfo { + pub(crate) unsafe fn vec_from_ptr(ptr: *mut RDKafkaListOffsetsResult) -> Vec { + let mut result_count: usize = 0; + let info_list_ptr = rdsys::rd_kafka_ListOffsets_result_infos(ptr, &mut result_count); + + let mut info_list = Vec::new(); + + // Copy the offsets from the C structure + for i in 0..result_count as isize { + let info_ptr = *info_list_ptr.offset(i); + let tp = rdsys::rd_kafka_ListOffsetsResultInfo_topic_partition(info_ptr); + + let topic = CStr::from_ptr((*tp).topic) + .to_str() + .expect("Topic name is not UTF-8").to_string(); + + let partition = (*tp).partition; + + let info = if (*tp).err.is_error() { + Err((topic.clone(), partition, KafkaError::AdminOp((*tp).err.into()))) + } else { + let offset = Offset::from_raw((*tp).offset); + + let metadata = if (*tp).metadata.is_null() { + None + } else { + Some(slice::from_raw_parts::( + (*tp).metadata as *const u8, + (*tp).metadata_size).to_vec()) + }; + + let timestamp = rdsys::rd_kafka_ListOffsetsResultInfo_timestamp(info_ptr); + + Ok(ListOffsetsResultInfo { + topic, + partition, + offset, + metadata, + timestamp, + }) + }; + info_list.push(info); + } + + info_list + } +} \ No newline at end of file diff --git a/tests/test_admin.rs b/tests/test_admin.rs index 87f258a9e..44e3d0bc6 100644 --- a/tests/test_admin.rs +++ b/tests/test_admin.rs @@ -10,11 +10,15 @@ use rdkafka::admin::{ }; use rdkafka::client::DefaultClientContext; use rdkafka::consumer::{BaseConsumer, CommitMode, Consumer, DefaultConsumerContext}; -use rdkafka::error::{KafkaError, RDKafkaErrorCode}; +use rdkafka::error::{KafkaError, KafkaResult, RDKafkaErrorCode}; +use rdkafka::admin::{ + group_description_result_key, ConsumerGroupState, +}; use rdkafka::metadata::Metadata; use rdkafka::producer::{FutureProducer, FutureRecord, Producer}; use rdkafka::{ClientConfig, Offset, TopicPartitionList}; - +use rdkafka::admin::{group_result_key, ListConsumerGroupOffsets}; +use rdkafka::admin::list_offsets_result_key; use crate::utils::*; mod utils; @@ -32,8 +36,27 @@ fn create_admin_client() -> AdminClient { .expect("admin client creation failed") } +async fn create_topics(topic_name_list: &[&str]) -> KafkaResult<()> { + let admin_client: AdminClient = create_admin_client(); + + let topic_list: Vec<_> = topic_name_list + .iter() + .map(|name| { + NewTopic::new(name, 1, TopicReplication::Fixed(1)).set("max.message.bytes", "1234") + }) + .collect(); + + match admin_client + .create_topics(&topic_list, &AdminOptions::default()) + .await + { + Ok(_) => Ok(()), + Err(e) => Err(e), + } +} + async fn create_consumer_group(consumer_group_name: &str) { - let admin_client = create_admin_client(); + let admin_client: AdminClient = create_admin_client(); let topic_name = &rand_test_topic(consumer_group_name); let consumer: BaseConsumer = create_config() .set("group.id", consumer_group_name) @@ -628,3 +651,744 @@ async fn test_event_errors() { Err(KafkaError::AdminOp(RDKafkaErrorCode::OperationTimedOut)) ); } + +// Test the list offsets request +#[tokio::test] +async fn test_list_offsets() { + // create the admin client. + let admin_client = create_admin_client(); + // create a producer + let producer = create_config().create::>().unwrap(); + let timeout = Some(Duration::from_secs(1)); + + let opts = AdminOptions::new().operation_timeout(Some(Duration::from_secs(30))); + + let name1 = rand_test_topic("test_list_offsets"); + let name2 = rand_test_topic("test_list_offsets"); + + let make_record1 = || FutureRecord::::to(&name1).payload("data"); + let make_record2 = || FutureRecord::::to(&name2).payload("data"); + + // Create a couple of topics + create_topics(&vec![name1.as_str(), name2.as_str()]) + .await + .expect("Failed creating topics"); + + // Produce five messages to the topic. + for _ in 0..5 { + producer.send(make_record1(), timeout).await.unwrap(); + producer.send(make_record2(), timeout).await.unwrap(); + } + + let mut tpl = TopicPartitionList::with_capacity(2); + tpl.add_partition_offset(&name1, 0, Offset::End) + .expect("Adding topic partition element list failed"); + tpl.add_partition_offset(&name2, 0, Offset::End) + .expect("Adding topic partition element list failed"); + + let result = admin_client + .list_offsets(&tpl, &opts) + .await + .expect("list offsets failed"); + eprintln!("result={:?}", result); + + assert!(result + .iter() + .any(|topic| list_offsets_result_key(topic) == (name1.as_str(), 0))); + let topic_info1 = result + .iter() + .find(|topic| list_offsets_result_key(topic) == (name1.as_str(), 0)) + .unwrap(); + match &topic_info1 { + Ok(topic_partition) => assert_eq!(topic_partition.offset, Offset::Offset(5)), + Err(_) => assert!(false, "List offsets returned error"), + } + + assert!(result + .iter() + .any(|topic| list_offsets_result_key(topic) == (name2.as_str(), 0))); + let topic_info2 = result + .iter() + .find(|topic| list_offsets_result_key(topic) == (name2.as_str(), 0)) + .unwrap(); + match &topic_info2 { + Ok(topic_partition) => assert_eq!(topic_partition.offset, Offset::Offset(5)), + Err(_) => assert!(false, "List offsets returned error"), + } +} + +// Test the list offsets request. Case where one of the requested topics does +// not exist. +#[tokio::test] +async fn test_list_offsets_one_does_not_exist() { + // create the admin client. + let admin_client = create_admin_client(); + // create a producer + let producer = create_config().create::>().unwrap(); + let timeout = Some(Duration::from_secs(1)); + + let opts = AdminOptions::new().operation_timeout(Some(Duration::from_secs(30))); + + let name1 = rand_test_topic("test_list_offsets_one_does_not_exist"); + let name2 = rand_test_topic("test_list_offsets_one_does_not_exist"); + + let make_record1 = || FutureRecord::::to(&name1).payload("data"); + + // Create a couple of topics + create_topics(&vec![name1.as_str()]) + .await + .expect("Failed creating topics"); + + // Produce five messages to the topic. + for _ in 0..5 { + producer.send(make_record1(), timeout).await.unwrap(); + } + + let mut tpl = TopicPartitionList::with_capacity(2); + tpl.add_partition_offset(&name1, 0, Offset::End) + .expect("Adding topic partition element list failed"); + tpl.add_partition_offset(&name2, 0, Offset::End) + .expect("Adding topic partition element list failed"); + + let result = admin_client + .list_offsets(&tpl, &opts) + .await + .expect("list offsets failed"); + eprintln!("result={:?}", result); + + assert!(result + .iter() + .any(|topic| list_offsets_result_key(topic) == (name1.as_str(), 0))); + let topic_info1 = result + .iter() + .find(|topic| list_offsets_result_key(topic) == (name1.as_str(), 0)) + .unwrap(); + match &topic_info1 { + Ok(topic_partition) => assert_eq!(topic_partition.offset, Offset::Offset(5)), + Err(_) => assert!(false, "List offsets returned error"), + } + + assert!(result + .iter() + .any(|topic| list_offsets_result_key(topic) == (name2.as_str(), 0))); + let topic_info2 = result + .iter() + .find(|topic| list_offsets_result_key(topic) == (name2.as_str(), 0)) + .unwrap(); + match &topic_info2 { + Ok(_) => assert!(false, "List offsets was expected to return error"), + Err(_) => {} + } +} + +// Test the list offsets request +// Check the case where the list provided is empty. +#[tokio::test] +async fn test_list_offsets_empty_list() { + // create the admin client. + let admin_client = create_admin_client(); + // create a producer + let producer = create_config().create::>().unwrap(); + let timeout = Some(Duration::from_secs(1)); + + let opts = AdminOptions::new().operation_timeout(Some(Duration::from_secs(30))); + + let name1 = rand_test_topic("test_list_offsets_empty_list"); + + let make_record1 = || FutureRecord::::to(&name1).payload("data"); + + // Create a couple of topics + create_topics(&vec![name1.as_str()]) + .await + .expect("Failed creating topics"); + + // Produce five messages to the topic. + for _ in 0..5 { + producer.send(make_record1(), timeout).await.unwrap(); + } + + let tpl = TopicPartitionList::new(); + + let result = admin_client + .list_offsets(&tpl, &opts) + .await + .expect("list offsets failed"); + eprintln!("result={:?}", result); + + assert_eq!(result.len(), 0); +} + +// Test the the describe consumer groups request +#[tokio::test] +async fn test_describe_groups() { + // create the admin client. + let admin_client = create_admin_client(); + // create a producer + let producer = create_config().create::>().unwrap(); + let timeout = Some(Duration::from_secs(1)); + + let opts = AdminOptions::new().operation_timeout(Some(Duration::from_secs(30))); + + let group_name1 = rand_test_group(); + + let topic1 = rand_test_topic("test_describe_groups"); + let topic2 = rand_test_topic("test_describe_groups"); + let topic_list = vec![topic1.as_str(), topic2.as_str()]; + let group_list = vec![group_name1.as_str()]; + + let make_record1 = || FutureRecord::::to(&topic1).payload("data"); + let make_record2 = || FutureRecord::::to(&topic2).payload("data"); + + // Create the topics + create_topics(&topic_list) + .await + .expect("Failed creating topics"); + + // Produce five messages to the topic. + for _ in 0..5 { + producer.send(make_record1(), timeout).await.unwrap(); + producer.send(make_record2(), timeout).await.unwrap(); + } + + { + let consumer: BaseConsumer = create_config() + .set("group.id", group_name1.as_str()) + .set("auto.offset.reset", "earliest") + .set("auto.commit.enable", "true") + .create() + .expect("create consumer failed"); + + consumer + .subscribe(&topic_list) + .expect("subscribe topic failed"); + + // Consume some messages + for message in consumer.iter().take(3) { + match message { + Ok(_) => (), + Err(e) => panic!("Error receiving message: {:?}", e), + } + } + + // Get the description. + let result = admin_client + .describe_consumer_groups(&group_list, &opts) + .await + .expect("describe_consumer_groups failed"); + + eprintln!("result: {:?}", result); + assert!(result + .iter() + .any(|description| group_description_result_key(description) == group_name1)); + let group_description = result + .iter() + .find(|description| group_description_result_key(description) == group_name1) + .expect("Did not find the group we requested"); + match group_description { + Ok(group_description) => assert_eq!(group_description.members.len(), 1), + Err(_) => assert!(false, "Failed describe consumer group"), + } + } + + // Get the description again when the consumer has been recycled. + + let result = admin_client + .describe_consumer_groups(&group_list, &opts) + .await + .expect("describe_consumer_groups failed"); + + eprintln!("result: {:?}", result); + assert!(result + .iter() + .any(|description| group_description_result_key(description) == group_name1)); + let group_description = result + .iter() + .find(|description| group_description_result_key(description) == group_name1) + .expect("Did not find the group we requested"); + + match group_description { + Ok(group_description) => assert_eq!(group_description.members.len(), 0), + Err(_) => assert!(false, "Failed describe consumer group"), + } +} + +// Test the describe_groups operation. +// Request more than one group. +#[tokio::test] +async fn test_describe_groups_more_than_one_group() { + // create the admin client. + let admin_client = create_admin_client(); + + let opts = AdminOptions::new().operation_timeout(Some(Duration::from_secs(30))); + + let group_name1 = rand_test_group(); + let group_name2 = rand_test_group(); + + create_consumer_group(&group_name1).await; + create_consumer_group(&group_name2).await; + + let group_list = vec![group_name1.as_str(), group_name2.as_str()]; + + let result = admin_client + .describe_consumer_groups(&group_list, &opts) + .await + .expect("describe_consumer_groups failed"); + eprintln!("result: {:?}", result); + assert!(result + .iter() + .any(|description| group_description_result_key(description) == group_name1)); + assert!(result + .iter() + .any(|description| group_description_result_key(description) == group_name2)); +} + +// Test the describe_consumer_groups operation. +// Request more than one group, but one of them does not exist. +#[tokio::test] +async fn test_describe_groups_one_does_not_exist() { + // create the admin client. + let admin_client = create_admin_client(); + + let opts = AdminOptions::new().operation_timeout(Some(Duration::from_secs(30))); + + let group_name1 = rand_test_group(); + let group_name2 = rand_test_group(); + + create_consumer_group(&group_name1).await; + + let group_list = vec![group_name1.as_str(), group_name2.as_str()]; + + let result = admin_client + .describe_consumer_groups(&group_list, &opts) + .await + .expect("describe_consumer_groups failed"); + eprintln!("result: {:?}", result); + assert!(result + .iter() + .any(|description| if let Ok(description) = description { + description.group_id == group_name1 && description.state == ConsumerGroupState::Empty + } else { + false + })); + assert!(result + .iter() + .any(|description| if let Ok(description) = description { + description.group_id == group_name2 && description.state == ConsumerGroupState::Dead + } else { + false + })); +} + +// Test the describe_consumer_groups operation. +// The request is empty. +#[tokio::test] +async fn test_describe_groups_with_empty_array() { + // create the admin client. + let admin_client = create_admin_client(); + + let opts = AdminOptions::new().operation_timeout(Some(Duration::from_secs(30))); + + let group_name1 = rand_test_group(); + + create_consumer_group(&group_name1).await; + + let group_list = vec![]; + + let result = admin_client + .describe_consumer_groups(&group_list, &opts) + .await; + assert!(result.is_err()) +} + + +// Test the list_consumer_group_offsets operation +#[tokio::test] +async fn test_list_consumer_group_offsets() { + // create the admin client. + let admin_client = create_admin_client(); + // create a producer + let producer = create_config().create::>().unwrap(); + let timeout = Some(Duration::from_secs(1)); + + let opts = AdminOptions::new().operation_timeout(Some(Duration::from_secs(30))); + + let group_name1 = rand_test_group(); + + let topic1 = rand_test_topic("test_describe_groups"); + let topic2 = rand_test_topic("test_describe_groups"); + let topic_list = vec![topic1.as_str(), topic2.as_str()]; + + let make_record1 = || FutureRecord::::to(&topic1).payload("data"); + let make_record2 = || FutureRecord::::to(&topic2).payload("data"); + + let group_list = vec![ListConsumerGroupOffsets::from_group(group_name1.as_str())]; + + // Create the topics + create_topics(&topic_list) + .await + .expect("Failed creating topics"); + + // Produce five messages to the topic. + for _ in 0..5 { + producer.send(make_record1(), timeout).await.unwrap(); + producer.send(make_record2(), timeout).await.unwrap(); + } + + { + let consumer: BaseConsumer = create_config() + .set("group.id", group_name1.as_str()) + .set("auto.offset.reset", "earliest") + .set("auto.commit.enable", "true") + .create() + .expect("create consumer failed"); + + consumer + .subscribe(&topic_list) + .expect("subscribe topic failed"); + + // Consume some messages + for message in consumer.iter().take(6) { + match message { + Ok(_) => (), + Err(e) => panic!("Error receiving message: {:?}", e), + } + } + consumer.commit_consumer_state(CommitMode::Sync).unwrap(); + + // Get the offsets. + let result = admin_client + .list_consumer_group_offsets(&group_list, &opts) + .await + .expect("describe_consumer_groups failed"); + + eprintln!("result: {:?}", result); + assert!(result + .iter() + .any(|group_result| group_result_key(group_result) == group_name1)); + let group_result = result + .iter() + .find(|group_result| group_result_key(group_result) == group_name1) + .expect("Did not find the group we requested"); + match group_result { + Ok(group) => { + assert_eq!(group.group_id, group_name1); + assert!(group.topic_partitions.elements().iter().any(|tp| tp.topic() == topic1)); + assert!(group.topic_partitions.elements().iter().any(|tp| tp.topic() == topic2)); + } + Err(_) => assert!(false, "Failed describe consumer group"), + } + + } + + // Get the offsets. + let result = admin_client + .list_consumer_group_offsets(&group_list, &opts) + .await + .expect("describe_consumer_groups failed"); + + eprintln!("result: {:?}", result); + assert!(result + .iter() + .any(|group_result| group_result_key(group_result) == group_name1)); + let group_result = result + .iter() + .find(|group_result| group_result_key(group_result) == group_name1) + .expect("Did not find the group we requested"); + match group_result { + Ok(group) => { + assert_eq!(group.group_id, group_name1); + assert!(group.topic_partitions.elements().iter().any(|tp| tp.topic() == topic1)); + assert!(group.topic_partitions.elements().iter().any(|tp| tp.topic() == topic2)); + } + Err(_) => assert!(false, "Failed describe consumer group"), + } + +} + +// Test the list_consumer_group_offsets operation +// The consumer does not have any topics assigned. +#[tokio::test] +async fn test_list_consumer_group_offsets_no_topics() { + // create the admin client. + let admin_client = create_admin_client(); + let opts = AdminOptions::new().operation_timeout(Some(Duration::from_secs(30))); + + let group_name1 = rand_test_group(); + + + let group_list = vec![ListConsumerGroupOffsets::from_group(group_name1.as_str())]; + + // Get the offsets. + let result = admin_client + .list_consumer_group_offsets(&group_list, &opts) + .await + .expect("describe_consumer_groups failed"); + + eprintln!("result: {:?}", result); + assert!(result + .iter() + .any(|group_result| group_result_key(group_result) == group_name1)); + let group_result = result + .iter() + .find(|group_result| group_result_key(group_result) == group_name1) + .expect("Did not find the group we requested"); + match group_result { + Ok(group) => { + assert_eq!(group.group_id, group_name1); + assert_eq!(group.topic_partitions.elements().len(), 0); + } + Err(_) => assert!(false, "Failed describe consumer group"), + } +} + +// Test the list_consumer_group_offsets operation +// Request information for more than one group. +#[tokio::test] +async fn test_list_consumer_group_offsets_more_than_one_group() { + // create the admin client. + let admin_client = create_admin_client(); + // create a producer + let opts = AdminOptions::new().operation_timeout(Some(Duration::from_secs(30))); + + let group_name1 = rand_test_group(); + let group_name2 = rand_test_group(); + + + let group_list = vec![ + ListConsumerGroupOffsets::from_group(group_name1.as_str()), + ListConsumerGroupOffsets::from_group(group_name2.as_str()), + ]; + + // Get the offsets. + let result = admin_client + .list_consumer_group_offsets(&group_list, &opts) + .await; + + // Fails because the API only supports one group at a time. + assert!(result.is_err()); + +} + +// Test the list_consumer_group_offsets operation +// Request information on a specific topic and partition. +#[tokio::test] +async fn test_list_consumer_group_offsets_one_existing_partition() { + // create the admin client. + let admin_client = create_admin_client(); + // create a producer + let producer = create_config().create::>().unwrap(); + let timeout = Some(Duration::from_secs(1)); + + let opts = AdminOptions::new().operation_timeout(Some(Duration::from_secs(30))); + + let group_name1 = rand_test_group(); + + let topic1 = rand_test_topic("test_describe_groups"); + let topic2 = rand_test_topic("test_describe_groups"); + let topic_list = vec![topic1.as_str(), topic2.as_str()]; + + let make_record1 = || FutureRecord::::to(&topic1).payload("data"); + let make_record2 = || FutureRecord::::to(&topic2).payload("data"); + + let mut request_tpl = TopicPartitionList::new(); + request_tpl.add_partition(topic1.as_str(), 0); + + let group_list = vec![ListConsumerGroupOffsets::new(group_name1.as_str(), request_tpl)]; + + // Create the topics + create_topics(&topic_list) + .await + .expect("Failed creating topics"); + + // Produce five messages to the topic. + for _ in 0..5 { + producer.send(make_record1(), timeout).await.unwrap(); + producer.send(make_record2(), timeout).await.unwrap(); + } + + { + let consumer: BaseConsumer = create_config() + .set("group.id", group_name1.as_str()) + .set("auto.offset.reset", "earliest") + .set("auto.commit.enable", "true") + .create() + .expect("create consumer failed"); + + consumer + .subscribe(&topic_list) + .expect("subscribe topic failed"); + + // Consume some messages + for message in consumer.iter().take(6) { + match message { + Ok(_) => (), + Err(e) => panic!("Error receiving message: {:?}", e), + } + } + consumer.commit_consumer_state(CommitMode::Sync).unwrap(); + + // Get the offsets. + let result = admin_client + .list_consumer_group_offsets(&group_list, &opts) + .await + .expect("describe_consumer_groups failed"); + + eprintln!("result: {:?}", result); + assert!(result + .iter() + .any(|group_result| group_result_key(group_result) == group_name1)); + let group_result = result + .iter() + .find(|group_result| group_result_key(group_result) == group_name1) + .expect("Did not find the group we requested"); + match group_result { + Ok(group) => { + assert_eq!(group.group_id, group_name1); + assert!(group.topic_partitions.elements().iter().any(|tp| tp.topic() == topic1)); + assert_eq!(group.topic_partitions.elements().len(), 1); + } + Err(_) => assert!(false, "Failed describe consumer group"), + } + + } + + // Get the offsets. + let result = admin_client + .list_consumer_group_offsets(&group_list, &opts) + .await + .expect("describe_consumer_groups failed"); + + eprintln!("result: {:?}", result); + assert!(result + .iter() + .any(|group_result| group_result_key(group_result) == group_name1)); + let group_result = result + .iter() + .find(|group_result| group_result_key(group_result) == group_name1) + .expect("Did not find the group we requested"); + match group_result { + Ok(group) => { + assert_eq!(group.group_id, group_name1); + assert!(group.topic_partitions.elements().iter().any(|tp| tp.topic() == topic1)); + assert_eq!(group.topic_partitions.elements().len(), 1); + } + Err(_) => assert!(false, "Failed describe consumer group"), + } + +} + +// Test the list_consumer_group_offsets operation +// Request information on a specific topic and partition, but the +// topic does not have that partition. +#[tokio::test] +async fn test_list_consumer_group_offsets_one_non_existing_partition() { + // create the admin client. + let admin_client = create_admin_client(); + // create a producer + let producer = create_config().create::>().unwrap(); + let timeout = Some(Duration::from_secs(1)); + + let opts = AdminOptions::new().operation_timeout(Some(Duration::from_secs(30))); + + let group_name1 = rand_test_group(); + + let topic1 = rand_test_topic("test_describe_groups"); + let topic2 = rand_test_topic("test_describe_groups"); + let topic_list = vec![topic1.as_str(), topic2.as_str()]; + + let make_record1 = || FutureRecord::::to(&topic1).payload("data"); + let make_record2 = || FutureRecord::::to(&topic2).payload("data"); + + let mut request_tpl = TopicPartitionList::new(); + request_tpl.add_partition(topic1.as_str(), 10); + + let group_list = vec![ListConsumerGroupOffsets::new(group_name1.as_str(), request_tpl)]; + + // Create the topics + create_topics(&topic_list) + .await + .expect("Failed creating topics"); + + // Produce five messages to the topic. + for _ in 0..5 { + producer.send(make_record1(), timeout).await.unwrap(); + producer.send(make_record2(), timeout).await.unwrap(); + } + + { + let consumer: BaseConsumer = create_config() + .set("group.id", group_name1.as_str()) + .set("auto.offset.reset", "earliest") + .set("auto.commit.enable", "true") + .create() + .expect("create consumer failed"); + + consumer + .subscribe(&topic_list) + .expect("subscribe topic failed"); + + // Consume some messages + for message in consumer.iter().take(6) { + match message { + Ok(_) => (), + Err(e) => panic!("Error receiving message: {:?}", e), + } + } + consumer.commit_consumer_state(CommitMode::Sync).unwrap(); + + // Get the offsets. + let result = admin_client + .list_consumer_group_offsets(&group_list, &opts) + .await + .expect("describe_consumer_groups failed"); + + eprintln!("result: {:?}", result); + assert!(result + .iter() + .any(|group_result| group_result_key(group_result) == group_name1)); + let group_result = result + .iter() + .find(|group_result| group_result_key(group_result) == group_name1) + .expect("Did not find the group we requested"); + match group_result { + Ok(group) => { + assert_eq!(group.group_id, group_name1); + let elements = group.topic_partitions.elements(); + // + // For an inexistent partition, the API returns success with an invalid offset. + // + assert!(elements.iter().any(|tp| tp.topic() == topic1)); + assert_eq!(elements.len(), 1); + let element = elements.iter().find(|tp| tp.topic() == topic1 && tp.partition()==10).unwrap(); + assert_eq!(element.offset(), Offset::Invalid); + } + Err(_) => assert!(false, "Failed describe consumer group"), + } + + } + + // Get the offsets. + let result = admin_client + .list_consumer_group_offsets(&group_list, &opts) + .await + .expect("describe_consumer_groups failed"); + + eprintln!("result: {:?}", result); + assert!(result + .iter() + .any(|group_result| group_result_key(group_result) == group_name1)); + let group_result = result + .iter() + .find(|group_result| group_result_key(group_result) == group_name1) + .expect("Did not find the group we requested"); + match group_result { + Ok(group) => { + assert_eq!(group.group_id, group_name1); + assert!(group.topic_partitions.elements().iter().any(|tp| tp.topic() == topic1)); + assert_eq!(group.topic_partitions.elements().len(), 1); + } + Err(_) => assert!(false, "Failed describe consumer group"), + } + +} +