-
Notifications
You must be signed in to change notification settings - Fork 9
Description
Hi valkey-java SMEs
I've started using the valkey-java client software recently with the valkey server to determine suitability. I have deployed valkey in a cluster mode of 6 pods in k8s environment. The image I am using is "docker.io/bitnami/valkey-cluster:8.0.1-debian-12-r3".
I have configured valkey server for keyspace notifications, Ex.
In my application I subscribe to the key-space notifications using psubscribe. Expecting to be notified when a key ttl has expired.
The problem is that for some of the key space events I do not receive a notification in the JAVA quarkus application where i have registered for the subscription. However there are also some key space notifications that I do receive in my application. I cannot explain why this is happening.
I can see all the ttl events being fired when I log on to each valkey node and using cli, manually psubscribe for these i.e. "$ valkey-cli -h localhost -p 6379 -a password --csv psubscribe 'key*:*' .
I know that key space events are node local and therefore I need to connect to each node to ensure I get all the events.
Is there anything you think i need to change or configure? Thanks in advance.
Below is a code snippet that I have simplified but I hope it shows what Im doing.
io.valkey.ConnectionPoolConfig poolConfig = new io.valkey.ConnectionPoolConfig(); poolConfig.setMaxTotal(8); poolConfig.setMaxIdle(8); poolConfig.setMinIdle(4); poolConfig.setMaxWait(Duration.ofSeconds(10)); Set<HostAndPort> jedisClusterNode = new HashSet<HostAndPort>(); jedisClusterNode.add(new HostAndPort( "valkey-cluster-0.valkey-cluster-headless.optiva-database.svc.cluster.local", 6379)); jc = new io.valkey.JedisCluster(jedisClusterNode, DEFAULT_TIMEOUT, DEFAULT_TIMEOUT, DEFAULT_REDIRECTIONS, cacheConfiguration.password(), "valkey-cluster-0", poolConfig); Set<HostAndPort> jedisClusterNode1 = new HashSet<HostAndPort>(); jedisClusterNode1.add(new HostAndPort( "valkey-cluster-1.valkey-cluster-headless.optiva-database.svc.cluster.local", 6379)); jc1 = new io.valkey.JedisCluster(jedisClusterNode1, DEFAULT_TIMEOUT, DEFAULT_TIMEOUT, DEFAULT_REDIRECTIONS, cacheConfiguration.password(), "valkey-cluster-1", poolConfig); Set<HostAndPort> jedisClusterNode2 = new HashSet<HostAndPort>(); jedisClusterNode2.add(new HostAndPort( "valkey-cluster-2.valkey-cluster-headless.optiva-database.svc.cluster.local", 6379)); jc2 = new io.valkey.JedisCluster(jedisClusterNode2, DEFAULT_TIMEOUT, DEFAULT_TIMEOUT, DEFAULT_REDIRECTIONS, cacheConfiguration.password(), "valkey-cluster-2", poolConfig); Set<HostAndPort> jedisClusterNode3 = new HashSet<HostAndPort>(); jedisClusterNode3.add(new HostAndPort( "valkey-cluster-3.valkey-cluster-headless.optiva-database.svc.cluster.local", 6379)); jc3 = new io.valkey.JedisCluster(jedisClusterNode3, DEFAULT_TIMEOUT, DEFAULT_TIMEOUT, DEFAULT_REDIRECTIONS, cacheConfiguration.password(), "valkey-cluster-3", poolConfig); Set<HostAndPort> jedisClusterNode4 = new HashSet<HostAndPort>(); jedisClusterNode4.add(new HostAndPort( "valkey-cluster-4.valkey-cluster-headless.optiva-database.svc.cluster.local", 6379)); jc4 = new io.valkey.JedisCluster(jedisClusterNode4, DEFAULT_TIMEOUT, DEFAULT_TIMEOUT, DEFAULT_REDIRECTIONS, cacheConfiguration.password(), "valkey-cluster-4", poolConfig); Set<HostAndPort> jedisClusterNode5 = new HashSet<HostAndPort>(); jedisClusterNode5.add(new HostAndPort( "valkey-cluster-5.valkey-cluster-headless.optiva-database.svc.cluster.local", 6379)); jc5 = new io.valkey.JedisCluster(jedisClusterNode5, DEFAULT_TIMEOUT, DEFAULT_TIMEOUT, DEFAULT_REDIRECTIONS, cacheConfiguration.password(), "valkey-cluster-5", poolConfig); executorService.submit(() -> { try { logger.infof("Registering the listener for events on the Valkey host %s:%d. Password %s.", "valkey-cluster-0", 6379, cacheConfiguration.password()); jc.psubscribe(new ValkeyExpiredMessages(logger, "valkey-cluster-0"), pushConfig.keyPattern()); } catch (Exception e) { logger.errorf("Error during subscription to pattern %s on host %s", pushConfig.keyPattern(), "valkey-cluster-0", e); } }); executorService.submit(() -> { try { logger.infof("Registering the listener for events on the Valkey host %s:%d. Password %s.", "valkey-cluster-1", 6379, cacheConfiguration.password()); jc1.psubscribe(new ValkeyExpiredMessages(logger, "valkey-cluster-1"), pushConfig.keyPattern()); } catch (Exception e) { logger.errorf("Error during subscription to pattern %s on host %s", pushConfig.keyPattern(), "valkey-cluster-1", e); } }); executorService.submit(() -> { try { logger.infof("Registering the listener for events on the Valkey host %s:%d. Password %s.", "valkey-cluster-2", 6379, cacheConfiguration.password()); jc2.psubscribe(new ValkeyExpiredMessages(logger, "valkey-cluster-2"), pushConfig.keyPattern()); } catch (Exception e) { logger.errorf("Error during subscription to pattern %s on host %s", pushConfig.keyPattern(), "valkey-cluster-2", e); } }); executorService.submit(() -> { try { logger.infof("Registering the listener for events on the Valkey host %s:%d. Password %s.", "valkey-cluster-3", 6379, cacheConfiguration.password()); jc3.psubscribe(new ValkeyExpiredMessages(logger, "valkey-cluster-3"), pushConfig.keyPattern()); } catch (Exception e) { logger.errorf("Error during subscription to pattern %s on host %s", pushConfig.keyPattern(), "valkey-cluster-3", e); } }); executorService.submit(() -> { try { logger.infof("Registering the listener for events on the Valkey host %s:%d. Password %s.", "valkey-cluster-4", 6379, cacheConfiguration.password()); jc4.psubscribe(new ValkeyExpiredMessages(logger, "valkey-cluster-4"), pushConfig.keyPattern()); } catch (Exception e) { logger.errorf("Error during subscription to pattern %s on host %s", pushConfig.keyPattern(), "valkey-cluster-4", e); } }); executorService.submit(() -> { try { logger.infof("Registering the listener for events on the Valkey host %s:%d. Password %s.", "valkey-cluster-5", 6379, cacheConfiguration.password()); jc5.psubscribe(new ValkeyExpiredMessages(logger, "valkey-cluster-5"), pushConfig.keyPattern()); } catch (Exception e) { logger.errorf("Error during subscription to pattern %s on host %s", pushConfig.keyPattern(), "valkey-cluster-5", e); } });