Skip to content

Allow callers to run a subprocess and provide low and high water marks when using SequenceOutput to emit standard output and standard error as soon as it arrives. #40

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
7324ac4
Allow callers to run a subprocess and provide low and high water marks
rdingman May 8, 2025
7b6899c
Move stream creation and configure to platform specific files
rdingman May 10, 2025
9b173ab
Mark APIs that reference SequenceOutput.Buffer with @available(Subpro…
rdingman May 14, 2025
ab43ae4
Simplify AsyncBufferSequence to only create one AsyncThrowingStream t…
rdingman May 14, 2025
10aa523
Add fatalError for unexpected result from DispatchIO.read
rdingman May 14, 2025
aa903ad
Fix build on Windows
rdingman May 14, 2025
bb57bf2
Update PlatformOptions to have preferredStreamBufferSizeRange and use…
rdingman May 14, 2025
955e9c9
Fix build on Linux
rdingman May 14, 2025
5f2df5f
Fix Swift 6.2 build
rdingman May 14, 2025
4bac06a
Update how we disable tests that require SubprocessSpan
rdingman May 14, 2025
ae0de61
Return to configuring the minimum and maximum buffer size through Str…
rdingman May 21, 2025
d65f2ce
Change testSlowDripRedirectedOutputRedirectToSequence to guarding aga…
rdingman May 21, 2025
c8fe778
Remove unneeded import of Dispatch
rdingman May 21, 2025
4de3d79
Merge branch 'main' into rdingman/issue-39
rdingman May 21, 2025
30876f5
Fix build on Windows
rdingman May 21, 2025
ff7ae12
Simplify interaction between AsyncBufferSequence.Iterator and its Asy…
rdingman May 23, 2025
48b97f2
Removed unneeded import
rdingman May 23, 2025
d94a9c6
Improve error handling
rdingman May 23, 2025
888666b
Remove definition and use of StreamOptions on Windows
rdingman May 23, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 27 additions & 7 deletions Sources/Subprocess/AsyncBufferSequence.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
@preconcurrency import SystemPackage
#endif

internal import Dispatch

#if SubprocessSpan
@available(SubprocessSpan, *)
#endif
Expand All @@ -30,24 +32,34 @@ public struct AsyncBufferSequence: AsyncSequence, Sendable {
private var buffer: [UInt8]
private var currentPosition: Int
private var finished: Bool
private var streamIterator: AsyncThrowingStream<TrackedPlatformDiskIO.StreamStatus, Swift.Error>.AsyncIterator

internal init(diskIO: TrackedPlatformDiskIO) {
self.diskIO = diskIO
self.buffer = []
self.currentPosition = 0
self.finished = false
self.streamIterator = diskIO.readDataStream(upToLength: readBufferSize).makeAsyncIterator()
}

public func next() async throws -> SequenceOutput.Buffer? {
let data = try await self.diskIO.readChunk(
upToLength: readBufferSize
)
if data == nil {
// We finished reading. Close the file descriptor now
public mutating func next() async throws -> SequenceOutput.Buffer? {
if let status = try await streamIterator.next() {
switch status {
case .data(let data):
return data

case .endOfStream(let data):
streamIterator = diskIO.readDataStream(upToLength: readBufferSize).makeAsyncIterator()
return data
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not seem right. Why are we creating a new iterator when the first one ends? There will be nothing to read from this second iterator because all the data in the pipe would already been read.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the pushback on this. I was going to explain my thinking and then realize if I have to explain this then it probably should be written in a more straightforward manner. The first implementation was using one iterator per chunk and when we reached a chunk boundary we'd switch to a new iterator. While this did work, it was admittedly a little clunky. Now, we use one AsyncThrowingStream (and iterator) across all chunks (even if the chunk is broken up into sub-chunks fora single read as in my original motivation for this issue. Please check on the new implementation to see if it makes sense to you.


case .endOfFile:
try self.diskIO.safelyClose()
return nil
}
} else {
try self.diskIO.safelyClose()
return nil
}
return data
}
}

Expand All @@ -62,6 +74,14 @@ public struct AsyncBufferSequence: AsyncSequence, Sendable {
}
}

extension TrackedPlatformDiskIO {
internal enum StreamStatus {
case data(SequenceOutput.Buffer)
case endOfStream(SequenceOutput.Buffer)
case endOfFile
}
}

// MARK: - Page Size
import _SubprocessCShims

Expand Down
8 changes: 7 additions & 1 deletion Sources/Subprocess/Execution.swift
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import Musl
import WinSDK
#endif

internal import Dispatch

/// An object that repersents a subprocess that has been
/// executed. You can use this object to send signals to the
/// child process as well as stream its output and error.
Expand Down Expand Up @@ -107,6 +109,8 @@ extension Execution where Output == SequenceOutput {
else {
fatalError("The standard output has already been consumed")
}

// TODO: Make buffer size an option
return AsyncBufferSequence(diskIO: readFd)
}
}
Expand All @@ -122,14 +126,16 @@ extension Execution where Error == SequenceOutput {
/// via pipe under the hood and each pipe can only be consumed once.
public var standardError: AsyncBufferSequence {
let consumptionState = self.outputConsumptionState.bitwiseXor(
OutputConsumptionState.standardOutputConsumed
OutputConsumptionState.standardErrorConsumed
)

guard consumptionState.contains(.standardErrorConsumed),
let readFd = self.errorPipe.readEnd
else {
fatalError("The standard output has already been consumed")
}

// TODO: Make buffer size and option
return AsyncBufferSequence(diskIO: readFd)
}
}
Expand Down
18 changes: 16 additions & 2 deletions Sources/Subprocess/Platforms/Subprocess+Darwin.swift
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ public struct PlatformOptions: Sendable {
/// Creates a session and sets the process group ID
/// i.e. Detach from the terminal.
public var createSession: Bool = false
public var outputOptions: StreamOptions = .init()
public var errorOptions: StreamOptions = .init()
/// An ordered list of steps in order to tear down the child
/// process in case the parent task is cancelled before
/// the child proces terminates.
Expand Down Expand Up @@ -126,6 +128,18 @@ extension PlatformOptions {
#endif
}

extension PlatformOptions {
public struct StreamOptions: Sendable {
let lowWater: Int?
let highWater: Int?

init(lowWater: Int? = nil, highWater: Int? = nil) {
self.lowWater = lowWater
self.highWater = highWater
}
}
}

extension PlatformOptions: CustomStringConvertible, CustomDebugStringConvertible {
internal func description(withIndent indent: Int) -> String {
let indent = String(repeating: " ", count: indent * 4)
Expand Down Expand Up @@ -355,8 +369,8 @@ extension Configuration {
output: output,
error: error,
inputPipe: inputPipe.createInputPipe(),
outputPipe: outputPipe.createOutputPipe(),
errorPipe: errorPipe.createOutputPipe()
outputPipe: outputPipe.createOutputPipe(with: platformOptions.outputOptions),
errorPipe: errorPipe.createOutputPipe(with: platformOptions.errorOptions)
)
}

Expand Down
18 changes: 16 additions & 2 deletions Sources/Subprocess/Platforms/Subprocess+Linux.swift
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,8 @@ extension Configuration {
output: output,
error: error,
inputPipe: inputPipe.createInputPipe(),
outputPipe: outputPipe.createOutputPipe(),
errorPipe: errorPipe.createOutputPipe()
outputPipe: outputPipe.createOutputPipe(with: platformOptions.outputOptions),
errorPipe: errorPipe.createOutputPipe(with: platformOptions.errorOptions)
)
}

Expand Down Expand Up @@ -176,6 +176,8 @@ public struct PlatformOptions: Sendable {
// Creates a session and sets the process group ID
// i.e. Detach from the terminal.
public var createSession: Bool = false
public var outputOptions: StreamOptions = .init()
public var errorOptions: StreamOptions = .init()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO we only need one (see my comments below).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I replaced these with preferredStreamBufferSizeRange above.

/// An ordered list of steps in order to tear down the child
/// process in case the parent task is cancelled before
/// the child proces terminates.
Expand All @@ -197,6 +199,18 @@ public struct PlatformOptions: Sendable {
public init() {}
}

extension PlatformOptions {
public struct StreamOptions: Sendable {
let lowWater: Int?
let highWater: Int?

init(lowWater: Int? = nil, highWater: Int? = nil) {
self.lowWater = lowWater
self.highWater = highWater
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I know I initially suggested using a StreamOptions nested struct, but after revisiting this API, I think we should reconsider the lowWater and highWater properties. Here’s why: 1) Their names can be quite confusing outside of the DispatchIO context, and 2) we’d need to add runtime validation to ensure lowWater < highWater.

How about we try something like this instead?

struct PlatformOptions {
    
    let preferredStreamBufferSizeRange: Range<Int>? = nil
}

This approach makes it clear that we’re requesting a range (with a lower and upper bound), and it eliminates the need for validation.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@iCharlesHu The issue I see with this suggestion is that it presumes that you will set both the lower and upper bound, or neither. There is no mechanism for setting just one or the other. As I see it, we have a few options:

  1. Adopt your suggestion and take the stance that you cannot set these independently.
  2. Recognize that these are platform specific options and rename them to indicate that. On Linux, these options would be removed once the Linux implementation moves away from DispatchIO because they are DispatchIO specific. We'd add the appropriate runtime check here. (FWIW, DispatchIO handles when lowWater > highWater and makes them the same).
  3. Attempt to use some sort of sentinel values to represent the "don't set this" or "use the default" case. For lowWater mark, this could be something like -1 (the default is "unspecified"). For highWater this is tougher because the documentation says the default value is SIZE_MAX which isn't representable by Int. IMO, this option doesn't seem very intuitive, but I thought I'd include it anyways.
  4. Change to use an enum which represents the four cases of set neither, set lower, set upper, set both. Something like:
enum BufferSizeOptions {
case none
case lowerBound(Int)
case upperBound(Int)
case range(Range<Int>)
}

Thoughts?

I think option 3 is too awkward and unintuitive, so I don't think we should consider it. If you feel strongly about option 1, we can go with that, but I wanted to bring up this issue.

Copy link
Author

@rdingman rdingman May 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@iCharlesHu I modified your proposal a bit to use a RangeExpression rather than just a concrete Range that way we can express things like 0... to only set the lower bounds or ...4096 to only set the upper bounds. I didn't want to make PlatformOptions fully generic because this would be more cumbersome. Instead, I added some API on PlatformOptions to enforce the requirement that RangeExpression.Bound must be an Int.

Check it out and let me know what you think.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue I see with this suggestion is that it presumes that you will set both the lower and upper bound, or neither. There is no mechanism for setting just one or the other.

Ahh I see your point. In this case let's go back to

struct StreamOptions {
    public let minimalBufferSize: Int? = nil
    public let maximumBufferSize: Int? = nil
}

I think that's the simplest.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@iCharlesHu Ok, great. I've gone back to configuring this through StreamOptions

}
}

extension PlatformOptions: CustomStringConvertible, CustomDebugStringConvertible {
internal func description(withIndent indent: Int) -> String {
let indent = String(repeating: " ", count: indent * 4)
Expand Down
54 changes: 53 additions & 1 deletion Sources/Subprocess/Platforms/Subprocess+Unix.swift
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,7 @@ extension CreatedPipe {
}
}
)

writeEnd = .init(
dispatchIO,
closeWhenDone: writeFileDescriptor.closeWhenDone
Expand All @@ -423,7 +424,7 @@ extension CreatedPipe {
)
}

internal func createOutputPipe() -> OutputPipe {
internal func createOutputPipe(with options: PlatformOptions.StreamOptions) -> OutputPipe {
var readEnd: TrackedPlatformDiskIO? = nil
if let readFileDescriptor = self.readFileDescriptor {
let dispatchIO: DispatchIO = DispatchIO(
Expand All @@ -437,6 +438,15 @@ extension CreatedPipe {
}
}
)

if let lowWater = options.lowWater {
dispatchIO.setLimit(lowWater: lowWater)
}

if let highWater = options.highWater {
dispatchIO.setLimit(highWater: highWater)
}

readEnd = .init(
dispatchIO,
closeWhenDone: readFileDescriptor.closeWhenDone
Expand All @@ -451,6 +461,48 @@ extension CreatedPipe {

// MARK: - TrackedDispatchIO extensions
extension TrackedDispatchIO {
internal func readDataStream(upToLength maxLength: Int) -> AsyncThrowingStream<StreamStatus, Swift.Error> {
return AsyncThrowingStream<StreamStatus, Swift.Error> { continuation in
self.dispatchIO.read(
offset: 0,
length: maxLength,
queue: .global()
) { done, data, error in
if error != 0 {
continuation.finish(throwing: SubprocessError(
code: .init(.failedToReadFromSubprocess),
underlyingError: .init(rawValue: error)
))
return
}

// Treat empty data and nil as the same
let buffer = data.map { $0.isEmpty ? nil : $0 } ?? nil
let status: StreamStatus

switch (buffer, done) {
case (.some(let data), false):
status = .data(SequenceOutput.Buffer(data: data))

case (.some(let data), true):
status = .endOfStream(SequenceOutput.Buffer(data: data))

case (nil, false):
return

case (nil, true):
status = .endOfFile
}

continuation.yield(status)

if done {
continuation.finish()
}
}
}
}

#if SubprocessSpan
@available(SubprocessSpan, *)
#endif
Expand Down
64 changes: 64 additions & 0 deletions Sources/Subprocess/Platforms/Subprocess+Windows.swift
Original file line number Diff line number Diff line change
Expand Up @@ -1043,6 +1043,70 @@ extension CreatedPipe {
}

extension TrackedFileDescriptor {
internal func readDataStream(upToLength maxLength: Int) -> AsyncThrowingStream<StreamStatus, Swift.Error> {
return AsyncThrowingStream<StreamStatus, Swift.Error> { continuation in
do {
var totalBytesRead: Int = 0

while totalBytesRead < maxLength {
let values = try [UInt8](
unsafeUninitializedCapacity: maxLength
) { buffer, initializedCount in
guard let baseAddress = buffer.baseAddress else {
initializedCount = 0
return
}

var bytesRead: DWORD = 0
let readSucceed = ReadFile(
self.fileDescriptor.platformDescriptor,
UnsafeMutableRawPointer(mutating: baseAddress),
DWORD(maxLength - totalBytesRead),
&bytesRead,
nil
)

if !readSucceed {
// Windows throws ERROR_BROKEN_PIPE when the pipe is closed
let error = GetLastError()
if error == ERROR_BROKEN_PIPE {
// We are done reading
initializedCount = 0
} else {
initializedCount = 0
throw SubprocessError(
code: .init(.failedToReadFromSubprocess),
underlyingError: .init(rawValue: error)
)
}
} else {
// We successfully read the current round
initializedCount += Int(bytesRead)
}
}

if values.count > 0 {
totalBytesRead += values.count

if totalBytesRead >= maxLength {
continuation.yield(.endOfStream(SequenceOutput.Buffer(data: values)))
continuation.finish()
return
} else {
continuation.yield(.data(SequenceOutput.Buffer(data: values)))
}
} else {
continuation.yield(.endOfFile)
continuation.finish()
return
}
}
} catch {
continuation.finish(throwing: error)
}
}
}

internal func readChunk(upToLength maxLength: Int) async throws -> SequenceOutput.Buffer? {
return try await withCheckedThrowingContinuation { continuation in
self.readUntilEOF(
Expand Down
42 changes: 42 additions & 0 deletions Tests/SubprocessTests/SubprocessTests+Unix.swift
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,48 @@ extension SubprocessUnixTests {
#expect(catResult.terminationStatus.isSuccess)
#expect(catResult.standardError == expected)
}

@Test func testSlowDripRedirectedOutputRedirectToSequence() async throws {
let threshold: Double = 0.5
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately in tests you'll have to write

guard #available(SubprocessSpan , *) else {
    return
}

In the beginning to work around the same availability issue. See other tests for examples.

Copy link
Author

@rdingman rdingman May 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@iCharlesHu When I first started working on this, I was very confused as to why some of the tests weren't running my new code and it was because of this check. Wouldn't it be better to have them skipped and noted as such in the test output rather than falsely succeeding? I'm thinking something like this:

    @Test(
        .enabled(
            if: {
                if #available(SubprocessSpan , *) {
                    true
                } else {
                    false
                }
            }(),
            "This test requires SubprocessSpan"
        )
    )
    func testSlowDripRedirectedOutputRedirectToSequence() async throws {
    }

Of course, we can have a helper function to make this less verbose.

Thoughts?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@iCharlesHu I went ahead and conditionalized this one test this way as an example. Let me know if you don't like that and would like me to revert to a guard

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdingman unfortunately this won't work because on Swift 6.2 and above the compiler will complain the code inside of testSlowDripRedirectedOutputRedirectToSequence needs SubprocessSpan and ask you to put @available around the function. That doesn't work either because the macro won't pick it up.

Unfortunately so far this is the only way I found that works. Very ugly... but it'll have to do for now.

Copy link
Author

@rdingman rdingman May 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@iCharlesHu Hmm. This builds just fine for me with the latest Swift 6.2 toolchain, has the intended results, and works with the currently shipping Swift 6.1 toolchain. However, I'll go ahead and revert this and match the existing tests.


let script = """
echo "DONE"
sleep \(threshold)
"""

var platformOptions = PlatformOptions()
platformOptions.outputOptions = .init(lowWater: 0)

let start = ContinuousClock().now

let catResult = try await Subprocess.run(
.path("/bin/bash"),
arguments: ["-c", script],
platformOptions: platformOptions,
output: .sequence,
error: .discarded,
body: { (execution, _) in
for try await chunk in execution.standardOutput {
let string = chunk.withUnsafeBytes { String(decoding: $0, as: UTF8.self) }

if string.hasPrefix("DONE") {
let end = ContinuousClock().now

if (end - start) > .seconds(threshold) {
return "Failure"

} else {
return "Success"
}
}
}

return "Failure"
}
)
#expect(catResult.terminationStatus.isSuccess)
#expect(catResult.value == "Success")
}
}

// MARK: - PlatformOption Tests
Expand Down