diff --git a/Sources/Valkey/Cluster/ValkeyClusterClient.swift b/Sources/Valkey/Cluster/ValkeyClusterClient.swift index 440b48e4..e5876364 100644 --- a/Sources/Valkey/Cluster/ValkeyClusterClient.swift +++ b/Sources/Valkey/Cluster/ValkeyClusterClient.swift @@ -208,18 +208,9 @@ public final class ValkeyClusterClient: Sendable { public func execute( _ commands: repeat each Command ) async -> sending (repeat Result<(each Command).Response, any Error>) { - func convert(_ result: Result, to: Response.Type) -> Result { - result.flatMap { - do { - return try .success(Response(fromRESP: $0)) - } catch { - return .failure(error) - } - } - } let results = await self.execute([any ValkeyCommand](commands: repeat each commands)) var index = AutoIncrementingInteger() - return (repeat convert(results[index.next()], to: (each Command).Response.self)) + return (repeat results[index.next()].convertFromRESP(to: (each Command).Response.self)) } /// Results from pipeline and index for each result @@ -303,6 +294,87 @@ public final class ValkeyClusterClient: Sendable { } } + /// Pipeline a series of commands as a transaction to Valkey connection + /// + /// Another client will never be served in the middle of the execution of these + /// commands. See https://valkey.io/topics/transactions/ for more information. + /// + /// EXEC and MULTI commands are added to the pipelined commands and the output + /// of the EXEC command is transformed into an array of RESPToken Results, one for + /// each command. + /// + /// Transactions come only affect keys coming from the same HashSlot. + /// + /// - Parameter commands: Parameter pack of ValkeyCommands + /// - Returns: Parameter pack holding the responses of all the commands + /// - Throws: ValkeyTransactionError when EXEC aborts + @inlinable + public func transaction( + _ commands: repeat each Command + ) async throws -> sending (repeat Result<(each Command).Response, any Error>) { + let results = try await self.transaction([any ValkeyCommand](commands: repeat each commands)) + var index = AutoIncrementingInteger() + return (repeat results[index.next()].convertFromRESP(to: (each Command).Response.self)) + } + + /// Pipeline a series of commands as a transaction to Valkey connection + /// + /// Another client will never be served in the middle of the execution of these + /// commands. See https://valkey.io/topics/transactions/ for more information. + /// + /// EXEC and MULTI commands are added to the pipelined commands and the output + /// of the EXEC command is transformed into an array of RESPToken Results, one for + /// each command. + /// + /// Transactions come only affect keys coming from the same HashSlot. + /// + /// This is an alternative version of the transaction function ``ValkeyClusterClient/transaction(_:)->(_,_)`` + /// that allows for a collection of ValkeyCommands. It provides more flexibility but the command + /// responses are returned as ``RESPToken`` instead of the response type for the command. + /// + /// - Parameter commands: Collection of ValkeyCommands + /// - Returns: Array holding the RESPToken responses of all the commands + /// - Throws: ValkeyTransactionError when EXEC aborts + @inlinable + public func transaction( + _ commands: Commands + ) async throws -> [Result] where Commands.Element == any ValkeyCommand { + let hashSlot = try self.hashSlot(for: commands.flatMap { $0.keysAffected }) + + var clientSelector: () async throws -> ValkeyNodeClient = { + try await self.nodeClient(for: hashSlot.map { [$0] } ?? []) + } + + var asking = false + var attempt = 0 + while !Task.isCancelled { + do { + let client = try await clientSelector() + if asking { + return try await client.transactionWithAsk(commands) + } else { + return try await client.transaction(commands) + } + } catch let error as ValkeyClusterError where error == .noNodeToTalkTo { + // TODO: Rerun node discovery! + } catch { + let retryAction = self.getTransactionRetryAction(from: error) + switch retryAction { + case .redirect(let redirectError): + clientSelector = { try await self.nodeClient(for: redirectError) } + asking = (redirectError.redirection == .ask) + case .tryAgain: + let wait = self.clientConfiguration.retryParameters.calculateWaitTime(retry: attempt) + try await Task.sleep(for: wait) + attempt += 1 + case .dontRetry: + throw error + } + } + } + throw ValkeyClusterError.clientRequestCancelled + } + struct Redirection { let node: ValkeyNodeClient let ask: Bool @@ -565,6 +637,35 @@ public final class ValkeyClusterClient: Sendable { } } + @usableFromInline + /* private */ func getTransactionRetryAction(from error: some Error) -> RetryAction { + switch error { + case let transactionError as ValkeyTransactionError: + switch transactionError { + case .transactionErrors(let results, let execError): + // check whether queued results include any errors that warrent a retry + for result in results { + if case .failure(let queuedError) = result { + let queuedAction = self.getRetryAction(from: queuedError) + guard case .dontRetry = queuedAction else { + return queuedAction + } + } + } + // check whether EXEC error warrents a retry + let execAction = self.getRetryAction(from: execError) + guard case .dontRetry = execAction else { + return execAction + } + return .dontRetry + case .transactionAborted: + return .dontRetry + } + default: + return .dontRetry + } + } + private func queueAction(_ action: RunAction) { self.actionStreamContinuation.yield(action) } diff --git a/Sources/Valkey/Connection/ValkeyConnection.swift b/Sources/Valkey/Connection/ValkeyConnection.swift index 5f9d31ef..8865e18d 100644 --- a/Sources/Valkey/Connection/ValkeyConnection.swift +++ b/Sources/Valkey/Connection/ValkeyConnection.swift @@ -386,41 +386,9 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable { return try await _execute( buffer: encoder.buffer, promises: promises, - valkeyPromises: promises.map { .nio($0) } - ) { promises -> Result<[Result], any Error> in - let responses: EXEC.Response - do { - let execFutureResult = promises.last!.futureResult - responses = try await execFutureResult.get().decode(as: EXEC.Response.self) - } catch let error as ValkeyClientError where error.errorCode == .commandError { - // we received an error while running the EXEC command. Extract queuing - // results and throw error - var results: [Result] = .init() - results.reserveCapacity(promises.count - 2) - for promise in promises[1..<(promises.count - 1)] { - results.append(await promise.futureResult._result()) - } - return .failure(ValkeyTransactionError.transactionErrors(queuedResults: results, execError: error)) - } catch { - return .failure(error) - } - // If EXEC returned nil then transaction was aborted because a - // WATCHed variable changed - guard let responses else { - return .failure(ValkeyTransactionError.transactionAborted) - } - // We convert all the RESP errors in the response from EXEC to Result.failure - return .success( - responses.map { - switch $0.identifier { - case .simpleError, .bulkError: - .failure(ValkeyClientError(.commandError, message: $0.errorString.map { Swift.String(buffer: $0) })) - default: - .success($0) - } - } - ) - }.get() + valkeyPromises: promises.map { .nio($0) }, + processResults: self._processTransactionPromises + ).get() } /// Pipeline a series of commands to Valkey connection and precede each command with an ASKING @@ -468,6 +436,48 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable { } } + /// Pipeline a series of commands as a transaction preceded with an ASKING command + /// + /// Once all the responses for the commands have been received the function returns + /// an array of RESPToken Results, one for each command. + /// + /// This is an internal function used by the cluster client + /// + /// - Parameter commands: Collection of ValkeyCommands + /// - Returns: Array holding the RESPToken responses of all the commands + @usableFromInline + func transactionWithAsk( + _ commands: some Collection + ) async throws -> [Result] { + self.logger.trace("transaction asking", metadata: ["commands": .string(Self.concatenateCommandNames(commands))]) + var promises: [EventLoopPromise] = [] + promises.reserveCapacity(commands.count) + var valkeyPromises: [ValkeyPromise] = [] + valkeyPromises.reserveCapacity(commands.count + 3) + var encoder = ValkeyCommandEncoder() + ASKING().encode(into: &encoder) + MULTI().encode(into: &encoder) + promises.append(channel.eventLoop.makePromise(of: RESPToken.self)) + valkeyPromises.append(.forget) + valkeyPromises.append(.nio(promises.last!)) + + for command in commands { + command.encode(into: &encoder) + promises.append(channel.eventLoop.makePromise(of: RESPToken.self)) + valkeyPromises.append(.nio(promises.last!)) + } + EXEC().encode(into: &encoder) + promises.append(channel.eventLoop.makePromise(of: RESPToken.self)) + valkeyPromises.append(.nio(promises.last!)) + + return try await _execute( + buffer: encoder.buffer, + promises: promises, + valkeyPromises: valkeyPromises, + processResults: self._processTransactionPromises + ).get() + } + /// Execute stream of commands written into buffer /// /// The function is provided with an array of EventLoopPromises for the responses of commands @@ -498,6 +508,44 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable { } } + @usableFromInline + func _processTransactionPromises( + _ promises: [EventLoopPromise] + ) async -> sending Result<[Result], any Error> { + let responses: EXEC.Response + do { + let execFutureResult = promises.last!.futureResult + responses = try await execFutureResult.get().decode(as: EXEC.Response.self) + } catch let error as ValkeyClientError where error.errorCode == .commandError { + // we received an error while running the EXEC command. Extract queuing + // results and throw error + var results: [Result] = .init() + results.reserveCapacity(promises.count - 2) + for promise in promises[1..<(promises.count - 1)] { + results.append(await promise.futureResult._result()) + } + return .failure(ValkeyTransactionError.transactionErrors(queuedResults: results, execError: error)) + } catch { + return .failure(error) + } + // If EXEC returned nil then transaction was aborted because a + // WATCHed variable changed + guard let responses else { + return .failure(ValkeyTransactionError.transactionAborted) + } + // We convert all the RESP errors in the response from EXEC to Result.failure + return .success( + responses.map { + switch $0.identifier { + case .simpleError, .bulkError: + .failure(ValkeyClientError(.commandError, message: $0.errorString.map { Swift.String(buffer: $0) })) + default: + .success($0) + } + } + ) + } + #if DistributedTracingSupport @usableFromInline func applyCommonAttributes(to attributes: inout SpanAttributes, commandName: String) { diff --git a/Sources/Valkey/Node/ValkeyNodeClient.swift b/Sources/Valkey/Node/ValkeyNodeClient.swift index 6786f5d5..6051a0da 100644 --- a/Sources/Valkey/Node/ValkeyNodeClient.swift +++ b/Sources/Valkey/Node/ValkeyNodeClient.swift @@ -272,6 +272,17 @@ extension ValkeyNodeClient { return .init(repeating: .failure(error), count: commands.count) } } + + /// Internal command used by cluster client, that precedes each command with a ASKING + /// command + @usableFromInline + func transactionWithAsk( + _ commands: Commands + ) async throws -> [Result] where Commands.Element == any ValkeyCommand { + try await self.withConnection { connection in + try await connection.transactionWithAsk(commands) + } + } } /// Extension that makes ``ValkeyNode`` conform to ``ValkeyNodeConnectionPool``. diff --git a/Sources/Valkey/ValkeyClientProtocol.swift b/Sources/Valkey/ValkeyClientProtocol.swift index 1fb21c65..09e1587c 100644 --- a/Sources/Valkey/ValkeyClientProtocol.swift +++ b/Sources/Valkey/ValkeyClientProtocol.swift @@ -19,7 +19,7 @@ public protocol ValkeyClientProtocol: Sendable { /// Once all the responses for the commands have been received the function returns /// an array of RESPToken Results, one for each command. /// - /// This is an alternative version of the pipelining function ``ValkeyClient/execute(_:)->(_,_)`` + /// This is an alternative version of the pipelining function ``ValkeyConnection/execute(_:)->(_,_)`` /// that allows for a collection of ValkeyCommands. It provides more flexibility but /// is more expensive to run and the command responses are returned as ``RESPToken`` /// instead of the response type for the command. @@ -28,6 +28,24 @@ public protocol ValkeyClientProtocol: Sendable { /// - Returns: Array holding the RESPToken responses of all the commands func execute(_ commands: [any ValkeyCommand]) async -> [Result] + /// Pipeline a series of commands as a transaction to Valkey connection + /// + /// Another client will never be served in the middle of the execution of these + /// commands. See https://valkey.io/topics/transactions/ for more information. + /// + /// EXEC and MULTI commands are added to the pipelined commands and the output + /// of the EXEC command is transformed into an array of RESPToken Results, one for + /// each command. + /// + /// This is an alternative version of the transaction function ``ValkeyConnection/transaction(_:)->(_,_)`` + /// that allows for a collection of ValkeyCommands. It provides more flexibility but the command + /// responses are returned as ``RESPToken`` instead of the response type for the command. + /// + /// - Parameter commands: Collection of ValkeyCommands + /// - Returns: Array holding the RESPToken responses of all the commands + /// - Throws: ValkeyTransactionError when EXEC aborts + func transaction(_ commands: [any ValkeyCommand]) async throws -> [Result] + /// Execute subscribe command and run closure using related ``ValkeySubscription`` /// AsyncSequence /// diff --git a/Tests/ClusterIntegrationTests/ClusterIntegrationTests.swift b/Tests/ClusterIntegrationTests/ClusterIntegrationTests.swift index de6b188b..0adaf077 100644 --- a/Tests/ClusterIntegrationTests/ClusterIntegrationTests.swift +++ b/Tests/ClusterIntegrationTests/ClusterIntegrationTests.swift @@ -462,7 +462,7 @@ struct ClusterIntegrationTests { @Test @available(valkeySwift 1.0, *) - func testNodeFailoverWithPipeline() async throws { + func testNodePipelineWithFailover() async throws { var logger = Logger(label: "ValkeyCluster") logger.logLevel = .trace let firstNodeHostname = clusterFirstNodeHostname! @@ -502,7 +502,7 @@ struct ClusterIntegrationTests { @available(valkeySwift 1.0, *) @Test - func testNodeHashSlotMigrationAndAskRedirectionWithPipeline() async throws { + func testNodePipelineWithHashSlotMigrationAndAskRedirection() async throws { var logger = Logger(label: "ValkeyCluster") logger.logLevel = .trace let firstNodeHostname = clusterFirstNodeHostname! @@ -537,7 +537,7 @@ struct ClusterIntegrationTests { @Test @available(valkeySwift 1.0, *) - func testNodeHashSlotMigrationAndTryAgainWithPipeline() async throws { + func testNodePipelineWithHashSlotMigrationAndTryAgain() async throws { var logger = Logger(label: "ValkeyCluster") logger.logLevel = .trace let firstNodeHostname = clusterFirstNodeHostname! @@ -673,6 +673,131 @@ struct ClusterIntegrationTests { #expect(response2 == "3") } } + + @Test + @available(valkeySwift 1.0, *) + func testClientTransaction() async throws { + var logger = Logger(label: "ValkeyCluster") + logger.logLevel = .trace + let firstNodeHostname = clusterFirstNodeHostname! + let firstNodePort = clusterFirstNodePort ?? 6379 + try await ClusterIntegrationTests.withValkeyCluster([(host: firstNodeHostname, port: firstNodePort)], logger: logger) { + client in + try await ClusterIntegrationTests.withKey(connection: client, suffix: "{foo}") { key in + var commands: [any ValkeyCommand] = .init() + commands.append(SET(key, value: "cluster pipeline test")) + commands.append(GET(key)) + let results = try await client.transaction(commands) + let response = try results[1].get().decode(as: String.self) + #expect(response == "cluster pipeline test") + } + } + } + + @Test + @available(valkeySwift 1.0, *) + func testClientTransactionWithFailover() async throws { + var logger = Logger(label: "ValkeyCluster") + logger.logLevel = .trace + let firstNodeHostname = clusterFirstNodeHostname! + let firstNodePort = clusterFirstNodePort ?? 6379 + try await ClusterIntegrationTests.withValkeyCluster([(host: firstNodeHostname, port: firstNodePort)], logger: logger) { + clusterClient in + try await ClusterIntegrationTests.withKey(connection: clusterClient) { key in + try await clusterClient.set(key, value: "bar") + let cluster = try await clusterClient.clusterShards() + let shard = try #require( + cluster.shards.first { shard in + let hashSlot = HashSlot(key: key) + return shard.slots.reduce(into: false) { $0 = ($0 || ($1.lowerBound <= hashSlot && $1.upperBound >= hashSlot)) } + } + ) + let replica = try #require(shard.nodes.first { $0.role == .replica }) + let port = try #require(replica.port) + // connect to replica and call CLUSTER FAILOVER + try await ClusterIntegrationTests.withValkeyClient(.hostname(replica.endpoint, port: port), logger: logger) { client in + try await client.clusterFailover() + } + // will receive a MOVED errors for SET, INCR and GET as the primary has moved to a replica + var commands: [any ValkeyCommand] = .init() + commands.append(SET(key, value: "100")) + commands.append(INCR(key)) + commands.append(ECHO(message: "Test non moved command")) + commands.append(GET(key)) + let results = try await clusterClient.transaction(commands) + let response2 = try results[2].get().decode(as: String.self) + #expect(response2 == "Test non moved command") + let response3 = try results[3].get().decode(as: String.self) + #expect(response3 == "101") + } + } + } + + @available(valkeySwift 1.0, *) + @Test + func testClientTransactionWithHashSlotMigrationAndAskRedirection() async throws { + var logger = Logger(label: "ValkeyCluster") + logger.logLevel = .trace + let firstNodeHostname = clusterFirstNodeHostname! + let firstNodePort = clusterFirstNodePort ?? 6379 + try await ClusterIntegrationTests.withValkeyCluster([(host: firstNodeHostname, port: firstNodePort)], logger: logger) { + client in + let keySuffix = "{\(UUID().uuidString)}" + try await ClusterIntegrationTests.withKey(connection: client, suffix: keySuffix) { key in + let hashSlot = HashSlot(key: key) + try await client.set(key, value: "Testing before import") + + try await ClusterIntegrationTests.testMigratingHashSlot(hashSlot, client: client) { + // key still uses nodeA + let value = try await client.set(key, value: "Testing during import", get: true).map { String(buffer: $0) } + #expect(value == "Testing before import") + } afterMigrate: { + // key has been migrated to nodeB so will receive an ASK error + var commands: [any ValkeyCommand] = .init() + commands.append(SET(key, value: "After migrate", get: true)) + commands.append(GET(key)) + let results = try await client.transaction(commands) + #expect(try results[0].get().decode(as: String.self) == "Testing during import") + #expect(try results[1].get().decode(as: String.self) == "After migrate") + } finished: { + let value = try await client.set(key, value: "Testing after import", get: true).map { String(buffer: $0) } + #expect(value == "After migrate") + } + } + } + } + + @Test + @available(valkeySwift 1.0, *) + func testClientTransactionWithHashSlotMigrationAndTryAgain() async throws { + var logger = Logger(label: "ValkeyCluster") + logger.logLevel = .trace + let firstNodeHostname = clusterFirstNodeHostname! + let firstNodePort = clusterFirstNodePort ?? 6379 + try await ClusterIntegrationTests.withValkeyCluster([(host: firstNodeHostname, port: firstNodePort)], logger: logger) { + client in + let keySuffix = "{\(UUID().uuidString)}" + try await ClusterIntegrationTests.withKey(connection: client, suffix: keySuffix) { key in + try await ClusterIntegrationTests.withKey(connection: client, suffix: keySuffix) { key2 in + let hashSlot = HashSlot(key: key) + try await client.lpush(key, elements: ["testing1"]) + + try await ClusterIntegrationTests.testMigratingHashSlot(hashSlot, client: client) { + } duringMigrate: { + // LPUSH will succeed, as node is on + var commands: [any ValkeyCommand] = .init() + commands.append(LPUSH(key, elements: ["testing2"])) + commands.append(RPOPLPUSH(source: key, destination: key2)) + let results = try await client.transaction(commands) + let count = try results[0].get().decode(as: Int.self) + #expect(count == 2) + let value = try results[1].get().decode(as: String.self) + #expect(value == "testing1") + } + } + } + } + } } @available(valkeySwift 1.0, *)