Skip to content

Draft: Client side caching #1001

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: series/2.x
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ val commonSettings = Seq(
testFrameworks += new TestFramework("munit.Framework"),
libraryDependencies ++= Seq(
Libraries.catsEffectKernel,
Libraries.catsEffectStd,
Libraries.redisClient,
Libraries.catsEffect % Test,
Libraries.catsLaws % Test,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2018-2021 ProfunKtor
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package dev.profunktor.redis4cats.caching

trait CacheAccessor[F[_], K, V] {
def get(key: K): F[V]
def put(key: K, value: V): F[Unit]
def evict(key: K): F[Unit]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright 2018-2021 ProfunKtor
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package dev.profunktor.redis4cats.caching

import cats.effect.kernel.{ Async, Resource }
import dev.profunktor.redis4cats.effect.TxExecutor
import io.lettuce.core.TrackingArgs
import io.lettuce.core.api.StatefulRedisConnection
import io.lettuce.core.support.caching.{
CacheAccessor => JCacheAccessor,
CacheFrontend => JCacheFrontend,
ClientSideCaching => JClientSideCaching
}

object ClientSideCaching {

def make[F[_]: Async, K, V](
connection: StatefulRedisConnection[K, V],
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

args: TrackingArgs,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the convention in this lib , is to create a scala api for the java pojo's . i.e.

cacheAccessor: CacheAccessor[F, K, V]
): Resource[F, JCacheFrontend[K, V]] =
TxExecutor.make[F].flatMap { redisExecutor =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

putting aside the changes to TxExecutor, we should create a facade for CacheFrontend similar to CacheAccessor and not require a Dispatcher and unsafeRun

Resource.make[F, JCacheFrontend[K, V]] {
Async[F].delay {
JClientSideCaching.enable(
new JCacheAccessor[K, V] {
override def get(key: K): V = redisExecutor.unsafeRun(cacheAccessor.get(key))
override def put(key: K, value: V): Unit = redisExecutor.unsafeRun(cacheAccessor.put(key, value))
override def evict(key: K): Unit = redisExecutor.unsafeRun(cacheAccessor.evict(key))
},
connection,
args
)
}
}(cacheFrontend => Async[F].delay(cacheFrontend.close()))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,26 +29,30 @@ import scala.util.control.NonFatal

import cats.effect.kernel._
import cats.syntax.all._
import cats.effect.std.Dispatcher

private[redis4cats] trait TxExecutor[F[_]] {
def delay[A](thunk: => A): F[A]
def eval[A](fa: F[A]): F[A]
def start[A](fa: F[A]): F[Fiber[F, Throwable, A]]
def liftK[G[_]: Async]: TxExecutor[G]
def liftK[G[_]: Async: Dispatcher]: TxExecutor[G]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this would be a breaking change as it adds additional parameters.
I dont think TxExecutor is the right place for this, it is meant for Redis transactions

Copy link
Member

@gvolpe gvolpe Apr 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would add that Dispatcher is not meant to be used as a typeclass, i.e. it should be passed explicitly

def unsafeRun[A](fa: F[A]): A
}

private[redis4cats] object TxExecutor {
def make[F[_]: Async]: Resource[F, TxExecutor[F]] =
Resource
.make(Sync[F].delay(Executors.newFixedThreadPool(1, TxThreadFactory))) { ec =>
Sync[F]
.delay(ec.shutdownNow())
.ensure(new IllegalStateException("There were outstanding tasks at time of shutdown of the Redis thread"))(
_.isEmpty
)
.void
}
.map(es => fromEC(exitOnFatal(ExecutionContext.fromExecutorService(es))))
Dispatcher.parallel[F].flatMap { dispatcher =>
Resource
.make(Sync[F].delay(Executors.newFixedThreadPool(1, TxThreadFactory))) { ec =>
Sync[F]
.delay(ec.shutdownNow())
.ensure(new IllegalStateException("There were outstanding tasks at time of shutdown of the Redis thread"))(
_.isEmpty
)
.void
}
.map(es => fromEC(exitOnFatal(ExecutionContext.fromExecutorService(es)))(dispatcher))
}

private def exitOnFatal(ec: ExecutionContext): ExecutionContext = new ExecutionContext {
def execute(r: Runnable): Unit =
Expand All @@ -70,11 +74,12 @@ private[redis4cats] object TxExecutor {
ec.reportFailure(t)
}

private def fromEC[F[_]: Async](ec: ExecutionContext): TxExecutor[F] =
private def fromEC[F[_]: Async](ec: ExecutionContext)(dispatcher: Dispatcher[F]): TxExecutor[F] =
new TxExecutor[F] {
def delay[A](thunk: => A): F[A] = eval(Sync[F].delay(thunk))
def eval[A](fa: F[A]): F[A] = Async[F].evalOn(fa, ec)
def start[A](fa: F[A]): F[Fiber[F, Throwable, A]] = Async[F].startOn(fa, ec)
def liftK[G[_]: Async]: TxExecutor[G] = fromEC[G](ec)
def liftK[G[_]: Async: Dispatcher]: TxExecutor[G] = fromEC[G](ec)(implicitly)
def unsafeRun[A](fa: F[A]): A = dispatcher.unsafeRunSync(fa)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ package dev.profunktor.redis4cats.tx

import cats.effect.kernel._
import cats.effect.kernel.syntax.all._
import cats.effect.std.Dispatcher
import cats.syntax.all._

import dev.profunktor.redis4cats.effect.TxExecutor

private[redis4cats] trait TxRunner[F[_]] {
Expand All @@ -30,7 +30,7 @@ private[redis4cats] trait TxRunner[F[_]] {
)(
fs: TxStore[F, String, A] => List[F[Unit]]
): F[Map[String, A]]
def liftK[G[_]: Async]: TxRunner[G]
def liftK[G[_]: Async: Dispatcher]: TxRunner[G]
}

private[redis4cats] object TxRunner {
Expand Down Expand Up @@ -60,6 +60,6 @@ private[redis4cats] object TxRunner {
} *> store.get
}

def liftK[G[_]: Async]: TxRunner[G] = make[G](t.liftK[G])
def liftK[G[_]: Async: Dispatcher]: TxRunner[G] = make[G](t.liftK[G])
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package dev.profunktor.redis4cats

import algebra._
import cats.effect.kernel.Async
import cats.effect.std.Dispatcher
import dev.profunktor.redis4cats.effect.Log

trait RedisCommands[F[_], K, V]
Expand All @@ -39,7 +40,7 @@ trait RedisCommands[F[_], K, V]

object RedisCommands {
implicit class LiftKOps[F[_], K, V](val cmd: RedisCommands[F, K, V]) extends AnyVal {
def liftK[G[_]: Async: Log]: RedisCommands[G, K, V] =
def liftK[G[_]: Async: Dispatcher: Log]: RedisCommands[G, K, V] =
cmd.asInstanceOf[BaseRedis[F, K, V]].liftK[G]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package dev.profunktor.redis4cats
import cats._
import cats.data.NonEmptyList
import cats.effect.kernel._
import cats.effect.std.Dispatcher
import cats.syntax.all._
import dev.profunktor.redis4cats.algebra.BitCommandOperation
import dev.profunktor.redis4cats.algebra.BitCommandOperation.Overflows
Expand Down Expand Up @@ -448,7 +449,7 @@ private[redis4cats] class BaseRedis[F[_]: FutureLift: MonadThrow: Log, K, V](
) extends RedisCommands[F, K, V]
with RedisConversionOps {

def liftK[G[_]: Async: Log]: RedisCommands[G, K, V] =
def liftK[G[_]: Async: Dispatcher: Log]: RedisCommands[G, K, V] =
new BaseRedis[G, K, V](conn.liftK[G], tx.liftK[G], cluster)

import dev.profunktor.redis4cats.JavaConversions._
Expand Down
1 change: 1 addition & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ object Dependencies {
def log4cats(artifact: String): ModuleID = "org.typelevel" %% s"log4cats-$artifact" % V.log4cats

val catsEffectKernel = "org.typelevel" %% "cats-effect-kernel" % V.catsEffect
val catsEffectStd = "org.typelevel" %% "cats-effect-std" % V.catsEffect
val fs2Core = "co.fs2" %% "fs2-core" % V.fs2
val keyPool = "org.typelevel" %% "keypool" % V.keyPool

Expand Down