Skip to content

Commit 1bf2d8c

Browse files
committed
Merge remote-tracking branch 'origin/fix-test_pubsub_combined_exact_pattern_and_sharded_one_client' into fix-pubsub-deterministic-channels
Merge valkey-io#4886 into valkey-io#4880 to combine flaky Pub/Sub test fixes Includes fixes from fix-test_pubsub_combined_exact_pattern_and_sharded_one_client into fix-pubsub-deterministic-channels as requested in issue valkey-io#4862. Stabilizes all related Pub/Sub tests with unique channel names and proper sync.#
2 parents 0b898d2 + b14f737 commit 1bf2d8c

File tree

23 files changed

+313
-16
lines changed

23 files changed

+313
-16
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33
#### Changes
44

55
* Added in documentation to how to use Lua scripts with Glide
6+
* JAVA: Add refreshTopologyFromInitialNodes Configuration Option ([#4870](https://github.com/valkey-io/valkey-glide/pull/4870))
7+
* GO: add RefreshTopologyFromInitialNodes configuration option ([#4871](https://github.com/valkey-io/valkey-glide/pull/4871))
8+
* Node: add refreshTopologyFromInitialNodes configuration option ([#4872](https://github.com/valkey-io/valkey-glide/pull/4872))
69

710
#### Fixes
811

glide-core/redis-rs/redis/src/aio/mod.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -227,9 +227,10 @@ where
227227

228228
// result is ignored, as per the command's instructions.
229229
// https://redis.io/commands/client-setinfo/
230-
let _: RedisResult<()> = crate::connection::client_set_info_pipeline()
231-
.query_async(con)
232-
.await;
230+
let _: RedisResult<()> =
231+
crate::connection::client_set_info_pipeline(connection_info.lib_name.as_deref())
232+
.query_async(con)
233+
.await;
233234

234235
// resubscribe
235236
if connection_info.protocol != ProtocolVersion::RESP3 {

glide-core/redis-rs/redis/src/cluster.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -991,6 +991,7 @@ pub(crate) fn get_connection_info(
991991
password: cluster_params.password,
992992
username: cluster_params.username,
993993
client_name: cluster_params.client_name,
994+
lib_name: cluster_params.lib_name,
994995
protocol: cluster_params.protocol,
995996
db: cluster_params.database_id,
996997
pubsub_subscriptions: cluster_params.pubsub_subscriptions,

glide-core/redis-rs/redis/src/cluster_client.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ struct BuilderParams {
4040
#[cfg(feature = "cluster-async")]
4141
slots_refresh_rate_limit: SlotsRefreshRateLimit,
4242
client_name: Option<String>,
43+
lib_name: Option<String>,
4344
response_timeout: Option<Duration>,
4445
protocol: ProtocolVersion,
4546
pubsub_subscriptions: Option<PubSubSubscriptionInfo>,
@@ -141,6 +142,7 @@ pub struct ClusterParams {
141142
pub(crate) connections_validation_interval: Option<Duration>,
142143
pub(crate) tls_params: Option<TlsConnParams>,
143144
pub(crate) client_name: Option<String>,
145+
pub(crate) lib_name: Option<String>,
144146
pub(crate) connection_timeout: Duration,
145147
pub(crate) response_timeout: Duration,
146148
pub(crate) protocol: ProtocolVersion,
@@ -173,6 +175,7 @@ impl ClusterParams {
173175
connections_validation_interval: value.connections_validation_interval,
174176
tls_params,
175177
client_name: value.client_name,
178+
lib_name: value.lib_name,
176179
response_timeout: value.response_timeout.unwrap_or(Duration::MAX),
177180
protocol: value.protocol,
178181
pubsub_subscriptions: value.pubsub_subscriptions,
@@ -308,6 +311,12 @@ impl ClusterClientBuilder {
308311
self
309312
}
310313

314+
/// Sets library name for the new ClusterClient.
315+
pub fn lib_name(mut self, lib_name: String) -> ClusterClientBuilder {
316+
self.builder_params.lib_name = Some(lib_name);
317+
self
318+
}
319+
311320
/// Sets password for the new ClusterClient.
312321
pub fn password(mut self, password: String) -> ClusterClientBuilder {
313322
self.builder_params.password = Some(password);

glide-core/redis-rs/redis/src/connection.rs

Lines changed: 39 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,8 @@ pub struct RedisConnectionInfo {
232232
pub protocol: ProtocolVersion,
233233
/// Optionally a client name that should be used for connection
234234
pub client_name: Option<String>,
235+
/// Optionally a library name that should be used for connection
236+
pub lib_name: Option<String>,
235237
/// Optionally a pubsub subscriptions that should be used for connection
236238
pub pubsub_subscriptions: Option<PubSubSubscriptionInfo>,
237239
}
@@ -391,6 +393,7 @@ fn url_to_tcp_connection_info(url: url::Url) -> RedisResult<ConnectionInfo> {
391393
_ => ProtocolVersion::RESP2,
392394
},
393395
client_name: None,
396+
lib_name: None,
394397
pubsub_subscriptions: None,
395398
},
396399
})
@@ -424,6 +427,7 @@ fn url_to_unix_connection_info(url: url::Url) -> RedisResult<ConnectionInfo> {
424427
_ => ProtocolVersion::RESP2,
425428
},
426429
client_name: None,
430+
lib_name: None,
427431
pubsub_subscriptions: None,
428432
},
429433
})
@@ -925,13 +929,15 @@ pub fn connect(
925929
setup_connection(con, &connection_info.redis)
926930
}
927931

928-
pub(crate) fn client_set_info_pipeline() -> Pipeline {
932+
pub(crate) fn client_set_info_pipeline(lib_name: Option<&str>) -> Pipeline {
929933
let mut pipeline = crate::pipe();
934+
let lib_name_value = lib_name.unwrap_or("UnknownClient");
935+
let final_lib_name = option_env!("GLIDE_NAME").unwrap_or(lib_name_value);
930936
pipeline
931937
.cmd("CLIENT")
932938
.arg("SETINFO")
933939
.arg("LIB-NAME")
934-
.arg(std::env!("GLIDE_NAME"))
940+
.arg(final_lib_name)
935941
.ignore();
936942
pipeline
937943
.cmd("CLIENT")
@@ -993,7 +999,8 @@ fn setup_connection(
993999

9941000
// result is ignored, as per the command's instructions.
9951001
// https://redis.io/commands/client-setinfo/
996-
let _: RedisResult<()> = client_set_info_pipeline().query(&mut rv);
1002+
let _: RedisResult<()> =
1003+
client_set_info_pipeline(connection_info.lib_name.as_deref()).query(&mut rv);
9971004

9981005
Ok(rv)
9991006
}
@@ -1726,6 +1733,34 @@ pub fn get_resp3_hello_command_error(err: RedisError) -> RedisError {
17261733
mod tests {
17271734
use super::*;
17281735

1736+
#[test]
1737+
fn test_client_set_info_pipeline_default_lib_name() {
1738+
let pipeline = client_set_info_pipeline(None);
1739+
let packed_commands = pipeline.get_packed_pipeline();
1740+
let cmd_str = String::from_utf8_lossy(&packed_commands);
1741+
1742+
// Should contain CLIENT SETINFO LIB-NAME
1743+
assert!(cmd_str.contains("CLIENT"));
1744+
assert!(cmd_str.contains("SETINFO"));
1745+
assert!(cmd_str.contains("LIB-NAME"));
1746+
1747+
// When GLIDE_NAME is set, it should use that value
1748+
// When GLIDE_NAME is not set and lib_name is None, it should use "UnknownClient"
1749+
// Since we can't control GLIDE_NAME in this test, we just verify the structure
1750+
assert!(cmd_str.contains("Glide") || cmd_str.contains("UnknownClient"));
1751+
}
1752+
1753+
#[test]
1754+
fn test_client_set_info_pipeline_logic() {
1755+
// Test the logic directly by simulating what happens when GLIDE_NAME is not set
1756+
let lib_name_value = None.unwrap_or("UnknownClient");
1757+
assert_eq!(lib_name_value, "UnknownClient");
1758+
1759+
// Test with provided lib_name
1760+
let lib_name_value = Some("CustomClient").unwrap_or("UnknownClient");
1761+
assert_eq!(lib_name_value, "CustomClient");
1762+
}
1763+
17291764
#[test]
17301765
fn test_parse_redis_url() {
17311766
let cases = vec![
@@ -1840,6 +1875,7 @@ mod tests {
18401875
password: None,
18411876
protocol: ProtocolVersion::RESP2,
18421877
client_name: None,
1878+
lib_name: None,
18431879
pubsub_subscriptions: None,
18441880
},
18451881
},

glide-core/src/client/mod.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,7 @@ pub async fn get_valkey_connection_info(
145145
let protocol = connection_request.protocol.unwrap_or_default();
146146
let db = connection_request.database_id;
147147
let client_name = connection_request.client_name.clone();
148+
let lib_name = connection_request.lib_name.clone();
148149
let pubsub_subscriptions = connection_request.pubsub_subscriptions.clone();
149150

150151
match &connection_request.authentication_info {
@@ -164,6 +165,7 @@ pub async fn get_valkey_connection_info(
164165
password: Some(token),
165166
protocol,
166167
client_name,
168+
lib_name,
167169
pubsub_subscriptions,
168170
}
169171
} else {
@@ -174,6 +176,7 @@ pub async fn get_valkey_connection_info(
174176
password: info.password.clone(),
175177
protocol,
176178
client_name,
179+
lib_name,
177180
pubsub_subscriptions,
178181
}
179182
}
@@ -182,6 +185,7 @@ pub async fn get_valkey_connection_info(
182185
db,
183186
protocol,
184187
client_name,
188+
lib_name,
185189
pubsub_subscriptions,
186190
..Default::default()
187191
},
@@ -1052,6 +1056,9 @@ async fn create_cluster_client(
10521056
if let Some(client_name) = valkey_connection_info.client_name {
10531057
builder = builder.client_name(client_name);
10541058
}
1059+
if let Some(lib_name) = valkey_connection_info.lib_name {
1060+
builder = builder.lib_name(lib_name);
1061+
}
10551062
if tls_mode != TlsMode::NoTls {
10561063
let tls = if tls_mode == TlsMode::SecureTls {
10571064
redis::cluster::TlsMode::Secure

glide-core/src/client/types.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use crate::iam::ServiceType;
1414
pub struct ConnectionRequest {
1515
pub read_from: Option<ReadFrom>,
1616
pub client_name: Option<String>,
17+
pub lib_name: Option<String>,
1718
pub authentication_info: Option<AuthenticationInfo>,
1819
pub database_id: i64,
1920
pub protocol: Option<redis::ProtocolVersion>,
@@ -164,6 +165,7 @@ impl From<protobuf::ConnectionRequest> for ConnectionRequest {
164165
});
165166

166167
let client_name = chars_to_string_option(&value.client_name);
168+
let lib_name = chars_to_string_option(&value.lib_name);
167169
let authentication_info = value.authentication_info.0.map(|authentication_info| {
168170
let password = chars_to_string_option(&authentication_info.password);
169171
let username = chars_to_string_option(&authentication_info.username);
@@ -278,6 +280,7 @@ impl From<protobuf::ConnectionRequest> for ConnectionRequest {
278280
ConnectionRequest {
279281
read_from,
280282
client_name,
283+
lib_name,
281284
authentication_info,
282285
database_id,
283286
protocol,

glide-core/src/protobuf/connection_request.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ message ConnectionRequest {
8888
uint32 connection_timeout = 16;
8989
bool lazy_connect = 17;
9090
bool refresh_topology_from_initial_nodes = 18;
91+
string lib_name = 19;
9192
}
9293

9394
message ConnectionRetryStrategy {

go/config/config.go

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -397,6 +397,7 @@ func (config *ClusterClientConfiguration) ToProtobuf() (*protobuf.ConnectionRequ
397397
if config.subscriptionConfig != nil && len(config.subscriptionConfig.subscriptions) > 0 {
398398
request.PubsubSubscriptions = config.subscriptionConfig.toProtobuf()
399399
}
400+
request.RefreshTopologyFromInitialNodes = config.AdvancedClusterClientConfiguration.refreshTopologyFromInitialNodes
400401
return request, nil
401402
}
402403

@@ -543,7 +544,8 @@ func (config *AdvancedClientConfiguration) WithConnectionTimeout(
543544
// Represents advanced configuration settings for a Cluster client used in
544545
// [ClusterClientConfiguration].
545546
type AdvancedClusterClientConfiguration struct {
546-
connectionTimeout time.Duration
547+
connectionTimeout time.Duration
548+
refreshTopologyFromInitialNodes bool
547549
}
548550

549551
// NewAdvancedClusterClientConfiguration returns a new [AdvancedClusterClientConfiguration] with default settings.
@@ -561,3 +563,17 @@ func (config *AdvancedClusterClientConfiguration) WithConnectionTimeout(
561563
config.connectionTimeout = connectionTimeout
562564
return config
563565
}
566+
567+
// WithRefreshTopologyFromInitialNodes enables refreshing the cluster topology using only the initial nodes.
568+
//
569+
// When this option is enabled, all topology updates (both the periodic checks and on-demand
570+
// refreshes triggered by topology changes) will query only the initial nodes provided when
571+
// creating the client, rather than using the internal cluster view.
572+
//
573+
// If not set, defaults to false (uses internal cluster view for topology refresh).
574+
func (config *AdvancedClusterClientConfiguration) WithRefreshTopologyFromInitialNodes(
575+
refreshTopologyFromInitialNodes bool,
576+
) *AdvancedClusterClientConfiguration {
577+
config.refreshTopologyFromInitialNodes = refreshTopologyFromInitialNodes
578+
return config
579+
}

go/config/config_test.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -391,3 +391,37 @@ func TestConfig_DatabaseId_BaseConfiguration(t *testing.T) {
391391
assert.Equal(t, uint32(3), clusterResult.DatabaseId)
392392
assert.True(t, clusterResult.ClusterModeEnabled)
393393
}
394+
395+
func TestClusterConfig_RefreshTopologyFromInitialNodes(t *testing.T) {
396+
// Test that refreshTopologyFromInitialNodes defaults to false
397+
defaultConfig := NewClusterClientConfiguration()
398+
defaultResult, err := defaultConfig.ToProtobuf()
399+
if err != nil {
400+
t.Fatalf("Failed to convert default cluster config to protobuf: %v", err)
401+
}
402+
assert.False(t, defaultResult.RefreshTopologyFromInitialNodes)
403+
404+
// Test that refreshTopologyFromInitialNodes can be set to true
405+
enabledConfig := NewClusterClientConfiguration().
406+
WithAdvancedConfiguration(
407+
NewAdvancedClusterClientConfiguration().
408+
WithRefreshTopologyFromInitialNodes(true),
409+
)
410+
enabledResult, err := enabledConfig.ToProtobuf()
411+
if err != nil {
412+
t.Fatalf("Failed to convert enabled cluster config to protobuf: %v", err)
413+
}
414+
assert.True(t, enabledResult.RefreshTopologyFromInitialNodes)
415+
416+
// Test that refreshTopologyFromInitialNodes can be explicitly set to false
417+
disabledConfig := NewClusterClientConfiguration().
418+
WithAdvancedConfiguration(
419+
NewAdvancedClusterClientConfiguration().
420+
WithRefreshTopologyFromInitialNodes(false),
421+
)
422+
disabledResult, err := disabledConfig.ToProtobuf()
423+
if err != nil {
424+
t.Fatalf("Failed to convert disabled cluster config to protobuf: %v", err)
425+
}
426+
assert.False(t, disabledResult.RefreshTopologyFromInitialNodes)
427+
}

0 commit comments

Comments
 (0)