Skip to content

Commit 29e7129

Browse files
committed
#998 Client side caching
1 parent dd861e5 commit 29e7129

File tree

8 files changed

+101
-18
lines changed

8 files changed

+101
-18
lines changed

build.sbt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ val commonSettings = Seq(
4444
testFrameworks += new TestFramework("munit.Framework"),
4545
libraryDependencies ++= Seq(
4646
Libraries.catsEffectKernel,
47+
Libraries.catsEffectStd,
4748
Libraries.redisClient,
4849
Libraries.catsEffect % Test,
4950
Libraries.catsLaws % Test,
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* Copyright 2018-2021 ProfunKtor
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package dev.profunktor.redis4cats.caching
18+
19+
trait CacheAccessor[F[_], K, V] {
20+
def get(key: K): F[V]
21+
def put(key: K, value: V): F[Unit]
22+
def evict(key: K): F[Unit]
23+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Copyright 2018-2021 ProfunKtor
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package dev.profunktor.redis4cats.caching
18+
19+
import cats.effect.kernel.{ Async, Resource }
20+
import dev.profunktor.redis4cats.effect.TxExecutor
21+
import io.lettuce.core.TrackingArgs
22+
import io.lettuce.core.api.StatefulRedisConnection
23+
import io.lettuce.core.support.caching.{
24+
CacheAccessor => JCacheAccessor,
25+
CacheFrontend => JCacheFrontend,
26+
ClientSideCaching => JClientSideCaching
27+
}
28+
29+
object ClientSideCaching {
30+
31+
def make[F[_]: Async, K, V](
32+
connection: StatefulRedisConnection[K, V],
33+
args: TrackingArgs,
34+
cacheAccessor: CacheAccessor[F, K, V]
35+
): Resource[F, JCacheFrontend[K, V]] =
36+
TxExecutor.make[F].flatMap { redisExecutor =>
37+
Resource.make[F, JCacheFrontend[K, V]] {
38+
Async[F].delay {
39+
JClientSideCaching.enable(
40+
new JCacheAccessor[K, V] {
41+
override def get(key: K): V = redisExecutor.unsafeRun(cacheAccessor.get(key))
42+
override def put(key: K, value: V): Unit = redisExecutor.unsafeRun(cacheAccessor.put(key, value))
43+
override def evict(key: K): Unit = redisExecutor.unsafeRun(cacheAccessor.evict(key))
44+
},
45+
connection,
46+
args
47+
)
48+
}
49+
}(cacheFrontend => Async[F].delay(cacheFrontend.close()))
50+
}
51+
}

modules/core/src/main/scala/dev/profunktor/redis4cats/effect/TxExecutor.scala

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -29,26 +29,30 @@ import scala.util.control.NonFatal
2929

3030
import cats.effect.kernel._
3131
import cats.syntax.all._
32+
import cats.effect.std.Dispatcher
3233

3334
private[redis4cats] trait TxExecutor[F[_]] {
3435
def delay[A](thunk: => A): F[A]
3536
def eval[A](fa: F[A]): F[A]
3637
def start[A](fa: F[A]): F[Fiber[F, Throwable, A]]
37-
def liftK[G[_]: Async]: TxExecutor[G]
38+
def liftK[G[_]: Async: Dispatcher]: TxExecutor[G]
39+
def unsafeRun[A](fa: F[A]): A
3840
}
3941

4042
private[redis4cats] object TxExecutor {
4143
def make[F[_]: Async]: Resource[F, TxExecutor[F]] =
42-
Resource
43-
.make(Sync[F].delay(Executors.newFixedThreadPool(1, TxThreadFactory))) { ec =>
44-
Sync[F]
45-
.delay(ec.shutdownNow())
46-
.ensure(new IllegalStateException("There were outstanding tasks at time of shutdown of the Redis thread"))(
47-
_.isEmpty
48-
)
49-
.void
50-
}
51-
.map(es => fromEC(exitOnFatal(ExecutionContext.fromExecutorService(es))))
44+
Dispatcher.parallel[F].flatMap { dispatcher =>
45+
Resource
46+
.make(Sync[F].delay(Executors.newFixedThreadPool(1, TxThreadFactory))) { ec =>
47+
Sync[F]
48+
.delay(ec.shutdownNow())
49+
.ensure(new IllegalStateException("There were outstanding tasks at time of shutdown of the Redis thread"))(
50+
_.isEmpty
51+
)
52+
.void
53+
}
54+
.map(es => fromEC(exitOnFatal(ExecutionContext.fromExecutorService(es)))(dispatcher))
55+
}
5256

5357
private def exitOnFatal(ec: ExecutionContext): ExecutionContext = new ExecutionContext {
5458
def execute(r: Runnable): Unit =
@@ -70,11 +74,12 @@ private[redis4cats] object TxExecutor {
7074
ec.reportFailure(t)
7175
}
7276

73-
private def fromEC[F[_]: Async](ec: ExecutionContext): TxExecutor[F] =
77+
private def fromEC[F[_]: Async](ec: ExecutionContext)(dispatcher: Dispatcher[F]): TxExecutor[F] =
7478
new TxExecutor[F] {
7579
def delay[A](thunk: => A): F[A] = eval(Sync[F].delay(thunk))
7680
def eval[A](fa: F[A]): F[A] = Async[F].evalOn(fa, ec)
7781
def start[A](fa: F[A]): F[Fiber[F, Throwable, A]] = Async[F].startOn(fa, ec)
78-
def liftK[G[_]: Async]: TxExecutor[G] = fromEC[G](ec)
82+
def liftK[G[_]: Async: Dispatcher]: TxExecutor[G] = fromEC[G](ec)(implicitly)
83+
def unsafeRun[A](fa: F[A]): A = dispatcher.unsafeRunSync(fa)
7984
}
8085
}

modules/core/src/main/scala/dev/profunktor/redis4cats/tx/TxRunner.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ package dev.profunktor.redis4cats.tx
1818

1919
import cats.effect.kernel._
2020
import cats.effect.kernel.syntax.all._
21+
import cats.effect.std.Dispatcher
2122
import cats.syntax.all._
22-
2323
import dev.profunktor.redis4cats.effect.TxExecutor
2424

2525
private[redis4cats] trait TxRunner[F[_]] {
@@ -30,7 +30,7 @@ private[redis4cats] trait TxRunner[F[_]] {
3030
)(
3131
fs: TxStore[F, String, A] => List[F[Unit]]
3232
): F[Map[String, A]]
33-
def liftK[G[_]: Async]: TxRunner[G]
33+
def liftK[G[_]: Async: Dispatcher]: TxRunner[G]
3434
}
3535

3636
private[redis4cats] object TxRunner {
@@ -60,6 +60,6 @@ private[redis4cats] object TxRunner {
6060
} *> store.get
6161
}
6262

63-
def liftK[G[_]: Async]: TxRunner[G] = make[G](t.liftK[G])
63+
def liftK[G[_]: Async: Dispatcher]: TxRunner[G] = make[G](t.liftK[G])
6464
}
6565
}

modules/effects/src/main/scala/dev/profunktor/redis4cats/commands.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package dev.profunktor.redis4cats
1818

1919
import algebra._
2020
import cats.effect.kernel.Async
21+
import cats.effect.std.Dispatcher
2122
import dev.profunktor.redis4cats.effect.Log
2223

2324
trait RedisCommands[F[_], K, V]
@@ -39,7 +40,7 @@ trait RedisCommands[F[_], K, V]
3940

4041
object RedisCommands {
4142
implicit class LiftKOps[F[_], K, V](val cmd: RedisCommands[F, K, V]) extends AnyVal {
42-
def liftK[G[_]: Async: Log]: RedisCommands[G, K, V] =
43+
def liftK[G[_]: Async: Dispatcher: Log]: RedisCommands[G, K, V] =
4344
cmd.asInstanceOf[BaseRedis[F, K, V]].liftK[G]
4445
}
4546
}

modules/effects/src/main/scala/dev/profunktor/redis4cats/redis.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package dev.profunktor.redis4cats
1919
import cats._
2020
import cats.data.NonEmptyList
2121
import cats.effect.kernel._
22+
import cats.effect.std.Dispatcher
2223
import cats.syntax.all._
2324
import dev.profunktor.redis4cats.algebra.BitCommandOperation
2425
import dev.profunktor.redis4cats.algebra.BitCommandOperation.Overflows
@@ -448,7 +449,7 @@ private[redis4cats] class BaseRedis[F[_]: FutureLift: MonadThrow: Log, K, V](
448449
) extends RedisCommands[F, K, V]
449450
with RedisConversionOps {
450451

451-
def liftK[G[_]: Async: Log]: RedisCommands[G, K, V] =
452+
def liftK[G[_]: Async: Dispatcher: Log]: RedisCommands[G, K, V] =
452453
new BaseRedis[G, K, V](conn.liftK[G], tx.liftK[G], cluster)
453454

454455
import dev.profunktor.redis4cats.JavaConversions._

project/Dependencies.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ object Dependencies {
2626
def log4cats(artifact: String): ModuleID = "org.typelevel" %% s"log4cats-$artifact" % V.log4cats
2727

2828
val catsEffectKernel = "org.typelevel" %% "cats-effect-kernel" % V.catsEffect
29+
val catsEffectStd = "org.typelevel" %% "cats-effect-std" % V.catsEffect
2930
val fs2Core = "co.fs2" %% "fs2-core" % V.fs2
3031
val keyPool = "org.typelevel" %% "keypool" % V.keyPool
3132

0 commit comments

Comments
 (0)