Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
9dfa51d
feat(pubsub): implement core PubSub infrastructure and FFI integration
jbrinkman Oct 9, 2025
4cbd22a
feat(rust): add PubSub FFI functions for callback registration and me…
jbrinkman Oct 10, 2025
73a842c
feat(rust): implement proper PubSub callback storage and management
jbrinkman Oct 10, 2025
4697c40
feat: Add comprehensive integration tests for FFI PubSub callback flow
jbrinkman Oct 13, 2025
0a75eea
test(pubsub): Refactor PubSub FFI Callback Integration Tests
jbrinkman Oct 15, 2025
03dc000
refactor(pubsub): Implement instance-based PubSub callback architecture
jbrinkman Oct 17, 2025
0e84419
fix: resolve critical memory leak in PubSub FFI message processing
jbrinkman Oct 17, 2025
dd0c85d
feat(pubsub): add thread safety to PubSub handler access in BaseClient
jbrinkman Oct 17, 2025
426e1a9
feat(pubsub): replace Task.Run with channel-based message processing
jbrinkman Oct 17, 2025
069af2d
feat(pubsub): implement graceful shutdown coordination between Rust a…
jbrinkman Oct 18, 2025
da86a39
feat(pubsub): Add queue-based message retrieval and comprehensive int…
jbrinkman Oct 20, 2025
13ba6c4
refactor(pubsub): Remove unused PubSubConfigurationExtensions class
jbrinkman Oct 20, 2025
5868588
style: Apply code formatting to PubSub files
jbrinkman Oct 20, 2025
fa8bff7
chore(reports): Remove legacy reporting artifacts and unused files
jbrinkman Oct 20, 2025
600b48e
refactor(pubsub): Simplify synchronization primitives in PubSub messa…
jbrinkman Oct 20, 2025
d59edfd
fix: Address Lint configuration errors
jbrinkman Oct 20, 2025
60c2c30
fix: enable pattern subscriptions in cluster mode
jbrinkman Oct 20, 2025
d2eec0e
test: remove redundant and inaccurate PubSub tests
jbrinkman Oct 21, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -151,3 +151,7 @@ $RECYCLE.BIN/
_NCrunch*

glide-logs/

# Test results and coverage reports
testresults/
reports/
170 changes: 170 additions & 0 deletions docs/configuration-architecture-analysis.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
# Configuration Architecture Analysis

## Overview

This document analyzes the configuration architecture in the Valkey.Glide C# client, focusing on the relationship between `ConnectionConfiguration` and `ConfigurationOptions`, and how configuration changes can be made through the `ConnectionMultiplexer`.

## Configuration Classes Relationship

### ConfigurationOptions
- **Purpose**: External API configuration class that follows StackExchange.Redis compatibility patterns
- **Location**: `sources/Valkey.Glide/Abstract/ConfigurationOptions.cs`
- **Role**: User-facing configuration interface

### ConnectionConfiguration
- **Purpose**: Internal configuration classes that map to the underlying FFI layer
- **Location**: `sources/Valkey.Glide/ConnectionConfiguration.cs`
- **Role**: Internal configuration representation and builder pattern implementation

## Configuration Flow

```
ConfigurationOptions → ClientConfigurationBuilder → ConnectionConfig → FFI.ConnectionConfig
```

1. **User Input**: `ConfigurationOptions` (external API)
2. **Translation**: `ConnectionMultiplexer.CreateClientConfigBuilder<T>()` method
3. **Building**: `ClientConfigurationBuilder<T>` (internal)
4. **Internal Config**: `ConnectionConfig` record
5. **FFI Layer**: `FFI.ConnectionConfig`

## Key Components Analysis

### ConnectionMultiplexer Configuration Mapping

The `ConnectionMultiplexer.CreateClientConfigBuilder<T>()` method at line 174 performs the critical translation:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit. Probably best not to reference line number in here, as this is likely to change? Similar elsewhere in this file. (Know you probably didn't write this; should be easy to remove).


```csharp
internal static T CreateClientConfigBuilder<T>(ConfigurationOptions configuration)
where T : ClientConfigurationBuilder<T>, new()
{
T config = new();
foreach (EndPoint ep in configuration.EndPoints)
{
config.Addresses += Utils.SplitEndpoint(ep);
}
config.UseTls = configuration.Ssl;
// ... other mappings
_ = configuration.ReadFrom.HasValue ? config.ReadFrom = configuration.ReadFrom.Value : new();
return config;
}
```

### Configuration Builders

The builder pattern is implemented through:
- `StandaloneClientConfigurationBuilder` (line 525)
- `ClusterClientConfigurationBuilder` (line 550)

Both inherit from `ClientConfigurationBuilder<T>` which provides:
- Fluent API methods (`WithXxx()`)
- Property setters
- Internal `ConnectionConfig Build()` method

## Configuration Mutability Analysis

### Current State: Immutable After Connection

**Connection Creation**: Configuration is set once during `ConnectionMultiplexer.ConnectAsync()`:

```csharp
public static async Task<ConnectionMultiplexer> ConnectAsync(ConfigurationOptions configuration, TextWriter? log = null)
{
// Configuration is translated and used to create the client
StandaloneClientConfiguration standaloneConfig = CreateClientConfigBuilder<StandaloneClientConfigurationBuilder>(configuration).Build();
// ... connection establishment
return new(configuration, await Database.Create(config));
}
```

**Storage**: The original `ConfigurationOptions` is stored in `RawConfig` property (line 156):

```csharp
internal ConfigurationOptions RawConfig { private set; get; }
```

### Limitations for Runtime Configuration Changes

1. **No Reconfiguration API**: `ConnectionMultiplexer` doesn't expose methods to change configuration after connection
2. **Immutable Builder Chain**: Once built, the configuration flows to FFI layer and cannot be modified
3. **Connection Recreation Required**: Any configuration change requires creating a new `ConnectionMultiplexer` instance

## Potential Configuration Change Approaches

### 1. Connection Recreation (Current Pattern)
```csharp
// Current approach - requires new connection
var newConfig = oldConfig.Clone();
newConfig.ReadFrom = new ReadFrom(ReadFromStrategy.AzAffinity, "us-west-2");
var newMultiplexer = await ConnectionMultiplexer.ConnectAsync(newConfig);
```

### 2. Potential Runtime Reconfiguration (Not Currently Implemented)

To enable runtime configuration changes, the following would need to be implemented:

```csharp
// Hypothetical API
public async Task ReconfigureAsync(Action<ConfigurationOptions> configure)
{
var newConfig = RawConfig.Clone();
configure(newConfig);

// Would need to:
// 1. Validate configuration changes
// 2. Update underlying client configuration
// 3. Potentially recreate connections
// 4. Update RawConfig
}
```

### 3. Builder Pattern Extension

A potential approach could extend the builder pattern to support updates:

```csharp
// Hypothetical API
public async Task<bool> TryUpdateConfigurationAsync<T>(Action<T> configure)
where T : ClientConfigurationBuilder<T>, new()
{
// Create new builder from current configuration
// Apply changes
// Validate and apply if possible
}
```

## ReadFrom Configuration Specifics

### Current Implementation
- `ReadFrom` is a struct (line 74) with `ReadFromStrategy` enum and optional AZ string
- Mapped in `CreateClientConfigBuilder()` at line 199
- Flows through to FFI layer via `ConnectionConfig.ToFfi()` method

### ReadFrom Change Requirements
To change `ReadFrom` configuration at runtime would require:
1. **API Design**: Method to accept new `ReadFrom` configuration
2. **Validation**: Ensure new configuration is compatible with current connection type
3. **FFI Updates**: Update the underlying client configuration
4. **Connection Management**: Handle any required connection reestablishment

## Recommendations

### Short Term
1. **Document Current Limitations**: Clearly document that configuration changes require connection recreation
2. **Helper Methods**: Provide utility methods for common reconfiguration scenarios:
```csharp
public static async Task<ConnectionMultiplexer> RecreateWithReadFromAsync(
ConnectionMultiplexer current,
ReadFrom newReadFrom)
```

### Long Term
1. **Runtime Reconfiguration API**: Implement selective runtime configuration updates for non-disruptive changes
2. **Configuration Validation**: Add validation to determine which changes require reconnection vs. runtime updates
3. **Connection Pool Management**: Consider connection pooling to minimize disruption during reconfiguration

## Conclusion

Currently, the `ConnectionMultiplexer` does not support runtime configuration changes. The architecture is designed around immutable configuration set at connection time. Any configuration changes, including `ReadFrom` strategy modifications, require creating a new `ConnectionMultiplexer` instance.

The relationship between `ConfigurationOptions` and `ConnectionConfiguration` is a translation layer where the external API (`ConfigurationOptions`) is converted to internal configuration structures (`ConnectionConfiguration`) that interface with the FFI layer.
19 changes: 19 additions & 0 deletions monitor-valkey.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#!/bin/zsh
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be moved to a scripts directory? We already have a utils directory that contains a script. Perhaps we could rename it to scripts and move this there?

# Streams all commands from Valkey and logs them to a file with timestamps in ./log directory.

LOGDIR="./log"
LOGFILE="$LOGDIR/valkey-monitor.log"

# Ensure directory exists
if [ ! -d "$LOGDIR" ]; then
mkdir -p "$LOGDIR"
fi
# Ensure log file exists
if [ ! -f "$LOGFILE" ]; then
touch "$LOGFILE"
fi

# Run MONITOR and prepend timestamps using date
valkey-cli MONITOR | while read -r line; do
echo "$(date '+[%Y-%m-%d %H:%M:%S]') $line"
done >> "$LOGFILE"
118 changes: 116 additions & 2 deletions rust/src/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,95 @@ pub struct ConnectionConfig {
pub protocol: redis::ProtocolVersion,
/// zero pointer is valid, means no client name is given (`None`)
pub client_name: *const c_char,
pub has_pubsub_config: bool,
pub pubsub_config: PubSubConfigInfo,
/*
TODO below
pub periodic_checks: Option<PeriodicCheck>,
pub pubsub_subscriptions: Option<redis::PubSubSubscriptionInfo>,
pub inflight_requests_limit: Option<u32>,
pub otel_endpoint: Option<String>,
pub otel_flush_interval_ms: Option<u64>,
*/
}

#[repr(C)]
#[derive(Debug, Clone, Copy)]
pub struct PubSubConfigInfo {
pub channels_ptr: *const *const c_char,
pub channel_count: u32,
pub patterns_ptr: *const *const c_char,
pub pattern_count: u32,
pub sharded_channels_ptr: *const *const c_char,
pub sharded_channel_count: u32,
}

/// Convert a C string array to a Vec of Vec<u8>
///
/// # Safety
///
/// * `ptr` must point to an array of `count` valid C string pointers
/// * Each C string pointer must be valid and null-terminated
unsafe fn convert_string_array(ptr: *const *const c_char, count: u32) -> Vec<Vec<u8>> {
if ptr.is_null() || count == 0 {
return Vec::new();
}

let slice = unsafe { std::slice::from_raw_parts(ptr, count as usize) };
slice
.iter()
.map(|&str_ptr| {
let c_str = unsafe { CStr::from_ptr(str_ptr) };
c_str.to_bytes().to_vec()
})
.collect()
}

/// Convert PubSubConfigInfo to the format expected by glide-core
///
/// # Safety
///
/// * All pointers in `config` must be valid or null
/// * String arrays must contain valid C strings
unsafe fn convert_pubsub_config(
config: &PubSubConfigInfo,
) -> std::collections::HashMap<redis::PubSubSubscriptionKind, std::collections::HashSet<Vec<u8>>> {
use redis::PubSubSubscriptionKind;
use std::collections::{HashMap, HashSet};

let mut subscriptions = HashMap::new();

// Convert exact channels
if config.channel_count > 0 {
let channels = unsafe { convert_string_array(config.channels_ptr, config.channel_count) };
subscriptions.insert(
PubSubSubscriptionKind::Exact,
channels.into_iter().collect::<HashSet<_>>(),
);
}

// Convert patterns
if config.pattern_count > 0 {
let patterns = unsafe { convert_string_array(config.patterns_ptr, config.pattern_count) };
subscriptions.insert(
PubSubSubscriptionKind::Pattern,
patterns.into_iter().collect::<HashSet<_>>(),
);
}

// Convert sharded channels
if config.sharded_channel_count > 0 {
let sharded = unsafe {
convert_string_array(config.sharded_channels_ptr, config.sharded_channel_count)
};
subscriptions.insert(
PubSubSubscriptionKind::Sharded,
sharded.into_iter().collect::<HashSet<_>>(),
);
}

subscriptions
}

/// Convert connection configuration to a corresponding object.
///
/// # Safety
Expand Down Expand Up @@ -147,9 +226,18 @@ pub(crate) unsafe fn create_connection_request(
} else {
None
},
pubsub_subscriptions: if config.has_pubsub_config {
let subscriptions = unsafe { convert_pubsub_config(&config.pubsub_config) };
if subscriptions.is_empty() {
None
} else {
Some(subscriptions)
}
} else {
None
},
// TODO below
periodic_checks: None,
pubsub_subscriptions: None,
inflight_requests_limit: None,
lazy_connect: false,
}
Expand Down Expand Up @@ -593,3 +681,29 @@ pub(crate) unsafe fn get_pipeline_options(
PipelineRetryStrategy::new(info.retry_server_error, info.retry_connection_error),
)
}

/// FFI callback function type for PubSub messages.
/// This callback is invoked by Rust when a PubSub message is received.
/// The callback signature matches the C# expectations for marshaling PubSub data.
///
/// # Parameters
/// * `push_kind` - The type of push notification (message, pmessage, smessage, etc.)
/// * `message_ptr` - Pointer to the raw message bytes
/// * `message_len` - Length of the message data in bytes
/// * `channel_ptr` - Pointer to the raw channel name bytes
/// * `channel_len` - Length of the channel name in bytes
/// * `pattern_ptr` - Pointer to the raw pattern bytes (null if no pattern)
/// * `pattern_len` - Length of the pattern in bytes (0 if no pattern)
pub type PubSubCallback = unsafe extern "C" fn(
push_kind: u32,
message_ptr: *const u8,
message_len: i64,
channel_ptr: *const u8,
channel_len: i64,
pattern_ptr: *const u8,
pattern_len: i64,
);

// PubSub callback functions removed - using instance-based callbacks instead.
// The pubsub_callback parameter in create_client will be used to configure glide-core's
// PubSub message handler when full integration is implemented.
Loading
Loading