Skip to content

Conversation

@cstruct
Copy link
Contributor

@cstruct cstruct commented Sep 14, 2025

This adds a .iter() method for asynchronously iterating over Iggy messages from a consumer. Adding this method requires the consumer group to be initialized so consumer_group is made async and initialization is made upfront.partition_id is also exposed on ReceiveMessage to allow manual commit of offsets.

@cstruct cstruct force-pushed the extend-consumer-group-interface branch from b98f4d8 to 830a592 Compare September 14, 2025 19:01
@cstruct
Copy link
Contributor Author

cstruct commented Sep 14, 2025

@changhc I'd love to get your feedback w.r.t the interface.

This adds a `.iter()` method for asynchronously iterating over Iggy
messages from a consumer. Adding this method requires the consumer
group to be initialized so `consumer_group` is made async and
initialization is made upfront. `partition_id` is also exposed on
`ReceiveMessage` to allow manual commit of offsets.
@cstruct cstruct force-pushed the extend-consumer-group-interface branch from 830a592 to 2ce2ba0 Compare September 14, 2025 19:26

/// Asynchronously iterate over `ReceiveMessage`s.
#[gen_stub(override_return_type(type_repr="collections.abc.AsyncIterator[ReceiveMessage]", imports=("collections.abc")))]
fn iter<'a>(&self) -> ReceiveMessageIterator {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any problem making this __aiter__?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so, I thought about it as well but landed on having a iter method because I thought it read better. I don't have strong feelings about this so am open to changing it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about this again and now I think it's better not to allow users to iterate through the consumer itself as that can be confusing. This iter method can probably be named as read_messages for clarity as it seems to be reading messages only instead of consuming them.

Also, what is the expected behaviour?

count = 0
async for message in consumer.iter():
    # read messages 0 to 4
    count += 1
    if count == 5:
        break
# call it again
async for message in consumer.iter():
    # reads message 5 or 0?
    break

If this method changes the consumer state, we need to make this clear.

@hubcio
Copy link
Contributor

hubcio commented Nov 18, 2025

@cstruct @changhc what's next in this PR? should we rebase and merge, or close?

@cstruct
Copy link
Contributor Author

cstruct commented Nov 18, 2025

Sorry for dropping the ball here. I've had reduced bandwidth and I apologize for not communicating that I've had to put this on hold. I'm going to need this for work in a bit so I will prioritize this soon.

What's next for this PR is that we have to think about the interface we want to expose. Adding this as read_messages and keeping the old callback based impl consume_messages is confusing without clearly documenting how they differ and when to use one of them. As it stands the only difference besides interface is that consume_messages supports AutoCommit::After(...), but I don't know if that justifies having two interfaces. As far as I can remember AutoCommit::After(...) still commits when the consumer yields an error, so this only really guards us against committing before a panic in Rust?

I'll try to get into this ASAP.

@hubcio
Copy link
Contributor

hubcio commented Nov 18, 2025

@cstruct mate, no worries, no one is demanding anything from you :D i just wanted to cleanup PRs so i pinged you. there are no deadlines. if you don't have time that's perfectly fine - just write it, don't force anything upon yourself.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants