Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 111 additions & 10 deletions Sources/Valkey/Cluster/ValkeyClusterClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -208,18 +208,9 @@ public final class ValkeyClusterClient: Sendable {
public func execute<each Command: ValkeyCommand>(
_ commands: repeat each Command
) async -> sending (repeat Result<(each Command).Response, any Error>) {
func convert<Response: RESPTokenDecodable>(_ result: Result<RESPToken, any Error>, to: Response.Type) -> Result<Response, any Error> {
result.flatMap {
do {
return try .success(Response(fromRESP: $0))
} catch {
return .failure(error)
}
}
}
let results = await self.execute([any ValkeyCommand](commands: repeat each commands))
var index = AutoIncrementingInteger()
return (repeat convert(results[index.next()], to: (each Command).Response.self))
return (repeat results[index.next()].convertFromRESP(to: (each Command).Response.self))
}

/// Results from pipeline and index for each result
Expand Down Expand Up @@ -303,6 +294,87 @@ public final class ValkeyClusterClient: Sendable {
}
}

/// Pipeline a series of commands as a transaction to Valkey connection
///
/// Another client will never be served in the middle of the execution of these
/// commands. See https://valkey.io/topics/transactions/ for more information.
///
/// EXEC and MULTI commands are added to the pipelined commands and the output
/// of the EXEC command is transformed into an array of RESPToken Results, one for
/// each command.
///
/// Transactions come only affect keys coming from the same HashSlot.
///
/// - Parameter commands: Parameter pack of ValkeyCommands
/// - Returns: Parameter pack holding the responses of all the commands
/// - Throws: ValkeyTransactionError when EXEC aborts
@inlinable
public func transaction<each Command: ValkeyCommand>(
_ commands: repeat each Command
) async throws -> sending (repeat Result<(each Command).Response, any Error>) {
let results = try await self.transaction([any ValkeyCommand](commands: repeat each commands))
var index = AutoIncrementingInteger()
return (repeat results[index.next()].convertFromRESP(to: (each Command).Response.self))
}

/// Pipeline a series of commands as a transaction to Valkey connection
///
/// Another client will never be served in the middle of the execution of these
/// commands. See https://valkey.io/topics/transactions/ for more information.
///
/// EXEC and MULTI commands are added to the pipelined commands and the output
/// of the EXEC command is transformed into an array of RESPToken Results, one for
/// each command.
///
/// Transactions come only affect keys coming from the same HashSlot.
///
/// This is an alternative version of the transaction function ``ValkeyClusterClient/transaction(_:)->(_,_)``
/// that allows for a collection of ValkeyCommands. It provides more flexibility but the command
/// responses are returned as ``RESPToken`` instead of the response type for the command.
///
/// - Parameter commands: Collection of ValkeyCommands
/// - Returns: Array holding the RESPToken responses of all the commands
/// - Throws: ValkeyTransactionError when EXEC aborts
@inlinable
public func transaction<Commands: Collection & Sendable>(
_ commands: Commands
) async throws -> [Result<RESPToken, Error>] where Commands.Element == any ValkeyCommand {
let hashSlot = try self.hashSlot(for: commands.flatMap { $0.keysAffected })

var clientSelector: () async throws -> ValkeyNodeClient = {
try await self.nodeClient(for: hashSlot.map { [$0] } ?? [])
}

var asking = false
var attempt = 0
while !Task.isCancelled {
do {
let client = try await clientSelector()
if asking {
return try await client.transactionWithAsk(commands)
} else {
return try await client.transaction(commands)
}
} catch let error as ValkeyClusterError where error == .noNodeToTalkTo {
// TODO: Rerun node discovery!
} catch {
let retryAction = self.getTransactionRetryAction(from: error)
switch retryAction {
case .redirect(let redirectError):
clientSelector = { try await self.nodeClient(for: redirectError) }
asking = (redirectError.redirection == .ask)
case .tryAgain:
let wait = self.clientConfiguration.retryParameters.calculateWaitTime(retry: attempt)
try await Task.sleep(for: wait)
attempt += 1
case .dontRetry:
throw error
}
}
}
throw ValkeyClusterError.clientRequestCancelled
}

struct Redirection {
let node: ValkeyNodeClient
let ask: Bool
Expand Down Expand Up @@ -565,6 +637,35 @@ public final class ValkeyClusterClient: Sendable {
}
}

@usableFromInline
/* private */ func getTransactionRetryAction(from error: some Error) -> RetryAction {
switch error {
case let transactionError as ValkeyTransactionError:
switch transactionError {
case .transactionErrors(let results, let execError):
// check whether queued results include any errors that warrent a retry
for result in results {
if case .failure(let queuedError) = result {
let queuedAction = self.getRetryAction(from: queuedError)
guard case .dontRetry = queuedAction else {
return queuedAction
}
}
}
// check whether EXEC error warrents a retry
let execAction = self.getRetryAction(from: execError)
guard case .dontRetry = execAction else {
return execAction
}
return .dontRetry
case .transactionAborted:
return .dontRetry
}
default:
return .dontRetry
}
}

private func queueAction(_ action: RunAction) {
self.actionStreamContinuation.yield(action)
}
Expand Down
118 changes: 83 additions & 35 deletions Sources/Valkey/Connection/ValkeyConnection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -386,41 +386,9 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
return try await _execute(
buffer: encoder.buffer,
promises: promises,
valkeyPromises: promises.map { .nio($0) }
) { promises -> Result<[Result<RESPToken, Error>], any Error> in
let responses: EXEC.Response
do {
let execFutureResult = promises.last!.futureResult
responses = try await execFutureResult.get().decode(as: EXEC.Response.self)
} catch let error as ValkeyClientError where error.errorCode == .commandError {
// we received an error while running the EXEC command. Extract queuing
// results and throw error
var results: [Result<RESPToken, Error>] = .init()
results.reserveCapacity(promises.count - 2)
for promise in promises[1..<(promises.count - 1)] {
results.append(await promise.futureResult._result())
}
return .failure(ValkeyTransactionError.transactionErrors(queuedResults: results, execError: error))
} catch {
return .failure(error)
}
// If EXEC returned nil then transaction was aborted because a
// WATCHed variable changed
guard let responses else {
return .failure(ValkeyTransactionError.transactionAborted)
}
// We convert all the RESP errors in the response from EXEC to Result.failure
return .success(
responses.map {
switch $0.identifier {
case .simpleError, .bulkError:
.failure(ValkeyClientError(.commandError, message: $0.errorString.map { Swift.String(buffer: $0) }))
default:
.success($0)
}
}
)
}.get()
valkeyPromises: promises.map { .nio($0) },
processResults: self._processTransactionPromises
).get()
}

/// Pipeline a series of commands to Valkey connection and precede each command with an ASKING
Expand Down Expand Up @@ -468,6 +436,48 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
}
}

/// Pipeline a series of commands as a transaction preceded with an ASKING command
///
/// Once all the responses for the commands have been received the function returns
/// an array of RESPToken Results, one for each command.
///
/// This is an internal function used by the cluster client
///
/// - Parameter commands: Collection of ValkeyCommands
/// - Returns: Array holding the RESPToken responses of all the commands
@usableFromInline
func transactionWithAsk(
_ commands: some Collection<any ValkeyCommand>
) async throws -> [Result<RESPToken, any Error>] {
self.logger.trace("transaction asking", metadata: ["commands": .string(Self.concatenateCommandNames(commands))])
var promises: [EventLoopPromise<RESPToken>] = []
promises.reserveCapacity(commands.count)
var valkeyPromises: [ValkeyPromise<RESPToken>] = []
valkeyPromises.reserveCapacity(commands.count + 3)
var encoder = ValkeyCommandEncoder()
ASKING().encode(into: &encoder)
MULTI().encode(into: &encoder)
promises.append(channel.eventLoop.makePromise(of: RESPToken.self))
valkeyPromises.append(.forget)
valkeyPromises.append(.nio(promises.last!))

for command in commands {
command.encode(into: &encoder)
promises.append(channel.eventLoop.makePromise(of: RESPToken.self))
valkeyPromises.append(.nio(promises.last!))
}
EXEC().encode(into: &encoder)
promises.append(channel.eventLoop.makePromise(of: RESPToken.self))
valkeyPromises.append(.nio(promises.last!))

return try await _execute(
buffer: encoder.buffer,
promises: promises,
valkeyPromises: valkeyPromises,
processResults: self._processTransactionPromises
).get()
}

/// Execute stream of commands written into buffer
///
/// The function is provided with an array of EventLoopPromises for the responses of commands
Expand Down Expand Up @@ -498,6 +508,44 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
}
}

@usableFromInline
func _processTransactionPromises(
_ promises: [EventLoopPromise<RESPToken>]
) async -> sending Result<[Result<RESPToken, Error>], any Error> {
let responses: EXEC.Response
do {
let execFutureResult = promises.last!.futureResult
responses = try await execFutureResult.get().decode(as: EXEC.Response.self)
} catch let error as ValkeyClientError where error.errorCode == .commandError {
// we received an error while running the EXEC command. Extract queuing
// results and throw error
var results: [Result<RESPToken, Error>] = .init()
results.reserveCapacity(promises.count - 2)
for promise in promises[1..<(promises.count - 1)] {
results.append(await promise.futureResult._result())
}
return .failure(ValkeyTransactionError.transactionErrors(queuedResults: results, execError: error))
} catch {
return .failure(error)
}
// If EXEC returned nil then transaction was aborted because a
// WATCHed variable changed
guard let responses else {
return .failure(ValkeyTransactionError.transactionAborted)
}
// We convert all the RESP errors in the response from EXEC to Result.failure
return .success(
responses.map {
switch $0.identifier {
case .simpleError, .bulkError:
.failure(ValkeyClientError(.commandError, message: $0.errorString.map { Swift.String(buffer: $0) }))
default:
.success($0)
}
}
)
}

#if DistributedTracingSupport
@usableFromInline
func applyCommonAttributes(to attributes: inout SpanAttributes, commandName: String) {
Expand Down
11 changes: 11 additions & 0 deletions Sources/Valkey/Node/ValkeyNodeClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,17 @@ extension ValkeyNodeClient {
return .init(repeating: .failure(error), count: commands.count)
}
}

/// Internal command used by cluster client, that precedes each command with a ASKING
/// command
@usableFromInline
func transactionWithAsk<Commands: Collection & Sendable>(
_ commands: Commands
) async throws -> [Result<RESPToken, any Error>] where Commands.Element == any ValkeyCommand {
try await self.withConnection { connection in
try await connection.transactionWithAsk(commands)
}
}
}

/// Extension that makes ``ValkeyNode`` conform to ``ValkeyNodeConnectionPool``.
Expand Down
20 changes: 19 additions & 1 deletion Sources/Valkey/ValkeyClientProtocol.swift
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public protocol ValkeyClientProtocol: Sendable {
/// Once all the responses for the commands have been received the function returns
/// an array of RESPToken Results, one for each command.
///
/// This is an alternative version of the pipelining function ``ValkeyClient/execute(_:)->(_,_)``
/// This is an alternative version of the pipelining function ``ValkeyConnection/execute(_:)->(_,_)``
/// that allows for a collection of ValkeyCommands. It provides more flexibility but
/// is more expensive to run and the command responses are returned as ``RESPToken``
/// instead of the response type for the command.
Expand All @@ -28,6 +28,24 @@ public protocol ValkeyClientProtocol: Sendable {
/// - Returns: Array holding the RESPToken responses of all the commands
func execute(_ commands: [any ValkeyCommand]) async -> [Result<RESPToken, any Error>]

/// Pipeline a series of commands as a transaction to Valkey connection
///
/// Another client will never be served in the middle of the execution of these
/// commands. See https://valkey.io/topics/transactions/ for more information.
///
/// EXEC and MULTI commands are added to the pipelined commands and the output
/// of the EXEC command is transformed into an array of RESPToken Results, one for
/// each command.
///
/// This is an alternative version of the transaction function ``ValkeyConnection/transaction(_:)->(_,_)``
/// that allows for a collection of ValkeyCommands. It provides more flexibility but the command
/// responses are returned as ``RESPToken`` instead of the response type for the command.
///
/// - Parameter commands: Collection of ValkeyCommands
/// - Returns: Array holding the RESPToken responses of all the commands
/// - Throws: ValkeyTransactionError when EXEC aborts
func transaction(_ commands: [any ValkeyCommand]) async throws -> [Result<RESPToken, any Error>]

/// Execute subscribe command and run closure using related ``ValkeySubscription``
/// AsyncSequence
///
Expand Down
Loading
Loading