Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make PubSubState a trait #972

Open
wants to merge 4 commits into
base: series/1.x
Choose a base branch
from

Conversation

peterneyens
Copy link

@peterneyens peterneyens commented Feb 26, 2025

An attempt to make it easier to change the PubSubState implementation or add multiple implementations. This is somewhat of a follow up of #966.

For some context: I am exploring using keyspace notifications and that led me to look at the pubsub code. My usecase would have a lot of short lived subscriptions, which is probably not the most common pubsub usage. Without any real tests or digging deeper into the lettuce code, I was somewhat worried about contention on a single AtomicCell holding all subscriptions (but it could be completely unfounded).

I hope that the change in this PR makes it easier to make the PubSubState more configurable. I added a sharded PubSubState implementation that is currently unused and only shows how we could support different implementations (but it seems a useful implementation to have).

  • This PR does move a lot of the Subscriber code into PubSubState. The first one now mostly contains how to create a subscription, while the second one now maintains (and hides) the subscription lifecycle.
  • I did leave out some of the logs added in Fix race conditions in pubsub #966, I can bring them back if desired, but I wanted to get some feedback first.
  • The unsubscribe and subscription cleanup effects aren't guarded by the mutex anymore. It didn't look necessary, but we should probably make sure this is OK.

@peterneyens
Copy link
Author

@arturaz Since this mostly refactors the code that you recently wrote, I think it is only fair to ask you if you think if this change is an improvement or not? And if you think it is an improvement in general, if you have any feedback?

@peterneyens peterneyens marked this pull request as ready for review February 26, 2025 11:49
@arturaz
Copy link
Collaborator

arturaz commented Feb 26, 2025

I was somewhat worried about contention on a single AtomicCell holding all subscriptions (but it could be completely unfounded).

Yeah, this needs to be benchmarked. We could be complicating the code with no real benefit.

The unsubscribe and subscription cleanup effects aren't guarded by the mutex anymore. It didn't look necessary, but we should probably make sure this is OK.

What about this scenario? Last subscriber is unsubscribing. It is removed from AtomicCell and then proceeds to run cleanup. However, Cats Effect scheduler decides to yield between modify and cleanup to other fiber, which looks into AtomicCell, takes the lock, sees that there's no subscribers, runs subscription against redis client. Then previous fiber resumes, performing the cleanup, nuking that subscription?

@arturaz
Copy link
Collaborator

arturaz commented Feb 26, 2025

I also think the debug logs should be left there, they are useful when, well, debugging :)

@peterneyens
Copy link
Author

Thanks for taking a look, @arturaz!

Yeah, this needs to be benchmarked. We could be complicating the code with no real benefit.

I agree I should have done some testing before.

A simple test opening and closing some subscriptions in parallel, does already show the contention. I'll see if I can spend some more time creating some better benchmarks.

test("subscribe and unsubscribe ") {
  import cats.syntax.all._
  withRedisPubSub { pubSub =>
    val channels              = List.range(1, 100).map(_ % 25).map(n => RedisChannel(n.toString))
    val expectedSubscriptions = channels.length.toLong

    val checkSubscriptionCount: IO[Long] = pubSub.internalChannelSubscriptions.map(_.values.sum)
    val wait = checkSubscriptionCount
      .delayBy(50.millis)
      .iterateUntil(_ == expectedSubscriptions)

    val subscribe = channels.parTraverse { channel =>
      pubSub.subscribe(channel).compile.drain
    }
    val unsubscribe = channels.parTraverse(pubSub.unsubscribe)

    IO.both(subscribe, wait >> unsubscribe).timed.flatMap { case (d, _) => IO.println(d) }
  }
} 

Switching to use PubSubState.make[F, K, V](shards = Some(2)) in mkPubSubConnection, seems to be more than 30% faster.
It would be interesting to compare that against 1.7.2 (eventhough that may also show the bugs you fixed).

I also think the debug logs should be left there, they are useful when, well, debugging :)

I can bring those back. I may try to see if we can add them back while keeping them outside of evalUpdate and evalModify.

@arturaz
Copy link
Collaborator

arturaz commented Feb 27, 2025

Is there any downside in only having the sharded implementation?

Represent start up and clean up as state changes
@peterneyens
Copy link
Author

peterneyens commented Feb 27, 2025

Is there any downside in only having the sharded implementation?

Probably not.

Thinking about this more, I think we can still make a Ref based implementation work by representing the subscription start up and clean up as states in our map. We could then make operations for the same channel/pattern wait until the subscription is ready or cleaned up. I pushed a draft version of such an implementation, but i needs some more work on the effectful state changes from Starting to Active and from ShuttingDown to removal (to handle failure, invalid states in the second atomic state change, ...). This implementation seems to be around three times faster as the single AtomicCell implementation for the simple test case I shared above.

@arturaz
Copy link
Collaborator

arturaz commented Feb 27, 2025

I pushed a draft version of such an implementation, but i needs some more work on the effectful state changes from Starting to Active and from ShuttingDown to removal (to handle failure, invalid states in the second atomic state change, ...).

So I should delay my review until you finish, right?

@peterneyens
Copy link
Author

I opened #984 to replace this PR (but I'll leave this one open for now).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants