From 7324ac4ab578d1e5a07b232f32bf26745f34b759 Mon Sep 17 00:00:00 2001 From: Ryan Dingman Date: Thu, 8 May 2025 10:07:45 -0700 Subject: [PATCH 01/18] Allow callers to run a subprocess and provide low and high water marks when using SequenceOutput to emit standard output and standard error as soon as it arrives. Resolves #39 --- Sources/Subprocess/AsyncBufferSequence.swift | 84 ++++++++++++++++--- Sources/Subprocess/Execution.swift | 26 +++++- Sources/Subprocess/IO/Output.swift | 14 +++- .../SubprocessTests+Unix.swift | 38 +++++++++ 4 files changed, 147 insertions(+), 15 deletions(-) diff --git a/Sources/Subprocess/AsyncBufferSequence.swift b/Sources/Subprocess/AsyncBufferSequence.swift index 163e595..c6d8ecc 100644 --- a/Sources/Subprocess/AsyncBufferSequence.swift +++ b/Sources/Subprocess/AsyncBufferSequence.swift @@ -15,6 +15,8 @@ @preconcurrency import SystemPackage #endif +internal import Dispatch + #if SubprocessSpan @available(SubprocessSpan, *) #endif @@ -27,38 +29,100 @@ public struct AsyncBufferSequence: AsyncSequence, Sendable { public typealias Element = SequenceOutput.Buffer private let diskIO: TrackedPlatformDiskIO + private let bufferSize: Int private var buffer: [UInt8] private var currentPosition: Int private var finished: Bool + private var streamIterator: AsyncThrowingStream.AsyncIterator - internal init(diskIO: TrackedPlatformDiskIO) { + internal init(diskIO: TrackedPlatformDiskIO, bufferSize: Int) { self.diskIO = diskIO + self.bufferSize = bufferSize self.buffer = [] self.currentPosition = 0 self.finished = false + self.streamIterator = Self.createDataStream(with: diskIO.dispatchIO, bufferSize: bufferSize).makeAsyncIterator() } - public func next() async throws -> SequenceOutput.Buffer? { - let data = try await self.diskIO.readChunk( - upToLength: readBufferSize - ) - if data == nil { - // We finished reading. Close the file descriptor now + public mutating func next() async throws -> SequenceOutput.Buffer? { + if let status = try await streamIterator.next() { + switch status { + case .data(let data): + return data + + case .endOfStream(let data): + streamIterator = Self.createDataStream(with: diskIO.dispatchIO, bufferSize: bufferSize).makeAsyncIterator() + return data + + case .endOfFile: + try self.diskIO.safelyClose() + return nil + } + } else { try self.diskIO.safelyClose() return nil } - return data + } + + private enum StreamStatus { + case data(SequenceOutput.Buffer) + case endOfStream(SequenceOutput.Buffer) + case endOfFile + } + + private static func createDataStream(with dispatchIO: DispatchIO, bufferSize: Int) -> AsyncThrowingStream { + return AsyncThrowingStream { continuation in + dispatchIO.read( + offset: 0, + length: bufferSize, + queue: .global() + ) { done, data, error in + if error != 0 { + continuation.finish(throwing: SubprocessError( + code: .init(.failedToReadFromSubprocess), + underlyingError: .init(rawValue: error) + )) + return + } + + // Treat empty data and nil as the same + let buffer = data.map { $0.isEmpty ? nil : $0 } ?? nil + let status: StreamStatus + + switch (buffer, done) { + case (.some(let data), false): + status = .data(SequenceOutput.Buffer(data: data)) + + case (.some(let data), true): + status = .endOfStream(SequenceOutput.Buffer(data: data)) + + case (nil, false): + return + + case (nil, true): + status = .endOfFile + } + + continuation.yield(status) + + if done { + continuation.finish() + } + } + } } } private let diskIO: TrackedPlatformDiskIO + private let bufferSize: Int - internal init(diskIO: TrackedPlatformDiskIO) { + internal init(diskIO: TrackedPlatformDiskIO, bufferSize: Int) { self.diskIO = diskIO + self.bufferSize = bufferSize } public func makeAsyncIterator() -> Iterator { - return Iterator(diskIO: self.diskIO) + return Iterator(diskIO: self.diskIO, bufferSize: bufferSize) } } diff --git a/Sources/Subprocess/Execution.swift b/Sources/Subprocess/Execution.swift index 3c1f763..02aa0ff 100644 --- a/Sources/Subprocess/Execution.swift +++ b/Sources/Subprocess/Execution.swift @@ -27,6 +27,8 @@ import Musl import WinSDK #endif +internal import Dispatch + /// An object that repersents a subprocess that has been /// executed. You can use this object to send signals to the /// child process as well as stream its output and error. @@ -107,7 +109,16 @@ extension Execution where Output == SequenceOutput { else { fatalError("The standard output has already been consumed") } - return AsyncBufferSequence(diskIO: readFd) + + if let lowWater = output.lowWater { + readFd.dispatchIO.setLimit(lowWater: lowWater) + } + + if let highWater = output.highWater { + readFd.dispatchIO.setLimit(highWater: highWater) + } + + return AsyncBufferSequence(diskIO: readFd, bufferSize: output.bufferSize) } } @@ -122,7 +133,7 @@ extension Execution where Error == SequenceOutput { /// via pipe under the hood and each pipe can only be consumed once. public var standardError: AsyncBufferSequence { let consumptionState = self.outputConsumptionState.bitwiseXor( - OutputConsumptionState.standardOutputConsumed + OutputConsumptionState.standardErrorConsumed ) guard consumptionState.contains(.standardErrorConsumed), @@ -130,7 +141,16 @@ extension Execution where Error == SequenceOutput { else { fatalError("The standard output has already been consumed") } - return AsyncBufferSequence(diskIO: readFd) + + if let lowWater = error.lowWater { + readFd.dispatchIO.setLimit(lowWater: lowWater) + } + + if let highWater = error.highWater { + readFd.dispatchIO.setLimit(highWater: highWater) + } + + return AsyncBufferSequence(diskIO: readFd, bufferSize: error.bufferSize) } } diff --git a/Sources/Subprocess/IO/Output.swift b/Sources/Subprocess/IO/Output.swift index e33292d..d9d6dad 100644 --- a/Sources/Subprocess/IO/Output.swift +++ b/Sources/Subprocess/IO/Output.swift @@ -210,8 +210,15 @@ public struct BytesOutput: OutputProtocol { #endif public struct SequenceOutput: OutputProtocol { public typealias OutputType = Void - - internal init() {} + internal let lowWater: Int? + internal let highWater: Int? + internal let bufferSize: Int + + internal init(lowWater: Int? = nil, highWater: Int? = nil, bufferSize: Int = readBufferSize) { + self.lowWater = lowWater + self.highWater = highWater + self.bufferSize = bufferSize + } } #if SubprocessSpan @@ -284,6 +291,9 @@ extension OutputProtocol where Self == SequenceOutput { /// to the `.standardOutput` (or `.standardError`) property /// of `Execution` as `AsyncSequence`. public static var sequence: Self { .init() } + public static func sequence(lowWater: Int? = nil, highWater: Int? = nil, bufferSize: Int? = nil) -> Self { + .init(lowWater: lowWater, highWater: highWater, bufferSize: bufferSize ?? readBufferSize) + } } // MARK: - Span Default Implementations diff --git a/Tests/SubprocessTests/SubprocessTests+Unix.swift b/Tests/SubprocessTests/SubprocessTests+Unix.swift index 3326807..1c064b5 100644 --- a/Tests/SubprocessTests/SubprocessTests+Unix.swift +++ b/Tests/SubprocessTests/SubprocessTests+Unix.swift @@ -665,6 +665,44 @@ extension SubprocessUnixTests { #expect(catResult.terminationStatus.isSuccess) #expect(catResult.standardError == expected) } + + @Test func testSlowDripRedirectedOutputRedirectToSequence() async throws { + let threshold: Double = 0.5 + + let script = """ + echo "DONE" + sleep \(threshold) + """ + + let start = ContinuousClock().now + + let catResult = try await Subprocess.run( + .path("/bin/bash"), + arguments: ["-c", script], + output: .sequence(lowWater: 0), + error: .discarded, + body: { (execution, _) in + for try await chunk in execution.standardOutput { + let string = chunk.withUnsafeBytes { String(decoding: $0, as: UTF8.self) } + + if string.hasPrefix("DONE") { + let end = ContinuousClock().now + + if (end - start) > .seconds(threshold) { + return "Failure" + + } else { + return "Success" + } + } + } + + return "Failure" + } + ) + #expect(catResult.terminationStatus.isSuccess) + #expect(catResult.value == "Success") + } } // MARK: - PlatformOption Tests From 7b6899c5b61b46c1dc05d2eb1d6e4632da3526c6 Mon Sep 17 00:00:00 2001 From: Ryan Dingman Date: Fri, 9 May 2025 17:01:33 -0700 Subject: [PATCH 02/18] Move stream creation and configure to platform specific files --- Sources/Subprocess/AsyncBufferSequence.swift | 72 ++++--------------- Sources/Subprocess/Execution.swift | 22 ++---- Sources/Subprocess/IO/Output.swift | 14 +--- .../Platforms/Subprocess+Darwin.swift | 18 ++++- .../Platforms/Subprocess+Linux.swift | 18 ++++- .../Platforms/Subprocess+Unix.swift | 54 +++++++++++++- .../Platforms/Subprocess+Windows.swift | 64 +++++++++++++++++ .../SubprocessTests+Unix.swift | 6 +- 8 files changed, 174 insertions(+), 94 deletions(-) diff --git a/Sources/Subprocess/AsyncBufferSequence.swift b/Sources/Subprocess/AsyncBufferSequence.swift index c6d8ecc..5ff4265 100644 --- a/Sources/Subprocess/AsyncBufferSequence.swift +++ b/Sources/Subprocess/AsyncBufferSequence.swift @@ -29,19 +29,17 @@ public struct AsyncBufferSequence: AsyncSequence, Sendable { public typealias Element = SequenceOutput.Buffer private let diskIO: TrackedPlatformDiskIO - private let bufferSize: Int private var buffer: [UInt8] private var currentPosition: Int private var finished: Bool - private var streamIterator: AsyncThrowingStream.AsyncIterator + private var streamIterator: AsyncThrowingStream.AsyncIterator - internal init(diskIO: TrackedPlatformDiskIO, bufferSize: Int) { + internal init(diskIO: TrackedPlatformDiskIO) { self.diskIO = diskIO - self.bufferSize = bufferSize self.buffer = [] self.currentPosition = 0 self.finished = false - self.streamIterator = Self.createDataStream(with: diskIO.dispatchIO, bufferSize: bufferSize).makeAsyncIterator() + self.streamIterator = diskIO.readDataStream(upToLength: readBufferSize).makeAsyncIterator() } public mutating func next() async throws -> SequenceOutput.Buffer? { @@ -51,7 +49,7 @@ public struct AsyncBufferSequence: AsyncSequence, Sendable { return data case .endOfStream(let data): - streamIterator = Self.createDataStream(with: diskIO.dispatchIO, bufferSize: bufferSize).makeAsyncIterator() + streamIterator = diskIO.readDataStream(upToLength: readBufferSize).makeAsyncIterator() return data case .endOfFile: @@ -63,66 +61,24 @@ public struct AsyncBufferSequence: AsyncSequence, Sendable { return nil } } - - private enum StreamStatus { - case data(SequenceOutput.Buffer) - case endOfStream(SequenceOutput.Buffer) - case endOfFile - } - - private static func createDataStream(with dispatchIO: DispatchIO, bufferSize: Int) -> AsyncThrowingStream { - return AsyncThrowingStream { continuation in - dispatchIO.read( - offset: 0, - length: bufferSize, - queue: .global() - ) { done, data, error in - if error != 0 { - continuation.finish(throwing: SubprocessError( - code: .init(.failedToReadFromSubprocess), - underlyingError: .init(rawValue: error) - )) - return - } - - // Treat empty data and nil as the same - let buffer = data.map { $0.isEmpty ? nil : $0 } ?? nil - let status: StreamStatus - - switch (buffer, done) { - case (.some(let data), false): - status = .data(SequenceOutput.Buffer(data: data)) - - case (.some(let data), true): - status = .endOfStream(SequenceOutput.Buffer(data: data)) - - case (nil, false): - return - - case (nil, true): - status = .endOfFile - } - - continuation.yield(status) - - if done { - continuation.finish() - } - } - } - } } private let diskIO: TrackedPlatformDiskIO - private let bufferSize: Int - internal init(diskIO: TrackedPlatformDiskIO, bufferSize: Int) { + internal init(diskIO: TrackedPlatformDiskIO) { self.diskIO = diskIO - self.bufferSize = bufferSize } public func makeAsyncIterator() -> Iterator { - return Iterator(diskIO: self.diskIO, bufferSize: bufferSize) + return Iterator(diskIO: self.diskIO) + } +} + +extension TrackedPlatformDiskIO { + internal enum StreamStatus { + case data(SequenceOutput.Buffer) + case endOfStream(SequenceOutput.Buffer) + case endOfFile } } diff --git a/Sources/Subprocess/Execution.swift b/Sources/Subprocess/Execution.swift index 02aa0ff..127bddd 100644 --- a/Sources/Subprocess/Execution.swift +++ b/Sources/Subprocess/Execution.swift @@ -110,15 +110,8 @@ extension Execution where Output == SequenceOutput { fatalError("The standard output has already been consumed") } - if let lowWater = output.lowWater { - readFd.dispatchIO.setLimit(lowWater: lowWater) - } - - if let highWater = output.highWater { - readFd.dispatchIO.setLimit(highWater: highWater) - } - - return AsyncBufferSequence(diskIO: readFd, bufferSize: output.bufferSize) + // TODO: Make buffer size an option + return AsyncBufferSequence(diskIO: readFd) } } @@ -142,15 +135,8 @@ extension Execution where Error == SequenceOutput { fatalError("The standard output has already been consumed") } - if let lowWater = error.lowWater { - readFd.dispatchIO.setLimit(lowWater: lowWater) - } - - if let highWater = error.highWater { - readFd.dispatchIO.setLimit(highWater: highWater) - } - - return AsyncBufferSequence(diskIO: readFd, bufferSize: error.bufferSize) + // TODO: Make buffer size and option + return AsyncBufferSequence(diskIO: readFd) } } diff --git a/Sources/Subprocess/IO/Output.swift b/Sources/Subprocess/IO/Output.swift index d9d6dad..e33292d 100644 --- a/Sources/Subprocess/IO/Output.swift +++ b/Sources/Subprocess/IO/Output.swift @@ -210,15 +210,8 @@ public struct BytesOutput: OutputProtocol { #endif public struct SequenceOutput: OutputProtocol { public typealias OutputType = Void - internal let lowWater: Int? - internal let highWater: Int? - internal let bufferSize: Int - - internal init(lowWater: Int? = nil, highWater: Int? = nil, bufferSize: Int = readBufferSize) { - self.lowWater = lowWater - self.highWater = highWater - self.bufferSize = bufferSize - } + + internal init() {} } #if SubprocessSpan @@ -291,9 +284,6 @@ extension OutputProtocol where Self == SequenceOutput { /// to the `.standardOutput` (or `.standardError`) property /// of `Execution` as `AsyncSequence`. public static var sequence: Self { .init() } - public static func sequence(lowWater: Int? = nil, highWater: Int? = nil, bufferSize: Int? = nil) -> Self { - .init(lowWater: lowWater, highWater: highWater, bufferSize: bufferSize ?? readBufferSize) - } } // MARK: - Span Default Implementations diff --git a/Sources/Subprocess/Platforms/Subprocess+Darwin.swift b/Sources/Subprocess/Platforms/Subprocess+Darwin.swift index 384091b..f26063c 100644 --- a/Sources/Subprocess/Platforms/Subprocess+Darwin.swift +++ b/Sources/Subprocess/Platforms/Subprocess+Darwin.swift @@ -57,6 +57,8 @@ public struct PlatformOptions: Sendable { /// Creates a session and sets the process group ID /// i.e. Detach from the terminal. public var createSession: Bool = false + public var outputOptions: StreamOptions = .init() + public var errorOptions: StreamOptions = .init() /// An ordered list of steps in order to tear down the child /// process in case the parent task is cancelled before /// the child proces terminates. @@ -126,6 +128,18 @@ extension PlatformOptions { #endif } +extension PlatformOptions { + public struct StreamOptions: Sendable { + let lowWater: Int? + let highWater: Int? + + init(lowWater: Int? = nil, highWater: Int? = nil) { + self.lowWater = lowWater + self.highWater = highWater + } + } +} + extension PlatformOptions: CustomStringConvertible, CustomDebugStringConvertible { internal func description(withIndent indent: Int) -> String { let indent = String(repeating: " ", count: indent * 4) @@ -355,8 +369,8 @@ extension Configuration { output: output, error: error, inputPipe: inputPipe.createInputPipe(), - outputPipe: outputPipe.createOutputPipe(), - errorPipe: errorPipe.createOutputPipe() + outputPipe: outputPipe.createOutputPipe(with: platformOptions.outputOptions), + errorPipe: errorPipe.createOutputPipe(with: platformOptions.errorOptions) ) } diff --git a/Sources/Subprocess/Platforms/Subprocess+Linux.swift b/Sources/Subprocess/Platforms/Subprocess+Linux.swift index fa4b965..4239b80 100644 --- a/Sources/Subprocess/Platforms/Subprocess+Linux.swift +++ b/Sources/Subprocess/Platforms/Subprocess+Linux.swift @@ -127,8 +127,8 @@ extension Configuration { output: output, error: error, inputPipe: inputPipe.createInputPipe(), - outputPipe: outputPipe.createOutputPipe(), - errorPipe: errorPipe.createOutputPipe() + outputPipe: outputPipe.createOutputPipe(with: platformOptions.outputOptions), + errorPipe: errorPipe.createOutputPipe(with: platformOptions.errorOptions) ) } @@ -176,6 +176,8 @@ public struct PlatformOptions: Sendable { // Creates a session and sets the process group ID // i.e. Detach from the terminal. public var createSession: Bool = false + public var outputOptions: StreamOptions = .init() + public var errorOptions: StreamOptions = .init() /// An ordered list of steps in order to tear down the child /// process in case the parent task is cancelled before /// the child proces terminates. @@ -197,6 +199,18 @@ public struct PlatformOptions: Sendable { public init() {} } +extension PlatformOptions { + public struct StreamOptions: Sendable { + let lowWater: Int? + let highWater: Int? + + init(lowWater: Int? = nil, highWater: Int? = nil) { + self.lowWater = lowWater + self.highWater = highWater + } + } +} + extension PlatformOptions: CustomStringConvertible, CustomDebugStringConvertible { internal func description(withIndent indent: Int) -> String { let indent = String(repeating: " ", count: indent * 4) diff --git a/Sources/Subprocess/Platforms/Subprocess+Unix.swift b/Sources/Subprocess/Platforms/Subprocess+Unix.swift index 19461db..39ef815 100644 --- a/Sources/Subprocess/Platforms/Subprocess+Unix.swift +++ b/Sources/Subprocess/Platforms/Subprocess+Unix.swift @@ -412,6 +412,7 @@ extension CreatedPipe { } } ) + writeEnd = .init( dispatchIO, closeWhenDone: writeFileDescriptor.closeWhenDone @@ -423,7 +424,7 @@ extension CreatedPipe { ) } - internal func createOutputPipe() -> OutputPipe { + internal func createOutputPipe(with options: PlatformOptions.StreamOptions) -> OutputPipe { var readEnd: TrackedPlatformDiskIO? = nil if let readFileDescriptor = self.readFileDescriptor { let dispatchIO: DispatchIO = DispatchIO( @@ -437,6 +438,15 @@ extension CreatedPipe { } } ) + + if let lowWater = options.lowWater { + dispatchIO.setLimit(lowWater: lowWater) + } + + if let highWater = options.highWater { + dispatchIO.setLimit(highWater: highWater) + } + readEnd = .init( dispatchIO, closeWhenDone: readFileDescriptor.closeWhenDone @@ -451,6 +461,48 @@ extension CreatedPipe { // MARK: - TrackedDispatchIO extensions extension TrackedDispatchIO { + internal func readDataStream(upToLength maxLength: Int) -> AsyncThrowingStream { + return AsyncThrowingStream { continuation in + self.dispatchIO.read( + offset: 0, + length: maxLength, + queue: .global() + ) { done, data, error in + if error != 0 { + continuation.finish(throwing: SubprocessError( + code: .init(.failedToReadFromSubprocess), + underlyingError: .init(rawValue: error) + )) + return + } + + // Treat empty data and nil as the same + let buffer = data.map { $0.isEmpty ? nil : $0 } ?? nil + let status: StreamStatus + + switch (buffer, done) { + case (.some(let data), false): + status = .data(SequenceOutput.Buffer(data: data)) + + case (.some(let data), true): + status = .endOfStream(SequenceOutput.Buffer(data: data)) + + case (nil, false): + return + + case (nil, true): + status = .endOfFile + } + + continuation.yield(status) + + if done { + continuation.finish() + } + } + } + } + #if SubprocessSpan @available(SubprocessSpan, *) #endif diff --git a/Sources/Subprocess/Platforms/Subprocess+Windows.swift b/Sources/Subprocess/Platforms/Subprocess+Windows.swift index 627cdaa..a29a602 100644 --- a/Sources/Subprocess/Platforms/Subprocess+Windows.swift +++ b/Sources/Subprocess/Platforms/Subprocess+Windows.swift @@ -1043,6 +1043,70 @@ extension CreatedPipe { } extension TrackedFileDescriptor { + internal func readDataStream(upToLength maxLength: Int) -> AsyncThrowingStream { + return AsyncThrowingStream { continuation in + do { + var totalBytesRead: Int = 0 + + while totalBytesRead < maxLength { + let values = try [UInt8]( + unsafeUninitializedCapacity: maxLength + ) { buffer, initializedCount in + guard let baseAddress = buffer.baseAddress else { + initializedCount = 0 + return + } + + var bytesRead: DWORD = 0 + let readSucceed = ReadFile( + self.fileDescriptor.platformDescriptor, + UnsafeMutableRawPointer(mutating: baseAddress), + DWORD(maxLength - totalBytesRead), + &bytesRead, + nil + ) + + if !readSucceed { + // Windows throws ERROR_BROKEN_PIPE when the pipe is closed + let error = GetLastError() + if error == ERROR_BROKEN_PIPE { + // We are done reading + initializedCount = 0 + } else { + initializedCount = 0 + throw SubprocessError( + code: .init(.failedToReadFromSubprocess), + underlyingError: .init(rawValue: error) + ) + } + } else { + // We successfully read the current round + initializedCount += Int(bytesRead) + } + } + + if values.count > 0 { + totalBytesRead += values.count + + if totalBytesRead >= maxLength { + continuation.yield(.endOfStream(SequenceOutput.Buffer(data: values))) + continuation.finish() + return + } else { + continuation.yield(.data(SequenceOutput.Buffer(data: values))) + } + } else { + continuation.yield(.endOfFile) + continuation.finish() + return + } + } + } catch { + continuation.finish(throwing: error) + } + } + } + internal func readChunk(upToLength maxLength: Int) async throws -> SequenceOutput.Buffer? { return try await withCheckedThrowingContinuation { continuation in self.readUntilEOF( diff --git a/Tests/SubprocessTests/SubprocessTests+Unix.swift b/Tests/SubprocessTests/SubprocessTests+Unix.swift index 1c064b5..dde9b7e 100644 --- a/Tests/SubprocessTests/SubprocessTests+Unix.swift +++ b/Tests/SubprocessTests/SubprocessTests+Unix.swift @@ -674,12 +674,16 @@ extension SubprocessUnixTests { sleep \(threshold) """ + var platformOptions = PlatformOptions() + platformOptions.outputOptions = .init(lowWater: 0) + let start = ContinuousClock().now let catResult = try await Subprocess.run( .path("/bin/bash"), arguments: ["-c", script], - output: .sequence(lowWater: 0), + platformOptions: platformOptions, + output: .sequence, error: .discarded, body: { (execution, _) in for try await chunk in execution.standardOutput { From 9b173ab770bbc0322a5b79734315a474157dab22 Mon Sep 17 00:00:00 2001 From: Ryan Dingman Date: Wed, 14 May 2025 08:49:32 -0700 Subject: [PATCH 03/18] Mark APIs that reference SequenceOutput.Buffer with @available(SubprocessSpan, *) so they compile with Swift 6.2 --- Sources/Subprocess/AsyncBufferSequence.swift | 3 +++ Sources/Subprocess/Platforms/Subprocess+Unix.swift | 3 +++ Sources/Subprocess/Platforms/Subprocess+Windows.swift | 3 +++ 3 files changed, 9 insertions(+) diff --git a/Sources/Subprocess/AsyncBufferSequence.swift b/Sources/Subprocess/AsyncBufferSequence.swift index 5ff4265..34a8031 100644 --- a/Sources/Subprocess/AsyncBufferSequence.swift +++ b/Sources/Subprocess/AsyncBufferSequence.swift @@ -75,6 +75,9 @@ public struct AsyncBufferSequence: AsyncSequence, Sendable { } extension TrackedPlatformDiskIO { +#if SubprocessSpan +@available(SubprocessSpan, *) +#endif internal enum StreamStatus { case data(SequenceOutput.Buffer) case endOfStream(SequenceOutput.Buffer) diff --git a/Sources/Subprocess/Platforms/Subprocess+Unix.swift b/Sources/Subprocess/Platforms/Subprocess+Unix.swift index 39ef815..f62fc53 100644 --- a/Sources/Subprocess/Platforms/Subprocess+Unix.swift +++ b/Sources/Subprocess/Platforms/Subprocess+Unix.swift @@ -461,6 +461,9 @@ extension CreatedPipe { // MARK: - TrackedDispatchIO extensions extension TrackedDispatchIO { +#if SubprocessSpan +@available(SubprocessSpan, *) +#endif internal func readDataStream(upToLength maxLength: Int) -> AsyncThrowingStream { return AsyncThrowingStream { continuation in self.dispatchIO.read( diff --git a/Sources/Subprocess/Platforms/Subprocess+Windows.swift b/Sources/Subprocess/Platforms/Subprocess+Windows.swift index a29a602..32d25b3 100644 --- a/Sources/Subprocess/Platforms/Subprocess+Windows.swift +++ b/Sources/Subprocess/Platforms/Subprocess+Windows.swift @@ -1043,6 +1043,9 @@ extension CreatedPipe { } extension TrackedFileDescriptor { +#if SubprocessSpan +@available(SubprocessSpan, *) +#endif internal func readDataStream(upToLength maxLength: Int) -> AsyncThrowingStream { return AsyncThrowingStream { continuation in do { From ab43ae4108ae85baba0b7ff4fac8f88d21d03598 Mon Sep 17 00:00:00 2001 From: Ryan Dingman Date: Wed, 14 May 2025 09:06:04 -0700 Subject: [PATCH 04/18] Simplify AsyncBufferSequence to only create one AsyncThrowingStream to read all then data rather than one per chunk --- Sources/Subprocess/AsyncBufferSequence.swift | 22 +++- .../Platforms/Subprocess+Unix.swift | 98 +++++---------- .../Platforms/Subprocess+Windows.swift | 112 +++++++----------- 3 files changed, 92 insertions(+), 140 deletions(-) diff --git a/Sources/Subprocess/AsyncBufferSequence.swift b/Sources/Subprocess/AsyncBufferSequence.swift index 34a8031..ff73de6 100644 --- a/Sources/Subprocess/AsyncBufferSequence.swift +++ b/Sources/Subprocess/AsyncBufferSequence.swift @@ -27,29 +27,41 @@ public struct AsyncBufferSequence: AsyncSequence, Sendable { @_nonSendable public struct Iterator: AsyncIteratorProtocol { public typealias Element = SequenceOutput.Buffer + internal typealias Stream = AsyncThrowingStream private let diskIO: TrackedPlatformDiskIO private var buffer: [UInt8] private var currentPosition: Int private var finished: Bool - private var streamIterator: AsyncThrowingStream.AsyncIterator + private var streamIterator: Stream.AsyncIterator + private let continuation: Stream.Continuation + private var needsNextChunk: Bool internal init(diskIO: TrackedPlatformDiskIO) { self.diskIO = diskIO self.buffer = [] self.currentPosition = 0 self.finished = false - self.streamIterator = diskIO.readDataStream(upToLength: readBufferSize).makeAsyncIterator() + let (stream, continuation) = AsyncThrowingStream.makeStream() + self.streamIterator = stream.makeAsyncIterator() + self.continuation = continuation + self.needsNextChunk = true } public mutating func next() async throws -> SequenceOutput.Buffer? { + + if needsNextChunk { + diskIO.readChunk(upToLength: readBufferSize, continuation: continuation) + needsNextChunk = false + } + if let status = try await streamIterator.next() { switch status { case .data(let data): return data - case .endOfStream(let data): - streamIterator = diskIO.readDataStream(upToLength: readBufferSize).makeAsyncIterator() + case .endOfChunk(let data): + needsNextChunk = true return data case .endOfFile: @@ -80,7 +92,7 @@ extension TrackedPlatformDiskIO { #endif internal enum StreamStatus { case data(SequenceOutput.Buffer) - case endOfStream(SequenceOutput.Buffer) + case endOfChunk(SequenceOutput.Buffer) case endOfFile } } diff --git a/Sources/Subprocess/Platforms/Subprocess+Unix.swift b/Sources/Subprocess/Platforms/Subprocess+Unix.swift index f62fc53..c7f6d08 100644 --- a/Sources/Subprocess/Platforms/Subprocess+Unix.swift +++ b/Sources/Subprocess/Platforms/Subprocess+Unix.swift @@ -464,82 +464,42 @@ extension TrackedDispatchIO { #if SubprocessSpan @available(SubprocessSpan, *) #endif - internal func readDataStream(upToLength maxLength: Int) -> AsyncThrowingStream { - return AsyncThrowingStream { continuation in - self.dispatchIO.read( - offset: 0, - length: maxLength, - queue: .global() - ) { done, data, error in - if error != 0 { - continuation.finish(throwing: SubprocessError( - code: .init(.failedToReadFromSubprocess), - underlyingError: .init(rawValue: error) - )) - return - } - - // Treat empty data and nil as the same - let buffer = data.map { $0.isEmpty ? nil : $0 } ?? nil - let status: StreamStatus - - switch (buffer, done) { - case (.some(let data), false): - status = .data(SequenceOutput.Buffer(data: data)) + internal func readChunk(upToLength maxLength: Int, continuation: AsyncBufferSequence.Iterator.Stream.Continuation) { + self.dispatchIO.read( + offset: 0, + length: maxLength, + queue: .global() + ) { done, data, error in + if error != 0 { + continuation.finish(throwing: SubprocessError( + code: .init(.failedToReadFromSubprocess), + underlyingError: .init(rawValue: error) + )) + return + } - case (.some(let data), true): - status = .endOfStream(SequenceOutput.Buffer(data: data)) + // Treat empty data and nil as the same + let buffer = data.map { $0.isEmpty ? nil : $0 } ?? nil + let status: StreamStatus - case (nil, false): - return + switch (buffer, done) { + case (.some(let data), false): + status = .data(SequenceOutput.Buffer(data: data)) - case (nil, true): - status = .endOfFile - } + case (.some(let data), true): + status = .endOfChunk(SequenceOutput.Buffer(data: data)) - continuation.yield(status) + case (nil, false): + return - if done { - continuation.finish() - } + case (nil, true): + status = .endOfFile } - } - } -#if SubprocessSpan - @available(SubprocessSpan, *) -#endif - package func readChunk(upToLength maxLength: Int) async throws -> SequenceOutput.Buffer? { - return try await withCheckedThrowingContinuation { continuation in - var buffer: DispatchData = .empty - self.dispatchIO.read( - offset: 0, - length: maxLength, - queue: .global() - ) { done, data, error in - if error != 0 { - continuation.resume( - throwing: SubprocessError( - code: .init(.failedToReadFromSubprocess), - underlyingError: .init(rawValue: error) - ) - ) - return - } - if let data = data { - if buffer.isEmpty { - buffer = data - } else { - buffer.append(data) - } - } - if done { - if !buffer.isEmpty { - continuation.resume(returning: SequenceOutput.Buffer(data: buffer)) - } else { - continuation.resume(returning: nil) - } - } + continuation.yield(status) + + if done { + continuation.finish() } } } diff --git a/Sources/Subprocess/Platforms/Subprocess+Windows.swift b/Sources/Subprocess/Platforms/Subprocess+Windows.swift index 32d25b3..42f4968 100644 --- a/Sources/Subprocess/Platforms/Subprocess+Windows.swift +++ b/Sources/Subprocess/Platforms/Subprocess+Windows.swift @@ -1043,85 +1043,65 @@ extension CreatedPipe { } extension TrackedFileDescriptor { -#if SubprocessSpan -@available(SubprocessSpan, *) -#endif - internal func readDataStream(upToLength maxLength: Int) -> AsyncThrowingStream { - return AsyncThrowingStream { continuation in - do { - var totalBytesRead: Int = 0 + internal func readChunk(upToLength maxLength: Int, continuation: AsyncBufferSequence.Iterator.Stream.Continuation) { + do { + var totalBytesRead: Int = 0 - while totalBytesRead < maxLength { - let values = try [UInt8]( - unsafeUninitializedCapacity: maxLength - ) { buffer, initializedCount in - guard let baseAddress = buffer.baseAddress else { - initializedCount = 0 - return - } + while totalBytesRead < maxLength { + let values = try [UInt8]( + unsafeUninitializedCapacity: maxLength - totalBytesRead + ) { buffer, initializedCount in + guard let baseAddress = buffer.baseAddress else { + initializedCount = 0 + return + } - var bytesRead: DWORD = 0 - let readSucceed = ReadFile( - self.fileDescriptor.platformDescriptor, - UnsafeMutableRawPointer(mutating: baseAddress), - DWORD(maxLength - totalBytesRead), - &bytesRead, - nil - ) + var bytesRead: DWORD = 0 + let readSucceed = ReadFile( + self.fileDescriptor.platformDescriptor, + UnsafeMutableRawPointer(mutating: baseAddress), + DWORD(maxLength - totalBytesRead), + &bytesRead, + nil + ) - if !readSucceed { - // Windows throws ERROR_BROKEN_PIPE when the pipe is closed - let error = GetLastError() - if error == ERROR_BROKEN_PIPE { - // We are done reading - initializedCount = 0 - } else { - initializedCount = 0 - throw SubprocessError( - code: .init(.failedToReadFromSubprocess), - underlyingError: .init(rawValue: error) - ) - } + if !readSucceed { + // Windows throws ERROR_BROKEN_PIPE when the pipe is closed + let error = GetLastError() + if error == ERROR_BROKEN_PIPE { + // We are done reading + initializedCount = 0 } else { - // We successfully read the current round - initializedCount += Int(bytesRead) + initializedCount = 0 + throw SubprocessError( + code: .init(.failedToReadFromSubprocess), + underlyingError: .init(rawValue: error) + ) } + } else { + // We successfully read the current round + initializedCount += Int(bytesRead) } + } - if values.count > 0 { - totalBytesRead += values.count + if values.count > 0 { + totalBytesRead += values.count - if totalBytesRead >= maxLength { - continuation.yield(.endOfStream(SequenceOutput.Buffer(data: values))) - continuation.finish() - return - } else { - continuation.yield(.data(SequenceOutput.Buffer(data: values))) - } - } else { - continuation.yield(.endOfFile) + if totalBytesRead >= maxLength { + continuation.yield(.endOfStream(SequenceOutput.Buffer(data: values))) continuation.finish() return + } else { + continuation.yield(.data(SequenceOutput.Buffer(data: values))) } - } - } catch { - continuation.finish(throwing: error) - } - } - } - - internal func readChunk(upToLength maxLength: Int) async throws -> SequenceOutput.Buffer? { - return try await withCheckedThrowingContinuation { continuation in - self.readUntilEOF( - upToLength: maxLength - ) { result in - switch result { - case .failure(let error): - continuation.resume(throwing: error) - case .success(let bytes): - continuation.resume(returning: SequenceOutput.Buffer(data: bytes)) + } else { + continuation.yield(.endOfFile) + continuation.finish() + return } } + } catch { + continuation.finish(throwing: error) } } From 10aa5230ec1d0c3b8d6db6035ab6fade7646235f Mon Sep 17 00:00:00 2001 From: Ryan Dingman Date: Wed, 14 May 2025 10:15:18 -0700 Subject: [PATCH 05/18] Add fatalError for unexpected result from DispatchIO.read --- Sources/Subprocess/Platforms/Subprocess+Unix.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Sources/Subprocess/Platforms/Subprocess+Unix.swift b/Sources/Subprocess/Platforms/Subprocess+Unix.swift index c7f6d08..780f6fd 100644 --- a/Sources/Subprocess/Platforms/Subprocess+Unix.swift +++ b/Sources/Subprocess/Platforms/Subprocess+Unix.swift @@ -490,7 +490,7 @@ extension TrackedDispatchIO { status = .endOfChunk(SequenceOutput.Buffer(data: data)) case (nil, false): - return + fatalError("Unexpectedly received no data from DispatchIO with it indicating it is not done.") case (nil, true): status = .endOfFile From aa903ad06b682b430764b1e8acca19b00a3e0102 Mon Sep 17 00:00:00 2001 From: Ryan Dingman Date: Wed, 14 May 2025 10:23:27 -0700 Subject: [PATCH 06/18] Fix build on Windows --- Sources/Subprocess/Platforms/Subprocess+Windows.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Sources/Subprocess/Platforms/Subprocess+Windows.swift b/Sources/Subprocess/Platforms/Subprocess+Windows.swift index 42f4968..dbd4e80 100644 --- a/Sources/Subprocess/Platforms/Subprocess+Windows.swift +++ b/Sources/Subprocess/Platforms/Subprocess+Windows.swift @@ -1088,7 +1088,7 @@ extension TrackedFileDescriptor { totalBytesRead += values.count if totalBytesRead >= maxLength { - continuation.yield(.endOfStream(SequenceOutput.Buffer(data: values))) + continuation.yield(.endOfChunk(SequenceOutput.Buffer(data: values))) continuation.finish() return } else { From bb57bf22c2c8bb9e9e211d81bf89955487767f7c Mon Sep 17 00:00:00 2001 From: Ryan Dingman Date: Wed, 14 May 2025 11:31:50 -0700 Subject: [PATCH 07/18] Update PlatformOptions to have preferredStreamBufferSizeRange and use RangeExpression to express the various ways to configure the low and high watermark --- .../Platforms/Subprocess+Darwin.swift | 33 +++++++++---------- .../Platforms/Subprocess+Linux.swift | 23 +++++++------ .../Platforms/Subprocess+Unix.swift | 24 ++++++++++---- .../SubprocessTests+Unix.swift | 2 +- 4 files changed, 45 insertions(+), 37 deletions(-) diff --git a/Sources/Subprocess/Platforms/Subprocess+Darwin.swift b/Sources/Subprocess/Platforms/Subprocess+Darwin.swift index f26063c..49cf60c 100644 --- a/Sources/Subprocess/Platforms/Subprocess+Darwin.swift +++ b/Sources/Subprocess/Platforms/Subprocess+Darwin.swift @@ -57,8 +57,9 @@ public struct PlatformOptions: Sendable { /// Creates a session and sets the process group ID /// i.e. Detach from the terminal. public var createSession: Bool = false - public var outputOptions: StreamOptions = .init() - public var errorOptions: StreamOptions = .init() + + private(set) var preferredStreamBufferSizeRange: (any RangeExpression & Sendable)? = nil + /// An ordered list of steps in order to tear down the child /// process in case the parent task is cancelled before /// the child proces terminates. @@ -84,7 +85,17 @@ public struct PlatformOptions: Sendable { ) throws -> Void )? = nil - public init() {} + public init() { + self.preferredStreamBufferSizeRange = nil + } + + public init(preferredStreamBufferSizeRange: R?) where R: RangeExpression & Sendable, R.Bound == Int { + self.preferredStreamBufferSizeRange = preferredStreamBufferSizeRange + } + + mutating func setPreferredStreamBufferSizeRange(_ range: R?) where R: RangeExpression & Sendable, R.Bound == Int { + self.preferredStreamBufferSizeRange = range + } } extension PlatformOptions { @@ -128,18 +139,6 @@ extension PlatformOptions { #endif } -extension PlatformOptions { - public struct StreamOptions: Sendable { - let lowWater: Int? - let highWater: Int? - - init(lowWater: Int? = nil, highWater: Int? = nil) { - self.lowWater = lowWater - self.highWater = highWater - } - } -} - extension PlatformOptions: CustomStringConvertible, CustomDebugStringConvertible { internal func description(withIndent indent: Int) -> String { let indent = String(repeating: " ", count: indent * 4) @@ -369,8 +368,8 @@ extension Configuration { output: output, error: error, inputPipe: inputPipe.createInputPipe(), - outputPipe: outputPipe.createOutputPipe(with: platformOptions.outputOptions), - errorPipe: errorPipe.createOutputPipe(with: platformOptions.errorOptions) + outputPipe: outputPipe.createOutputPipe(with: platformOptions), + errorPipe: errorPipe.createOutputPipe(with: platformOptions) ) } diff --git a/Sources/Subprocess/Platforms/Subprocess+Linux.swift b/Sources/Subprocess/Platforms/Subprocess+Linux.swift index 4239b80..63aa1eb 100644 --- a/Sources/Subprocess/Platforms/Subprocess+Linux.swift +++ b/Sources/Subprocess/Platforms/Subprocess+Linux.swift @@ -176,8 +176,9 @@ public struct PlatformOptions: Sendable { // Creates a session and sets the process group ID // i.e. Detach from the terminal. public var createSession: Bool = false - public var outputOptions: StreamOptions = .init() - public var errorOptions: StreamOptions = .init() + + private(set) var preferredStreamBufferSizeRange: (any RangeExpression & Sendable)? = nil + /// An ordered list of steps in order to tear down the child /// process in case the parent task is cancelled before /// the child proces terminates. @@ -196,18 +197,16 @@ public struct PlatformOptions: Sendable { /// call any necessary process setup functions. public var preSpawnProcessConfigurator: (@convention(c) @Sendable () -> Void)? = nil - public init() {} -} + public init() { + self.preferredStreamBufferSizeRange = nil + } -extension PlatformOptions { - public struct StreamOptions: Sendable { - let lowWater: Int? - let highWater: Int? + public init(preferredStreamBufferSizeRange: R?) where R: RangeExpression & Sendable, R.Bound == Int { + self.preferredStreamBufferSizeRange = preferredStreamBufferSizeRange + } - init(lowWater: Int? = nil, highWater: Int? = nil) { - self.lowWater = lowWater - self.highWater = highWater - } + mutating func setPreferredStreamBufferSizeRange(_ range: R?) where R: RangeExpression & Sendable, R.Bound == Int { + self.preferredStreamBufferSizeRange = range } } diff --git a/Sources/Subprocess/Platforms/Subprocess+Unix.swift b/Sources/Subprocess/Platforms/Subprocess+Unix.swift index 780f6fd..8995e74 100644 --- a/Sources/Subprocess/Platforms/Subprocess+Unix.swift +++ b/Sources/Subprocess/Platforms/Subprocess+Unix.swift @@ -424,7 +424,7 @@ extension CreatedPipe { ) } - internal func createOutputPipe(with options: PlatformOptions.StreamOptions) -> OutputPipe { + internal func createOutputPipe(with options: PlatformOptions) -> OutputPipe { var readEnd: TrackedPlatformDiskIO? = nil if let readFileDescriptor = self.readFileDescriptor { let dispatchIO: DispatchIO = DispatchIO( @@ -439,12 +439,22 @@ extension CreatedPipe { } ) - if let lowWater = options.lowWater { - dispatchIO.setLimit(lowWater: lowWater) - } - - if let highWater = options.highWater { - dispatchIO.setLimit(highWater: highWater) + if let preferredStreamBufferSizeRange = options.preferredStreamBufferSizeRange { + if let range = preferredStreamBufferSizeRange as? Range { + dispatchIO.setLimit(lowWater: range.lowerBound) + dispatchIO.setLimit(highWater: range.upperBound) + } else if let range = preferredStreamBufferSizeRange as? ClosedRange { + dispatchIO.setLimit(lowWater: range.lowerBound) + dispatchIO.setLimit(highWater: range.upperBound) + } else if let range = preferredStreamBufferSizeRange as? PartialRangeFrom { + dispatchIO.setLimit(lowWater: range.lowerBound) + } else if let range = preferredStreamBufferSizeRange as? PartialRangeUpTo { + dispatchIO.setLimit(highWater: range.upperBound - 1) + } else if let range = preferredStreamBufferSizeRange as? PartialRangeThrough { + dispatchIO.setLimit(highWater: range.upperBound) + } else { + fatalError("Unsupported preferredStreamBufferSizeRange: \(preferredStreamBufferSizeRange)") + } } readEnd = .init( diff --git a/Tests/SubprocessTests/SubprocessTests+Unix.swift b/Tests/SubprocessTests/SubprocessTests+Unix.swift index dde9b7e..0776c75 100644 --- a/Tests/SubprocessTests/SubprocessTests+Unix.swift +++ b/Tests/SubprocessTests/SubprocessTests+Unix.swift @@ -675,7 +675,7 @@ extension SubprocessUnixTests { """ var platformOptions = PlatformOptions() - platformOptions.outputOptions = .init(lowWater: 0) + platformOptions.setPreferredStreamBufferSizeRange(0...) let start = ContinuousClock().now From 955e9c9249d1f103df72e967cbcf0ad69bf38e64 Mon Sep 17 00:00:00 2001 From: Ryan Dingman Date: Wed, 14 May 2025 11:45:46 -0700 Subject: [PATCH 08/18] Fix build on Linux --- Sources/Subprocess/Platforms/Subprocess+Linux.swift | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Sources/Subprocess/Platforms/Subprocess+Linux.swift b/Sources/Subprocess/Platforms/Subprocess+Linux.swift index 63aa1eb..f65ec47 100644 --- a/Sources/Subprocess/Platforms/Subprocess+Linux.swift +++ b/Sources/Subprocess/Platforms/Subprocess+Linux.swift @@ -127,8 +127,8 @@ extension Configuration { output: output, error: error, inputPipe: inputPipe.createInputPipe(), - outputPipe: outputPipe.createOutputPipe(with: platformOptions.outputOptions), - errorPipe: errorPipe.createOutputPipe(with: platformOptions.errorOptions) + outputPipe: outputPipe.createOutputPipe(with: platformOptions), + errorPipe: errorPipe.createOutputPipe(with: platformOptions) ) } From 5f2df5f630f62aae2249f4693388d82761939e76 Mon Sep 17 00:00:00 2001 From: Ryan Dingman Date: Wed, 14 May 2025 13:28:42 -0700 Subject: [PATCH 09/18] Fix Swift 6.2 build --- .../SubprocessTests+Unix.swift | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/Tests/SubprocessTests/SubprocessTests+Unix.swift b/Tests/SubprocessTests/SubprocessTests+Unix.swift index 0776c75..0b59ca4 100644 --- a/Tests/SubprocessTests/SubprocessTests+Unix.swift +++ b/Tests/SubprocessTests/SubprocessTests+Unix.swift @@ -666,7 +666,22 @@ extension SubprocessUnixTests { #expect(catResult.standardError == expected) } - @Test func testSlowDripRedirectedOutputRedirectToSequence() async throws { + @Test( + .enabled( + if: { + if #available(SubprocessSpan , *) { + true + } else { + false + } + }(), + "This test requires SubprocessSpan" + ) + ) + func testSlowDripRedirectedOutputRedirectToSequence() async throws { + guard #available(SubprocessSpan , *) else { + return + } let threshold: Double = 0.5 let script = """ @@ -687,7 +702,7 @@ extension SubprocessUnixTests { error: .discarded, body: { (execution, _) in for try await chunk in execution.standardOutput { - let string = chunk.withUnsafeBytes { String(decoding: $0, as: UTF8.self) } + let string = chunk._withUnsafeBytes { String(decoding: $0, as: UTF8.self) } if string.hasPrefix("DONE") { let end = ContinuousClock().now From 4bac06ab803bb05de380e6bdbbdd2a19747a1262 Mon Sep 17 00:00:00 2001 From: Ryan Dingman Date: Wed, 14 May 2025 13:46:27 -0700 Subject: [PATCH 10/18] Update how we disable tests that require SubprocessSpan --- Tests/SubprocessTests/SubprocessTests+Unix.swift | 16 +--------------- Tests/SubprocessTests/TestSupport.swift | 8 ++++++++ 2 files changed, 9 insertions(+), 15 deletions(-) diff --git a/Tests/SubprocessTests/SubprocessTests+Unix.swift b/Tests/SubprocessTests/SubprocessTests+Unix.swift index 0b59ca4..fe42bb9 100644 --- a/Tests/SubprocessTests/SubprocessTests+Unix.swift +++ b/Tests/SubprocessTests/SubprocessTests+Unix.swift @@ -666,22 +666,8 @@ extension SubprocessUnixTests { #expect(catResult.standardError == expected) } - @Test( - .enabled( - if: { - if #available(SubprocessSpan , *) { - true - } else { - false - } - }(), - "This test requires SubprocessSpan" - ) - ) + @Test(.enabled(if: hasSubprocessSpan, "This test requires SubprocessSpan")) func testSlowDripRedirectedOutputRedirectToSequence() async throws { - guard #available(SubprocessSpan , *) else { - return - } let threshold: Double = 0.5 let script = """ diff --git a/Tests/SubprocessTests/TestSupport.swift b/Tests/SubprocessTests/TestSupport.swift index 8939ac2..31e4acc 100644 --- a/Tests/SubprocessTests/TestSupport.swift +++ b/Tests/SubprocessTests/TestSupport.swift @@ -42,3 +42,11 @@ internal func directory(_ lhs: String, isSameAs rhs: String) -> Bool { return canonicalLhs == canonicalRhs } + +internal var hasSubprocessSpan: Bool { + if #available(SubprocessSpan , *) { + true + } else { + false + } +} From ae0de6101ffe7e71a941de91040ec2601f675464 Mon Sep 17 00:00:00 2001 From: Ryan Dingman Date: Wed, 21 May 2025 09:06:01 -0700 Subject: [PATCH 11/18] Return to configuring the minimum and maximum buffer size through StreamOptions --- .../Platforms/Subprocess+Darwin.swift | 20 +++++++++-------- .../Platforms/Subprocess+Linux.swift | 20 +++++++++-------- .../Platforms/Subprocess+Unix.swift | 22 +++++-------------- .../SubprocessTests+Unix.swift | 2 +- 4 files changed, 29 insertions(+), 35 deletions(-) diff --git a/Sources/Subprocess/Platforms/Subprocess+Darwin.swift b/Sources/Subprocess/Platforms/Subprocess+Darwin.swift index 49cf60c..8eb80d3 100644 --- a/Sources/Subprocess/Platforms/Subprocess+Darwin.swift +++ b/Sources/Subprocess/Platforms/Subprocess+Darwin.swift @@ -58,7 +58,7 @@ public struct PlatformOptions: Sendable { /// i.e. Detach from the terminal. public var createSession: Bool = false - private(set) var preferredStreamBufferSizeRange: (any RangeExpression & Sendable)? = nil + public var streamOptions: StreamOptions = .init() /// An ordered list of steps in order to tear down the child /// process in case the parent task is cancelled before @@ -85,16 +85,18 @@ public struct PlatformOptions: Sendable { ) throws -> Void )? = nil - public init() { - self.preferredStreamBufferSizeRange = nil - } + public init() {} +} - public init(preferredStreamBufferSizeRange: R?) where R: RangeExpression & Sendable, R.Bound == Int { - self.preferredStreamBufferSizeRange = preferredStreamBufferSizeRange - } +extension PlatformOptions { + public struct StreamOptions: Sendable { + let minimumBufferSize: Int? + let maximumBufferSize: Int? - mutating func setPreferredStreamBufferSizeRange(_ range: R?) where R: RangeExpression & Sendable, R.Bound == Int { - self.preferredStreamBufferSizeRange = range + public init(minimumBufferSize: Int? = nil, maximumBufferSize: Int? = nil) { + self.minimumBufferSize = minimumBufferSize + self.maximumBufferSize = maximumBufferSize + } } } diff --git a/Sources/Subprocess/Platforms/Subprocess+Linux.swift b/Sources/Subprocess/Platforms/Subprocess+Linux.swift index f65ec47..9f945ef 100644 --- a/Sources/Subprocess/Platforms/Subprocess+Linux.swift +++ b/Sources/Subprocess/Platforms/Subprocess+Linux.swift @@ -177,7 +177,7 @@ public struct PlatformOptions: Sendable { // i.e. Detach from the terminal. public var createSession: Bool = false - private(set) var preferredStreamBufferSizeRange: (any RangeExpression & Sendable)? = nil + public var streamOptions: StreamOptions = .init() /// An ordered list of steps in order to tear down the child /// process in case the parent task is cancelled before @@ -197,16 +197,18 @@ public struct PlatformOptions: Sendable { /// call any necessary process setup functions. public var preSpawnProcessConfigurator: (@convention(c) @Sendable () -> Void)? = nil - public init() { - self.preferredStreamBufferSizeRange = nil - } + public init() {} +} - public init(preferredStreamBufferSizeRange: R?) where R: RangeExpression & Sendable, R.Bound == Int { - self.preferredStreamBufferSizeRange = preferredStreamBufferSizeRange - } +extension PlatformOptions { + public struct StreamOptions: Sendable { + public let minimumBufferSize: Int? + public let maximumBufferSize: Int? - mutating func setPreferredStreamBufferSizeRange(_ range: R?) where R: RangeExpression & Sendable, R.Bound == Int { - self.preferredStreamBufferSizeRange = range + public init(minimumBufferSize: Int? = nil, maximumBufferSize: Int? = nil) { + self.minimumBufferSize = minimumBufferSize + self.maximumBufferSize = maximumBufferSize + } } } diff --git a/Sources/Subprocess/Platforms/Subprocess+Unix.swift b/Sources/Subprocess/Platforms/Subprocess+Unix.swift index 8995e74..02dbc09 100644 --- a/Sources/Subprocess/Platforms/Subprocess+Unix.swift +++ b/Sources/Subprocess/Platforms/Subprocess+Unix.swift @@ -439,22 +439,12 @@ extension CreatedPipe { } ) - if let preferredStreamBufferSizeRange = options.preferredStreamBufferSizeRange { - if let range = preferredStreamBufferSizeRange as? Range { - dispatchIO.setLimit(lowWater: range.lowerBound) - dispatchIO.setLimit(highWater: range.upperBound) - } else if let range = preferredStreamBufferSizeRange as? ClosedRange { - dispatchIO.setLimit(lowWater: range.lowerBound) - dispatchIO.setLimit(highWater: range.upperBound) - } else if let range = preferredStreamBufferSizeRange as? PartialRangeFrom { - dispatchIO.setLimit(lowWater: range.lowerBound) - } else if let range = preferredStreamBufferSizeRange as? PartialRangeUpTo { - dispatchIO.setLimit(highWater: range.upperBound - 1) - } else if let range = preferredStreamBufferSizeRange as? PartialRangeThrough { - dispatchIO.setLimit(highWater: range.upperBound) - } else { - fatalError("Unsupported preferredStreamBufferSizeRange: \(preferredStreamBufferSizeRange)") - } + if let minimumBufferSize = options.streamOptions.minimumBufferSize { + dispatchIO.setLimit(lowWater: minimumBufferSize) + } + + if let maximumBufferSize = options.streamOptions.maximumBufferSize { + dispatchIO.setLimit(lowWater: maximumBufferSize) } readEnd = .init( diff --git a/Tests/SubprocessTests/SubprocessTests+Unix.swift b/Tests/SubprocessTests/SubprocessTests+Unix.swift index fe42bb9..ca8ad7e 100644 --- a/Tests/SubprocessTests/SubprocessTests+Unix.swift +++ b/Tests/SubprocessTests/SubprocessTests+Unix.swift @@ -676,7 +676,7 @@ extension SubprocessUnixTests { """ var platformOptions = PlatformOptions() - platformOptions.setPreferredStreamBufferSizeRange(0...) + platformOptions.streamOptions = .init(minimumBufferSize: 0) let start = ContinuousClock().now From d65f2ce40eaefa8fd2b7b7fd219121539aeb7e3f Mon Sep 17 00:00:00 2001 From: Ryan Dingman Date: Wed, 21 May 2025 09:12:21 -0700 Subject: [PATCH 12/18] Change testSlowDripRedirectedOutputRedirectToSequence to guarding against the availability of SubprocessSpan like the other tests. --- Tests/SubprocessTests/SubprocessTests+Unix.swift | 6 ++++-- Tests/SubprocessTests/TestSupport.swift | 8 -------- 2 files changed, 4 insertions(+), 10 deletions(-) diff --git a/Tests/SubprocessTests/SubprocessTests+Unix.swift b/Tests/SubprocessTests/SubprocessTests+Unix.swift index ca8ad7e..12ac540 100644 --- a/Tests/SubprocessTests/SubprocessTests+Unix.swift +++ b/Tests/SubprocessTests/SubprocessTests+Unix.swift @@ -666,8 +666,10 @@ extension SubprocessUnixTests { #expect(catResult.standardError == expected) } - @Test(.enabled(if: hasSubprocessSpan, "This test requires SubprocessSpan")) - func testSlowDripRedirectedOutputRedirectToSequence() async throws { + @Test func testSlowDripRedirectedOutputRedirectToSequence() async throws { + guard #available(SubprocessSpan , *) else { + return + } let threshold: Double = 0.5 let script = """ diff --git a/Tests/SubprocessTests/TestSupport.swift b/Tests/SubprocessTests/TestSupport.swift index 31e4acc..8939ac2 100644 --- a/Tests/SubprocessTests/TestSupport.swift +++ b/Tests/SubprocessTests/TestSupport.swift @@ -42,11 +42,3 @@ internal func directory(_ lhs: String, isSameAs rhs: String) -> Bool { return canonicalLhs == canonicalRhs } - -internal var hasSubprocessSpan: Bool { - if #available(SubprocessSpan , *) { - true - } else { - false - } -} From c8fe778ea52179334773ec0340a72544d8ce4e6a Mon Sep 17 00:00:00 2001 From: Ryan Dingman Date: Wed, 21 May 2025 09:15:15 -0700 Subject: [PATCH 13/18] Remove unneeded import of Dispatch --- Sources/Subprocess/AsyncBufferSequence.swift | 2 -- 1 file changed, 2 deletions(-) diff --git a/Sources/Subprocess/AsyncBufferSequence.swift b/Sources/Subprocess/AsyncBufferSequence.swift index ff73de6..6ca6c84 100644 --- a/Sources/Subprocess/AsyncBufferSequence.swift +++ b/Sources/Subprocess/AsyncBufferSequence.swift @@ -15,8 +15,6 @@ @preconcurrency import SystemPackage #endif -internal import Dispatch - #if SubprocessSpan @available(SubprocessSpan, *) #endif From 30876f562fb51476f66e942369b79f022adba07d Mon Sep 17 00:00:00 2001 From: Ryan Dingman Date: Wed, 21 May 2025 10:29:04 -0700 Subject: [PATCH 14/18] Fix build on Windows --- Sources/Subprocess/Platforms/Subprocess+Windows.swift | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Sources/Subprocess/Platforms/Subprocess+Windows.swift b/Sources/Subprocess/Platforms/Subprocess+Windows.swift index 66b1e59..95f3176 100644 --- a/Sources/Subprocess/Platforms/Subprocess+Windows.swift +++ b/Sources/Subprocess/Platforms/Subprocess+Windows.swift @@ -1093,7 +1093,7 @@ extension FileDescriptor { var bytesRead: DWORD = 0 let readSucceed = ReadFile( - self.fileDescriptor.platformDescriptor, + self.platformDescriptor, UnsafeMutableRawPointer(mutating: baseAddress), DWORD(maxLength - totalBytesRead), &bytesRead, @@ -1123,11 +1123,11 @@ extension FileDescriptor { totalBytesRead += values.count if totalBytesRead >= maxLength { - continuation.yield(.endOfChunk(SequenceOutput.Buffer(data: values))) + continuation.yield(.endOfChunk(AsyncBufferSequence.Buffer(data: values))) continuation.finish() return } else { - continuation.yield(.data(SequenceOutput.Buffer(data: values))) + continuation.yield(.data(AsyncBufferSequence.Buffer(data: values))) } } else { continuation.yield(.endOfFile) From ff7ae125ac67f99b403a7bd05009a59b41d8d22c Mon Sep 17 00:00:00 2001 From: Ryan Dingman Date: Thu, 22 May 2025 23:22:08 -0700 Subject: [PATCH 15/18] Simplify interaction between AsyncBufferSequence.Iterator and its AsyncStream --- Sources/Subprocess/AsyncBufferSequence.swift | 45 +++++-------------- .../Platforms/Subprocess+Unix.swift | 25 ++--------- .../Platforms/Subprocess+Windows.swift | 12 +---- 3 files changed, 16 insertions(+), 66 deletions(-) diff --git a/Sources/Subprocess/AsyncBufferSequence.swift b/Sources/Subprocess/AsyncBufferSequence.swift index bf42cd0..f30d741 100644 --- a/Sources/Subprocess/AsyncBufferSequence.swift +++ b/Sources/Subprocess/AsyncBufferSequence.swift @@ -35,7 +35,7 @@ public struct AsyncBufferSequence: AsyncSequence, Sendable { @_nonSendable public struct Iterator: AsyncIteratorProtocol { public typealias Element = Buffer - internal typealias Stream = AsyncThrowingStream + internal typealias Stream = AsyncThrowingStream private let diskIO: DiskIO private var buffer: [UInt8] @@ -43,17 +43,17 @@ public struct AsyncBufferSequence: AsyncSequence, Sendable { private var finished: Bool private var streamIterator: Stream.AsyncIterator private let continuation: Stream.Continuation - private var needsNextChunk: Bool + private var bytesRemaining: Int internal init(diskIO: DiskIO, streamOptions: PlatformOptions.StreamOptions) { self.diskIO = diskIO self.buffer = [] self.currentPosition = 0 self.finished = false - let (stream, continuation) = AsyncThrowingStream.makeStream() + let (stream, continuation) = AsyncThrowingStream.makeStream() self.streamIterator = stream.makeAsyncIterator() self.continuation = continuation - self.needsNextChunk = true + self.bytesRemaining = 0 #if !os(Windows) if let minimumBufferSize = streamOptions.minimumBufferSize { @@ -68,28 +68,14 @@ public struct AsyncBufferSequence: AsyncSequence, Sendable { public mutating func next() async throws -> Buffer? { - if needsNextChunk { - diskIO.readChunk(upToLength: readBufferSize, continuation: continuation) - needsNextChunk = false + if bytesRemaining <= 0 { + bytesRemaining = readBufferSize + diskIO.stream(upToLength: readBufferSize, continuation: continuation) } - if let status = try await streamIterator.next() { - switch status { - case .data(let data): - return data - - case .endOfChunk(let data): - needsNextChunk = true - return data - - case .endOfFile: - #if os(Windows) - try self.diskIO.close() - #else - self.diskIO.close() - #endif - return nil - } + if let buffer = try await streamIterator.next() { + bytesRemaining -= buffer.count + return buffer } else { #if os(Windows) try self.diskIO.close() @@ -114,17 +100,6 @@ public struct AsyncBufferSequence: AsyncSequence, Sendable { } } -extension AsyncBufferSequence { -#if SubprocessSpan -@available(SubprocessSpan, *) -#endif - internal enum StreamStatus { - case data(AsyncBufferSequence.Buffer) - case endOfChunk(AsyncBufferSequence.Buffer) - case endOfFile - } -} - // MARK: - Page Size import _SubprocessCShims diff --git a/Sources/Subprocess/Platforms/Subprocess+Unix.swift b/Sources/Subprocess/Platforms/Subprocess+Unix.swift index 780cb87..fef085a 100644 --- a/Sources/Subprocess/Platforms/Subprocess+Unix.swift +++ b/Sources/Subprocess/Platforms/Subprocess+Unix.swift @@ -415,7 +415,7 @@ extension DispatchIO { #if SubprocessSpan @available(SubprocessSpan, *) #endif - internal func readChunk(upToLength maxLength: Int, continuation: AsyncBufferSequence.Iterator.Stream.Continuation) { + internal func stream(upToLength maxLength: Int, continuation: AsyncBufferSequence.Iterator.Stream.Continuation) { self.read( offset: 0, length: maxLength, @@ -430,26 +430,9 @@ extension DispatchIO { } // Treat empty data and nil as the same - let buffer = data.map { $0.isEmpty ? nil : $0 } ?? nil - let status: AsyncBufferSequence.StreamStatus - - switch (buffer, done) { - case (.some(let data), false): - status = .data(AsyncBufferSequence.Buffer(data: data)) - - case (.some(let data), true): - status = .endOfChunk(AsyncBufferSequence.Buffer(data: data)) - - case (nil, false): - fatalError("Unexpectedly received no data from DispatchIO with it indicating it is not done.") - - case (nil, true): - status = .endOfFile - } - - continuation.yield(status) - - if done { + if let data = data.map({ $0.isEmpty ? nil : $0 }) ?? nil { + continuation.yield(AsyncBufferSequence.Buffer(data: data)) + } else if done { continuation.finish() } } diff --git a/Sources/Subprocess/Platforms/Subprocess+Windows.swift b/Sources/Subprocess/Platforms/Subprocess+Windows.swift index 95f3176..e6377e3 100644 --- a/Sources/Subprocess/Platforms/Subprocess+Windows.swift +++ b/Sources/Subprocess/Platforms/Subprocess+Windows.swift @@ -1078,7 +1078,7 @@ extension FileDescriptor { } extension FileDescriptor { - internal func readChunk(upToLength maxLength: Int, continuation: AsyncBufferSequence.Iterator.Stream.Continuation) { + internal func stream(upToLength maxLength: Int, continuation: AsyncBufferSequence.Iterator.Stream.Continuation) { do { var totalBytesRead: Int = 0 @@ -1121,16 +1121,8 @@ extension FileDescriptor { if values.count > 0 { totalBytesRead += values.count - - if totalBytesRead >= maxLength { - continuation.yield(.endOfChunk(AsyncBufferSequence.Buffer(data: values))) - continuation.finish() - return - } else { - continuation.yield(.data(AsyncBufferSequence.Buffer(data: values))) - } + continuation.yield(AsyncBufferSequence.Buffer(data: values)) } else { - continuation.yield(.endOfFile) continuation.finish() return } From 48b97f27b84cda3f29b8a26beb9860b33268514c Mon Sep 17 00:00:00 2001 From: Ryan Dingman Date: Thu, 22 May 2025 23:26:31 -0700 Subject: [PATCH 16/18] Removed unneeded import --- Sources/Subprocess/Execution.swift | 2 -- 1 file changed, 2 deletions(-) diff --git a/Sources/Subprocess/Execution.swift b/Sources/Subprocess/Execution.swift index 60dbf81..c783c22 100644 --- a/Sources/Subprocess/Execution.swift +++ b/Sources/Subprocess/Execution.swift @@ -27,8 +27,6 @@ import Musl import WinSDK #endif -internal import Dispatch - /// An object that repersents a subprocess that has been /// executed. You can use this object to send signals to the /// child process as well as stream its output and error. From d94a9c6355b981b18fab31b99644c4a2e5fb28ab Mon Sep 17 00:00:00 2001 From: Ryan Dingman Date: Fri, 23 May 2025 10:47:47 -0700 Subject: [PATCH 17/18] Improve error handling --- Sources/Subprocess/AsyncBufferSequence.swift | 17 ++++++++++++++++- .../Subprocess/Platforms/Subprocess+Unix.swift | 13 ++++++++++--- 2 files changed, 26 insertions(+), 4 deletions(-) diff --git a/Sources/Subprocess/AsyncBufferSequence.swift b/Sources/Subprocess/AsyncBufferSequence.swift index f30d741..b666f95 100644 --- a/Sources/Subprocess/AsyncBufferSequence.swift +++ b/Sources/Subprocess/AsyncBufferSequence.swift @@ -73,7 +73,21 @@ public struct AsyncBufferSequence: AsyncSequence, Sendable { diskIO.stream(upToLength: readBufferSize, continuation: continuation) } - if let buffer = try await streamIterator.next() { + let buffer: Buffer? + + do { + buffer = try await streamIterator.next() + } catch { + #if os(Windows) + try self.diskIO.close() + #else + self.diskIO.close() + #endif + + throw error + } + + if let buffer { bytesRemaining -= buffer.count return buffer } else { @@ -82,6 +96,7 @@ public struct AsyncBufferSequence: AsyncSequence, Sendable { #else self.diskIO.close() #endif + return nil } } diff --git a/Sources/Subprocess/Platforms/Subprocess+Unix.swift b/Sources/Subprocess/Platforms/Subprocess+Unix.swift index fef085a..6b24389 100644 --- a/Sources/Subprocess/Platforms/Subprocess+Unix.swift +++ b/Sources/Subprocess/Platforms/Subprocess+Unix.swift @@ -421,7 +421,7 @@ extension DispatchIO { length: maxLength, queue: .global() ) { done, data, error in - if error != 0 { + guard error == 0 else { continuation.finish(throwing: SubprocessError( code: .init(.failedToReadFromSubprocess), underlyingError: .init(rawValue: error) @@ -429,11 +429,18 @@ extension DispatchIO { return } - // Treat empty data and nil as the same - if let data = data.map({ $0.isEmpty ? nil : $0 }) ?? nil { + guard let data else { + fatalError("Unexpectedly received nil data from DispatchIO with error == 0.") + } + + if !data.isEmpty { + // We have non-empty data. Yield it to the continuation continuation.yield(AsyncBufferSequence.Buffer(data: data)) } else if done { + // Receiving an empty data and done == true means we've reached the end of the file. continuation.finish() + } else { + fatalError("Unexpectedly received no data from DispatchIO with it indicating it is not done.") } } } From 888666b4289b828e28b3dd370546bcbe27e1bc6a Mon Sep 17 00:00:00 2001 From: Ryan Dingman Date: Fri, 23 May 2025 11:20:06 -0700 Subject: [PATCH 18/18] Remove definition and use of StreamOptions on Windows --- Sources/Subprocess/API.swift | 16 ++++++++-------- Sources/Subprocess/AsyncBufferSequence.swift | 19 +++---------------- .../Platforms/Subprocess+Darwin.swift | 6 +++--- .../Platforms/Subprocess+Unix.swift | 10 +++++++++- .../Platforms/Subprocess+Windows.swift | 7 ------- 5 files changed, 23 insertions(+), 35 deletions(-) diff --git a/Sources/Subprocess/API.swift b/Sources/Subprocess/API.swift index 5b25bf5..e32636b 100644 --- a/Sources/Subprocess/API.swift +++ b/Sources/Subprocess/API.swift @@ -200,7 +200,7 @@ public func run( } // Body runs in the same isolation - let outputSequence = AsyncBufferSequence(diskIO: outputIOBox.take()!.consumeDiskIO(), streamOptions: platformOptions.streamOptions) + let outputSequence = AsyncBufferSequence(diskIO: outputIOBox.take()!.consumeDiskIO()) let result = try await body(execution, outputSequence) try await group.waitForAll() return result @@ -250,7 +250,7 @@ public func run( } // Body runs in the same isolation - let errorSequence = AsyncBufferSequence(diskIO: errorIOBox.take()!.consumeDiskIO(), streamOptions: platformOptions.streamOptions) + let errorSequence = AsyncBufferSequence(diskIO: errorIOBox.take()!.consumeDiskIO()) let result = try await body(execution, errorSequence) try await group.waitForAll() return result @@ -286,7 +286,7 @@ public func run( error: try error.createPipe() ) { execution, inputIO, outputIO, errorIO in let writer = StandardInputWriter(diskIO: inputIO!) - let outputSequence = AsyncBufferSequence(diskIO: outputIO!.consumeDiskIO(), streamOptions: platformOptions.streamOptions) + let outputSequence = AsyncBufferSequence(diskIO: outputIO!.consumeDiskIO()) return try await body(execution, writer, outputSequence) } } @@ -319,7 +319,7 @@ public func run( error: try error.createPipe() ) { execution, inputIO, outputIO, errorIO in let writer = StandardInputWriter(diskIO: inputIO!) - let errorSequence = AsyncBufferSequence(diskIO: errorIO!.consumeDiskIO(), streamOptions: platformOptions.streamOptions) + let errorSequence = AsyncBufferSequence(diskIO: errorIO!.consumeDiskIO()) return try await body(execution, writer, errorSequence) } } @@ -376,8 +376,8 @@ public func run( error: try error.createPipe() ) { execution, inputIO, outputIO, errorIO in let writer = StandardInputWriter(diskIO: inputIO!) - let outputSequence = AsyncBufferSequence(diskIO: outputIO!.consumeDiskIO(), streamOptions: platformOptions.streamOptions) - let errorSequence = AsyncBufferSequence(diskIO: errorIO!.consumeDiskIO(), streamOptions: platformOptions.streamOptions) + let outputSequence = AsyncBufferSequence(diskIO: outputIO!.consumeDiskIO()) + let errorSequence = AsyncBufferSequence(diskIO: errorIO!.consumeDiskIO()) return try await body(execution, writer, outputSequence, errorSequence) } } @@ -503,8 +503,8 @@ public func run( error: try error.createPipe() ) { execution, inputIO, outputIO, errorIO in let writer = StandardInputWriter(diskIO: inputIO!) - let outputSequence = AsyncBufferSequence(diskIO: outputIO!.consumeDiskIO(), streamOptions: configuration.platformOptions.streamOptions) - let errorSequence = AsyncBufferSequence(diskIO: errorIO!.consumeDiskIO(), streamOptions: configuration.platformOptions.streamOptions) + let outputSequence = AsyncBufferSequence(diskIO: outputIO!.consumeDiskIO()) + let errorSequence = AsyncBufferSequence(diskIO: errorIO!.consumeDiskIO()) return try await body(execution, writer, outputSequence, errorSequence) } } diff --git a/Sources/Subprocess/AsyncBufferSequence.swift b/Sources/Subprocess/AsyncBufferSequence.swift index b666f95..2e3233d 100644 --- a/Sources/Subprocess/AsyncBufferSequence.swift +++ b/Sources/Subprocess/AsyncBufferSequence.swift @@ -45,7 +45,7 @@ public struct AsyncBufferSequence: AsyncSequence, Sendable { private let continuation: Stream.Continuation private var bytesRemaining: Int - internal init(diskIO: DiskIO, streamOptions: PlatformOptions.StreamOptions) { + internal init(diskIO: DiskIO) { self.diskIO = diskIO self.buffer = [] self.currentPosition = 0 @@ -54,20 +54,9 @@ public struct AsyncBufferSequence: AsyncSequence, Sendable { self.streamIterator = stream.makeAsyncIterator() self.continuation = continuation self.bytesRemaining = 0 - - #if !os(Windows) - if let minimumBufferSize = streamOptions.minimumBufferSize { - diskIO.setLimit(lowWater: minimumBufferSize) - } - - if let maximumBufferSize = streamOptions.maximumBufferSize { - diskIO.setLimit(highWater: maximumBufferSize) - } - #endif } public mutating func next() async throws -> Buffer? { - if bytesRemaining <= 0 { bytesRemaining = readBufferSize diskIO.stream(upToLength: readBufferSize, continuation: continuation) @@ -103,15 +92,13 @@ public struct AsyncBufferSequence: AsyncSequence, Sendable { } private let diskIO: DiskIO - private let streamOptions: PlatformOptions.StreamOptions - internal init(diskIO: DiskIO, streamOptions: PlatformOptions.StreamOptions) { + internal init(diskIO: DiskIO) { self.diskIO = diskIO - self.streamOptions = streamOptions } public func makeAsyncIterator() -> Iterator { - return Iterator(diskIO: self.diskIO, streamOptions: streamOptions) + return Iterator(diskIO: self.diskIO) } } diff --git a/Sources/Subprocess/Platforms/Subprocess+Darwin.swift b/Sources/Subprocess/Platforms/Subprocess+Darwin.swift index bc17661..e79c5fd 100644 --- a/Sources/Subprocess/Platforms/Subprocess+Darwin.swift +++ b/Sources/Subprocess/Platforms/Subprocess+Darwin.swift @@ -456,9 +456,9 @@ extension Configuration { ) return SpawnResult( execution: execution, - inputWriteEnd: inputWriteFileDescriptor?.createPlatformDiskIO(), - outputReadEnd: outputReadFileDescriptor?.createPlatformDiskIO(), - errorReadEnd: errorReadFileDescriptor?.createPlatformDiskIO() + inputWriteEnd: inputWriteFileDescriptor?.createPlatformDiskIO(with: platformOptions.streamOptions), + outputReadEnd: outputReadFileDescriptor?.createPlatformDiskIO(with: platformOptions.streamOptions), + errorReadEnd: errorReadFileDescriptor?.createPlatformDiskIO(with: platformOptions.streamOptions) ) } diff --git a/Sources/Subprocess/Platforms/Subprocess+Unix.swift b/Sources/Subprocess/Platforms/Subprocess+Unix.swift index 6b24389..ec460dd 100644 --- a/Sources/Subprocess/Platforms/Subprocess+Unix.swift +++ b/Sources/Subprocess/Platforms/Subprocess+Unix.swift @@ -393,7 +393,7 @@ internal typealias PlatformFileDescriptor = CInt internal typealias TrackedPlatformDiskIO = TrackedDispatchIO extension TrackedFileDescriptor { - internal consuming func createPlatformDiskIO() -> TrackedPlatformDiskIO { + internal consuming func createPlatformDiskIO(with streamOptions: PlatformOptions.StreamOptions) -> TrackedPlatformDiskIO { let dispatchIO: DispatchIO = DispatchIO( type: .stream, fileDescriptor: self.platformDescriptor(), @@ -406,6 +406,14 @@ extension TrackedFileDescriptor { } ) + if let minimumBufferSize = streamOptions.minimumBufferSize { + dispatchIO.setLimit(lowWater: minimumBufferSize) + } + + if let maximumBufferSize = streamOptions.maximumBufferSize { + dispatchIO.setLimit(highWater: maximumBufferSize) + } + return .init(dispatchIO, closeWhenDone: self.closeWhenDone) } } diff --git a/Sources/Subprocess/Platforms/Subprocess+Windows.swift b/Sources/Subprocess/Platforms/Subprocess+Windows.swift index e6377e3..2f6e7dd 100644 --- a/Sources/Subprocess/Platforms/Subprocess+Windows.swift +++ b/Sources/Subprocess/Platforms/Subprocess+Windows.swift @@ -417,7 +417,6 @@ public struct PlatformOptions: Sendable { /// process in case the parent task is cancelled before /// the child proces terminates. /// Always ends in forcefully terminate at the end. - internal var streamOptions: StreamOptions = .init() public var teardownSequence: [TeardownStep] = [] /// A closure to configure platform-specific /// spawning constructs. This closure enables direct @@ -442,12 +441,6 @@ public struct PlatformOptions: Sendable { public init() {} } -extension PlatformOptions { - internal struct StreamOptions: Sendable { - internal init() {} - } -} - extension PlatformOptions: CustomStringConvertible, CustomDebugStringConvertible { internal func description(withIndent indent: Int) -> String { let indent = String(repeating: " ", count: indent * 4)