Skip to content

Commit 4678e36

Browse files
committed
Add get_watermark_offsets
1 parent eb4270c commit 4678e36

File tree

4 files changed

+60
-0
lines changed

4 files changed

+60
-0
lines changed

src/consumer/base_consumer.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -698,6 +698,27 @@ where
698698
}
699699
}
700700

701+
fn get_watermark_offsets(&self, topic: &str, partition: i32) -> KafkaResult<(i64, i64)> {
702+
let mut low = -1;
703+
let mut high = -1;
704+
let topic_c = CString::new(topic.to_string())?;
705+
let result = unsafe {
706+
rdsys::rd_kafka_get_watermark_offsets(
707+
self.client.native_ptr(),
708+
topic_c.as_ptr(),
709+
partition,
710+
&mut low as *mut i64,
711+
&mut high as *mut i64,
712+
)
713+
};
714+
715+
if result.is_error() {
716+
Err(KafkaError::MetadataFetch(result.into()))
717+
} else {
718+
Ok((low, high))
719+
}
720+
}
721+
701722
fn position(&self) -> KafkaResult<TopicPartitionList> {
702723
let tpl = self.assignment()?;
703724
let error = unsafe { rdsys::rd_kafka_position(self.client.native_ptr(), tpl.ptr()) };

src/consumer/mod.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -369,6 +369,15 @@ where
369369
T: Into<Timeout>,
370370
Self: Sized;
371371

372+
/// Get last known low (oldest/beginning) and high (newest/end) offsets for partition.
373+
///
374+
/// The low offset is updated periodically (if statistics.interval.ms is set) while the
375+
/// high offset is updated on each fetched message set from the broker.
376+
///
377+
/// If there is no cached offset (either low or high, or both) then OFFSET_INVALID will
378+
/// be returned for the respective offset.
379+
fn get_watermark_offsets(&self, topic: &str, partition: i32) -> KafkaResult<(i64, i64)>;
380+
372381
/// Retrieve current positions (offsets) for topics and partitions.
373382
fn position(&self) -> KafkaResult<TopicPartitionList>;
374383

src/consumer/stream_consumer.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -507,6 +507,10 @@ where
507507
self.base.offsets_for_times(timestamps, timeout)
508508
}
509509

510+
fn get_watermark_offsets(&self, topic: &str, partition: i32) -> KafkaResult<(i64, i64)> {
511+
self.base.get_watermark_offsets(topic, partition)
512+
}
513+
510514
fn position(&self) -> KafkaResult<TopicPartitionList> {
511515
self.base.position()
512516
}

tests/test_high_consumers.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,19 @@ async fn test_consumer_commit_message() {
317317
(0, 12)
318318
);
319319

320+
assert_eq!(
321+
consumer.get_watermark_offsets(&topic_name, 0).unwrap(),
322+
(0, 10)
323+
);
324+
assert_eq!(
325+
consumer.get_watermark_offsets(&topic_name, 1).unwrap(),
326+
(0, 11)
327+
);
328+
assert_eq!(
329+
consumer.get_watermark_offsets(&topic_name, 2).unwrap(),
330+
(0, 12)
331+
);
332+
320333
let mut assignment = TopicPartitionList::new();
321334
assignment
322335
.add_partition_offset(&topic_name, 0, Offset::Stored)
@@ -402,6 +415,19 @@ async fn test_consumer_store_offset_commit() {
402415
(0, 12)
403416
);
404417

418+
assert_eq!(
419+
consumer.get_watermark_offsets(&topic_name, 0).unwrap(),
420+
(0, 10)
421+
);
422+
assert_eq!(
423+
consumer.get_watermark_offsets(&topic_name, 1).unwrap(),
424+
(0, 11)
425+
);
426+
assert_eq!(
427+
consumer.get_watermark_offsets(&topic_name, 2).unwrap(),
428+
(0, 12)
429+
);
430+
405431
let mut assignment = TopicPartitionList::new();
406432
assignment
407433
.add_partition_offset(&topic_name, 0, Offset::Stored)

0 commit comments

Comments
 (0)