Skip to content

DefaultStreamMessageListenerContainer does't read new messages #3201

@Edsuns

Description

@Edsuns

Even though use StreamOffset.create(streamKey, ReadOffset.lastConsumed()) to build the listener, it does't read new messages, and just read pending messages.

        StreamMessageListenerContainer<String, ObjectRecord<String, String>> container =
                StreamMessageListenerContainer.create(redisMQTemplate.getRedisTemplate().getRequiredConnectionFactory(), containerOptions);
        Consumer consumer = Consumer.from(group, consumerName);
        StreamOffset<String> streamOffset = StreamOffset.create(streamKey, ReadOffset.lastConsumed());
        StreamMessageListenerContainer.StreamReadRequestBuilder<String> builder = StreamMessageListenerContainer.StreamReadRequest
                .builder(streamOffset).consumer(consumer)
                .autoAcknowledge(false)
                .cancelOnError(throwable -> false);
        container.register(builder.build(), listener);

https://redis.io/docs/latest/commands/xreadgroup/#differences-between-xread-and-xreadgroup

The ID to specify in the STREAMS option when using XREADGROUP can be one of the following two:

  • The special > ID, which means that the consumer want to receive only messages that were never delivered to any other consumer. It just means, give me new messages.
  • Any other ID, that is, 0 or any other valid ID or incomplete ID (just the millisecond time part), will have the effect of returning entries that are pending for the consumer sending the command with IDs greater than the one provided. So basically if the ID is not >, then the command will just let the client access its pending entries: messages delivered to it, but not yet acknowledged. Note that in this case, both BLOCK and NOACK are ignored.

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions