Skip to content

Commit 7ee57f9

Browse files
authored
Support adding services while ServiceGroup is running (#199)
somewhat straight-forward implementation draft of #185 choices were made, what do we think?
1 parent e2b442c commit 7ee57f9

File tree

4 files changed

+620
-92
lines changed

4 files changed

+620
-92
lines changed

Sources/ServiceLifecycle/ServiceGroup.swift

+138-25
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
import Logging
1616
import UnixSignals
17+
import AsyncAlgorithms
1718

1819
/// A ``ServiceGroup`` is responsible for running a number of services, setting up signal handling and signalling graceful shutdown to the services.
1920
public actor ServiceGroup: Sendable, Service {
@@ -23,7 +24,8 @@ public actor ServiceGroup: Sendable, Service {
2324
case initial(services: [ServiceGroupConfiguration.ServiceConfiguration])
2425
/// The state once ``ServiceGroup/run()`` has been called.
2526
case running(
26-
gracefulShutdownStreamContinuation: AsyncStream<Void>.Continuation
27+
gracefulShutdownStreamContinuation: AsyncStream<Void>.Continuation,
28+
addedServiceChannel: AsyncChannel<ServiceGroupConfiguration.ServiceConfiguration>
2729
)
2830
/// The state once ``ServiceGroup/run()`` has finished.
2931
case finished
@@ -106,6 +108,38 @@ public actor ServiceGroup: Sendable, Service {
106108
self.maximumCancellationDuration = configuration._maximumCancellationDuration
107109
}
108110

111+
/// Adds a new service to the group.
112+
///
113+
/// If the group is currently running, the added service will be started immediately.
114+
/// If the group is gracefully shutting down, cancelling, or already finished, the added service will not be started.
115+
/// - Parameters:
116+
/// - serviceConfiguration: The service configuration to add.
117+
public func addServiceUnlessShutdown(_ serviceConfiguration: ServiceGroupConfiguration.ServiceConfiguration) async {
118+
switch self.state {
119+
case var .initial(services: services):
120+
self.state = .initial(services: [])
121+
services.append(serviceConfiguration)
122+
self.state = .initial(services: services)
123+
124+
case .running(_, let addedServiceChannel):
125+
await addedServiceChannel.send(serviceConfiguration)
126+
127+
case .finished:
128+
// Since this is a best effort operation we don't have to do anything here
129+
return
130+
}
131+
}
132+
133+
/// Adds a new service to the group.
134+
///
135+
/// If the group is currently running, the added service will be started immediately.
136+
/// If the group is gracefully shutting down, cancelling, or already finished, the added service will not be started.
137+
/// - Parameters:
138+
/// - service: The service to add.
139+
public func addServiceUnlessShutdown(_ service: any Service) async {
140+
await self.addServiceUnlessShutdown(ServiceGroupConfiguration.ServiceConfiguration(service: service))
141+
}
142+
109143
/// Runs all the services by spinning up a child task per service.
110144
/// Furthermore, this method sets up the correct signal handlers
111145
/// for graceful shutdown.
@@ -128,16 +162,19 @@ public actor ServiceGroup: Sendable, Service {
128162
}
129163

130164
let (gracefulShutdownStream, gracefulShutdownContinuation) = AsyncStream.makeStream(of: Void.self)
165+
let addedServiceChannel = AsyncChannel<ServiceGroupConfiguration.ServiceConfiguration>()
131166

132167
self.state = .running(
133-
gracefulShutdownStreamContinuation: gracefulShutdownContinuation
168+
gracefulShutdownStreamContinuation: gracefulShutdownContinuation,
169+
addedServiceChannel: addedServiceChannel
134170
)
135171

136172
var potentialError: Error?
137173
do {
138174
try await self._run(
139175
services: &services,
140-
gracefulShutdownStream: gracefulShutdownStream
176+
gracefulShutdownStream: gracefulShutdownStream,
177+
addedServiceChannel: addedServiceChannel
141178
)
142179
} catch {
143180
potentialError = error
@@ -173,7 +210,7 @@ public actor ServiceGroup: Sendable, Service {
173210
self.state = .finished
174211
return
175212

176-
case .running(let gracefulShutdownStreamContinuation):
213+
case .running(let gracefulShutdownStreamContinuation, _):
177214
// We cannot transition to shuttingDown here since we are signalling over to the task
178215
// that runs `run`. This task is responsible for transitioning to shuttingDown since
179216
// there might be multiple signals racing to trigger it
@@ -198,11 +235,13 @@ public actor ServiceGroup: Sendable, Service {
198235
case gracefulShutdownFinished
199236
case gracefulShutdownTimedOut
200237
case cancellationCaught
238+
case newServiceAdded(ServiceGroupConfiguration.ServiceConfiguration)
201239
}
202240

203241
private func _run(
204242
services: inout [ServiceGroupConfiguration.ServiceConfiguration],
205-
gracefulShutdownStream: AsyncStream<Void>
243+
gracefulShutdownStream: AsyncStream<Void>,
244+
addedServiceChannel: AsyncChannel<ServiceGroupConfiguration.ServiceConfiguration>
206245
) async throws {
207246
self.logger.debug(
208247
"Starting service lifecycle",
@@ -280,25 +319,12 @@ public actor ServiceGroup: Sendable, Service {
280319
let gracefulShutdownManager = GracefulShutdownManager()
281320
gracefulShutdownManagers.append(gracefulShutdownManager)
282321

283-
// This must be addTask and not addTaskUnlessCancelled
284-
// because we must run all the services for the below logic to work.
285-
group.addTask {
286-
return await TaskLocals.$gracefulShutdownManager.withValue(gracefulShutdownManager) {
287-
do {
288-
try await serviceConfiguration.service.run()
289-
return .serviceFinished(service: serviceConfiguration, index: index)
290-
} catch {
291-
return .serviceThrew(service: serviceConfiguration, index: index, error: error)
292-
}
293-
}
294-
}
295-
}
296-
297-
group.addTask {
298-
// This child task is waiting forever until the group gets cancelled.
299-
let (stream, _) = AsyncStream.makeStream(of: Void.self)
300-
await stream.first { _ in true }
301-
return .cancellationCaught
322+
self.addServiceTask(
323+
group: &group,
324+
service: serviceConfiguration,
325+
gracefulShutdownManager: gracefulShutdownManager,
326+
index: index
327+
)
302328
}
303329

304330
// We are storing the services in an optional array now. When a slot in the array is
@@ -310,12 +336,52 @@ public actor ServiceGroup: Sendable, Service {
310336
"We did not create a graceful shutdown manager per service"
311337
)
312338

339+
group.addTask {
340+
// This child task is waiting forever until the group gets cancelled.
341+
let (stream, _) = AsyncStream.makeStream(of: Void.self)
342+
await stream.first { _ in true }
343+
return .cancellationCaught
344+
}
345+
346+
// Adds a task that listens to added services and funnels them into the task group
347+
self.addAddedServiceListenerTask(group: &group, channel: addedServiceChannel)
348+
313349
// We are going to wait for any of the services to finish or
314350
// the signal sequence to throw an error.
315351
while !group.isEmpty {
316352
let result: ChildTaskResult? = try await group.next()
317353

318354
switch result {
355+
case .newServiceAdded(let serviceConfiguration):
356+
self.logger.debug(
357+
"Starting added service",
358+
metadata: [
359+
self.loggingConfiguration.keys.serviceKey: "\(serviceConfiguration.service)"
360+
]
361+
)
362+
363+
let gracefulShutdownManager = GracefulShutdownManager()
364+
gracefulShutdownManagers.append(gracefulShutdownManager)
365+
services.append(serviceConfiguration)
366+
367+
precondition(
368+
services.count == gracefulShutdownManagers.count,
369+
"Mismatch between services and graceful shutdown managers"
370+
)
371+
372+
self.addServiceTask(
373+
group: &group,
374+
service: serviceConfiguration,
375+
gracefulShutdownManager: gracefulShutdownManager,
376+
index: services.count - 1
377+
)
378+
379+
// Each listener task can only handle a single added service, so we must add a new listener
380+
self.addAddedServiceListenerTask(
381+
group: &group,
382+
channel: addedServiceChannel
383+
)
384+
319385
case .serviceFinished(let service, let index):
320386
if group.isCancelled {
321387
// The group is cancelled and we expect all services to finish
@@ -530,10 +596,13 @@ public actor ServiceGroup: Sendable, Service {
530596
group: inout ThrowingTaskGroup<ChildTaskResult, Error>,
531597
gracefulShutdownManagers: [GracefulShutdownManager]
532598
) async throws {
533-
guard case .running = self.state else {
599+
guard case let .running(_, addedServiceChannel) = self.state else {
534600
fatalError("Unexpected state")
535601
}
536602

603+
// Signal to stop adding new services (it is important that no new services are added after this point)
604+
addedServiceChannel.finish()
605+
537606
if #available(macOS 13.0, iOS 16.0, watchOS 9.0, tvOS 16.0, *),
538607
let maximumGracefulShutdownDuration = self.maximumGracefulShutdownDuration
539608
{
@@ -717,6 +786,10 @@ public actor ServiceGroup: Sendable, Service {
717786
// We are going to continue the result loop since we have to wait for our service
718787
// to finish.
719788
break
789+
790+
case .newServiceAdded:
791+
// Since adding services is best effort, we simply ignore this
792+
break
720793
}
721794
}
722795
}
@@ -777,6 +850,46 @@ public actor ServiceGroup: Sendable, Service {
777850
cancellationTimeoutTask = nil
778851
}
779852
}
853+
854+
private func addServiceTask(
855+
group: inout ThrowingTaskGroup<ChildTaskResult, Error>,
856+
service serviceConfiguration: ServiceGroupConfiguration.ServiceConfiguration,
857+
gracefulShutdownManager: GracefulShutdownManager,
858+
index: Int
859+
) {
860+
// This must be addTask and not addTaskUnlessCancelled
861+
// because we must run all the services for the shutdown logic to work.
862+
group.addTask {
863+
return await TaskLocals.$gracefulShutdownManager.withValue(gracefulShutdownManager) {
864+
do {
865+
try await serviceConfiguration.service.run()
866+
return .serviceFinished(service: serviceConfiguration, index: index)
867+
} catch {
868+
return .serviceThrew(service: serviceConfiguration, index: index, error: error)
869+
}
870+
}
871+
}
872+
}
873+
874+
private func addAddedServiceListenerTask(
875+
group: inout ThrowingTaskGroup<ChildTaskResult, Error>,
876+
channel: AsyncChannel<ServiceGroupConfiguration.ServiceConfiguration>
877+
) {
878+
group.addTask {
879+
return await withTaskCancellationHandler {
880+
var iterator = channel.makeAsyncIterator()
881+
if let addedService = await iterator.next() {
882+
return .newServiceAdded(addedService)
883+
}
884+
885+
return .gracefulShutdownFinished
886+
} onCancel: {
887+
// Once the group is cancelled we will no longer read from the channel.
888+
// This will resume any suspended producer in `addServiceUnlessShutdown`.
889+
channel.finish()
890+
}
891+
}
892+
}
780893
}
781894

782895
// This should be removed once we support Swift 5.9+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the SwiftServiceLifecycle open source project
4+
//
5+
// Copyright (c) 2023 Apple Inc. and the SwiftServiceLifecycle project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of SwiftServiceLifecycle project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import ServiceLifecycle
16+
17+
actor MockService: Service, CustomStringConvertible {
18+
enum Event {
19+
case run
20+
case runPing
21+
case runCancelled
22+
case shutdownGracefully
23+
}
24+
25+
let events: AsyncStream<Event>
26+
private(set) var hasRun: Bool = false
27+
28+
private let eventsContinuation: AsyncStream<Event>.Continuation
29+
30+
private var runContinuation: CheckedContinuation<Void, Error>?
31+
32+
nonisolated let description: String
33+
34+
private let pings: AsyncStream<Void>
35+
private nonisolated let pingContinuation: AsyncStream<Void>.Continuation
36+
37+
init(
38+
description: String
39+
) {
40+
var eventsContinuation: AsyncStream<Event>.Continuation!
41+
events = AsyncStream<Event> { eventsContinuation = $0 }
42+
self.eventsContinuation = eventsContinuation!
43+
44+
var pingContinuation: AsyncStream<Void>.Continuation!
45+
pings = AsyncStream<Void> { pingContinuation = $0 }
46+
self.pingContinuation = pingContinuation!
47+
48+
self.description = description
49+
}
50+
51+
func run() async throws {
52+
hasRun = true
53+
54+
try await withTaskCancellationHandler {
55+
try await withGracefulShutdownHandler {
56+
try await withThrowingTaskGroup(of: Void.self) { group in
57+
group.addTask {
58+
self.eventsContinuation.yield(.run)
59+
for await _ in self.pings {
60+
self.eventsContinuation.yield(.runPing)
61+
}
62+
}
63+
64+
try await withCheckedThrowingContinuation {
65+
self.runContinuation = $0
66+
}
67+
68+
group.cancelAll()
69+
}
70+
} onGracefulShutdown: {
71+
self.eventsContinuation.yield(.shutdownGracefully)
72+
}
73+
} onCancel: {
74+
self.eventsContinuation.yield(.runCancelled)
75+
}
76+
}
77+
78+
func resumeRunContinuation(with result: Result<Void, Error>) {
79+
runContinuation?.resume(with: result)
80+
}
81+
82+
nonisolated func sendPing() {
83+
pingContinuation.yield()
84+
}
85+
}

0 commit comments

Comments
 (0)