Implement enhanced HA routing, CN discovery, and open-stage auto reconnect#33
Conversation
There was a problem hiding this comment.
Pull request overview
This PR introduces enhanced high-availability routing for distributed GaussDB multi-host connections, adding cluster-aware seed selection, coordinator (CN) discovery/refresh, and bounded reconnect during Open()/OpenAsync().
Changes:
- Add HA routing primitives (
PriorityServers,AutoBalance) and CN discovery/refresh (RefreshCNIpListTime,UsingEip,DisasterToleranceCluster) on top of existing multi-host logic. - Add open-time bounded reconnect (
AutoReconnect,MaxReconnects) and remove transparent command replay semantics. - Add a real-cluster validation runner project (
GaussDB.RealClusterScenarios) plus README/PublicAPI updates.
Reviewed changes
Copilot reviewed 11 out of 12 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| test/GaussDB.RealClusterScenarios/Program.cs | Adds a real-cluster scenario runner to validate HA routing, discovery, and reconnect behaviors. |
| test/GaussDB.RealClusterScenarios/GaussDB.RealClusterScenarios.csproj | New executable test project referencing the driver. |
| src/GaussDB/README.md | Documents new distributed HA routing/discovery/reconnect connection string options. |
| src/GaussDB/PublicAPI.Unshipped.txt | Declares new public connection string builder properties for the HA feature set. |
| src/GaussDB/HaEndpoint.cs | Introduces a normalized host/port endpoint type for HA dedupe and keys. |
| src/GaussDB/GaussDBMultiHostDataSource.cs | Implements cluster planning, CN snapshot refresh, endpoint pool mapping, and new ordering behaviors. |
| src/GaussDB/GaussDBGlobalClusterStatusTracker.cs | Tracks preferred primary cluster per URL key across opens. |
| src/GaussDB/GaussDBCoordinatorListTracker.cs | Caches/throttles coordinator endpoint snapshots per cluster key. |
| src/GaussDB/GaussDBConnectionStringBuilder.cs | Adds/validates new HA routing and reconnect connection string properties. |
| src/GaussDB/GaussDBConnection.cs | Implements bounded reconnect retries during open for eligible failure types. |
| src/GaussDB/GaussDBCommand.cs | Refactors reader execution entry into a core method (prep for updated reconnect semantics). |
| GaussDB.slnx | Includes the new real-cluster scenarios project in the solution. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| static readonly ConcurrentDictionary<string, Entry> Entries = new(); | ||
|
|
||
| internal static ValueTask<HaEndpoint[]?> GetSnapshotAsync( | ||
| string clusterKey, | ||
| TimeSpan refreshInterval, | ||
| Func<CancellationToken, ValueTask<HaEndpoint[]?>> refreshFactory, | ||
| bool async, | ||
| CancellationToken cancellationToken) | ||
| { | ||
| var entry = Entries.GetOrAdd(clusterKey, static _ => new Entry()); | ||
| if (TryGetFreshSnapshot(entry, refreshInterval, out var snapshot)) | ||
| return new(snapshot); | ||
|
|
||
| return async | ||
| ? RefreshAsync(entry, refreshInterval, refreshFactory, cancellationToken) | ||
| : new(RefreshAsync(entry, refreshInterval, refreshFactory, cancellationToken).GetAwaiter().GetResult()); | ||
| } | ||
|
|
||
| internal static void SeedSnapshotForTesting(string clusterKey, params HaEndpoint[] endpoints) | ||
| { | ||
| var entry = Entries.GetOrAdd(clusterKey, static _ => new Entry()); | ||
| entry.Snapshot = endpoints.Length == 0 ? null : endpoints; | ||
| entry.LastAttemptTicks = DateTime.UtcNow.Ticks; | ||
| } | ||
|
|
||
| internal static void Reset() | ||
| => Entries.Clear(); | ||
|
|
There was a problem hiding this comment.
Entries is a static dictionary keyed by clusterKey and never evicts entries (and SemaphoreSlim instances) unless Reset() is called. In applications that create many distinct host lists over time, this can cause unbounded memory growth.
Suggestion: add an eviction policy (TTL/size cap) or associate snapshot caching with the lifetime of a GaussDBMultiHostDataSource instance rather than global static state.
There was a problem hiding this comment.
Addressed. The coordinator snapshot cache is now bounded and trimmed instead of relying on Reset() for cleanup.
| ?? GetValue(values, "mode", "REAL_GAUSS_SCENARIO_MODE", "matrix"); | ||
| var targets = GetValue(values, "targets", "REAL_GAUSS_TARGETS", | ||
| "60.204.173.73:8000,113.44.50.25:8000,124.70.197.117:8000") | ||
| .Split(',', StringSplitOptions.TrimEntries | StringSplitOptions.RemoveEmptyEntries); | ||
| var baseExtra = GetValue(values, "extra", "REAL_GAUSS_EXTRA", | ||
| "Database=postgres;Username=root;Password=Gauss_234net,;Timeout=5;Command Timeout=30;SSL Mode=Disable;Pooling=false;Multiplexing=false;UsingEip=true"); | ||
| var failDelayMs = int.Parse(GetValue(values, "fail-delay-ms", "REAL_GAUSS_FAIL_DELAY_MS", "1000")); |
There was a problem hiding this comment.
Options.Parse hard-codes real-looking public IP addresses and a plaintext password as the default values for REAL_GAUSS_TARGETS / REAL_GAUSS_EXTRA. This is a credential/endpoint leak risk in the repo history and also makes accidental execution connect to real infrastructure.
Recommendation: remove secrets and real endpoints from defaults (use placeholders like 127.0.0.1:8000 and omit Password=), and fail fast with a clear error if the env vars/CLI args aren’t provided.
There was a problem hiding this comment.
Fixed. REAL_GAUSS_TARGETS and REAL_GAUSS_EXTRA are now required inputs. The runner no longer contains real endpoints or credentials as defaults.
| async ValueTask<ClusterRoutingPlan[]> BuildClusterRoutingPlansAsync(GaussDBConnection conn, CancellationToken cancellationToken) | ||
| { | ||
| // 先确定簇顺序,再在每个簇内决定 CN 顺序;这样 PriorityServers 和 AutoBalance 的职责是分层的。 | ||
| var orderedClusters = OrderClusters(); |
There was a problem hiding this comment.
orderedClusters is assigned but never used; OrderClusters() is called twice and only clusterOrder is used. This creates a compiler warning and adds noise.
Suggestion: remove the unused local and keep a single call to OrderClusters().
| var orderedClusters = OrderClusters(); |
There was a problem hiding this comment.
Fixed. The unused local was removed and only a single OrderClusters() call remains.
| var pool = GetOrAddEndpointPool(endpoint); | ||
| try | ||
| { | ||
| var connection = await pool.OpenConnectionAsync(cancellationToken).ConfigureAwait(false); | ||
| await using (connection.ConfigureAwait(false)) | ||
| { | ||
| var refreshSql = await ResolveCoordinatorRefreshSqlAsync(connection, cancellationToken).ConfigureAwait(false); | ||
| using var command = connection.CreateCommand(); | ||
| command.CommandText = refreshSql; | ||
|
|
||
| var reader = await command.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false); | ||
| await using (reader.ConfigureAwait(false)) | ||
| { | ||
| var refreshedEndpoints = new List<HaEndpoint>(); | ||
| while (await reader.ReadAsync(cancellationToken).ConfigureAwait(false)) | ||
| refreshedEndpoints.Add(new(reader.GetString(0), reader.GetInt32(1))); | ||
|
|
||
| return refreshedEndpoints.Count == 0 ? null : refreshedEndpoints.ToArray(); | ||
| } | ||
| } | ||
| } | ||
| catch | ||
| { | ||
| // Fall back to static seed routing if discovery is unavailable from this endpoint. | ||
| } |
There was a problem hiding this comment.
RefreshCoordinatorEndpoints swallows all exceptions when probing a seed endpoint. This will also swallow OperationCanceledException, causing Open/OpenAsync cancellation to be ignored and the loop to keep trying other endpoints.
Suggestion: add a catch (OperationCanceledException) { throw; } (or when (cancellationToken.IsCancellationRequested)), and only swallow non-cancellation failures.
There was a problem hiding this comment.
Fixed. RefreshCoordinatorEndpoints now rethrows OperationCanceledException and only swallows non-cancellation failures when probing seed endpoints.
| entry.LastAttemptTicks = DateTime.UtcNow.Ticks; | ||
|
|
||
| try | ||
| { | ||
| snapshot = await refreshFactory(cancellationToken).ConfigureAwait(false); | ||
| entry.Snapshot = snapshot is { Length: > 0 } ? snapshot : null; | ||
| } | ||
| catch | ||
| { | ||
| entry.Snapshot = null; | ||
| } |
There was a problem hiding this comment.
GaussDBCoordinatorListTracker.RefreshAsync catches all exceptions from refreshFactory and converts them into Snapshot = null. That includes OperationCanceledException, which means caller cancellation may be silently treated as “refresh failed” and HA routing will fall back instead of propagating cancellation.
Suggestion: rethrow OperationCanceledException and only swallow non-cancellation failures.
There was a problem hiding this comment.
Fixed. RefreshAsync now rethrows OperationCanceledException instead of treating cancellation as a refresh failure, so caller cancellation is preserved.
| int GetRoundRobinIndex() | ||
| { | ||
| var counter = RoundRobinCounters.GetOrAdd(_urlKey, static _ => new RoundRobinCounter()); | ||
| while (true) | ||
| { | ||
| var index = Interlocked.Increment(ref _roundRobinIndex); | ||
| var index = Interlocked.Increment(ref counter.Value); | ||
| if (index >= 0) | ||
| return index % _pools.Length; | ||
| return index; | ||
|
|
||
| // Worst case scenario - we've wrapped around integer counter | ||
| if (index == int.MinValue) | ||
| { | ||
| // This is the thread which wrapped around the counter - reset it to 0 | ||
| _roundRobinIndex = 0; | ||
| counter.Value = 0; | ||
| return 0; | ||
| } | ||
|
|
||
| // This is not the thread which wrapped around the counter - just wait until it's 0 or more | ||
| var sw = new SpinWait(); | ||
| while (_roundRobinIndex < 0) | ||
| while (counter.Value < 0) | ||
| sw.SpinOnce(); |
There was a problem hiding this comment.
RoundRobinCounter.Value is a plain int (not volatile), but it’s read in a spin loop (while (counter.Value < 0)) and written directly on wraparound (counter.Value = 0). Without Volatile.Read/Write (or Interlocked.Exchange), this can theoretically spin longer than necessary or miss the reset on some architectures/JIT optimizations.
Suggestion: use Volatile.Read(ref counter.Value) in the spin loop and Volatile.Write/Interlocked.Exchange for the reset assignment, or make Value a volatile int field (matching the previous volatile int _roundRobinIndex).
There was a problem hiding this comment.
Fixed. The round-robin counter now uses Volatile.Read / Volatile.Write on the wraparound path to avoid stale reads under concurrency.
| // PriorityServers 负责把 seed host 列表切成“优先簇 / 兜底簇”,因此必须严格小于总 host 数。 | ||
| if (IsConfigured(nameof(PriorityServers)) && (PriorityServers <= 0 || PriorityServers >= seedHostCount)) | ||
| throw new ArgumentException("PriorityServers must be greater than 0 and smaller than the number of seed hosts."); |
There was a problem hiding this comment.
PostProcessAndValidate throws if PriorityServers is configured as 0 (PriorityServers <= 0), even though 0 is the documented default/disabled value. This makes a connection string like PriorityServers=0 (or code that sets PriorityServers = 0) unexpectedly invalid.
Suggestion: treat 0 as “disabled” consistently (i.e., allow it), and only validate PriorityServers when it’s > 0 (must still be < seedHostCount).
| // PriorityServers 负责把 seed host 列表切成“优先簇 / 兜底簇”,因此必须严格小于总 host 数。 | |
| if (IsConfigured(nameof(PriorityServers)) && (PriorityServers <= 0 || PriorityServers >= seedHostCount)) | |
| throw new ArgumentException("PriorityServers must be greater than 0 and smaller than the number of seed hosts."); | |
| // PriorityServers 负责把 seed host 列表切成“优先簇 / 兜底簇”;0 表示禁用,启用时必须严格小于总 host 数。 | |
| if (IsConfigured(nameof(PriorityServers)) && PriorityServers > 0 && PriorityServers >= seedHostCount) | |
| throw new ArgumentException("PriorityServers must be 0 (disabled) or a positive value smaller than the number of seed hosts."); |
There was a problem hiding this comment.
Fixed. PriorityServers=0 is now treated as the disabled/default value, and validation only applies when PriorityServers > 0.
| using System.Collections.Concurrent; | ||
|
|
||
| namespace HuaweiCloud.GaussDB; | ||
|
|
||
| static class GaussDBGlobalClusterStatusTracker | ||
| { | ||
| // 记录同一组 URL 最近一次成功命中的主簇,用于下次优先尝试该 AZ。 | ||
| static readonly ConcurrentDictionary<string, string> PreferredClusterKeys = new(); | ||
|
|
||
| // 返回当前这组 URL 最近一次确认可用的主簇 key。 | ||
| internal static string? GetPreferredClusterKey(string urlKey) | ||
| => PreferredClusterKeys.TryGetValue(urlKey, out var clusterKey) | ||
| ? clusterKey | ||
| : null; | ||
|
|
||
| // 一旦某次建连命中了主簇,就更新记忆,后续优先回到该簇。 | ||
| internal static void ReportPrimary(string urlKey, string clusterKey) | ||
| => PreferredClusterKeys[urlKey] = clusterKey; | ||
|
|
There was a problem hiding this comment.
PreferredClusterKeys is a process-wide static dictionary keyed by urlKey and has no eviction strategy. In long-running apps that create many distinct connection strings/data sources, this can grow without bound.
Suggestion: consider bounding/evicting entries (e.g., size cap + LRU/TTL), or tie the preferred-cluster state to the lifecycle of the associated data source instead of a global static.
| using System.Collections.Concurrent; | |
| namespace HuaweiCloud.GaussDB; | |
| static class GaussDBGlobalClusterStatusTracker | |
| { | |
| // 记录同一组 URL 最近一次成功命中的主簇,用于下次优先尝试该 AZ。 | |
| static readonly ConcurrentDictionary<string, string> PreferredClusterKeys = new(); | |
| // 返回当前这组 URL 最近一次确认可用的主簇 key。 | |
| internal static string? GetPreferredClusterKey(string urlKey) | |
| => PreferredClusterKeys.TryGetValue(urlKey, out var clusterKey) | |
| ? clusterKey | |
| : null; | |
| // 一旦某次建连命中了主簇,就更新记忆,后续优先回到该簇。 | |
| internal static void ReportPrimary(string urlKey, string clusterKey) | |
| => PreferredClusterKeys[urlKey] = clusterKey; | |
| using System; | |
| using System.Collections.Concurrent; | |
| using System.Linq; | |
| namespace HuaweiCloud.GaussDB; | |
| static class GaussDBGlobalClusterStatusTracker | |
| { | |
| const int MaxPreferredClusterKeys = 1024; | |
| const int TrimPreferredClusterKeysTo = 768; | |
| static readonly object TrimLock = new(); | |
| sealed class PreferredClusterEntry | |
| { | |
| internal PreferredClusterEntry(string clusterKey, long lastAccessTicks) | |
| { | |
| ClusterKey = clusterKey; | |
| LastAccessTicks = lastAccessTicks; | |
| } | |
| internal string ClusterKey; | |
| internal long LastAccessTicks; | |
| } | |
| // 记录同一组 URL 最近一次成功命中的主簇,用于下次优先尝试该 AZ。 | |
| static readonly ConcurrentDictionary<string, PreferredClusterEntry> PreferredClusterKeys = new(); | |
| // 返回当前这组 URL 最近一次确认可用的主簇 key。 | |
| internal static string? GetPreferredClusterKey(string urlKey) | |
| { | |
| if (!PreferredClusterKeys.TryGetValue(urlKey, out var entry)) | |
| return null; | |
| entry.LastAccessTicks = DateTime.UtcNow.Ticks; | |
| return entry.ClusterKey; | |
| } | |
| // 一旦某次建连命中了主簇,就更新记忆,后续优先回到该簇。 | |
| internal static void ReportPrimary(string urlKey, string clusterKey) | |
| { | |
| PreferredClusterKeys[urlKey] = new PreferredClusterEntry(clusterKey, DateTime.UtcNow.Ticks); | |
| if (PreferredClusterKeys.Count > MaxPreferredClusterKeys) | |
| TrimExcessEntries(); | |
| } | |
| static void TrimExcessEntries() | |
| { | |
| lock (TrimLock) | |
| { | |
| var currentCount = PreferredClusterKeys.Count; | |
| if (currentCount <= MaxPreferredClusterKeys) | |
| return; | |
| var entriesToRemove = PreferredClusterKeys | |
| .OrderBy(pair => pair.Value.LastAccessTicks) | |
| .Take(currentCount - TrimPreferredClusterKeysTo) | |
| .Select(pair => pair.Key) | |
| .ToArray(); | |
| foreach (var key in entriesToRemove) | |
| PreferredClusterKeys.TryRemove(key, out _); | |
| } | |
| } |
There was a problem hiding this comment.
Addressed. The preferred-cluster cache is now bounded and trimmed, so it no longer grows without limit in long-running processes.
5a18994 to
bd0c465
Compare
Summary
This PR adds enhanced high-availability routing for GaussDB multi-host connections, including:
AutoBalanceOpen()/OpenAsync()Main changes
1. Enhanced HA routing
Added cluster-aware routing on top of the existing multi-host path:
PriorityServersAutoBalancefalseshuffleroundrobintruebalancepriorityNshuffleNspecifiedleastconnLoadBalanceHostsAutoBalanceis not explicitly set2. Dynamic CN discovery
Added metadata-driven coordinator refresh:
RefreshCNIpListTimeUsingEipnode_host1/node_port1ornode_host/node_portDisasterToleranceClusterpgxc_nodepgxc_disaster_read_node(), whendisaster_cluster_run_mode() = 1Also added:
3. Reconnect behavior
Added bounded reconnect attempts for connection opening:
AutoReconnectMaxReconnectsBehavioral change:
Open()/OpenAsync()phaseAdminShutdownare returned to the caller instead of being replayed automatically
4. Tests and docs
Added/updated:
test/GaussDB.RealClusterScenariosValidation
Real-cluster matrix was executed against the distributed GaussDB test environment.
Current result:
36/36 PASSExample command:
dotnet .\test\GaussDB.RealClusterScenarios\bin\Debug\net8.0\GaussDB.RealClusterScenarios.dll matrixNotes
This PR focuses on:
It does not re-enable transparent command replay.