@@ -199,24 +199,27 @@ extension MultiProvider {
199199 _ body: ( ConfigUpdatesAsyncSequence < MultiSnapshot , Never > ) async throws -> Return
200200 ) async throws -> Return {
201201 let providers = storage. providers
202- let sources :
203- [ @Sendable (
204- ( ConfigUpdatesAsyncSequence < any ConfigSnapshotProtocol , Never > ) async throws -> Void
205- ) async throws -> Void ] = providers. map { $0. watchSnapshot }
206- return try await combineLatestOneOrMore (
207- elementType: ( any ConfigSnapshotProtocol ) . self,
208- sources: sources,
209- updatesHandler: { updateArrays in
210- try await body (
211- ConfigUpdatesAsyncSequence (
212- updateArrays
213- . map { array in
214- MultiSnapshot ( snapshots: array)
215- }
216- )
202+ typealias UpdatesSequence = any ( AsyncSequence < any ConfigSnapshotProtocol , Never > & Sendable )
203+ var updateSequences : [ UpdatesSequence ] = [ ]
204+ updateSequences. reserveCapacity ( providers. count)
205+ return try await withProvidersWatchingSnapshot (
206+ providers: ArraySlice ( providers) ,
207+ updateSequences: & updateSequences,
208+ ) { providerUpdateSequences in
209+ let updateArrays = combineLatestMany (
210+ elementType: ( any ConfigSnapshotProtocol ) . self,
211+ failureType: Never . self,
212+ providerUpdateSequences
213+ )
214+ return try await body (
215+ ConfigUpdatesAsyncSequence (
216+ updateArrays
217+ . map { array in
218+ MultiSnapshot ( snapshots: array)
219+ }
217220 )
218- }
219- )
221+ )
222+ }
220223 }
221224
222225 /// Asynchronously resolves a configuration value from nested providers.
@@ -290,43 +293,86 @@ extension MultiProvider {
290293 ) async throws -> Return {
291294 let providers = storage. providers
292295 let providerNames = providers. map ( \. providerName)
293- let sources :
294- [ @Sendable (
295- (
296- ConfigUpdatesAsyncSequence < Result < LookupResult , any Error > , Never >
297- ) async throws -> Void
298- ) async throws -> Void ] = providers. map { provider in
299- { handler in
300- _ = try await provider. watchValue ( forKey: key, type: type, updatesHandler: handler)
301- }
302- }
303- return try await combineLatestOneOrMore (
304- elementType: Result < LookupResult , any Error > . self,
305- sources: sources,
306- updatesHandler: { updateArrays in
307- try await updatesHandler (
308- ConfigUpdatesAsyncSequence (
309- updateArrays
310- . map { array in
311- var results : [ AccessEvent . ProviderResult ] = [ ]
312- for (providerIndex, lookupResult) in array. enumerated ( ) {
313- let providerName = providerNames [ providerIndex]
314- results. append ( . init( providerName: providerName, result: lookupResult) )
315- switch lookupResult {
316- case . success( let value) where value. value == nil :
317- // Got a success + nil from a nested provider, keep iterating.
318- continue
319- default :
320- // Got a success + non-nil or an error from a nested provider, propagate that up.
321- return ( results, lookupResult. map { $0. value } )
322- }
296+ typealias UpdatesSequence = any ( AsyncSequence < Result < LookupResult , any Error > , Never > & Sendable )
297+ var updateSequences : [ UpdatesSequence ] = [ ]
298+ updateSequences. reserveCapacity ( providers. count)
299+ return try await withProvidersWatchingValue (
300+ providers: ArraySlice ( providers) ,
301+ updateSequences: & updateSequences,
302+ key: key,
303+ configType: type,
304+ ) { providerUpdateSequences in
305+ let updateArrays = combineLatestMany (
306+ elementType: Result < LookupResult , any Error > . self,
307+ failureType: Never . self,
308+ providerUpdateSequences
309+ )
310+ return try await updatesHandler (
311+ ConfigUpdatesAsyncSequence (
312+ updateArrays
313+ . map { array in
314+ var results : [ AccessEvent . ProviderResult ] = [ ]
315+ for (providerIndex, lookupResult) in array. enumerated ( ) {
316+ let providerName = providerNames [ providerIndex]
317+ results. append ( . init( providerName: providerName, result: lookupResult) )
318+ switch lookupResult {
319+ case . success( let value) where value. value == nil :
320+ // Got a success + nil from a nested provider, keep iterating.
321+ continue
322+ default :
323+ // Got a success + non-nil or an error from a nested provider, propagate that up.
324+ return ( results, lookupResult. map { $0. value } )
323325 }
324- // If all nested results were success + nil, return the same.
325- return ( results, . success( nil ) )
326326 }
327- )
327+ // If all nested results were success + nil, return the same.
328+ return ( results, . success( nil ) )
329+ }
328330 )
329- }
331+ )
332+ }
333+ }
334+ }
335+
336+ @available ( Configuration 1 . 0 , * )
337+ nonisolated ( nonsending) private func withProvidersWatchingValue< ReturnInner> (
338+ providers: ArraySlice < any ConfigProvider > ,
339+ updateSequences: inout [ any ( AsyncSequence < Result < LookupResult , any Error > , Never > & Sendable ) ] ,
340+ key: AbsoluteConfigKey ,
341+ configType: ConfigType ,
342+ body: ( [ any ( AsyncSequence < Result < LookupResult , any Error > , Never > & Sendable ) ] ) async throws -> ReturnInner
343+ ) async throws -> ReturnInner {
344+ guard let provider = providers. first else {
345+ // Recursion termination, once we've collected all update sequences, execute the body.
346+ return try await body ( updateSequences)
347+ }
348+ return try await provider. watchValue ( forKey: key, type: configType) { updates in
349+ updateSequences. append ( updates)
350+ return try await withProvidersWatchingValue (
351+ providers: providers. dropFirst ( ) ,
352+ updateSequences: & updateSequences,
353+ key: key,
354+ configType: configType,
355+ body: body
356+ )
357+ }
358+ }
359+
360+ @available ( Configuration 1 . 0 , * )
361+ nonisolated ( nonsending) private func withProvidersWatchingSnapshot< ReturnInner> (
362+ providers: ArraySlice < any ConfigProvider > ,
363+ updateSequences: inout [ any ( AsyncSequence < any ConfigSnapshotProtocol , Never > & Sendable ) ] ,
364+ body: ( [ any ( AsyncSequence < any ConfigSnapshotProtocol , Never > & Sendable ) ] ) async throws -> ReturnInner
365+ ) async throws -> ReturnInner {
366+ guard let provider = providers. first else {
367+ // Recursion termination, once we've collected all update sequences, execute the body.
368+ return try await body ( updateSequences)
369+ }
370+ return try await provider. watchSnapshot { updates in
371+ updateSequences. append ( updates)
372+ return try await withProvidersWatchingSnapshot (
373+ providers: providers. dropFirst ( ) ,
374+ updateSequences: & updateSequences,
375+ body: body
330376 )
331377 }
332378}
0 commit comments