Skip to content

Commit cf72fbb

Browse files
committed
Cluster client transaction support
Signed-off-by: Adam Fowler <[email protected]>
1 parent a789f2c commit cf72fbb

File tree

4 files changed

+305
-38
lines changed

4 files changed

+305
-38
lines changed

Sources/Valkey/Cluster/ValkeyClusterClient.swift

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,62 @@ public final class ValkeyClusterClient: Sendable {
303303
}
304304
}
305305

306+
/// Pipeline a series of commands as a transaction to Valkey connection
307+
///
308+
/// Another client will never be served in the middle of the execution of these
309+
/// commands. See https://valkey.io/topics/transactions/ for more information.
310+
///
311+
/// EXEC and MULTI commands are added to the pipelined commands and the output
312+
/// of the EXEC command is transformed into an array of RESPToken Results, one for
313+
/// each command.
314+
///
315+
/// This is an alternative version of the transaction function ``ValkeyConnection/transaction(_:)->(_,_)``
316+
/// that allows for a collection of ValkeyCommands. It provides more flexibility but the command
317+
/// responses are returned as ``RESPToken`` instead of the response type for the command.
318+
///
319+
/// - Parameter commands: Collection of ValkeyCommands
320+
/// - Returns: Array holding the RESPToken responses of all the commands
321+
/// - Throws: ValkeyTransactionError when EXEC aborts
322+
@inlinable
323+
public func transaction<Commands: Collection & Sendable>(
324+
_ commands: Commands
325+
) async throws -> [Result<RESPToken, Error>] where Commands.Element == any ValkeyCommand {
326+
let hashSlot = try self.hashSlot(for: commands.flatMap { $0.keysAffected })
327+
328+
var clientSelector: () async throws -> ValkeyNodeClient = {
329+
try await self.nodeClient(for: hashSlot.map { [$0] } ?? [])
330+
}
331+
332+
var asking = false
333+
var attempt = 0
334+
while !Task.isCancelled {
335+
do {
336+
let client = try await clientSelector()
337+
if asking {
338+
return try await client.transactionWithAsk(commands)
339+
} else {
340+
return try await client.transaction(commands)
341+
}
342+
} catch let error as ValkeyClusterError where error == .noNodeToTalkTo {
343+
// TODO: Rerun node discovery!
344+
} catch {
345+
let retryAction = self.getTransactionRetryAction(from: error)
346+
switch retryAction {
347+
case .redirect(let redirectError):
348+
clientSelector = { try await self.nodeClient(for: redirectError) }
349+
asking = (redirectError.redirection == .ask)
350+
case .tryAgain:
351+
let wait = self.clientConfiguration.retryParameters.calculateWaitTime(retry: attempt)
352+
try await Task.sleep(for: wait)
353+
attempt += 1
354+
case .dontRetry:
355+
throw error
356+
}
357+
}
358+
}
359+
throw ValkeyClusterError.clientRequestCancelled
360+
}
361+
306362
struct Redirection {
307363
let node: ValkeyNodeClient
308364
let ask: Bool
@@ -565,6 +621,33 @@ public final class ValkeyClusterClient: Sendable {
565621
}
566622
}
567623

624+
@usableFromInline
625+
/* private */ func getTransactionRetryAction(from error: some Error) -> RetryAction {
626+
switch error {
627+
case let transactionError as ValkeyTransactionError:
628+
switch transactionError {
629+
case .transactionErrors(let results, let execError):
630+
for result in results {
631+
if case .failure(let queuedError) = result {
632+
let queuedAction = self.getRetryAction(from: queuedError)
633+
guard case .dontRetry = queuedAction else {
634+
return queuedAction
635+
}
636+
}
637+
}
638+
let execAction = self.getRetryAction(from: execError)
639+
guard case .dontRetry = execAction else {
640+
return execAction
641+
}
642+
return .dontRetry
643+
case .transactionAborted:
644+
return .dontRetry
645+
}
646+
default:
647+
return .dontRetry
648+
}
649+
}
650+
568651
private func queueAction(_ action: RunAction) {
569652
self.actionStreamContinuation.yield(action)
570653
}

Sources/Valkey/Connection/ValkeyConnection.swift

Lines changed: 83 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -386,41 +386,9 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
386386
return try await _execute(
387387
buffer: encoder.buffer,
388388
promises: promises,
389-
valkeyPromises: promises.map { .nio($0) }
390-
) { promises -> Result<[Result<RESPToken, Error>], any Error> in
391-
let responses: EXEC.Response
392-
do {
393-
let execFutureResult = promises.last!.futureResult
394-
responses = try await execFutureResult.get().decode(as: EXEC.Response.self)
395-
} catch let error as ValkeyClientError where error.errorCode == .commandError {
396-
// we received an error while running the EXEC command. Extract queuing
397-
// results and throw error
398-
var results: [Result<RESPToken, Error>] = .init()
399-
results.reserveCapacity(promises.count - 2)
400-
for promise in promises[1..<(promises.count - 1)] {
401-
results.append(await promise.futureResult._result())
402-
}
403-
return .failure(ValkeyTransactionError.transactionErrors(queuedResults: results, execError: error))
404-
} catch {
405-
return .failure(error)
406-
}
407-
// If EXEC returned nil then transaction was aborted because a
408-
// WATCHed variable changed
409-
guard let responses else {
410-
return .failure(ValkeyTransactionError.transactionAborted)
411-
}
412-
// We convert all the RESP errors in the response from EXEC to Result.failure
413-
return .success(
414-
responses.map {
415-
switch $0.identifier {
416-
case .simpleError, .bulkError:
417-
.failure(ValkeyClientError(.commandError, message: $0.errorString.map { Swift.String(buffer: $0) }))
418-
default:
419-
.success($0)
420-
}
421-
}
422-
)
423-
}.get()
389+
valkeyPromises: promises.map { .nio($0) },
390+
processResults: self._processTransactionPromises
391+
).get()
424392
}
425393

426394
/// Pipeline a series of commands to Valkey connection and precede each command with an ASKING
@@ -468,6 +436,48 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
468436
}
469437
}
470438

439+
/// Pipeline a series of commands as a transaction preceded with an ASKING command
440+
///
441+
/// Once all the responses for the commands have been received the function returns
442+
/// an array of RESPToken Results, one for each command.
443+
///
444+
/// This is an internal function used by the cluster client
445+
///
446+
/// - Parameter commands: Collection of ValkeyCommands
447+
/// - Returns: Array holding the RESPToken responses of all the commands
448+
@usableFromInline
449+
func transactionWithAsk(
450+
_ commands: some Collection<any ValkeyCommand>
451+
) async throws -> [Result<RESPToken, any Error>] {
452+
// this currently allocates a promise for every command. We could collapse this down to one promise
453+
var promises: [EventLoopPromise<RESPToken>] = []
454+
promises.reserveCapacity(commands.count)
455+
var valkeyPromises: [ValkeyPromise<RESPToken>] = []
456+
valkeyPromises.reserveCapacity(commands.count + 3)
457+
var encoder = ValkeyCommandEncoder()
458+
ASKING().encode(into: &encoder)
459+
MULTI().encode(into: &encoder)
460+
promises.append(channel.eventLoop.makePromise(of: RESPToken.self))
461+
valkeyPromises.append(.forget)
462+
valkeyPromises.append(.nio(promises.last!))
463+
464+
for command in commands {
465+
command.encode(into: &encoder)
466+
promises.append(channel.eventLoop.makePromise(of: RESPToken.self))
467+
valkeyPromises.append(.nio(promises.last!))
468+
}
469+
EXEC().encode(into: &encoder)
470+
promises.append(channel.eventLoop.makePromise(of: RESPToken.self))
471+
valkeyPromises.append(.nio(promises.last!))
472+
473+
return try await _execute(
474+
buffer: encoder.buffer,
475+
promises: promises,
476+
valkeyPromises: valkeyPromises,
477+
processResults: self._processTransactionPromises
478+
).get()
479+
}
480+
471481
/// Execute stream of commands written into buffer
472482
///
473483
/// The function is provided with an array of EventLoopPromises for the responses of commands
@@ -498,6 +508,44 @@ public final actor ValkeyConnection: ValkeyClientProtocol, Sendable {
498508
}
499509
}
500510

511+
@usableFromInline
512+
func _processTransactionPromises(
513+
_ promises: [EventLoopPromise<RESPToken>]
514+
) async -> sending Result<[Result<RESPToken, Error>], any Error> {
515+
let responses: EXEC.Response
516+
do {
517+
let execFutureResult = promises.last!.futureResult
518+
responses = try await execFutureResult.get().decode(as: EXEC.Response.self)
519+
} catch let error as ValkeyClientError where error.errorCode == .commandError {
520+
// we received an error while running the EXEC command. Extract queuing
521+
// results and throw error
522+
var results: [Result<RESPToken, Error>] = .init()
523+
results.reserveCapacity(promises.count - 2)
524+
for promise in promises[1..<(promises.count - 1)] {
525+
results.append(await promise.futureResult._result())
526+
}
527+
return .failure(ValkeyTransactionError.transactionErrors(queuedResults: results, execError: error))
528+
} catch {
529+
return .failure(error)
530+
}
531+
// If EXEC returned nil then transaction was aborted because a
532+
// WATCHed variable changed
533+
guard let responses else {
534+
return .failure(ValkeyTransactionError.transactionAborted)
535+
}
536+
// We convert all the RESP errors in the response from EXEC to Result.failure
537+
return .success(
538+
responses.map {
539+
switch $0.identifier {
540+
case .simpleError, .bulkError:
541+
.failure(ValkeyClientError(.commandError, message: $0.errorString.map { Swift.String(buffer: $0) }))
542+
default:
543+
.success($0)
544+
}
545+
}
546+
)
547+
}
548+
501549
#if DistributedTracingSupport
502550
@usableFromInline
503551
func applyCommonAttributes(to attributes: inout SpanAttributes, commandName: String) {

Sources/Valkey/Node/ValkeyNodeClient.swift

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,17 @@ extension ValkeyNodeClient {
272272
return .init(repeating: .failure(error), count: commands.count)
273273
}
274274
}
275+
276+
/// Internal command used by cluster client, that precedes each command with a ASKING
277+
/// command
278+
@usableFromInline
279+
func transactionWithAsk<Commands: Collection & Sendable>(
280+
_ commands: Commands
281+
) async throws -> [Result<RESPToken, any Error>] where Commands.Element == any ValkeyCommand {
282+
try await self.withConnection { connection in
283+
try await connection.transactionWithAsk(commands)
284+
}
285+
}
275286
}
276287

277288
/// Extension that makes ``ValkeyNode`` conform to ``ValkeyNodeConnectionPool``.

0 commit comments

Comments
 (0)