Skip to content

Commit 830a592

Browse files
committed
feat(python): add AsyncIterator interface to IggyConsumer
This adds a `.iter()` method for asynchronously iterating over Iggy messages from a consumer. `partition_id` is also exposed one `ReceiveMessage` to allow manual commit of offsets.
1 parent 5aa6d5f commit 830a592

File tree

8 files changed

+158
-31
lines changed

8 files changed

+158
-31
lines changed

foreign/python/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ repository = "https://github.com/apache/iggy"
2727

2828
[dependencies]
2929
bytes = "1.10.1"
30+
futures = "0.3.31"
3031
iggy = { path = "../../core/sdk", version = "0.7.0" }
3132
pyo3 = "0.25.0"
3233
pyo3-async-runtimes = { version = "0.25.0", features = [

foreign/python/apache_iggy.pyi

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,7 @@ class IggyClient:
252252
253253
Returns a list of received messages or a PyRuntimeError on failure.
254254
"""
255-
def consumer_group(self, name:builtins.str, stream:builtins.str, topic:builtins.str, partition_id:typing.Optional[builtins.int]=None, polling_strategy:typing.Optional[PollingStrategy]=None, batch_length:typing.Optional[builtins.int]=None, auto_commit:typing.Optional[AutoCommit]=None, create_consumer_group_if_not_exists:builtins.bool=True, auto_join_consumer_group:builtins.bool=True, poll_interval:typing.Optional[datetime.timedelta]=None, polling_retry_interval:typing.Optional[datetime.timedelta]=None, init_retries:typing.Optional[builtins.int]=None, init_retry_interval:typing.Optional[datetime.timedelta]=None, allow_replay:builtins.bool=False) -> IggyConsumer:
255+
def consumer_group(self, name:builtins.str, stream:builtins.str, topic:builtins.str, partition_id:typing.Optional[builtins.int]=None, polling_strategy:typing.Optional[PollingStrategy]=None, batch_length:typing.Optional[builtins.int]=None, auto_commit:typing.Optional[AutoCommit]=None, create_consumer_group_if_not_exists:builtins.bool=True, auto_join_consumer_group:builtins.bool=True, poll_interval:typing.Optional[datetime.timedelta]=None, polling_retry_interval:typing.Optional[datetime.timedelta]=None, init_retries:typing.Optional[builtins.int]=None, init_retry_interval:typing.Optional[datetime.timedelta]=None, allow_replay:builtins.bool=False) -> collections.abc.Awaitable[IggyConsumer]:
256256
r"""
257257
Creates a new consumer group consumer.
258258
@@ -302,6 +302,14 @@ class IggyConsumer:
302302
Deletes the offset for the provided partition id or if none is specified
303303
uses the current partition id for the consumer group.
304304
305+
Returns `Ok(())` if the server responds successfully, or a `PyRuntimeError`
306+
if the operation fails.
307+
"""
308+
def iter(self) -> collections.abc.AsyncIterator[ReceiveMessage]:
309+
r"""
310+
Deletes the offset for the provided partition id or if none is specified
311+
uses the current partition id for the consumer group.
312+
305313
Returns `Ok(())` if the server responds successfully, or a `PyRuntimeError`
306314
if the operation fails.
307315
"""
@@ -381,6 +389,10 @@ class ReceiveMessage:
381389
382390
The length represents the length of the payload.
383391
"""
392+
def partition_id(self) -> builtins.int:
393+
r"""
394+
Retrieves the partition this message belongs to.
395+
"""
384396

385397
class SendMessage:
386398
r"""

foreign/python/src/client.rs

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,6 @@
1616
* under the License.
1717
*/
1818

19-
use std::str::FromStr;
20-
use std::sync::Arc;
21-
2219
use iggy::prelude::{
2320
Consumer as RustConsumer, IggyClient as RustIggyClient, IggyMessage as RustMessage,
2421
PollingStrategy as RustPollingStrategy, *,
@@ -28,6 +25,8 @@ use pyo3::types::{PyDelta, PyList, PyType};
2825
use pyo3_async_runtimes::tokio::future_into_py;
2926
use pyo3_stub_gen::define_stub_info_gatherer;
3027
use pyo3_stub_gen::derive::{gen_stub_pyclass, gen_stub_pymethods};
28+
use std::str::FromStr;
29+
use std::sync::Arc;
3130

3231
use crate::consumer::{py_delta_to_iggy_duration, AutoCommit, IggyConsumer};
3332
use crate::identifier::PyIdentifier;
@@ -317,7 +316,10 @@ impl IggyClient {
317316
let messages = polled_messages
318317
.messages
319318
.into_iter()
320-
.map(ReceiveMessage::from_rust_message)
319+
.map(|m| ReceiveMessage {
320+
inner: m,
321+
partition_id,
322+
})
321323
.collect::<Vec<_>>();
322324
Ok(messages)
323325
})
@@ -343,8 +345,10 @@ impl IggyClient {
343345
init_retry_interval=None,
344346
allow_replay=false,
345347
))]
346-
fn consumer_group(
348+
#[gen_stub(override_return_type(type_repr="collections.abc.Awaitable[IggyConsumer]", imports=("collections.abc")))]
349+
fn consumer_group<'a>(
347350
&self,
351+
py: Python<'a>,
348352
name: &str,
349353
stream: &str,
350354
topic: &str,
@@ -359,7 +363,7 @@ impl IggyClient {
359363
init_retries: Option<u32>,
360364
init_retry_interval: Option<Py<PyDelta>>,
361365
allow_replay: bool,
362-
) -> PyResult<IggyConsumer> {
366+
) -> PyResult<Bound<'a, PyAny>> {
363367
let mut builder = self
364368
.inner
365369
.consumer_group(name, stream, topic)
@@ -415,10 +419,16 @@ impl IggyClient {
415419
if allow_replay {
416420
builder = builder.allow_replay()
417421
}
418-
let consumer = builder.build();
422+
let mut consumer = builder.build();
419423

420-
Ok(IggyConsumer {
421-
inner: Arc::new(Mutex::new(consumer)),
424+
future_into_py(py, async move {
425+
consumer
426+
.init()
427+
.await
428+
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!("{e:?}")))?;
429+
Ok(IggyConsumer {
430+
inner: Arc::new(Mutex::new(consumer)),
431+
})
422432
})
423433
}
424434
}

foreign/python/src/consumer.rs

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ use tokio::sync::Mutex;
3636
use tokio::task::JoinHandle;
3737

3838
use crate::identifier::PyIdentifier;
39+
use crate::iterator::ReceiveMessageIterator;
3940
use crate::receive_message::ReceiveMessage;
4041

4142
/// A Python class representing the Iggy consumer.
@@ -129,6 +130,13 @@ impl IggyConsumer {
129130
})
130131
}
131132

133+
/// Asynchronously iterate over `ReceiveMessage`s.
134+
#[gen_stub(override_return_type(type_repr="collections.abc.AsyncIterator[ReceiveMessage]", imports=("collections.abc")))]
135+
fn iter<'a>(&self) -> ReceiveMessageIterator {
136+
let inner = self.inner.clone();
137+
ReceiveMessageIterator { inner }
138+
}
139+
132140
/// Consumes messages continuously using a callback function and an optional `asyncio.Event` for signaling shutdown.
133141
///
134142
/// Returns an awaitable that completes when shutdown is signaled or a PyRuntimeError on failure.
@@ -148,14 +156,6 @@ impl IggyConsumer {
148156
future_into_py(py, async {
149157
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
150158

151-
let inner_init = inner.clone();
152-
let mut inner_init = inner_init.lock().await;
153-
inner_init
154-
.init()
155-
.await
156-
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!("{e:?}")))?;
157-
drop(inner_init);
158-
159159
let task_locals = Python::with_gil(pyo3_async_runtimes::tokio::get_current_locals)?;
160160
let handle_consume = get_runtime().spawn(scope(task_locals, async move {
161161
let task_locals =
@@ -219,7 +219,10 @@ impl MessageConsumer for PyCallbackConsumer {
219219
let callback = self.callback.clone();
220220
let task_locals = self.task_locals.clone().lock_owned().await;
221221
let task_locals = Python::with_gil(|py| task_locals.clone_ref(py));
222-
let message = ReceiveMessage::from_rust_message(received.message);
222+
let message = ReceiveMessage {
223+
inner: received.message,
224+
partition_id: received.partition_id,
225+
};
223226
get_runtime()
224227
.spawn(scope(task_locals, async move {
225228
Python::with_gil(|py| {

foreign/python/src/iterator.rs

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/* Licensed to the Apache Software Foundation (ASF) under one
2+
* or more contributor license agreements. See the NOTICE file
3+
* distributed with this work for additional information
4+
* regarding copyright ownership. The ASF licenses this file
5+
* to you under the Apache License, Version 2.0 (the
6+
* "License"); you may not use this file except in compliance
7+
* with the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing,
12+
* software distributed under the License is distributed on an
13+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
* KIND, either express or implied. See the License for the
15+
* specific language governing permissions and limitations
16+
* under the License.
17+
*/
18+
19+
use std::sync::Arc;
20+
21+
use futures::StreamExt;
22+
use iggy::prelude::IggyConsumer as RustIggyConsumer;
23+
use pyo3::exceptions::PyStopIteration;
24+
25+
use crate::receive_message::ReceiveMessage;
26+
use pyo3::prelude::*;
27+
use pyo3_async_runtimes::tokio::future_into_py;
28+
use tokio::sync::Mutex;
29+
30+
#[pyclass]
31+
pub struct ReceiveMessageIterator {
32+
pub(crate) inner: Arc<Mutex<RustIggyConsumer>>,
33+
}
34+
35+
#[pymethods]
36+
impl ReceiveMessageIterator {
37+
pub fn __anext__<'a>(&self, py: Python<'a>) -> PyResult<Bound<'a, PyAny>> {
38+
let inner = self.inner.clone();
39+
future_into_py(py, async move {
40+
let mut inner = inner.lock().await;
41+
if let Some(message) = inner.next().await {
42+
Ok(message
43+
.map(|m| ReceiveMessage {
44+
inner: m.message,
45+
partition_id: m.partition_id,
46+
})
47+
.map_err(|e| {
48+
PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!("{e:?}"))
49+
})?)
50+
} else {
51+
Err(PyStopIteration::new_err("No more messages"))
52+
}
53+
})
54+
}
55+
56+
pub fn __aiter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
57+
slf
58+
}
59+
}

foreign/python/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,15 @@
1919
pub mod client;
2020
mod consumer;
2121
mod identifier;
22+
mod iterator;
2223
mod receive_message;
2324
mod send_message;
2425
mod stream;
2526
mod topic;
2627

2728
use client::IggyClient;
2829
use consumer::{AutoCommit, AutoCommitAfter, AutoCommitWhen, IggyConsumer};
30+
use iterator::ReceiveMessageIterator;
2931
use pyo3::prelude::*;
3032
use receive_message::{PollingStrategy, ReceiveMessage};
3133
use send_message::SendMessage;
@@ -45,5 +47,6 @@ fn apache_iggy(_py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> {
4547
m.add_class::<AutoCommit>()?;
4648
m.add_class::<AutoCommitAfter>()?;
4749
m.add_class::<AutoCommitWhen>()?;
50+
m.add_class::<ReceiveMessageIterator>()?;
4851
Ok(())
4952
}

foreign/python/src/receive_message.rs

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,15 +28,7 @@ use pyo3_stub_gen::derive::{gen_stub_pyclass, gen_stub_pyclass_complex_enum, gen
2828
#[gen_stub_pyclass]
2929
pub struct ReceiveMessage {
3030
pub(crate) inner: RustReceiveMessage,
31-
}
32-
33-
impl ReceiveMessage {
34-
/// Converts a Rust message into its corresponding Python representation.
35-
///
36-
/// This is an internal utility function, not exposed to Python.
37-
pub(crate) fn from_rust_message(message: RustReceiveMessage) -> Self {
38-
Self { inner: message }
39-
}
31+
pub(crate) partition_id: u32,
4032
}
4133

4234
#[gen_stub_pymethods]
@@ -83,6 +75,11 @@ impl ReceiveMessage {
8375
pub fn length(&self) -> u32 {
8476
self.inner.header.payload_length
8577
}
78+
79+
/// Retrieves the partition this message belongs to.
80+
pub fn partition_id(&self) -> u32 {
81+
self.partition_id
82+
}
8683
}
8784

8885
#[derive(Clone, Copy)]

foreign/python/tests/test_iggy_sdk.py

Lines changed: 45 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -444,7 +444,7 @@ async def test_meta(self, iggy_client: IggyClient, consumer_group_setup):
444444
await iggy_client.create_topic(
445445
stream=stream_name, name=topic_name, partitions_count=1
446446
)
447-
consumer = iggy_client.consumer_group(
447+
consumer = await iggy_client.consumer_group(
448448
consumer_name,
449449
stream_name,
450450
topic_name,
@@ -481,7 +481,7 @@ async def test_consume_messages(
481481
stream=stream_name, name=topic_name, partitions_count=1
482482
)
483483

484-
consumer = iggy_client.consumer_group(
484+
consumer = await iggy_client.consumer_group(
485485
consumer_name,
486486
stream_name,
487487
topic_name,
@@ -509,6 +509,48 @@ async def send() -> None:
509509

510510
assert received_messages == test_messages
511511

512+
@pytest.mark.asyncio
513+
async def test_iter_messages(self, iggy_client: IggyClient, consumer_group_setup):
514+
"""Test that the consumer group can consume messages."""
515+
consumer_name = consumer_group_setup["consumer"]
516+
stream_name = consumer_group_setup["stream"]
517+
topic_name = consumer_group_setup["topic"]
518+
partition_id = consumer_group_setup["partition_id"]
519+
test_messages = consumer_group_setup["messages"]
520+
521+
# Setup
522+
received_messages = []
523+
shutdown_event = asyncio.Event()
524+
await iggy_client.create_stream(stream_name)
525+
await iggy_client.create_topic(
526+
stream=stream_name, name=topic_name, partitions_count=1
527+
)
528+
529+
consumer = await iggy_client.consumer_group(
530+
consumer_name,
531+
stream_name,
532+
topic_name,
533+
partition_id,
534+
PollingStrategy.Next(),
535+
10,
536+
auto_commit=AutoCommit.Interval(timedelta(seconds=5)),
537+
poll_interval=timedelta(seconds=1),
538+
)
539+
540+
await iggy_client.send_messages(
541+
stream_name,
542+
topic_name,
543+
partition_id,
544+
[Message(m) for m in test_messages],
545+
)
546+
547+
async for message in consumer.iter():
548+
received_messages.append(message.payload().decode())
549+
if len(received_messages) == 5:
550+
break
551+
552+
assert received_messages == test_messages
553+
512554
@pytest.mark.asyncio
513555
async def test_shutdown(self, iggy_client: IggyClient, consumer_group_setup):
514556
"""Test that the consumer group can be signaled to shutdown."""
@@ -524,7 +566,7 @@ async def test_shutdown(self, iggy_client: IggyClient, consumer_group_setup):
524566
stream=stream_name, name=topic_name, partitions_count=1
525567
)
526568

527-
consumer = iggy_client.consumer_group(
569+
consumer = await iggy_client.consumer_group(
528570
consumer_name,
529571
stream_name,
530572
topic_name,

0 commit comments

Comments
 (0)