Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions foreign/python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,16 @@ repository = "https://github.com/apache/iggy"

[dependencies]
bytes = "1.10.1"
futures = "0.3.31"
iggy = { path = "../../core/sdk", version = "0.7.0" }
pyo3 = "0.25.0"
pyo3-async-runtimes = { version = "0.25.0", features = [
"attributes",
"tokio-runtime",
] }
pyo3-stub-gen = "0.12.0"
pyo3-stub-gen = "0.13.1"
tokio = "1.40.0"

[patch.crates-io]
pyo3-stub-gen = { git = "https://github.com/Jij-Inc/pyo3-stub-gen.git", rev = "1870f637f700605395a666e3bdc0276aece73b5f" }

[lib]
name = "apache_iggy"
crate-type = ["cdylib", "rlib"]
Expand Down
14 changes: 11 additions & 3 deletions foreign/python/apache_iggy.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ class IggyClient:
init_retries: typing.Optional[builtins.int] = None,
init_retry_interval: typing.Optional[datetime.timedelta] = None,
allow_replay: builtins.bool = False,
) -> IggyConsumer:
) -> collections.abc.Awaitable[IggyConsumer]:
r"""
Creates a new consumer group consumer.

Expand Down Expand Up @@ -388,6 +388,10 @@ class IggyConsumer:
Returns `Ok(())` if the server responds successfully, or a `PyRuntimeError`
if the operation fails.
"""
def iter(self) -> collections.abc.AsyncIterator[ReceiveMessage]:
r"""
Asynchronously iterate over `ReceiveMessage`s.
"""
def consume_messages(
self,
callback: collections.abc.Callable[
Expand Down Expand Up @@ -470,6 +474,10 @@ class ReceiveMessage:

The length represents the length of the payload.
"""
def partition_id(self) -> builtins.int:
r"""
Retrieves the partition this message belongs to.
"""

class SendMessage:
r"""
Expand All @@ -480,10 +488,10 @@ class SendMessage:
"""
def __new__(cls, data: builtins.str | bytes) -> SendMessage:
r"""
Constructs a new `SendMessage` instance from a string.
Constructs a new `SendMessage` instance from a string or bytes.

This method allows for the creation of a `SendMessage` instance
directly from Python using the provided string data.
directly from Python using the provided string or bytes data.
"""

class StreamDetails:
Expand Down
28 changes: 19 additions & 9 deletions foreign/python/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@
* under the License.
*/

use std::str::FromStr;
use std::sync::Arc;

use iggy::prelude::{
Consumer as RustConsumer, IggyClient as RustIggyClient, IggyMessage as RustMessage,
PollingStrategy as RustPollingStrategy, *,
Expand All @@ -28,6 +25,8 @@ use pyo3::types::{PyDelta, PyList, PyType};
use pyo3_async_runtimes::tokio::future_into_py;
use pyo3_stub_gen::define_stub_info_gatherer;
use pyo3_stub_gen::derive::{gen_stub_pyclass, gen_stub_pymethods};
use std::str::FromStr;
use std::sync::Arc;

use crate::consumer::{py_delta_to_iggy_duration, AutoCommit, IggyConsumer};
use crate::identifier::PyIdentifier;
Expand Down Expand Up @@ -317,7 +316,10 @@ impl IggyClient {
let messages = polled_messages
.messages
.into_iter()
.map(ReceiveMessage::from_rust_message)
.map(|m| ReceiveMessage {
inner: m,
partition_id,
})
.collect::<Vec<_>>();
Ok(messages)
})
Expand All @@ -343,8 +345,10 @@ impl IggyClient {
init_retry_interval=None,
allow_replay=false,
))]
fn consumer_group(
#[gen_stub(override_return_type(type_repr="collections.abc.Awaitable[IggyConsumer]", imports=("collections.abc")))]
fn consumer_group<'a>(
&self,
py: Python<'a>,
name: &str,
stream: &str,
topic: &str,
Expand All @@ -359,7 +363,7 @@ impl IggyClient {
init_retries: Option<u32>,
init_retry_interval: Option<Py<PyDelta>>,
allow_replay: bool,
) -> PyResult<IggyConsumer> {
) -> PyResult<Bound<'a, PyAny>> {
let mut builder = self
.inner
.consumer_group(name, stream, topic)
Expand Down Expand Up @@ -415,10 +419,16 @@ impl IggyClient {
if allow_replay {
builder = builder.allow_replay()
}
let consumer = builder.build();
let mut consumer = builder.build();

Ok(IggyConsumer {
inner: Arc::new(Mutex::new(consumer)),
future_into_py(py, async move {
consumer
.init()
.await
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!("{e:?}")))?;
Ok(IggyConsumer {
inner: Arc::new(Mutex::new(consumer)),
})
})
}
}
Expand Down
21 changes: 12 additions & 9 deletions foreign/python/src/consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use tokio::sync::Mutex;
use tokio::task::JoinHandle;

use crate::identifier::PyIdentifier;
use crate::iterator::ReceiveMessageIterator;
use crate::receive_message::ReceiveMessage;

/// A Python class representing the Iggy consumer.
Expand Down Expand Up @@ -129,6 +130,13 @@ impl IggyConsumer {
})
}

/// Asynchronously iterate over `ReceiveMessage`s.
#[gen_stub(override_return_type(type_repr="collections.abc.AsyncIterator[ReceiveMessage]", imports=("collections.abc")))]
fn iter<'a>(&self) -> ReceiveMessageIterator {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any problem making this __aiter__?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so, I thought about it as well but landed on having a iter method because I thought it read better. I don't have strong feelings about this so am open to changing it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about this again and now I think it's better not to allow users to iterate through the consumer itself as that can be confusing. This iter method can probably be named as read_messages for clarity as it seems to be reading messages only instead of consuming them.

Also, what is the expected behaviour?

count = 0
async for message in consumer.iter():
    # read messages 0 to 4
    count += 1
    if count == 5:
        break
# call it again
async for message in consumer.iter():
    # reads message 5 or 0?
    break

If this method changes the consumer state, we need to make this clear.

let inner = self.inner.clone();
ReceiveMessageIterator { inner }
}

/// Consumes messages continuously using a callback function and an optional `asyncio.Event` for signaling shutdown.
///
/// Returns an awaitable that completes when shutdown is signaled or a PyRuntimeError on failure.
Expand All @@ -148,14 +156,6 @@ impl IggyConsumer {
future_into_py(py, async {
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();

let inner_init = inner.clone();
let mut inner_init = inner_init.lock().await;
inner_init
.init()
.await
.map_err(|e| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!("{e:?}")))?;
drop(inner_init);

let task_locals = Python::with_gil(pyo3_async_runtimes::tokio::get_current_locals)?;
let handle_consume = get_runtime().spawn(scope(task_locals, async move {
let task_locals =
Expand Down Expand Up @@ -219,7 +219,10 @@ impl MessageConsumer for PyCallbackConsumer {
let callback = self.callback.clone();
let task_locals = self.task_locals.clone().lock_owned().await;
let task_locals = Python::with_gil(|py| task_locals.clone_ref(py));
let message = ReceiveMessage::from_rust_message(received.message);
let message = ReceiveMessage {
inner: received.message,
partition_id: received.partition_id,
};
get_runtime()
.spawn(scope(task_locals, async move {
Python::with_gil(|py| {
Expand Down
59 changes: 59 additions & 0 deletions foreign/python/src/iterator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

use std::sync::Arc;

use futures::StreamExt;
use iggy::prelude::IggyConsumer as RustIggyConsumer;
use pyo3::exceptions::PyStopIteration;

use crate::receive_message::ReceiveMessage;
use pyo3::prelude::*;
use pyo3_async_runtimes::tokio::future_into_py;
use tokio::sync::Mutex;

#[pyclass]
pub struct ReceiveMessageIterator {
pub(crate) inner: Arc<Mutex<RustIggyConsumer>>,
}

#[pymethods]
impl ReceiveMessageIterator {
pub fn __anext__<'a>(&self, py: Python<'a>) -> PyResult<Bound<'a, PyAny>> {
let inner = self.inner.clone();
future_into_py(py, async move {
let mut inner = inner.lock().await;
if let Some(message) = inner.next().await {
Ok(message
.map(|m| ReceiveMessage {
inner: m.message,
partition_id: m.partition_id,
})
.map_err(|e| {
PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(format!("{e:?}"))
})?)
} else {
Err(PyStopIteration::new_err("No more messages"))
}
})
}

pub fn __aiter__(slf: PyRef<'_, Self>) -> PyRef<'_, Self> {
slf
}
}
3 changes: 3 additions & 0 deletions foreign/python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@
pub mod client;
mod consumer;
mod identifier;
mod iterator;
mod receive_message;
mod send_message;
mod stream;
mod topic;

use client::IggyClient;
use consumer::{AutoCommit, AutoCommitAfter, AutoCommitWhen, IggyConsumer};
use iterator::ReceiveMessageIterator;
use pyo3::prelude::*;
use receive_message::{PollingStrategy, ReceiveMessage};
use send_message::SendMessage;
Expand All @@ -45,5 +47,6 @@ fn apache_iggy(_py: Python, m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<AutoCommit>()?;
m.add_class::<AutoCommitAfter>()?;
m.add_class::<AutoCommitWhen>()?;
m.add_class::<ReceiveMessageIterator>()?;
Ok(())
}
15 changes: 6 additions & 9 deletions foreign/python/src/receive_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,7 @@ use pyo3_stub_gen::derive::{gen_stub_pyclass, gen_stub_pyclass_complex_enum, gen
#[gen_stub_pyclass]
pub struct ReceiveMessage {
pub(crate) inner: RustReceiveMessage,
}

impl ReceiveMessage {
/// Converts a Rust message into its corresponding Python representation.
///
/// This is an internal utility function, not exposed to Python.
pub(crate) fn from_rust_message(message: RustReceiveMessage) -> Self {
Self { inner: message }
}
pub(crate) partition_id: u32,
}

#[gen_stub_pymethods]
Expand Down Expand Up @@ -83,6 +75,11 @@ impl ReceiveMessage {
pub fn length(&self) -> u32 {
self.inner.header.payload_length
}

/// Retrieves the partition this message belongs to.
pub fn partition_id(&self) -> u32 {
self.partition_id
}
}

#[derive(Clone, Copy)]
Expand Down
48 changes: 45 additions & 3 deletions foreign/python/tests/test_iggy_sdk.py
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ async def test_meta(self, iggy_client: IggyClient, consumer_group_setup):
await iggy_client.create_topic(
stream=stream_name, name=topic_name, partitions_count=1
)
consumer = iggy_client.consumer_group(
consumer = await iggy_client.consumer_group(
consumer_name,
stream_name,
topic_name,
Expand Down Expand Up @@ -481,7 +481,7 @@ async def test_consume_messages(
stream=stream_name, name=topic_name, partitions_count=1
)

consumer = iggy_client.consumer_group(
consumer = await iggy_client.consumer_group(
consumer_name,
stream_name,
topic_name,
Expand Down Expand Up @@ -509,6 +509,48 @@ async def send() -> None:

assert received_messages == test_messages

@pytest.mark.asyncio
async def test_iter_messages(self, iggy_client: IggyClient, consumer_group_setup):
"""Test that the consumer group can consume messages."""
consumer_name = consumer_group_setup["consumer"]
stream_name = consumer_group_setup["stream"]
topic_name = consumer_group_setup["topic"]
partition_id = consumer_group_setup["partition_id"]
test_messages = consumer_group_setup["messages"]

# Setup
received_messages = []
shutdown_event = asyncio.Event()
await iggy_client.create_stream(stream_name)
await iggy_client.create_topic(
stream=stream_name, name=topic_name, partitions_count=1
)

consumer = await iggy_client.consumer_group(
consumer_name,
stream_name,
topic_name,
partition_id,
PollingStrategy.Next(),
10,
auto_commit=AutoCommit.Interval(timedelta(seconds=5)),
poll_interval=timedelta(seconds=1),
)

await iggy_client.send_messages(
stream_name,
topic_name,
partition_id,
[Message(m) for m in test_messages],
)

async for message in consumer.iter():
received_messages.append(message.payload().decode())
if len(received_messages) == 5:
break

assert received_messages == test_messages

@pytest.mark.asyncio
async def test_shutdown(self, iggy_client: IggyClient, consumer_group_setup):
"""Test that the consumer group can be signaled to shutdown."""
Expand All @@ -524,7 +566,7 @@ async def test_shutdown(self, iggy_client: IggyClient, consumer_group_setup):
stream=stream_name, name=topic_name, partitions_count=1
)

consumer = iggy_client.consumer_group(
consumer = await iggy_client.consumer_group(
consumer_name,
stream_name,
topic_name,
Expand Down
Loading