diff --git a/Sources/Subprocess/AsyncBufferSequence.swift b/Sources/Subprocess/AsyncBufferSequence.swift index 442e07e..2e3233d 100644 --- a/Sources/Subprocess/AsyncBufferSequence.swift +++ b/Sources/Subprocess/AsyncBufferSequence.swift @@ -35,33 +35,59 @@ public struct AsyncBufferSequence: AsyncSequence, Sendable { @_nonSendable public struct Iterator: AsyncIteratorProtocol { public typealias Element = Buffer + internal typealias Stream = AsyncThrowingStream private let diskIO: DiskIO private var buffer: [UInt8] private var currentPosition: Int private var finished: Bool + private var streamIterator: Stream.AsyncIterator + private let continuation: Stream.Continuation + private var bytesRemaining: Int internal init(diskIO: DiskIO) { self.diskIO = diskIO self.buffer = [] self.currentPosition = 0 self.finished = false + let (stream, continuation) = AsyncThrowingStream.makeStream() + self.streamIterator = stream.makeAsyncIterator() + self.continuation = continuation + self.bytesRemaining = 0 } - public func next() async throws -> Buffer? { - let data = try await self.diskIO.readChunk( - upToLength: readBufferSize - ) - if data == nil { - // We finished reading. Close the file descriptor now + public mutating func next() async throws -> Buffer? { + if bytesRemaining <= 0 { + bytesRemaining = readBufferSize + diskIO.stream(upToLength: readBufferSize, continuation: continuation) + } + + let buffer: Buffer? + + do { + buffer = try await streamIterator.next() + } catch { + #if os(Windows) + try self.diskIO.close() + #else + self.diskIO.close() + #endif + + throw error + } + + if let buffer { + bytesRemaining -= buffer.count + return buffer + } else { #if os(Windows) try self.diskIO.close() #else self.diskIO.close() #endif + return nil } - return data } } diff --git a/Sources/Subprocess/Platforms/Subprocess+Darwin.swift b/Sources/Subprocess/Platforms/Subprocess+Darwin.swift index 9a56106..e79c5fd 100644 --- a/Sources/Subprocess/Platforms/Subprocess+Darwin.swift +++ b/Sources/Subprocess/Platforms/Subprocess+Darwin.swift @@ -57,6 +57,9 @@ public struct PlatformOptions: Sendable { /// Creates a session and sets the process group ID /// i.e. Detach from the terminal. public var createSession: Bool = false + + public var streamOptions: StreamOptions = .init() + /// An ordered list of steps in order to tear down the child /// process in case the parent task is cancelled before /// the child proces terminates. @@ -126,6 +129,18 @@ extension PlatformOptions { #endif } +extension PlatformOptions { + public struct StreamOptions: Sendable { + let minimumBufferSize: Int? + let maximumBufferSize: Int? + + public init(minimumBufferSize: Int? = nil, maximumBufferSize: Int? = nil) { + self.minimumBufferSize = minimumBufferSize + self.maximumBufferSize = maximumBufferSize + } + } +} + extension PlatformOptions: CustomStringConvertible, CustomDebugStringConvertible { internal func description(withIndent indent: Int) -> String { let indent = String(repeating: " ", count: indent * 4) @@ -441,9 +456,9 @@ extension Configuration { ) return SpawnResult( execution: execution, - inputWriteEnd: inputWriteFileDescriptor?.createPlatformDiskIO(), - outputReadEnd: outputReadFileDescriptor?.createPlatformDiskIO(), - errorReadEnd: errorReadFileDescriptor?.createPlatformDiskIO() + inputWriteEnd: inputWriteFileDescriptor?.createPlatformDiskIO(with: platformOptions.streamOptions), + outputReadEnd: outputReadFileDescriptor?.createPlatformDiskIO(with: platformOptions.streamOptions), + errorReadEnd: errorReadFileDescriptor?.createPlatformDiskIO(with: platformOptions.streamOptions) ) } diff --git a/Sources/Subprocess/Platforms/Subprocess+Linux.swift b/Sources/Subprocess/Platforms/Subprocess+Linux.swift index a488a52..3b1f39b 100644 --- a/Sources/Subprocess/Platforms/Subprocess+Linux.swift +++ b/Sources/Subprocess/Platforms/Subprocess+Linux.swift @@ -214,6 +214,9 @@ public struct PlatformOptions: Sendable { // Creates a session and sets the process group ID // i.e. Detach from the terminal. public var createSession: Bool = false + + public var streamOptions: StreamOptions = .init() + /// An ordered list of steps in order to tear down the child /// process in case the parent task is cancelled before /// the child proces terminates. @@ -235,6 +238,18 @@ public struct PlatformOptions: Sendable { public init() {} } +extension PlatformOptions { + public struct StreamOptions: Sendable { + let minimumBufferSize: Int? + let maximumBufferSize: Int? + + public init(minimumBufferSize: Int? = nil, maximumBufferSize: Int? = nil) { + self.minimumBufferSize = minimumBufferSize + self.maximumBufferSize = maximumBufferSize + } + } +} + extension PlatformOptions: CustomStringConvertible, CustomDebugStringConvertible { internal func description(withIndent indent: Int) -> String { let indent = String(repeating: " ", count: indent * 4) diff --git a/Sources/Subprocess/Platforms/Subprocess+Unix.swift b/Sources/Subprocess/Platforms/Subprocess+Unix.swift index d61df5a..ec460dd 100644 --- a/Sources/Subprocess/Platforms/Subprocess+Unix.swift +++ b/Sources/Subprocess/Platforms/Subprocess+Unix.swift @@ -393,7 +393,7 @@ internal typealias PlatformFileDescriptor = CInt internal typealias TrackedPlatformDiskIO = TrackedDispatchIO extension TrackedFileDescriptor { - internal consuming func createPlatformDiskIO() -> TrackedPlatformDiskIO { + internal consuming func createPlatformDiskIO(with streamOptions: PlatformOptions.StreamOptions) -> TrackedPlatformDiskIO { let dispatchIO: DispatchIO = DispatchIO( type: .stream, fileDescriptor: self.platformDescriptor(), @@ -405,6 +405,15 @@ extension TrackedFileDescriptor { } } ) + + if let minimumBufferSize = streamOptions.minimumBufferSize { + dispatchIO.setLimit(lowWater: minimumBufferSize) + } + + if let maximumBufferSize = streamOptions.maximumBufferSize { + dispatchIO.setLimit(highWater: maximumBufferSize) + } + return .init(dispatchIO, closeWhenDone: self.closeWhenDone) } } @@ -414,37 +423,32 @@ extension DispatchIO { #if SubprocessSpan @available(SubprocessSpan, *) #endif - internal func readChunk(upToLength maxLength: Int) async throws -> AsyncBufferSequence.Buffer? { - return try await withCheckedThrowingContinuation { continuation in - var buffer: DispatchData = .empty - self.read( - offset: 0, - length: maxLength, - queue: .global() - ) { done, data, error in - if error != 0 { - continuation.resume( - throwing: SubprocessError( - code: .init(.failedToReadFromSubprocess), - underlyingError: .init(rawValue: error) - ) - ) - return - } - if let data = data { - if buffer.isEmpty { - buffer = data - } else { - buffer.append(data) - } - } - if done { - if !buffer.isEmpty { - continuation.resume(returning: AsyncBufferSequence.Buffer(data: buffer)) - } else { - continuation.resume(returning: nil) - } - } + internal func stream(upToLength maxLength: Int, continuation: AsyncBufferSequence.Iterator.Stream.Continuation) { + self.read( + offset: 0, + length: maxLength, + queue: .global() + ) { done, data, error in + guard error == 0 else { + continuation.finish(throwing: SubprocessError( + code: .init(.failedToReadFromSubprocess), + underlyingError: .init(rawValue: error) + )) + return + } + + guard let data else { + fatalError("Unexpectedly received nil data from DispatchIO with error == 0.") + } + + if !data.isEmpty { + // We have non-empty data. Yield it to the continuation + continuation.yield(AsyncBufferSequence.Buffer(data: data)) + } else if done { + // Receiving an empty data and done == true means we've reached the end of the file. + continuation.finish() + } else { + fatalError("Unexpectedly received no data from DispatchIO with it indicating it is not done.") } } } diff --git a/Sources/Subprocess/Platforms/Subprocess+Windows.swift b/Sources/Subprocess/Platforms/Subprocess+Windows.swift index 2e80677..2f6e7dd 100644 --- a/Sources/Subprocess/Platforms/Subprocess+Windows.swift +++ b/Sources/Subprocess/Platforms/Subprocess+Windows.swift @@ -1071,18 +1071,57 @@ extension FileDescriptor { } extension FileDescriptor { - internal func readChunk(upToLength maxLength: Int) async throws -> AsyncBufferSequence.Buffer? { - return try await withCheckedThrowingContinuation { continuation in - self.readUntilEOF( - upToLength: maxLength - ) { result in - switch result { - case .failure(let error): - continuation.resume(throwing: error) - case .success(let bytes): - continuation.resume(returning: AsyncBufferSequence.Buffer(data: bytes)) + internal func stream(upToLength maxLength: Int, continuation: AsyncBufferSequence.Iterator.Stream.Continuation) { + do { + var totalBytesRead: Int = 0 + + while totalBytesRead < maxLength { + let values = try [UInt8]( + unsafeUninitializedCapacity: maxLength - totalBytesRead + ) { buffer, initializedCount in + guard let baseAddress = buffer.baseAddress else { + initializedCount = 0 + return + } + + var bytesRead: DWORD = 0 + let readSucceed = ReadFile( + self.platformDescriptor, + UnsafeMutableRawPointer(mutating: baseAddress), + DWORD(maxLength - totalBytesRead), + &bytesRead, + nil + ) + + if !readSucceed { + // Windows throws ERROR_BROKEN_PIPE when the pipe is closed + let error = GetLastError() + if error == ERROR_BROKEN_PIPE { + // We are done reading + initializedCount = 0 + } else { + initializedCount = 0 + throw SubprocessError( + code: .init(.failedToReadFromSubprocess), + underlyingError: .init(rawValue: error) + ) + } + } else { + // We successfully read the current round + initializedCount += Int(bytesRead) + } + } + + if values.count > 0 { + totalBytesRead += values.count + continuation.yield(AsyncBufferSequence.Buffer(data: values)) + } else { + continuation.finish() + return } } + } catch { + continuation.finish(throwing: error) } } diff --git a/Tests/SubprocessTests/SubprocessTests+Unix.swift b/Tests/SubprocessTests/SubprocessTests+Unix.swift index 86fe242..6e2dd45 100644 --- a/Tests/SubprocessTests/SubprocessTests+Unix.swift +++ b/Tests/SubprocessTests/SubprocessTests+Unix.swift @@ -662,6 +662,50 @@ extension SubprocessUnixTests { #expect(catResult.terminationStatus.isSuccess) #expect(catResult.standardError == expected) } + + @Test func testSlowDripRedirectedOutputRedirectToSequence() async throws { + guard #available(SubprocessSpan , *) else { + return + } + let threshold: Double = 0.5 + + let script = """ + echo "DONE" + sleep \(threshold) + """ + + var platformOptions = PlatformOptions() + platformOptions.streamOptions = .init(minimumBufferSize: 0) + + let start = ContinuousClock().now + + let catResult = try await Subprocess.run( + .path("/bin/bash"), + arguments: ["-c", script], + platformOptions: platformOptions, + error: .discarded, + body: { (execution, standardOutput) in + for try await chunk in standardOutput { + let string = chunk._withUnsafeBytes { String(decoding: $0, as: UTF8.self) } + + if string.hasPrefix("DONE") { + let end = ContinuousClock().now + + if (end - start) > .seconds(threshold) { + return "Failure" + + } else { + return "Success" + } + } + } + + return "Failure" + } + ) + #expect(catResult.terminationStatus.isSuccess) + #expect(catResult.value == "Success") + } } // MARK: - PlatformOption Tests