Skip to content

Conversation

@atakavci
Copy link
Collaborator

@atakavci atakavci commented Jan 2, 2026

Add Asynchronous Connection API for Multi-Database Client

Overview

This PR introduces a new asynchronous connection API (connectAsync) for the MultiDbClient, enabling non-blocking connection establishment to multiple Redis databases with parallel health checking and intelligent database selection.

Motivation

The existing connect() method is synchronous and blocks until all database connections are established and health checks complete. This can cause significant delays in applications that need to establish connections to multiple Redis databases, especially in scenarios where:

  • Some databases may be slow to respond
  • Network latency varies across databases
  • Applications need to remain responsive during connection establishment
  • Microservices need fast startup times

Changes

1. New Asynchronous Connection API

Added MultiDbClient.connectAsync(RedisCodec<K, V> codec)

  • Returns MultiDbConnectionFuture<K, V> (extends BaseConnectionFuture)
  • Non-blocking connection establishment
  • Parallel connection attempts to all configured databases
  • Asynchronous health check waiting
  • Partial success support (succeeds if at least one database is healthy)
  • Deadlock prevention: All callbacks execute on a separate thread pool, not on Netty event loop threads

2. New MultiDbAsyncConnectionBuilder Class

Created a new package-private class that encapsulates all async connection orchestration logic:

Key Methods:

  • connectAsync() - Main entry point for async connection creation
  • createRedisDatabaseAsync() - Creates individual database connections asynchronously
  • handleDatabaseFutures() - Orchestrates collection and health checking
  • collectDatabasesWithEstablishedConnections() - Implements partial success pattern
  • collectHealthStatuses() - Waits asynchronously for all health statuses
  • filterHealthyDatabases() - Filters by health status AND connection state

Features:

  • ✅ Parallel connection establishment to all databases
  • ✅ Asynchronous health check waiting (no blocking)
  • ✅ Partial success pattern (at least one healthy database required)
  • ✅ Proper resource cleanup on failures
  • ✅ Weight-based database selection
  • ✅ Comprehensive error handling with suppressed exceptions

3. Enhanced StatusTracker with Async Support

Added waitForHealthStatusAsync(RedisURI endpoint)

  • Returns CompletableFuture<HealthStatus>
  • Event-driven approach using HealthStatusListener
  • Automatic timeout handling using ClientResources.eventExecutorGroup()
  • Proper listener cleanup with AtomicBoolean to prevent race conditions
  • Race condition protection with double-check pattern
  • Scheduler cleanup on future completion

Updated Constructor:

  • Now accepts ClientResources parameter for accessing the event executor
  • Updated instantiation in MultiDbClientImpl to pass ClientResources

4. Refactored MultiDbClientImpl

Changes:

  • Extracted multi-db connection creation to createMultiDbConnection() method
  • Updated StatusTracker instantiation to include ClientResources in both connect() and connectPubSub() methods
  • Implemented connectAsync() using MultiDbAsyncConnectionBuilder
  • Maintained backward compatibility with existing connect() method
  • Added comprehensive JavaDoc for the synchronous connect() method explaining blocking behavior

5. New BaseConnectionFuture Class

Created a new abstract base class for connection futures that provides protection against event loop deadlocks:

Purpose:

  • Ensures all callbacks (thenApply, thenAccept, etc.) execute on a separate thread pool
  • Prevents deadlocks when users call blocking sync operations inside callbacks
  • Implements both CompletionStage<T> and Future<T> interfaces

Key Features:

  • All non-async methods (thenApply, thenAccept, etc.) are forced to execute asynchronously
  • Uses ForkJoinPool.commonPool() as default executor
  • Allows custom executor via constructor
  • Abstract wrap() method for subclasses to wrap new futures

Example Problem Solved:

// DANGEROUS with plain CompletableFuture - can deadlock!
future.thenApply(conn -> conn.sync().ping());

// SAFE with BaseConnectionFuture - always runs on separate thread
future.thenApply(conn -> conn.sync().ping());

6. New MultiDbConnectionFuture Class

Created a specialized connection future for multi-database connections:

Extends: BaseConnectionFuture<StatefulRedisMultiDbConnection<K, V>>

Key Methods:

  • from(CompletableFuture<...> future) - Static factory method
  • from(CompletableFuture<...> future, Executor executor) - Static factory with custom executor
  • wrap(CompletableFuture<U> future) - Implementation of abstract method from base class

Features:

  • Type-safe wrapper for multi-db connection futures
  • Inherits all deadlock prevention from BaseConnectionFuture
  • Used as return type for MultiDbClient.connectAsync()

7. Minor Visibility Change

RedisDatabaseImpl.getConnection():

  • Changed visibility from public to package-private
  • Only used internally within the failover package

8. Comprehensive Test Coverage

Unit Tests (MultiDbAsyncConnectionBuilderUnitTests):

  • 13 unit tests covering all package-private methods
  • Uses Mockito for mocking (no Redis required)
  • Tests partial success, edge cases, and error scenarios
  • ✅ All tests passing

Integration Tests:

  • Enhanced existing integration test files with proper test tag constants
  • Tests real Redis connections
  • Covers partial failures, null codec, multiple connections, different codecs
  • Tests future composition and exception handling

9. Updated Test Tags

Standardized test tags across all test files to use TestTags.UNIT_TEST and TestTags.INTEGRATION_TEST constants instead of string literals:

  • CircuitBreakerMetricsIntegrationTests
  • DatabaseCommandTrackerUnitTests
  • DatabaseEndpointCallbackTests
  • DatabasePubSubEndpointTrackerTests
  • HealthCheckIntegrationTests
  • MultiDbAsyncConnectionBuilderUnitTests (new)

Technical Details

Connection Flow

connectAsync(codec)
    ↓
Create HealthStatusManager
    ↓
Create MultiDbAsyncConnectionBuilder
    ↓
For each database config:
    - createRedisDatabaseAsync() [parallel]
    ↓
Wait for all connections (CompletableFuture.allOf)
    ↓
collectDatabasesWithEstablishedConnections()
    - Collect successful connections
    - Log failures
    - Throw if ALL failed
    ↓
collectHealthStatuses() [async]
    - Wait for health status of each database
    - Use StatusTracker.waitForHealthStatusAsync()
    - Select most weighted healthy database
    ↓
filterHealthyDatabases()
    - Filter by health status AND isOpen()
    - Close excluded connections
    ↓
createMultiDbConnection()
    - Create StatefulRedisMultiDbConnectionImpl
    ↓
Wrap in MultiDbConnectionFuture
    - Provides deadlock prevention
    - Uses eventExecutorGroup as default executor
    ↓
Return MultiDbConnectionFuture

Partial Success Pattern

The implementation supports partial success:

  • If all databases fail to connect → throws RedisConnectionException with all failures as suppressed exceptions
  • If at least one database connects successfully → proceeds with health checks
  • If all databases are unhealthy → throws RedisConnectionException
  • If at least one database is healthy → succeeds with healthy databases only

Async Health Check Implementation

The StatusTracker.waitForHealthStatusAsync() method uses an event-driven approach:

  1. Checks if status is already determined (fast path)
  2. Registers a temporary HealthStatusListener for the endpoint
  3. Double-checks status after registration (race condition protection)
  4. Uses ClientResources.eventExecutorGroup() to schedule timeout
  5. Cleans up listener using AtomicBoolean to prevent double removal
  6. Ensures scheduler shutdown on timeout

Deadlock Prevention

The MultiDbConnectionFuture (via BaseConnectionFuture) solves a critical problem with async connection handling:

Problem: When using plain CompletableFuture, callbacks can execute on Netty event loop threads. If a callback calls a blocking sync operation (like conn.sync().ping()), it blocks the event loop thread, causing a deadlock.

Solution: BaseConnectionFuture forces all callbacks to execute on a separate thread pool (ForkJoinPool.commonPool() by default, or ClientResources.eventExecutorGroup() for MultiDbConnectionFuture). This ensures:

  • Event loop threads are never blocked by user code
  • Sync operations in callbacks are safe
  • No deadlocks even with blocking operations

Implementation Details:

  • All non-async methods (thenApply, thenAccept, etc.) delegate to their async variants
  • Custom executor can be provided for fine-grained control
  • The exceptionally() method is safe as-is (only runs on exception, doesn't block)

API Usage

// Asynchronous connection
MultiDbClient client = MultiDbClient.create(configs);
MultiDbConnectionFuture<String, String> future =
    client.connectAsync(StringCodec.UTF8);

// Non-blocking - can do other work here

// Wait for connection when needed
StatefulRedisMultiDbConnection<String, String> connection = future.get();

// SAFE: Callbacks run on separate thread pool, not event loop
future.thenApply(conn -> {
    // This is safe even though sync() blocks!
    return conn.sync().ping();
}).thenAccept(result -> {
    System.out.println("PING result: " + result);
}).exceptionally(ex -> {
    // Handle error
    ex.printStackTrace();
    return null;
});

// Can also convert to CompletableFuture if needed
CompletableFuture<StatefulRedisMultiDbConnection<String, String>> cf = future.toCompletableFuture();

Breaking Changes

None. This is a purely additive change that maintains full backward compatibility.

Documentation

  • Added comprehensive JavaDoc for all new methods and classes
  • Updated method documentation in MultiDbClient interface
  • Enhanced JavaDoc for synchronous connect() method to clarify blocking behavior
  • Documented deadlock prevention mechanism in BaseConnectionFuture and MultiDbConnectionFuture
  • Added code examples showing safe usage of sync operations in callbacks

Files Changed

New Files

  • src/main/java/io/lettuce/core/BaseConnectionFuture.java (311 lines)
  • src/main/java/io/lettuce/core/failover/MultiDbConnectionFuture.java (85 lines)
  • src/main/java/io/lettuce/core/failover/MultiDbAsyncConnectionBuilder.java (432 lines)
  • src/test/java/io/lettuce/core/failover/MultiDbAsyncConnectionBuilderUnitTests.java (392 lines)

Modified Files

  • src/main/java/io/lettuce/core/failover/MultiDbClient.java
  • src/main/java/io/lettuce/core/failover/MultiDbClientImpl.java
  • src/main/java/io/lettuce/core/failover/RedisDatabaseImpl.java
  • src/main/java/io/lettuce/core/failover/StatusTracker.java
  • src/test/java/io/lettuce/core/failover/CircuitBreakerMetricsIntegrationTests.java
  • src/test/java/io/lettuce/core/failover/DatabaseCommandTrackerUnitTests.java
  • src/test/java/io/lettuce/core/failover/DatabaseEndpointCallbackTests.java
  • src/test/java/io/lettuce/core/failover/DatabasePubSubEndpointTrackerTests.java
  • src/test/java/io/lettuce/core/failover/HealthCheckIntegrationTests.java

Testing

All tests pass successfully:

  • ✅ 13 new unit tests for MultiDbAsyncConnectionBuilder
  • ✅ Enhanced integration tests with standardized test tags
  • ✅ All existing tests continue to pass
  • ✅ No regressions

@atakavci atakavci requested review from ggivo and uglide January 2, 2026 15:01
@atakavci atakavci self-assigned this Jan 2, 2026
@atakavci atakavci added the type: feature A new feature label Jan 2, 2026
@atakavci atakavci marked this pull request as draft January 5, 2026 12:34
@atakavci atakavci marked this pull request as ready for review January 6, 2026 09:42
@atakavci atakavci force-pushed the failover/asyncConnect branch from 355db5b to c47d840 Compare January 6, 2026 12:03
@atakavci atakavci force-pushed the failover/asyncConnect branch from 68d5b36 to 78a1f99 Compare January 7, 2026 00:20
Copy link

@jit-ci jit-ci bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❌ The following Jit checks failed to run:

  • static-code-analysis-semgrep-pro

#jit_bypass_commit in this PR to bypass, Jit Admin privileges required.

More info in the Jit platform.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

type: feature A new feature

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant