From 54b16fb716c728400986bce12277f4ed22ad4bed Mon Sep 17 00:00:00 2001 From: Krzysztof Kocel Date: Mon, 29 Jul 2024 20:52:22 +0200 Subject: [PATCH 1/6] Add lag and entries read to StreamInfo --- .../redis/connection/stream/StreamInfo.java | 20 +++++++++++++++++++ .../AbstractConnectionIntegrationTests.java | 4 ++++ ...eactiveStreamCommandsIntegrationTests.java | 4 ++++ 3 files changed, 28 insertions(+) diff --git a/src/main/java/org/springframework/data/redis/connection/stream/StreamInfo.java b/src/main/java/org/springframework/data/redis/connection/stream/StreamInfo.java index c822fcea47..3eb0f1747e 100644 --- a/src/main/java/org/springframework/data/redis/connection/stream/StreamInfo.java +++ b/src/main/java/org/springframework/data/redis/connection/stream/StreamInfo.java @@ -361,6 +361,26 @@ public Long pendingCount() { public String lastDeliveredId() { return getRequired("last-delivered-id", String.class); } + + /** + * The logical "read counter" of the last entry delivered to the group's consumers. Corresponds to {@literal entries-read}. + * + * @return + */ + public Long entriesRead() { + return getRequired("entries-read", Long.class); + } + + /** + * The number of entries in the stream that are still waiting to be delivered to the group's consumers, + * or a NULL when that number can't be determined. Corresponds to {@literal lag}. + * + * @return + */ + @Nullable + public Long lag() { + return get("entries-read", Long.class); + } } public static class XInfoConsumers implements Streamable { diff --git a/src/test/java/org/springframework/data/redis/connection/AbstractConnectionIntegrationTests.java b/src/test/java/org/springframework/data/redis/connection/AbstractConnectionIntegrationTests.java index b6b9cd6ed4..87c659d19c 100644 --- a/src/test/java/org/springframework/data/redis/connection/AbstractConnectionIntegrationTests.java +++ b/src/test/java/org/springframework/data/redis/connection/AbstractConnectionIntegrationTests.java @@ -4017,6 +4017,8 @@ public void xinfoGroups() { assertThat(info.get(0).groupName()).isEqualTo("my-group"); assertThat(info.get(0).consumerCount()).isEqualTo(1L); assertThat(info.get(0).pendingCount()).isEqualTo(2L); + assertThat(info.get(0).lag()).isEqualTo(0L); + assertThat(info.get(0).entriesRead()).isEqualTo(2L); assertThat(info.get(0).lastDeliveredId()).isEqualTo(lastRecord.getValue()); } @@ -4055,6 +4057,8 @@ public void xinfoGroupsNoConsumer() { assertThat(info.get(0).groupName()).isEqualTo("my-group"); assertThat(info.get(0).consumerCount()).isZero(); assertThat(info.get(0).pendingCount()).isZero(); + assertThat(info.get(0).lag()).isZero(); + assertThat(info.get(0).entriesRead()).isZero(); assertThat(info.get(0).lastDeliveredId()).isEqualTo("0-0"); } diff --git a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommandsIntegrationTests.java b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommandsIntegrationTests.java index 9fb4d8401b..757e718168 100644 --- a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommandsIntegrationTests.java +++ b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommandsIntegrationTests.java @@ -429,6 +429,8 @@ void xinfoGroups() { assertThat(info.groupName()).isEqualTo("my-group"); assertThat(info.consumerCount()).isEqualTo(1L); assertThat(info.pendingCount()).isEqualTo(2L); + assertThat(info.lag()).isZero(); + assertThat(info.entriesRead()).isEqualTo(2L); assertThat(info.lastDeliveredId()).isEqualTo(lastRecord); }).verifyComplete(); } @@ -455,6 +457,8 @@ void xinfoGroupsNoConsumer() { assertThat(info.groupName()).isEqualTo("my-group"); assertThat(info.consumerCount()).isZero(); assertThat(info.pendingCount()).isZero(); + assertThat(info.entriesRead()).isZero(); + assertThat(info.lag()).isZero(); assertThat(info.lastDeliveredId()).isEqualTo("0-0"); }).verifyComplete(); } From 891e75d893d1ccb987cc64a512fa938820675753 Mon Sep 17 00:00:00 2001 From: Krzysztof Kocel Date: Tue, 30 Jul 2024 10:50:17 +0200 Subject: [PATCH 2/6] authors, lag --- .../data/redis/connection/stream/StreamInfo.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/springframework/data/redis/connection/stream/StreamInfo.java b/src/main/java/org/springframework/data/redis/connection/stream/StreamInfo.java index 3eb0f1747e..327c77a4bd 100644 --- a/src/main/java/org/springframework/data/redis/connection/stream/StreamInfo.java +++ b/src/main/java/org/springframework/data/redis/connection/stream/StreamInfo.java @@ -219,6 +219,8 @@ public Map getLastEntry() { * {@literal Redis Stream}. * * @author Christoph Strobl + * @author Mark Paluch + * @author Krzysztof Kocel */ public static class XInfoGroups implements Streamable { @@ -379,7 +381,7 @@ public Long entriesRead() { */ @Nullable public Long lag() { - return get("entries-read", Long.class); + return get("lag", Long.class); } } From e94fdd43475ac244f73d888d081ad1d07341288d Mon Sep 17 00:00:00 2001 From: Krzysztof Kocel Date: Tue, 17 Sep 2024 13:43:16 +0200 Subject: [PATCH 3/6] Add support for xautoclaim --- .../connection/DefaultedRedisConnection.java | 30 ++++++++ .../redis/connection/RedisStreamCommands.java | 65 +++++++++++++++++ .../jedis/JedisClusterStreamCommands.java | 23 ++++++ .../connection/jedis/JedisStreamCommands.java | 72 +++++++++++++++++++ .../connection/jedis/StreamConverters.java | 12 ++++ .../lettuce/LettuceStreamCommands.java | 49 +++++++++++++ .../connection/lettuce/StreamConverters.java | 10 +++ .../connection/stream/ClaimedMessages.java | 43 +++++++++++ .../connection/stream/ClaimedMessagesIds.java | 43 +++++++++++ 9 files changed, 347 insertions(+) create mode 100644 src/main/java/org/springframework/data/redis/connection/stream/ClaimedMessages.java create mode 100644 src/main/java/org/springframework/data/redis/connection/stream/ClaimedMessagesIds.java diff --git a/src/main/java/org/springframework/data/redis/connection/DefaultedRedisConnection.java b/src/main/java/org/springframework/data/redis/connection/DefaultedRedisConnection.java index aaeaafe18b..ea4227fc62 100644 --- a/src/main/java/org/springframework/data/redis/connection/DefaultedRedisConnection.java +++ b/src/main/java/org/springframework/data/redis/connection/DefaultedRedisConnection.java @@ -29,6 +29,8 @@ import org.springframework.data.geo.Metric; import org.springframework.data.geo.Point; import org.springframework.data.redis.connection.stream.ByteRecord; +import org.springframework.data.redis.connection.stream.ClaimedMessages; +import org.springframework.data.redis.connection.stream.ClaimedMessagesIds; import org.springframework.data.redis.connection.stream.Consumer; import org.springframework.data.redis.connection.stream.MapRecord; import org.springframework.data.redis.connection.stream.PendingMessages; @@ -503,6 +505,34 @@ default List xClaim(byte[] key, String group, String newOwner, XClai return streamCommands().xClaim(key, group, newOwner, options); } + /** @deprecated in favor of {@link RedisConnection#streamCommands()}}. */ + @Override + @Deprecated + default ClaimedMessagesIds xAutoclaimJustId(byte[] key, String group, String newOwner, Duration minIdleTime, String start) { + return streamCommands().xAutoclaimJustId(key, group, newOwner, minIdleTime, start); + } + + /** @deprecated in favor of {@link RedisConnection#streamCommands()}}. */ + @Override + @Deprecated + default ClaimedMessagesIds xAutoclaimJustId(byte[] key, String group, String newOwner, Duration minIdleTime, String start, Long count) { + return streamCommands().xAutoclaimJustId(key, group, newOwner, minIdleTime, start, count); + } + + /** @deprecated in favor of {@link RedisConnection#streamCommands()}}. */ + @Override + @Deprecated + default ClaimedMessages xAutoclaim(byte[] key, String group, String newOwner, Duration minIdleTime, String start) { + return streamCommands().xAutoclaim(key, group, newOwner, minIdleTime, start); + } + + /** @deprecated in favor of {@link RedisConnection#streamCommands()}}. */ + @Override + @Deprecated + default ClaimedMessages xAutoclaim(byte[] key, String group, String newOwner, Duration minIdleTime, String start, Long count) { + return streamCommands().xAutoclaim(key, group, newOwner, minIdleTime, start, count); + } + /** @deprecated in favor of {@link RedisConnection#streamCommands()}}. */ @Override @Deprecated diff --git a/src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java index a326106610..84f0e912e0 100644 --- a/src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java @@ -521,6 +521,71 @@ public XClaimOptions ids(String... ids) { } } + /** + * Transfer ownership of pending stream entries that match the specified criteria. Returns just an array of IDs + * of messages successfully claimed, without returning the actual message. The retry counter is not incremented. + * + * @param key the {@literal key} the stream is stored at. + * @param group the name of the {@literal consumer group}. + * @param newOwner the name of the new {@literal consumer}. + * @param minIdleTime must not be {@literal null}. + * @param start must not be {@literal null}. + * @return list of {@link RecordId ids} that changed user. + * @see Redis Documentation: XAUTOCLAIM + * @since 2.3 + */ + @Nullable + ClaimedMessagesIds xAutoclaimJustId(byte[] key, String group, String newOwner, Duration minIdleTime, String start); + + /** + * Transfer ownership of pending stream entries that match the specified criteria. Returns just an array of IDs + * of messages successfully claimed, without returning the actual message. The retry counter is not incremented. + * + * @param key the {@literal key} the stream is stored at. + * @param group the name of the {@literal consumer group}. + * @param newOwner the name of the new {@literal consumer}. + * @param minIdleTime must not be {@literal null}. + * @param start must not be {@literal null}. + * @param count limit the number of results. Must not be {@literal null}. + * @return list of {@link RecordId ids} that changed user. + * @see Redis Documentation: XAUTOCLAIM + * @since 2.3 + */ + @Nullable + ClaimedMessagesIds xAutoclaimJustId(byte[] key, String group, String newOwner, Duration minIdleTime, String start, Long count); + + + /** + * Transfer ownership of pending stream entries that match the specified criteria. + * + * @param key the {@literal key} the stream is stored at. + * @param group the name of the {@literal consumer group}. + * @param newOwner the name of the new {@literal consumer}. + * @param minIdleTime must not be {@literal null}. + * @param start must not be {@literal null}. + * @return list of {@link ByteRecord} that changed user. + * @see Redis Documentation: XAUTOCLAIM + */ + @Nullable + ClaimedMessages xAutoclaim(byte[] key, String group, String newOwner, Duration minIdleTime, String start); + + /** + * Transfer ownership of pending stream entries that match the specified criteria. + * + * @param key the {@literal key} the stream is stored at. + * @param group the name of the {@literal consumer group}. + * @param newOwner the name of the new {@literal consumer}. + * @param minIdleTime must not be {@literal null}. + * @param start must not be {@literal null}. + * @param count limit the number of results. Must not be {@literal null}. + * @return list of {@link ByteRecord} that changed user. + * @see Redis Documentation: XAUTOCLAIM + * @since 2.3 + */ + @Nullable + ClaimedMessages xAutoclaim(byte[] key, String group, String newOwner, Duration minIdleTime, String start, Long count); + + /** * Removes the records with the given id's from the stream. Returns the number of items deleted, that may be different * from the number of id's passed in case certain id's do not exist. diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterStreamCommands.java index 05cadae499..b222958ae2 100644 --- a/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterStreamCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterStreamCommands.java @@ -17,6 +17,8 @@ import static org.springframework.data.redis.connection.jedis.StreamConverters.*; +import org.springframework.data.redis.connection.stream.ClaimedMessages; +import org.springframework.data.redis.connection.stream.ClaimedMessagesIds; import redis.clients.jedis.BuilderFactory; import redis.clients.jedis.params.XAddParams; import redis.clients.jedis.params.XClaimParams; @@ -24,6 +26,7 @@ import redis.clients.jedis.params.XReadGroupParams; import redis.clients.jedis.params.XReadParams; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -131,6 +134,26 @@ public List xClaim(byte[] key, String group, String newOwner, XClaim } } + @Override + public ClaimedMessagesIds xAutoclaimJustId(byte[] key, String group, String newOwner, Duration minIdleTime, String start) { + return null; + } + + @Override + public ClaimedMessagesIds xAutoclaimJustId(byte[] key, String group, String newOwner, Duration minIdleTime, String start, Long count) { + return null; + } + + @Override + public ClaimedMessages xAutoclaim(byte[] key, String group, String newOwner, Duration minIdleTime, String start) { + return null; + } + + @Override + public ClaimedMessages xAutoclaim(byte[] key, String group, String newOwner, Duration minIdleTime, String start, Long count) { + return null; + } + @Override public Long xDel(byte[] key, RecordId... recordIds) { diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/JedisStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/jedis/JedisStreamCommands.java index c3ecbe8255..cd37aca18b 100644 --- a/src/main/java/org/springframework/data/redis/connection/jedis/JedisStreamCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/jedis/JedisStreamCommands.java @@ -15,11 +15,14 @@ */ package org.springframework.data.redis.connection.jedis; +import org.springframework.data.redis.connection.stream.ClaimedMessages; +import org.springframework.data.redis.connection.stream.ClaimedMessagesIds; import redis.clients.jedis.BuilderFactory; import redis.clients.jedis.Jedis; import redis.clients.jedis.commands.PipelineBinaryCommands; import redis.clients.jedis.commands.StreamPipelineBinaryCommands; import redis.clients.jedis.params.XAddParams; +import redis.clients.jedis.params.XAutoClaimParams; import redis.clients.jedis.params.XClaimParams; import redis.clients.jedis.params.XPendingParams; import redis.clients.jedis.params.XReadGroupParams; @@ -27,6 +30,7 @@ import redis.clients.jedis.resps.StreamConsumersInfo; import redis.clients.jedis.resps.StreamGroupInfo; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -116,6 +120,74 @@ public List xClaim(byte[] key, String group, String newOwner, XClaim .get(r -> StreamConverters.convertToByteRecord(key, r)); } + @Override + public ClaimedMessagesIds xAutoclaimJustId(byte[] key, String group, String newOwner, Duration minIdleTime, String start) { + + Assert.notNull(key, "Key must not be null"); + Assert.notNull(group, "Group must not be null"); + Assert.notNull(minIdleTime, "MinIdleTime must not be null"); + Assert.notNull(start, "Start must not be null"); + + XAutoClaimParams params = XAutoClaimParams.xAutoClaimParams(); + + final List byteRecords = connection.invoke() + .from(Jedis::xautoclaimJustId, ResponseCommands::xautoclaimJustId, key, JedisConverters.toBytes(group), + JedisConverters.toBytes(newOwner), minIdleTime.toMillis(), JedisConverters.toBytes(start), + params).get(r -> StreamConverters.convertToByteRecord(key, r)); + + return null; + } + + @Override + public ClaimedMessagesIds xAutoclaimJustId(byte[] key, String group, String newOwner, Duration minIdleTime, String start, Long count) { + Assert.notNull(key, "Key must not be null"); + Assert.notNull(group, "Group must not be null"); + Assert.notNull(minIdleTime, "MinIdleTime must not be null"); + Assert.notNull(start, "Start must not be null"); + Assert.notNull(count, "Count must not be null"); + + XAutoClaimParams params = XAutoClaimParams.xAutoClaimParams().count(count.intValue()); + + final List byteRecords = connection.invoke() + .from(Jedis::xautoclaimJustId, ResponseCommands::xautoclaimJustId, key, JedisConverters.toBytes(group), + JedisConverters.toBytes(newOwner), minIdleTime.toMillis(), JedisConverters.toBytes(start), + params).get(r -> StreamConverters.convertToByteRecord(key, r)); + + return null; + } + + @Override + public ClaimedMessages xAutoclaim(byte[] key, String group, String newOwner, Duration minIdleTime, String start) { + Assert.notNull(key, "Key must not be null"); + Assert.notNull(group, "Group must not be null"); + Assert.notNull(newOwner, "NewOwner must not be null"); + + XAutoClaimParams params = XAutoClaimParams.xAutoClaimParams(); + + final List br= connection.invoke() + .from(Jedis::xautoclaim, ResponseCommands::xautoclaim, key, JedisConverters.toBytes(group), + JedisConverters.toBytes(newOwner), minIdleTime.toMillis(), JedisConverters.toBytes(start), + params).get(r -> StreamConverters.convertToByteRecord(key, r)); + + return null; + } + + @Override + public ClaimedMessages xAutoclaim(byte[] key, String group, String newOwner, Duration minIdleTime, String start, Long count) { + Assert.notNull(key, "Key must not be null"); + Assert.notNull(group, "Group must not be null"); + Assert.notNull(newOwner, "NewOwner must not be null"); + + XAutoClaimParams params = XAutoClaimParams.xAutoClaimParams().count(count.intValue()); + + final List br= connection.invoke() + .from(Jedis::xautoclaim, ResponseCommands::xautoclaim, key, JedisConverters.toBytes(group), + JedisConverters.toBytes(newOwner), minIdleTime.toMillis(), JedisConverters.toBytes(start), + params).get(r -> StreamConverters.convertToByteRecord(key, r)); + + return null; + } + @Override public Long xDel(byte[] key, RecordId... recordIds) { diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/StreamConverters.java b/src/main/java/org/springframework/data/redis/connection/jedis/StreamConverters.java index 60d16a1f4a..c9587118b9 100644 --- a/src/main/java/org/springframework/data/redis/connection/jedis/StreamConverters.java +++ b/src/main/java/org/springframework/data/redis/connection/jedis/StreamConverters.java @@ -18,6 +18,7 @@ import redis.clients.jedis.BuilderFactory; import redis.clients.jedis.StreamEntryID; import redis.clients.jedis.params.XAddParams; +import redis.clients.jedis.params.XAutoClaimParams; import redis.clients.jedis.params.XClaimParams; import redis.clients.jedis.params.XPendingParams; import redis.clients.jedis.params.XReadGroupParams; @@ -263,6 +264,17 @@ public static XClaimParams toXClaimParams(RedisStreamCommands.XClaimOptions opti return params; } + public static XAutoClaimParams toXautoClaimParams(Integer count) { + + XAutoClaimParams params = XAutoClaimParams.xAutoClaimParams(); + + if (count != null) { + params.count(count); + } + + return params; + } + public static XReadParams toXReadParams(StreamReadOptions readOptions) { XReadParams params = XReadParams.xReadParams(); diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceStreamCommands.java index 7f15f68e8e..97dfaf9f3d 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceStreamCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceStreamCommands.java @@ -16,12 +16,14 @@ package org.springframework.data.redis.connection.lettuce; import io.lettuce.core.XAddArgs; +import io.lettuce.core.XAutoClaimArgs; import io.lettuce.core.XClaimArgs; import io.lettuce.core.XGroupCreateArgs; import io.lettuce.core.XReadArgs; import io.lettuce.core.api.async.RedisStreamAsyncCommands; import io.lettuce.core.cluster.api.async.RedisClusterAsyncCommands; +import java.time.Duration; import java.util.Arrays; import java.util.List; import java.util.Objects; @@ -31,6 +33,8 @@ import org.springframework.data.redis.connection.Limit; import org.springframework.data.redis.connection.RedisStreamCommands; import org.springframework.data.redis.connection.stream.ByteRecord; +import org.springframework.data.redis.connection.stream.ClaimedMessages; +import org.springframework.data.redis.connection.stream.ClaimedMessagesIds; import org.springframework.data.redis.connection.stream.Consumer; import org.springframework.data.redis.connection.stream.MapRecord; import org.springframework.data.redis.connection.stream.PendingMessages; @@ -50,6 +54,7 @@ * @author Dejan Jankov * @author Dengliming * @author Mark John Moreno + * @author Krzysztof Kocel * @since 2.2 */ class LettuceStreamCommands implements RedisStreamCommands { @@ -117,6 +122,50 @@ public List xClaim(byte[] key, String group, String newOwner, XClaim .toList(StreamConverters.byteRecordConverter()); } + @Override + public ClaimedMessagesIds xAutoclaimJustId(byte[] key, String group, String newOwner, Duration minIdleTime, String start) { + + io.lettuce.core.Consumer from = io.lettuce.core.Consumer.from(LettuceConverters.toBytes(group), + LettuceConverters.toBytes(newOwner)); + XAutoClaimArgs args = XAutoClaimArgs.Builder.justid(from, minIdleTime, start); + + return connection.invoke().from(RedisStreamAsyncCommands::xautoclaim, key, args) + .get(StreamConverters.claimedMessageJustIdConverter()); + } + + @Override + public ClaimedMessagesIds xAutoclaimJustId(byte[] key, String group, String newOwner, Duration minIdleTime, String start, Long count) { + + io.lettuce.core.Consumer from = io.lettuce.core.Consumer.from(LettuceConverters.toBytes(group), + LettuceConverters.toBytes(newOwner)); + XAutoClaimArgs args = XAutoClaimArgs.Builder.justid(from, minIdleTime, start).count(count); + + return connection.invoke().from(RedisStreamAsyncCommands::xautoclaim, key, args) + .get(StreamConverters.claimedMessageJustIdConverter()); + } + + @Override + public ClaimedMessages xAutoclaim(byte[] key, String group, String newOwner, Duration minIdleTime, String start) { + + io.lettuce.core.Consumer from = io.lettuce.core.Consumer.from(LettuceConverters.toBytes(group), + LettuceConverters.toBytes(newOwner)); + XAutoClaimArgs args = XAutoClaimArgs.Builder.xautoclaim(from, minIdleTime, start); + + return connection.invoke().from(RedisStreamAsyncCommands::xautoclaim, key, args) + .get(StreamConverters.claimedMessageConverter()); + } + + @Override + public ClaimedMessages xAutoclaim(byte[] key, String group, String newOwner, Duration minIdleTime, String start, Long count) { + + io.lettuce.core.Consumer from = io.lettuce.core.Consumer.from(LettuceConverters.toBytes(group), + LettuceConverters.toBytes(newOwner)); + XAutoClaimArgs args = XAutoClaimArgs.Builder.xautoclaim(from, minIdleTime, start).count(count); + + return connection.invoke().from(RedisStreamAsyncCommands::xautoclaim, key, args) + .get(StreamConverters.claimedMessageConverter()); + } + @Override public Long xDel(byte[] key, RecordId... recordIds) { diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/StreamConverters.java b/src/main/java/org/springframework/data/redis/connection/lettuce/StreamConverters.java index a243a44748..a3955fa40f 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/StreamConverters.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/StreamConverters.java @@ -18,11 +18,13 @@ import io.lettuce.core.StreamMessage; import io.lettuce.core.XClaimArgs; import io.lettuce.core.XReadArgs; +import io.lettuce.core.models.stream.ClaimedMessages; import io.lettuce.core.models.stream.PendingMessage; import io.lettuce.core.models.stream.PendingMessages; import java.time.Duration; import java.util.List; +import java.util.stream.Collectors; import org.springframework.core.convert.converter.Converter; import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions; @@ -71,6 +73,14 @@ static Converter, ByteRecord> byteRecordConverter( return (it) -> StreamRecords.newRecord().in(it.getStream()).withId(it.getId()).ofBytes(it.getBody()); } + static Converter, org.springframework.data.redis.connection.stream.ClaimedMessages> claimedMessageConverter() { + return (it) -> new org.springframework.data.redis.connection.stream.ClaimedMessages(RecordId.of(it.getId()), it.getMessages().stream().map((message) -> StreamConverters.byteRecordConverter().convert(message)).toList(), List.of()); + } + + static Converter, org.springframework.data.redis.connection.stream.ClaimedMessagesIds> claimedMessageJustIdConverter() { + return (it) -> new org.springframework.data.redis.connection.stream.ClaimedMessagesIds(RecordId.of(it.getId()), it.getMessages().stream().map((message) -> RecordId.of(message.getId())).toList(), List.of()); + } + /** * Convert the raw Lettuce xpending result to {@link PendingMessages}. * diff --git a/src/main/java/org/springframework/data/redis/connection/stream/ClaimedMessages.java b/src/main/java/org/springframework/data/redis/connection/stream/ClaimedMessages.java new file mode 100644 index 0000000000..85db116d3d --- /dev/null +++ b/src/main/java/org/springframework/data/redis/connection/stream/ClaimedMessages.java @@ -0,0 +1,43 @@ +package org.springframework.data.redis.connection.stream; + +import java.util.List; + +public class ClaimedMessages { + private final RecordId id; + private final List claimedMessages; + private final List deletedMessages; + + public ClaimedMessages(RecordId id, List claimedMessages, List deletedMessages) { + this.id = id; + this.claimedMessages = claimedMessages; + this.deletedMessages = deletedMessages; + } + + /** + * @return the message id. + */ + public RecordId getId() { + return id; + } + + /** + * @return the message id as {@link String}. + */ + public String getIdAsString() { + return id.getValue(); + } + + /** + * @return list of claimed messages. + */ + List getClaimedMessages() { + return claimedMessages; + } + + /** + * @return list of deleted messages. + */ + List getDeletedMessages() { + return deletedMessages; + } +} diff --git a/src/main/java/org/springframework/data/redis/connection/stream/ClaimedMessagesIds.java b/src/main/java/org/springframework/data/redis/connection/stream/ClaimedMessagesIds.java new file mode 100644 index 0000000000..3dec9b604d --- /dev/null +++ b/src/main/java/org/springframework/data/redis/connection/stream/ClaimedMessagesIds.java @@ -0,0 +1,43 @@ +package org.springframework.data.redis.connection.stream; + +import java.util.List; + +public class ClaimedMessagesIds { + private final RecordId id; + private final List claimedMessages; + private final List deletedMessages; + + public ClaimedMessagesIds(RecordId id, List claimedMessages, List deletedMessages) { + this.id = id; + this.claimedMessages = claimedMessages; + this.deletedMessages = deletedMessages; + } + + /** + * @return the message id. + */ + public RecordId getId() { + return id; + } + + /** + * @return the message id as {@link String}. + */ + public String getIdAsString() { + return id.getValue(); + } + + /** + * @return list of claimed messages. + */ + List getClaimedMessages() { + return claimedMessages; + } + + /** + * @return list of deleted messages. + */ + List getDeletedMessages() { + return deletedMessages; + } +} From 3b20ae283a3df11761e1d15854c176b15956da93 Mon Sep 17 00:00:00 2001 From: Krzysztof Kocel Date: Mon, 9 Dec 2024 15:43:26 +0100 Subject: [PATCH 4/6] Revert "Add support for xautoclaim" This reverts commit e94fdd43475ac244f73d888d081ad1d07341288d. --- .../connection/DefaultedRedisConnection.java | 30 -------- .../redis/connection/RedisStreamCommands.java | 65 ----------------- .../jedis/JedisClusterStreamCommands.java | 23 ------ .../connection/jedis/JedisStreamCommands.java | 72 ------------------- .../connection/jedis/StreamConverters.java | 12 ---- .../lettuce/LettuceStreamCommands.java | 49 ------------- .../connection/lettuce/StreamConverters.java | 10 --- .../connection/stream/ClaimedMessages.java | 43 ----------- .../connection/stream/ClaimedMessagesIds.java | 43 ----------- 9 files changed, 347 deletions(-) delete mode 100644 src/main/java/org/springframework/data/redis/connection/stream/ClaimedMessages.java delete mode 100644 src/main/java/org/springframework/data/redis/connection/stream/ClaimedMessagesIds.java diff --git a/src/main/java/org/springframework/data/redis/connection/DefaultedRedisConnection.java b/src/main/java/org/springframework/data/redis/connection/DefaultedRedisConnection.java index ea4227fc62..aaeaafe18b 100644 --- a/src/main/java/org/springframework/data/redis/connection/DefaultedRedisConnection.java +++ b/src/main/java/org/springframework/data/redis/connection/DefaultedRedisConnection.java @@ -29,8 +29,6 @@ import org.springframework.data.geo.Metric; import org.springframework.data.geo.Point; import org.springframework.data.redis.connection.stream.ByteRecord; -import org.springframework.data.redis.connection.stream.ClaimedMessages; -import org.springframework.data.redis.connection.stream.ClaimedMessagesIds; import org.springframework.data.redis.connection.stream.Consumer; import org.springframework.data.redis.connection.stream.MapRecord; import org.springframework.data.redis.connection.stream.PendingMessages; @@ -505,34 +503,6 @@ default List xClaim(byte[] key, String group, String newOwner, XClai return streamCommands().xClaim(key, group, newOwner, options); } - /** @deprecated in favor of {@link RedisConnection#streamCommands()}}. */ - @Override - @Deprecated - default ClaimedMessagesIds xAutoclaimJustId(byte[] key, String group, String newOwner, Duration minIdleTime, String start) { - return streamCommands().xAutoclaimJustId(key, group, newOwner, minIdleTime, start); - } - - /** @deprecated in favor of {@link RedisConnection#streamCommands()}}. */ - @Override - @Deprecated - default ClaimedMessagesIds xAutoclaimJustId(byte[] key, String group, String newOwner, Duration minIdleTime, String start, Long count) { - return streamCommands().xAutoclaimJustId(key, group, newOwner, minIdleTime, start, count); - } - - /** @deprecated in favor of {@link RedisConnection#streamCommands()}}. */ - @Override - @Deprecated - default ClaimedMessages xAutoclaim(byte[] key, String group, String newOwner, Duration minIdleTime, String start) { - return streamCommands().xAutoclaim(key, group, newOwner, minIdleTime, start); - } - - /** @deprecated in favor of {@link RedisConnection#streamCommands()}}. */ - @Override - @Deprecated - default ClaimedMessages xAutoclaim(byte[] key, String group, String newOwner, Duration minIdleTime, String start, Long count) { - return streamCommands().xAutoclaim(key, group, newOwner, minIdleTime, start, count); - } - /** @deprecated in favor of {@link RedisConnection#streamCommands()}}. */ @Override @Deprecated diff --git a/src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java index 84f0e912e0..a326106610 100644 --- a/src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/RedisStreamCommands.java @@ -521,71 +521,6 @@ public XClaimOptions ids(String... ids) { } } - /** - * Transfer ownership of pending stream entries that match the specified criteria. Returns just an array of IDs - * of messages successfully claimed, without returning the actual message. The retry counter is not incremented. - * - * @param key the {@literal key} the stream is stored at. - * @param group the name of the {@literal consumer group}. - * @param newOwner the name of the new {@literal consumer}. - * @param minIdleTime must not be {@literal null}. - * @param start must not be {@literal null}. - * @return list of {@link RecordId ids} that changed user. - * @see Redis Documentation: XAUTOCLAIM - * @since 2.3 - */ - @Nullable - ClaimedMessagesIds xAutoclaimJustId(byte[] key, String group, String newOwner, Duration minIdleTime, String start); - - /** - * Transfer ownership of pending stream entries that match the specified criteria. Returns just an array of IDs - * of messages successfully claimed, without returning the actual message. The retry counter is not incremented. - * - * @param key the {@literal key} the stream is stored at. - * @param group the name of the {@literal consumer group}. - * @param newOwner the name of the new {@literal consumer}. - * @param minIdleTime must not be {@literal null}. - * @param start must not be {@literal null}. - * @param count limit the number of results. Must not be {@literal null}. - * @return list of {@link RecordId ids} that changed user. - * @see Redis Documentation: XAUTOCLAIM - * @since 2.3 - */ - @Nullable - ClaimedMessagesIds xAutoclaimJustId(byte[] key, String group, String newOwner, Duration minIdleTime, String start, Long count); - - - /** - * Transfer ownership of pending stream entries that match the specified criteria. - * - * @param key the {@literal key} the stream is stored at. - * @param group the name of the {@literal consumer group}. - * @param newOwner the name of the new {@literal consumer}. - * @param minIdleTime must not be {@literal null}. - * @param start must not be {@literal null}. - * @return list of {@link ByteRecord} that changed user. - * @see Redis Documentation: XAUTOCLAIM - */ - @Nullable - ClaimedMessages xAutoclaim(byte[] key, String group, String newOwner, Duration minIdleTime, String start); - - /** - * Transfer ownership of pending stream entries that match the specified criteria. - * - * @param key the {@literal key} the stream is stored at. - * @param group the name of the {@literal consumer group}. - * @param newOwner the name of the new {@literal consumer}. - * @param minIdleTime must not be {@literal null}. - * @param start must not be {@literal null}. - * @param count limit the number of results. Must not be {@literal null}. - * @return list of {@link ByteRecord} that changed user. - * @see Redis Documentation: XAUTOCLAIM - * @since 2.3 - */ - @Nullable - ClaimedMessages xAutoclaim(byte[] key, String group, String newOwner, Duration minIdleTime, String start, Long count); - - /** * Removes the records with the given id's from the stream. Returns the number of items deleted, that may be different * from the number of id's passed in case certain id's do not exist. diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterStreamCommands.java index b222958ae2..05cadae499 100644 --- a/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterStreamCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterStreamCommands.java @@ -17,8 +17,6 @@ import static org.springframework.data.redis.connection.jedis.StreamConverters.*; -import org.springframework.data.redis.connection.stream.ClaimedMessages; -import org.springframework.data.redis.connection.stream.ClaimedMessagesIds; import redis.clients.jedis.BuilderFactory; import redis.clients.jedis.params.XAddParams; import redis.clients.jedis.params.XClaimParams; @@ -26,7 +24,6 @@ import redis.clients.jedis.params.XReadGroupParams; import redis.clients.jedis.params.XReadParams; -import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -134,26 +131,6 @@ public List xClaim(byte[] key, String group, String newOwner, XClaim } } - @Override - public ClaimedMessagesIds xAutoclaimJustId(byte[] key, String group, String newOwner, Duration minIdleTime, String start) { - return null; - } - - @Override - public ClaimedMessagesIds xAutoclaimJustId(byte[] key, String group, String newOwner, Duration minIdleTime, String start, Long count) { - return null; - } - - @Override - public ClaimedMessages xAutoclaim(byte[] key, String group, String newOwner, Duration minIdleTime, String start) { - return null; - } - - @Override - public ClaimedMessages xAutoclaim(byte[] key, String group, String newOwner, Duration minIdleTime, String start, Long count) { - return null; - } - @Override public Long xDel(byte[] key, RecordId... recordIds) { diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/JedisStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/jedis/JedisStreamCommands.java index cd37aca18b..c3ecbe8255 100644 --- a/src/main/java/org/springframework/data/redis/connection/jedis/JedisStreamCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/jedis/JedisStreamCommands.java @@ -15,14 +15,11 @@ */ package org.springframework.data.redis.connection.jedis; -import org.springframework.data.redis.connection.stream.ClaimedMessages; -import org.springframework.data.redis.connection.stream.ClaimedMessagesIds; import redis.clients.jedis.BuilderFactory; import redis.clients.jedis.Jedis; import redis.clients.jedis.commands.PipelineBinaryCommands; import redis.clients.jedis.commands.StreamPipelineBinaryCommands; import redis.clients.jedis.params.XAddParams; -import redis.clients.jedis.params.XAutoClaimParams; import redis.clients.jedis.params.XClaimParams; import redis.clients.jedis.params.XPendingParams; import redis.clients.jedis.params.XReadGroupParams; @@ -30,7 +27,6 @@ import redis.clients.jedis.resps.StreamConsumersInfo; import redis.clients.jedis.resps.StreamGroupInfo; -import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -120,74 +116,6 @@ public List xClaim(byte[] key, String group, String newOwner, XClaim .get(r -> StreamConverters.convertToByteRecord(key, r)); } - @Override - public ClaimedMessagesIds xAutoclaimJustId(byte[] key, String group, String newOwner, Duration minIdleTime, String start) { - - Assert.notNull(key, "Key must not be null"); - Assert.notNull(group, "Group must not be null"); - Assert.notNull(minIdleTime, "MinIdleTime must not be null"); - Assert.notNull(start, "Start must not be null"); - - XAutoClaimParams params = XAutoClaimParams.xAutoClaimParams(); - - final List byteRecords = connection.invoke() - .from(Jedis::xautoclaimJustId, ResponseCommands::xautoclaimJustId, key, JedisConverters.toBytes(group), - JedisConverters.toBytes(newOwner), minIdleTime.toMillis(), JedisConverters.toBytes(start), - params).get(r -> StreamConverters.convertToByteRecord(key, r)); - - return null; - } - - @Override - public ClaimedMessagesIds xAutoclaimJustId(byte[] key, String group, String newOwner, Duration minIdleTime, String start, Long count) { - Assert.notNull(key, "Key must not be null"); - Assert.notNull(group, "Group must not be null"); - Assert.notNull(minIdleTime, "MinIdleTime must not be null"); - Assert.notNull(start, "Start must not be null"); - Assert.notNull(count, "Count must not be null"); - - XAutoClaimParams params = XAutoClaimParams.xAutoClaimParams().count(count.intValue()); - - final List byteRecords = connection.invoke() - .from(Jedis::xautoclaimJustId, ResponseCommands::xautoclaimJustId, key, JedisConverters.toBytes(group), - JedisConverters.toBytes(newOwner), minIdleTime.toMillis(), JedisConverters.toBytes(start), - params).get(r -> StreamConverters.convertToByteRecord(key, r)); - - return null; - } - - @Override - public ClaimedMessages xAutoclaim(byte[] key, String group, String newOwner, Duration minIdleTime, String start) { - Assert.notNull(key, "Key must not be null"); - Assert.notNull(group, "Group must not be null"); - Assert.notNull(newOwner, "NewOwner must not be null"); - - XAutoClaimParams params = XAutoClaimParams.xAutoClaimParams(); - - final List br= connection.invoke() - .from(Jedis::xautoclaim, ResponseCommands::xautoclaim, key, JedisConverters.toBytes(group), - JedisConverters.toBytes(newOwner), minIdleTime.toMillis(), JedisConverters.toBytes(start), - params).get(r -> StreamConverters.convertToByteRecord(key, r)); - - return null; - } - - @Override - public ClaimedMessages xAutoclaim(byte[] key, String group, String newOwner, Duration minIdleTime, String start, Long count) { - Assert.notNull(key, "Key must not be null"); - Assert.notNull(group, "Group must not be null"); - Assert.notNull(newOwner, "NewOwner must not be null"); - - XAutoClaimParams params = XAutoClaimParams.xAutoClaimParams().count(count.intValue()); - - final List br= connection.invoke() - .from(Jedis::xautoclaim, ResponseCommands::xautoclaim, key, JedisConverters.toBytes(group), - JedisConverters.toBytes(newOwner), minIdleTime.toMillis(), JedisConverters.toBytes(start), - params).get(r -> StreamConverters.convertToByteRecord(key, r)); - - return null; - } - @Override public Long xDel(byte[] key, RecordId... recordIds) { diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/StreamConverters.java b/src/main/java/org/springframework/data/redis/connection/jedis/StreamConverters.java index c9587118b9..60d16a1f4a 100644 --- a/src/main/java/org/springframework/data/redis/connection/jedis/StreamConverters.java +++ b/src/main/java/org/springframework/data/redis/connection/jedis/StreamConverters.java @@ -18,7 +18,6 @@ import redis.clients.jedis.BuilderFactory; import redis.clients.jedis.StreamEntryID; import redis.clients.jedis.params.XAddParams; -import redis.clients.jedis.params.XAutoClaimParams; import redis.clients.jedis.params.XClaimParams; import redis.clients.jedis.params.XPendingParams; import redis.clients.jedis.params.XReadGroupParams; @@ -264,17 +263,6 @@ public static XClaimParams toXClaimParams(RedisStreamCommands.XClaimOptions opti return params; } - public static XAutoClaimParams toXautoClaimParams(Integer count) { - - XAutoClaimParams params = XAutoClaimParams.xAutoClaimParams(); - - if (count != null) { - params.count(count); - } - - return params; - } - public static XReadParams toXReadParams(StreamReadOptions readOptions) { XReadParams params = XReadParams.xReadParams(); diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceStreamCommands.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceStreamCommands.java index 97dfaf9f3d..7f15f68e8e 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceStreamCommands.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceStreamCommands.java @@ -16,14 +16,12 @@ package org.springframework.data.redis.connection.lettuce; import io.lettuce.core.XAddArgs; -import io.lettuce.core.XAutoClaimArgs; import io.lettuce.core.XClaimArgs; import io.lettuce.core.XGroupCreateArgs; import io.lettuce.core.XReadArgs; import io.lettuce.core.api.async.RedisStreamAsyncCommands; import io.lettuce.core.cluster.api.async.RedisClusterAsyncCommands; -import java.time.Duration; import java.util.Arrays; import java.util.List; import java.util.Objects; @@ -33,8 +31,6 @@ import org.springframework.data.redis.connection.Limit; import org.springframework.data.redis.connection.RedisStreamCommands; import org.springframework.data.redis.connection.stream.ByteRecord; -import org.springframework.data.redis.connection.stream.ClaimedMessages; -import org.springframework.data.redis.connection.stream.ClaimedMessagesIds; import org.springframework.data.redis.connection.stream.Consumer; import org.springframework.data.redis.connection.stream.MapRecord; import org.springframework.data.redis.connection.stream.PendingMessages; @@ -54,7 +50,6 @@ * @author Dejan Jankov * @author Dengliming * @author Mark John Moreno - * @author Krzysztof Kocel * @since 2.2 */ class LettuceStreamCommands implements RedisStreamCommands { @@ -122,50 +117,6 @@ public List xClaim(byte[] key, String group, String newOwner, XClaim .toList(StreamConverters.byteRecordConverter()); } - @Override - public ClaimedMessagesIds xAutoclaimJustId(byte[] key, String group, String newOwner, Duration minIdleTime, String start) { - - io.lettuce.core.Consumer from = io.lettuce.core.Consumer.from(LettuceConverters.toBytes(group), - LettuceConverters.toBytes(newOwner)); - XAutoClaimArgs args = XAutoClaimArgs.Builder.justid(from, minIdleTime, start); - - return connection.invoke().from(RedisStreamAsyncCommands::xautoclaim, key, args) - .get(StreamConverters.claimedMessageJustIdConverter()); - } - - @Override - public ClaimedMessagesIds xAutoclaimJustId(byte[] key, String group, String newOwner, Duration minIdleTime, String start, Long count) { - - io.lettuce.core.Consumer from = io.lettuce.core.Consumer.from(LettuceConverters.toBytes(group), - LettuceConverters.toBytes(newOwner)); - XAutoClaimArgs args = XAutoClaimArgs.Builder.justid(from, minIdleTime, start).count(count); - - return connection.invoke().from(RedisStreamAsyncCommands::xautoclaim, key, args) - .get(StreamConverters.claimedMessageJustIdConverter()); - } - - @Override - public ClaimedMessages xAutoclaim(byte[] key, String group, String newOwner, Duration minIdleTime, String start) { - - io.lettuce.core.Consumer from = io.lettuce.core.Consumer.from(LettuceConverters.toBytes(group), - LettuceConverters.toBytes(newOwner)); - XAutoClaimArgs args = XAutoClaimArgs.Builder.xautoclaim(from, minIdleTime, start); - - return connection.invoke().from(RedisStreamAsyncCommands::xautoclaim, key, args) - .get(StreamConverters.claimedMessageConverter()); - } - - @Override - public ClaimedMessages xAutoclaim(byte[] key, String group, String newOwner, Duration minIdleTime, String start, Long count) { - - io.lettuce.core.Consumer from = io.lettuce.core.Consumer.from(LettuceConverters.toBytes(group), - LettuceConverters.toBytes(newOwner)); - XAutoClaimArgs args = XAutoClaimArgs.Builder.xautoclaim(from, minIdleTime, start).count(count); - - return connection.invoke().from(RedisStreamAsyncCommands::xautoclaim, key, args) - .get(StreamConverters.claimedMessageConverter()); - } - @Override public Long xDel(byte[] key, RecordId... recordIds) { diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/StreamConverters.java b/src/main/java/org/springframework/data/redis/connection/lettuce/StreamConverters.java index a3955fa40f..a243a44748 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/StreamConverters.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/StreamConverters.java @@ -18,13 +18,11 @@ import io.lettuce.core.StreamMessage; import io.lettuce.core.XClaimArgs; import io.lettuce.core.XReadArgs; -import io.lettuce.core.models.stream.ClaimedMessages; import io.lettuce.core.models.stream.PendingMessage; import io.lettuce.core.models.stream.PendingMessages; import java.time.Duration; import java.util.List; -import java.util.stream.Collectors; import org.springframework.core.convert.converter.Converter; import org.springframework.data.redis.connection.RedisStreamCommands.XClaimOptions; @@ -73,14 +71,6 @@ static Converter, ByteRecord> byteRecordConverter( return (it) -> StreamRecords.newRecord().in(it.getStream()).withId(it.getId()).ofBytes(it.getBody()); } - static Converter, org.springframework.data.redis.connection.stream.ClaimedMessages> claimedMessageConverter() { - return (it) -> new org.springframework.data.redis.connection.stream.ClaimedMessages(RecordId.of(it.getId()), it.getMessages().stream().map((message) -> StreamConverters.byteRecordConverter().convert(message)).toList(), List.of()); - } - - static Converter, org.springframework.data.redis.connection.stream.ClaimedMessagesIds> claimedMessageJustIdConverter() { - return (it) -> new org.springframework.data.redis.connection.stream.ClaimedMessagesIds(RecordId.of(it.getId()), it.getMessages().stream().map((message) -> RecordId.of(message.getId())).toList(), List.of()); - } - /** * Convert the raw Lettuce xpending result to {@link PendingMessages}. * diff --git a/src/main/java/org/springframework/data/redis/connection/stream/ClaimedMessages.java b/src/main/java/org/springframework/data/redis/connection/stream/ClaimedMessages.java deleted file mode 100644 index 85db116d3d..0000000000 --- a/src/main/java/org/springframework/data/redis/connection/stream/ClaimedMessages.java +++ /dev/null @@ -1,43 +0,0 @@ -package org.springframework.data.redis.connection.stream; - -import java.util.List; - -public class ClaimedMessages { - private final RecordId id; - private final List claimedMessages; - private final List deletedMessages; - - public ClaimedMessages(RecordId id, List claimedMessages, List deletedMessages) { - this.id = id; - this.claimedMessages = claimedMessages; - this.deletedMessages = deletedMessages; - } - - /** - * @return the message id. - */ - public RecordId getId() { - return id; - } - - /** - * @return the message id as {@link String}. - */ - public String getIdAsString() { - return id.getValue(); - } - - /** - * @return list of claimed messages. - */ - List getClaimedMessages() { - return claimedMessages; - } - - /** - * @return list of deleted messages. - */ - List getDeletedMessages() { - return deletedMessages; - } -} diff --git a/src/main/java/org/springframework/data/redis/connection/stream/ClaimedMessagesIds.java b/src/main/java/org/springframework/data/redis/connection/stream/ClaimedMessagesIds.java deleted file mode 100644 index 3dec9b604d..0000000000 --- a/src/main/java/org/springframework/data/redis/connection/stream/ClaimedMessagesIds.java +++ /dev/null @@ -1,43 +0,0 @@ -package org.springframework.data.redis.connection.stream; - -import java.util.List; - -public class ClaimedMessagesIds { - private final RecordId id; - private final List claimedMessages; - private final List deletedMessages; - - public ClaimedMessagesIds(RecordId id, List claimedMessages, List deletedMessages) { - this.id = id; - this.claimedMessages = claimedMessages; - this.deletedMessages = deletedMessages; - } - - /** - * @return the message id. - */ - public RecordId getId() { - return id; - } - - /** - * @return the message id as {@link String}. - */ - public String getIdAsString() { - return id.getValue(); - } - - /** - * @return list of claimed messages. - */ - List getClaimedMessages() { - return claimedMessages; - } - - /** - * @return list of deleted messages. - */ - List getDeletedMessages() { - return deletedMessages; - } -} From 5de03fdd467d58db85f1a8bc2a8112d1ac956ce3 Mon Sep 17 00:00:00 2001 From: Krzysztof Kocel Date: Mon, 9 Dec 2024 15:46:10 +0100 Subject: [PATCH 5/6] Add lag and entries-read to StreamInfo (XINFO GROUPS) --- .../data/redis/connection/stream/StreamInfo.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/springframework/data/redis/connection/stream/StreamInfo.java b/src/main/java/org/springframework/data/redis/connection/stream/StreamInfo.java index 3eb0f1747e..327c77a4bd 100644 --- a/src/main/java/org/springframework/data/redis/connection/stream/StreamInfo.java +++ b/src/main/java/org/springframework/data/redis/connection/stream/StreamInfo.java @@ -219,6 +219,8 @@ public Map getLastEntry() { * {@literal Redis Stream}. * * @author Christoph Strobl + * @author Mark Paluch + * @author Krzysztof Kocel */ public static class XInfoGroups implements Streamable { @@ -379,7 +381,7 @@ public Long entriesRead() { */ @Nullable public Long lag() { - return get("entries-read", Long.class); + return get("lag", Long.class); } } From f645ed9ab24ba02cb42f08832d4cb72cf7c4e406 Mon Sep 17 00:00:00 2001 From: Krzysztof Kocel Date: Mon, 9 Dec 2024 16:34:00 +0100 Subject: [PATCH 6/6] fix tests --- .../data/redis/connection/stream/StreamInfo.java | 3 ++- .../redis/connection/AbstractConnectionIntegrationTests.java | 4 ++-- .../LettuceReactiveStreamCommandsIntegrationTests.java | 4 ++-- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/src/main/java/org/springframework/data/redis/connection/stream/StreamInfo.java b/src/main/java/org/springframework/data/redis/connection/stream/StreamInfo.java index 327c77a4bd..19fd29963d 100644 --- a/src/main/java/org/springframework/data/redis/connection/stream/StreamInfo.java +++ b/src/main/java/org/springframework/data/redis/connection/stream/StreamInfo.java @@ -369,8 +369,9 @@ public String lastDeliveredId() { * * @return */ + @Nullable public Long entriesRead() { - return getRequired("entries-read", Long.class); + return get("entries-read", Long.class); } /** diff --git a/src/test/java/org/springframework/data/redis/connection/AbstractConnectionIntegrationTests.java b/src/test/java/org/springframework/data/redis/connection/AbstractConnectionIntegrationTests.java index 87c659d19c..36a16d8f90 100644 --- a/src/test/java/org/springframework/data/redis/connection/AbstractConnectionIntegrationTests.java +++ b/src/test/java/org/springframework/data/redis/connection/AbstractConnectionIntegrationTests.java @@ -4057,8 +4057,8 @@ public void xinfoGroupsNoConsumer() { assertThat(info.get(0).groupName()).isEqualTo("my-group"); assertThat(info.get(0).consumerCount()).isZero(); assertThat(info.get(0).pendingCount()).isZero(); - assertThat(info.get(0).lag()).isZero(); - assertThat(info.get(0).entriesRead()).isZero(); + assertThat(info.get(0).lag()).isEqualTo(2); + assertThat(info.get(0).entriesRead()).isNull(); assertThat(info.get(0).lastDeliveredId()).isEqualTo("0-0"); } diff --git a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommandsIntegrationTests.java b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommandsIntegrationTests.java index 757e718168..a98a3b6350 100644 --- a/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommandsIntegrationTests.java +++ b/src/test/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommandsIntegrationTests.java @@ -457,8 +457,8 @@ void xinfoGroupsNoConsumer() { assertThat(info.groupName()).isEqualTo("my-group"); assertThat(info.consumerCount()).isZero(); assertThat(info.pendingCount()).isZero(); - assertThat(info.entriesRead()).isZero(); - assertThat(info.lag()).isZero(); + assertThat(info.entriesRead()).isNull(); + assertThat(info.lag()).isEqualTo(2); assertThat(info.lastDeliveredId()).isEqualTo("0-0"); }).verifyComplete(); }