Skip to content

Commit 63532d6

Browse files
authored
bugfix: reading from a silent stream should not fail with RedisCommandTimeoutException (#952)
If the stream has no data for a long time, lettuce will throw `RedisCommandTimeoutException`. In that case we don't want to fail the stream and we should restart the read.
1 parent 3d61ffe commit 63532d6

File tree

10 files changed

+228
-44
lines changed

10 files changed

+228
-44
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Copyright 2018-2021 ProfunKtor
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package dev.profunktor.redis4cats
18+
19+
import cats.MonadThrow
20+
import cats.effect.kernel.Clock
21+
import cats.kernel.Monoid
22+
import cats.syntax.all._
23+
import io.lettuce.core.RedisCommandTimeoutException
24+
25+
import scala.concurrent.duration.FiniteDuration
26+
27+
/**
28+
* Configures restarting operations in case they time out.
29+
*
30+
* This is useful because Lettuce (the underlying Java client) does time out some operations if they do not send
31+
* any data, like reading from a stream.
32+
*/
33+
trait RestartOnTimeout {
34+
35+
/**
36+
* @param elapsed amount of time elapsed from the start of operation
37+
* @return `true` if the operation should be restarted
38+
*/
39+
def apply(elapsed: FiniteDuration): Boolean
40+
41+
/** Wraps the given operation into a restart loop. */
42+
def wrap[F[_], A](fa: F[A])(implicit clock: Clock[F], monadThrow: MonadThrow[F], monoid: Monoid[F[A]]): F[A] = {
43+
val currentTime = clock.monotonic
44+
45+
def onTimeout(startedAt: FiniteDuration): F[A] =
46+
for {
47+
now <- currentTime
48+
elapsed = now - startedAt
49+
restart = apply(elapsed)
50+
a <- if (restart) doOp else monoid.empty
51+
} yield a
52+
53+
def doOp: F[A] =
54+
for {
55+
startedAt <- currentTime
56+
a <- fa.recoverWith { case _: RedisCommandTimeoutException => onTimeout(startedAt) }
57+
} yield a
58+
59+
doOp
60+
}
61+
}
62+
object RestartOnTimeout {
63+
64+
/** Always restart. */
65+
def always: RestartOnTimeout = _ => true
66+
67+
/** Never restart. */
68+
def never: RestartOnTimeout = _ => false
69+
70+
/** Restart if the elapsed time is less than the given duration. */
71+
def ifBefore(duration: FiniteDuration): RestartOnTimeout = elapsed => elapsed < duration
72+
73+
/** Restart if the elapsed time is greater than the given duration. */
74+
def ifAfter(duration: FiniteDuration): RestartOnTimeout = elapsed => elapsed > duration
75+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Copyright 2018-2021 ProfunKtor
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package dev.profunktor.redis4cats
18+
19+
import cats.{ Applicative }
20+
import cats.effect.kernel.Clock
21+
import fs2.Stream
22+
23+
import scala.concurrent.duration.FiniteDuration
24+
25+
object StreamsInstances {
26+
implicit def fs2Clock[F[_]: Clock]: Clock[Stream[F, *]] = new Clock[Stream[F, *]] {
27+
override def applicative: Applicative[Stream[F, *]] = Applicative[Stream[F, *]]
28+
override def monotonic: Stream[F, FiniteDuration] = Stream.eval(Clock[F].monotonic)
29+
override def realTime: Stream[F, FiniteDuration] = Stream.eval(Clock[F].realTime)
30+
}
31+
}

modules/streams/src/main/scala/dev/profunktor/redis4cats/pubsub/PubSubCommands.scala

+10
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,19 @@ trait PublishCommands[F[_], S[_], K, V] extends PubSubStats[F, K] {
4848
* @tparam V the value type
4949
*/
5050
trait SubscribeCommands[F[_], S[_], K, V] {
51+
52+
/**
53+
* Subscribes to a channel.
54+
*/
5155
def subscribe(channel: RedisChannel[K]): S[V]
56+
5257
def unsubscribe(channel: RedisChannel[K]): F[Unit]
58+
59+
/**
60+
* Subscribes to a pattern.
61+
*/
5362
def psubscribe(channel: RedisPattern[K]): S[RedisPatternEvent[K, V]]
63+
5464
def punsubscribe(channel: RedisPattern[K]): F[Unit]
5565
}
5666

modules/streams/src/main/scala/dev/profunktor/redis4cats/pubsub/internals/Subscriber.scala

+12-6
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,7 @@ import cats.Applicative
2222
import cats.effect.kernel._
2323
import cats.effect.kernel.implicits._
2424
import cats.syntax.all._
25-
import dev.profunktor.redis4cats.data.RedisChannel
26-
import dev.profunktor.redis4cats.data.RedisPattern
27-
import dev.profunktor.redis4cats.data.RedisPatternEvent
25+
import dev.profunktor.redis4cats.data.{ RedisChannel, RedisPattern, RedisPatternEvent }
2826
import dev.profunktor.redis4cats.effect.{ FutureLift, Log }
2927
import fs2.Stream
3028
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection
@@ -37,7 +35,10 @@ private[pubsub] class Subscriber[F[_]: Async: FutureLift: Log, K, V](
3735
override def subscribe(channel: RedisChannel[K]): Stream[F, V] =
3836
Stream
3937
.resource(Resource.eval(state.get) >>= PubSubInternals.channel[F, K, V](state, subConnection).apply(channel))
40-
.evalTap(_ => FutureLift[F].lift(subConnection.async().subscribe(channel.underlying)))
38+
.evalTap(_ =>
39+
FutureLift[F]
40+
.lift(subConnection.async().subscribe(channel.underlying))
41+
)
4142
.flatMap(_.subscribe(500).unNone)
4243

4344
override def unsubscribe(channel: RedisChannel[K]): F[Unit] =
@@ -49,10 +50,15 @@ private[pubsub] class Subscriber[F[_]: Async: FutureLift: Log, K, V](
4950
.update(s => s.copy(channels = s.channels - channel.underlying))
5051
})
5152

52-
override def psubscribe(pattern: RedisPattern[K]): Stream[F, RedisPatternEvent[K, V]] =
53+
override def psubscribe(
54+
pattern: RedisPattern[K]
55+
): Stream[F, RedisPatternEvent[K, V]] =
5356
Stream
5457
.resource(Resource.eval(state.get) >>= PubSubInternals.pattern[F, K, V](state, subConnection).apply(pattern))
55-
.evalTap(_ => FutureLift[F].lift(subConnection.async().psubscribe(pattern.underlying)))
58+
.evalTap(_ =>
59+
FutureLift[F]
60+
.lift(subConnection.async().psubscribe(pattern.underlying))
61+
)
5662
.flatMap(_.subscribe(500).unNone)
5763

5864
override def punsubscribe(pattern: RedisPattern[K]): F[Unit] =

modules/streams/src/main/scala/dev/profunktor/redis4cats/streams/Fs2RawStreaming.scala

+8-7
Original file line numberDiff line numberDiff line change
@@ -59,15 +59,15 @@ private[streams] class RedisRawStreaming[F[_]: FutureLift: Sync, K, V](
5959
streams: Set[StreamingOffset[K]],
6060
block: Option[Duration] = Some(Duration.Zero),
6161
count: Option[Long] = None
62-
): F[List[XReadMessage[K, V]]] =
62+
): F[List[XReadMessage[K, V]]] = {
63+
val offsets = streams.map {
64+
case All(key) => StreamOffset.from(key, "0")
65+
case Latest(key) => StreamOffset.latest(key)
66+
case Custom(key, offset) => StreamOffset.from(key, offset)
67+
}.toSeq
68+
6369
FutureLift[F]
6470
.lift {
65-
val offsets = streams.map {
66-
case All(key) => StreamOffset.from(key, "0")
67-
case Latest(key) => StreamOffset.latest(key)
68-
case Custom(key, offset) => StreamOffset.from(key, offset)
69-
}.toSeq
70-
7171
(block, count) match {
7272
case (None, None) => client.async().xread(offsets: _*)
7373
case (None, Some(count)) => client.async().xread(XReadArgs.Builder.count(count), offsets: _*)
@@ -81,5 +81,6 @@ private[streams] class RedisRawStreaming[F[_]: FutureLift: Sync, K, V](
8181
XReadMessage[K, V](MessageId(msg.getId), msg.getStream, msg.getBody.asScala.toMap)
8282
}
8383
}
84+
}
8485

8586
}

modules/streams/src/main/scala/dev/profunktor/redis4cats/streams/Fs2Streaming.scala

+23-15
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717
package dev.profunktor.redis4cats
1818
package streams
1919

20-
import scala.concurrent.duration.Duration
21-
2220
import cats.effect.kernel._
2321
import cats.syntax.all._
2422
import dev.profunktor.redis4cats.connection._
@@ -27,6 +25,9 @@ import dev.profunktor.redis4cats.effect.{ FutureLift, Log }
2725
import dev.profunktor.redis4cats.streams.data._
2826
import fs2.Stream
2927
import io.lettuce.core.{ ReadFrom => JReadFrom }
28+
import dev.profunktor.redis4cats.StreamsInstances._
29+
30+
import scala.concurrent.duration.Duration
3031

3132
object RedisStream {
3233

@@ -70,11 +71,14 @@ object RedisStream {
7071

7172
class RedisStream[F[_]: Sync, K, V](rawStreaming: RedisRawStreaming[F, K, V]) extends Streaming[F, Stream[F, *], K, V] {
7273

73-
private[streams] val nextOffset: K => XReadMessage[K, V] => StreamingOffset[K] =
74-
key => msg => StreamingOffset.Custom(key, msg.id.value)
74+
private[streams] def nextOffset(key: K, msg: XReadMessage[K, V]): StreamingOffset[K] =
75+
StreamingOffset.Custom(key, msg.id.value)
7576

76-
private[streams] val offsetsByKey: List[XReadMessage[K, V]] => Map[K, Option[StreamingOffset[K]]] =
77-
list => list.groupBy(_.key).map { case (k, values) => k -> values.lastOption.map(nextOffset(k)) }
77+
private[streams] def offsetsByKey(iter: Iterable[XReadMessage[K, V]]): Iterator[(K, StreamingOffset[K])] = {
78+
val map = collection.mutable.Map.empty[K, XReadMessage[K, V]]
79+
iter.iterator.foreach(msg => map += msg.key -> msg)
80+
map.iterator.map { case (key, msg) => key -> nextOffset(key, msg) }
81+
}
7882

7983
override def append: Stream[F, XAddMessage[K, V]] => Stream[F, MessageId] =
8084
_.evalMap(append)
@@ -86,18 +90,22 @@ class RedisStream[F[_]: Sync, K, V](rawStreaming: RedisRawStreaming[F, K, V]) ex
8690
keys: Set[K],
8791
chunkSize: Int,
8892
initialOffset: K => StreamingOffset[K],
89-
block: Option[Duration] = Some(Duration.Zero),
90-
count: Option[Long] = None
93+
block: Option[Duration],
94+
count: Option[Long],
95+
restartOnTimeout: RestartOnTimeout
9196
): Stream[F, XReadMessage[K, V]] = {
9297
val initial = keys.map(k => k -> initialOffset(k)).toMap
9398
Stream.eval(Ref.of[F, Map[K, StreamingOffset[K]]](initial)).flatMap { ref =>
94-
(for {
95-
offsets <- Stream.eval(ref.get)
96-
list <- Stream.eval(rawStreaming.xRead(offsets.values.toSet, block, count))
97-
newOffsets = offsetsByKey(list).collect { case (key, Some(value)) => key -> value }.toList
98-
_ <- Stream.eval(newOffsets.map { case (k, v) => ref.update(_.updated(k, v)) }.sequence)
99-
result <- Stream.fromIterator[F](list.iterator, chunkSize)
100-
} yield result).repeat
99+
def withoutRestarts =
100+
(for {
101+
offsets <- Stream.eval(ref.get)
102+
list <- Stream.eval(rawStreaming.xRead(offsets.values.toSet, block, count))
103+
offsetUpdates = offsetsByKey(list)
104+
_ <- Stream.eval(ref.update(map => offsetUpdates.foldLeft(map) { case (acc, (k, v)) => acc.updated(k, v) }))
105+
result <- Stream.fromIterator[F](list.iterator, chunkSize)
106+
} yield result).repeat
107+
108+
restartOnTimeout.wrap(withoutRestarts)
101109
}
102110
}
103111

modules/streams/src/main/scala/dev/profunktor/redis4cats/streams/streams.scala

+9-1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package dev.profunktor.redis4cats.streams
1818

19+
import dev.profunktor.redis4cats.RestartOnTimeout
1920
import dev.profunktor.redis4cats.streams.data._
2021

2122
import scala.concurrent.duration.Duration
@@ -51,11 +52,18 @@ trait Streaming[F[_], S[_], K, V] {
5152

5253
def append(msg: XAddMessage[K, V]): F[MessageId]
5354

55+
/**
56+
* Read data from one or multiple streams, only returning entries with an ID greater than the last
57+
* received ID reported by the caller.
58+
*
59+
* @see https://redis.io/commands/xread
60+
*/
5461
def read(
5562
keys: Set[K],
5663
chunkSize: Int,
5764
initialOffset: K => StreamingOffset[K] = StreamingOffset.All[K],
5865
block: Option[Duration] = Some(Duration.Zero),
59-
count: Option[Long] = None
66+
count: Option[Long] = None,
67+
restartOnTimeout: RestartOnTimeout = RestartOnTimeout.always
6068
): S[XReadMessage[K, V]]
6169
}

modules/tests/src/test/scala/dev/profunktor/redis4cats/Redis4CatsFunSuite.scala

+34-15
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,16 @@ package dev.profunktor.redis4cats
1818

1919
import cats.effect._
2020
import cats.syntax.all._
21-
import dev.profunktor.redis4cats.Redis4CatsFunSuite.Fs2Streaming
21+
import dev.profunktor.redis4cats.Redis4CatsFunSuite.{ Fs2PubSub, Fs2Streaming }
2222
import dev.profunktor.redis4cats.connection._
2323
import dev.profunktor.redis4cats.data.RedisCodec
2424
import dev.profunktor.redis4cats.effect.Log.NoOp._
25+
import dev.profunktor.redis4cats.pubsub.{ PubSub, PubSubCommands }
2526
import dev.profunktor.redis4cats.streams.{ RedisStream, Streaming }
27+
import io.lettuce.core.{ ClientOptions, TimeoutOptions }
2628

27-
import scala.concurrent.duration.Duration
29+
import scala.concurrent.duration.{ Duration, DurationInt }
2830
import scala.concurrent.{ Await, Future }
29-
import dev.profunktor.redis4cats.pubsub.{ PubSub, PubSubCommands }
30-
import dev.profunktor.redis4cats.Redis4CatsFunSuite.Fs2PubSub
3131

3232
abstract class Redis4CatsFunSuite(isCluster: Boolean) extends IOSuite {
3333

@@ -54,24 +54,43 @@ abstract class Redis4CatsFunSuite(isCluster: Boolean) extends IOSuite {
5454
RedisClient[IO].from("redis://localhost").use(f).as(assert(true)).unsafeToFuture()
5555

5656
def withRedisPubSub(f: Fs2PubSub[String, String] => IO[Unit]): Future[Unit] =
57-
(for {
58-
client <- fs2.Stream.resource(RedisClient[IO].from("redis://localhost"))
59-
pubSub <- fs2.Stream.resource(PubSub.mkPubSubConnection[IO, String, String](client, stringCodec))
60-
_ <- fs2.Stream.eval(f(pubSub))
61-
} yield ()).compile.drain.void.unsafeToFuture()
57+
withRedisPubSubOptionsResource(ClientOptions.create()).use(f).unsafeToFuture()
58+
59+
def withRedisPubSubOptionsResource(options: ClientOptions): Resource[IO, Fs2PubSub[String, String]] =
60+
for {
61+
client <- RedisClient[IO].withOptions("redis://localhost", options)
62+
pubSub <- PubSub.mkPubSubConnection[IO, String, String](client, stringCodec)
63+
} yield pubSub
6264

6365
def withRedisStream(f: (Fs2Streaming[String, String], Fs2Streaming[String, String]) => IO[Unit]): Future[Unit] =
64-
(for {
65-
client <- fs2.Stream.resource(RedisClient[IO].from("redis://localhost"))
66-
readStream <- RedisStream.mkStreamingConnection[IO, String, String](client, stringCodec)
67-
writeStream <- RedisStream.mkStreamingConnection[IO, String, String](client, stringCodec)
68-
_ <- fs2.Stream.eval(f(readStream, writeStream))
69-
} yield ()).compile.drain.void.unsafeToFuture()
66+
withRedisStreamOptionsResource(ClientOptions.create())
67+
.use { case (readStream, writeStream) => f(readStream, writeStream) }
68+
.unsafeToFuture()
69+
70+
def withRedisStreamOptionsResource(
71+
options: ClientOptions
72+
): Resource[IO, (Fs2Streaming[String, String], Fs2Streaming[String, String])] =
73+
for {
74+
client <- RedisClient[IO].withOptions("redis://localhost", options)
75+
readStream <- RedisStream.mkStreamingConnectionResource[IO, String, String](client, stringCodec)
76+
writeStream <- RedisStream.mkStreamingConnectionResource[IO, String, String](client, stringCodec)
77+
} yield (readStream, writeStream)
7078

7179
private def flushAll(): Future[Unit] =
7280
if (isCluster) withRedisCluster(_.flushAll)
7381
else withRedis(_.flushAll)
7482

83+
def timeoutingOperationTest[A](
84+
f: (ClientOptions, RestartOnTimeout) => fs2.Stream[IO, A]
85+
): IO[Unit] = {
86+
val options = ClientOptions
87+
.builder()
88+
.timeoutOptions(TimeoutOptions.builder().fixedTimeout(java.time.Duration.ofMillis(250)).build())
89+
.build()
90+
91+
f(options, RestartOnTimeout.always).interruptAfter(750.millis).compile.drain
92+
}
93+
7594
// --- Cluster ---
7695

7796
lazy val redisUri = List(

modules/tests/src/test/scala/dev/profunktor/redis4cats/RedisPubSubSpec.scala

+16
Original file line numberDiff line numberDiff line change
@@ -55,4 +55,20 @@ class RedisPubSubSpec extends Redis4CatsFunSuite(isCluster = false) {
5555
actualIO.map(assertEquals(_, expected))
5656
}
5757
}
58+
59+
test("subscribing to a silent channel should not fail with RedisCommandTimeoutException") {
60+
timeoutingOperationTest { (options, _) =>
61+
fs2.Stream.resource(withRedisPubSubOptionsResource(options)).flatMap { pubSub =>
62+
pubSub.subscribe(RedisChannel("test-sub-expiration"))
63+
}
64+
}
65+
}
66+
67+
test("subscribing to a silent pattern should not fail with RedisCommandTimeoutException") {
68+
timeoutingOperationTest { (options, _) =>
69+
fs2.Stream.resource(withRedisPubSubOptionsResource(options)).flatMap { pubSub =>
70+
pubSub.psubscribe(RedisPattern("test-sub-expiration"))
71+
}
72+
}
73+
}
5874
}

0 commit comments

Comments
 (0)