-
Notifications
You must be signed in to change notification settings - Fork 17
Allow callers to run a subprocess and provide low and high water marks when using SequenceOutput to emit standard output and standard error as soon as it arrives. #40
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
when using SequenceOutput to emit standard output and standard error as soon as it arrives. Resolves swiftlang#39
ae5935d
to
7324ac4
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks so much for the issue and PR! I agree this is a great addition to the API surface.
As a overall comment, could you add a test to make sure the new behavior works as intended?
self.buffer = [] | ||
self.currentPosition = 0 | ||
self.finished = false | ||
self.streamIterator = Self.createDataStream(with: diskIO.dispatchIO, bufferSize: bufferSize).makeAsyncIterator() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AsyncBufferSequence
is a shared type across all platforms therefore we can't unconditionally refer to platform specific type dispatchIO
here. We may see Windows build failure as a result
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the feedback. I'll look into implementing this in such a way that platforms without dispatchIO don't break.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
Sources/Subprocess/IO/Output.swift
Outdated
internal let lowWater: Int? | ||
internal let highWater: Int? | ||
internal let bufferSize: Int | ||
|
||
internal init(lowWater: Int? = nil, highWater: Int? = nil, bufferSize: Int = readBufferSize) { | ||
self.lowWater = lowWater | ||
self.highWater = highWater | ||
self.bufferSize = bufferSize | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don’t think it’s appropriate to include these parameters here for a couple of reasons:
- (This isn’t directly related to your change) right now, we’re in the middle of some major architectural updates: Adopt ~Copyable in
Subprocess
#38. This PR makesSequenceOutput
internal, so you can’t use.sequence
or.sequence(lowWater: …)
anymore. - More importantly, this looks like a platform-specific feature. Setting this parameter won’t have any impact on Windows, and (also unrelated to your change) we’re planning to move away from
DispatchIO
on Linux soon, so it won’t work there either.
Considering all this, I suggest we move these parameters to Darwin’s specific PlatformOptions
, maybe under a nested struct
PlatformOptions.StreamOptions
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Noted. I'll look into moving these parameters.
I did add a test named "testSlowDripRedirectedOutputRedirectToSequence". Does that not cover the new behavior like you intend? |
73a31d4
to
a1abbf5
Compare
@iCharlesHu Any suggestions on how to build and test this on windows? I have things building on windows, but I'm having trouble debugging the tests. |
a1abbf5
to
14584a9
Compare
d9f85be
to
7b6899c
Compare
@iCharlesHu I figured out how to get the debugger working in VSCode on Windows. However, it appears that several of the tests crash with an exception because a file descriptor is being closed more than once (this is the case on I tried out your PR #38 to see if it fixed those issues, but it has not. For what its worth I'm running on:
|
Ahh yes! Sorry I totally missed |
Thanks so much for looking into the Windows build. Unfortunately we do have some known test failures on Windows currently (#22) and I'll address them separately. Right now we want to make sure all new changes at least build on Windows. |
@iCharlesHu Great, that's good to know. Thanks! |
streamIterator = diskIO.readDataStream(upToLength: readBufferSize).makeAsyncIterator() | ||
return data |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This does not seem right. Why are we creating a new iterator when the first one ends? There will be nothing to read from this second iterator because all the data in the pipe would already been read.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the pushback on this. I was going to explain my thinking and then realize if I have to explain this then it probably should be written in a more straightforward manner. The first implementation was using one iterator per chunk and when we reached a chunk boundary we'd switch to a new iterator. While this did work, it was admittedly a little clunky. Now, we use one AsyncThrowingStream (and iterator) across all chunks (even if the chunk is broken up into sub-chunks fora single read as in my original motivation for this issue. Please check on the new implementation to see if it makes sense to you.
@@ -665,6 +665,48 @@ extension SubprocessUnixTests { | |||
#expect(catResult.terminationStatus.isSuccess) | |||
#expect(catResult.standardError == expected) | |||
} | |||
|
|||
@Test func testSlowDripRedirectedOutputRedirectToSequence() async throws { | |||
let threshold: Double = 0.5 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately in tests you'll have to write
guard #available(SubprocessSpan , *) else {
return
}
In the beginning to work around the same availability issue. See other tests for examples.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@iCharlesHu When I first started working on this, I was very confused as to why some of the tests weren't running my new code and it was because of this check. Wouldn't it be better to have them skipped and noted as such in the test output rather than falsely succeeding? I'm thinking something like this:
@Test(
.enabled(
if: {
if #available(SubprocessSpan , *) {
true
} else {
false
}
}(),
"This test requires SubprocessSpan"
)
)
func testSlowDripRedirectedOutputRedirectToSequence() async throws {
}
Of course, we can have a helper function to make this less verbose.
Thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@iCharlesHu I went ahead and conditionalized this one test this way as an example. Let me know if you don't like that and would like me to revert to a guard
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rdingman unfortunately this won't work because on Swift 6.2 and above the compiler will complain the code inside of testSlowDripRedirectedOutputRedirectToSequence
needs SubprocessSpan
and ask you to put @available
around the function. That doesn't work either because the macro won't pick it up.
Unfortunately so far this is the only way I found that works. Very ugly... but it'll have to do for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@iCharlesHu Hmm. This builds just fine for me with the latest Swift 6.2 toolchain, has the intended results, and works with the currently shipping Swift 6.1 toolchain. However, I'll go ahead and revert this and match the existing tests.
public struct StreamOptions: Sendable { | ||
let lowWater: Int? | ||
let highWater: Int? | ||
|
||
init(lowWater: Int? = nil, highWater: Int? = nil) { | ||
self.lowWater = lowWater | ||
self.highWater = highWater | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry I know I initially suggested using a StreamOptions
nested struct, but after revisiting this API, I think we should reconsider the lowWater
and highWater
properties. Here’s why: 1) Their names can be quite confusing outside of the DispatchIO context, and 2) we’d need to add runtime validation to ensure lowWater < highWater
.
How about we try something like this instead?
struct PlatformOptions {
…
let preferredStreamBufferSizeRange: Range<Int>? = nil
}
This approach makes it clear that we’re requesting a range (with a lower and upper bound), and it eliminates the need for validation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@iCharlesHu The issue I see with this suggestion is that it presumes that you will set both the lower and upper bound, or neither. There is no mechanism for setting just one or the other. As I see it, we have a few options:
- Adopt your suggestion and take the stance that you cannot set these independently.
- Recognize that these are platform specific options and rename them to indicate that. On Linux, these options would be removed once the Linux implementation moves away from DispatchIO because they are DispatchIO specific. We'd add the appropriate runtime check here. (FWIW, DispatchIO handles when lowWater > highWater and makes them the same).
- Attempt to use some sort of sentinel values to represent the "don't set this" or "use the default" case. For
lowWater
mark, this could be something like -1 (the default is "unspecified"). ForhighWater
this is tougher because the documentation says the default value isSIZE_MAX
which isn't representable byInt
. IMO, this option doesn't seem very intuitive, but I thought I'd include it anyways. - Change to use an enum which represents the four cases of set neither, set lower, set upper, set both. Something like:
enum BufferSizeOptions {
case none
case lowerBound(Int)
case upperBound(Int)
case range(Range<Int>)
}
Thoughts?
I think option 3 is too awkward and unintuitive, so I don't think we should consider it. If you feel strongly about option 1, we can go with that, but I wanted to bring up this issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@iCharlesHu I modified your proposal a bit to use a RangeExpression
rather than just a concrete Range
that way we can express things like 0...
to only set the lower bounds or ...4096
to only set the upper bounds. I didn't want to make PlatformOptions
fully generic because this would be more cumbersome. Instead, I added some API on PlatformOptions
to enforce the requirement that RangeExpression.Bound
must be an Int
.
Check it out and let me know what you think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The issue I see with this suggestion is that it presumes that you will set both the lower and upper bound, or neither. There is no mechanism for setting just one or the other.
Ahh I see your point. In this case let's go back to
struct StreamOptions {
public let minimalBufferSize: Int? = nil
public let maximumBufferSize: Int? = nil
}
I think that's the simplest.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@iCharlesHu Ok, great. I've gone back to configuring this through StreamOptions
public var outputOptions: StreamOptions = .init() | ||
public var errorOptions: StreamOptions = .init() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO we only need one (see my comments below).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I replaced these with preferredStreamBufferSizeRange
above.
…cessSpan, *) so they compile with Swift 6.2
…o read all then data rather than one per chunk
… RangeExpression to express the various ways to configure the low and high watermark
…inst the availability of SubprocessSpan like the other tests.
@iCharlesHu I merged in the latest changes from What are your thoughts? Do you prefer one or the other? Do you have a third option you'd like to pursue? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall looks pretty good! Thanks so much for working on this! This should be my last bit of comment before we are good to merge!
Just to write down some of the code I was suggesting:
- Rename
.readChunk
to.stream
(like you suggested earlier), and have it return aAsyncThrowingStream<Buffer>
directly (no need forStreamStatus
) - Have
AsyncBufferSequence.Iterator
call into.stream
directly.
extension DispatchIO {
#if SubprocessSpan
@available(SubprocessSpan, *)
#endif
internal func stream(
upToLength maxLength: Int
) -> AsyncThrowingStream<AsyncBufferSequence.Buffer, any Error> {
return AsyncThrowingStream { continuation in
self.read(
offset: 0,
length: maxLength,
queue: .global()
) { done, data, error in
if error != 0 {
continuation.finish(
throwing: SubprocessError(
code: .init(.failedToReadFromSubprocess),
underlyingError: .init(rawValue: error)
)
)
return
}
switch (done, data) {
case (true, .none):
continuation.finish()
case (true, .some(let dispatchData)):
continuation.yield(.init(data: dispatchData))
continuation.finish() // Notice we are finishing the stream right away instead of sending a `.endOfFile`
case (false, .some(let dispatchData)):
continuation.yield(.init(data: dispatchData))
case (false, .none):
fatalError()
}
}
}
}
}
// Inside `AsyncBufferSequence.Iterator`
public struct Iterator: AsyncIteratorProtocol {
public typealias Element = Buffer
private let diskIO: DiskIO
private var base: AsyncThrowingStream<Buffer, any Error>.AsyncIterator
internal init(diskIO: DiskIO) {
self.diskIO = diskIO
self.base = diskIO.stream(upToLength: .max).makeAsyncIterator()
}
public mutating func next() async throws -> Buffer? {
let data = try await self.base.next()
if data == nil {
// We finished reading. Close the file descriptor now
#if os(Windows)
try self.diskIO.close()
#else
self.diskIO.close()
#endif
return nil
}
return data
}
}
switch (buffer, done) { | ||
case (.some(let data), false): | ||
status = .data(AsyncBufferSequence.Buffer(data: data)) | ||
|
||
case (.some(let data), true): | ||
status = .endOfChunk(AsyncBufferSequence.Buffer(data: data)) | ||
|
||
case (nil, false): | ||
fatalError("Unexpectedly received no data from DispatchIO with it indicating it is not done.") | ||
|
||
case (nil, true): | ||
status = .endOfFile | ||
} | ||
|
||
continuation.yield(status) | ||
|
||
if done { | ||
continuation.finish() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of setting using a StreamStatus
to model the state, I think just having Iterator.Stream
be AsyncThrowingStream<Buffer>
is good enough. Because when the throwing stream returns nil
, it naturally implies "end of stream"
switch (buffer, done) {
case (.some(let data), false):
continuation.yield(AsyncBufferSequence.Buffer(data: data))
case (.some(let data), true):
continuation.yield(AsyncBufferSequence.Buffer(data: data))
continuation.finish() // Finish the stream now
case (nil, false):
fatalError("Unexpectedly received no data from DispatchIO with it indicating it is not done.")
case (nil, true):
continuation.finish() // Finish the stream
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@iCharlesHu Simplified this.
if needsNextChunk { | ||
diskIO.readChunk(upToLength: readBufferSize, continuation: continuation) | ||
needsNextChunk = false | ||
} | ||
|
||
if let status = try await streamIterator.next() { | ||
switch status { | ||
case .data(let data): | ||
return data | ||
|
||
case .endOfChunk(let data): | ||
needsNextChunk = true | ||
return data | ||
|
||
case .endOfFile: | ||
#if os(Windows) | ||
try self.diskIO.close() | ||
#else | ||
self.diskIO.close() | ||
#endif | ||
return nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If streamIterator
returned AsyncThrowingStream<Buffer>
here you can just return it directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@iCharlesHu Yes, simplified this.
self.diskIO = diskIO | ||
self.buffer = [] | ||
self.currentPosition = 0 | ||
self.finished = false | ||
let (stream, continuation) = AsyncThrowingStream<Buffer, Swift.Error>.makeStream() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit concerned with the usage of the AsyncStream
here for two reasons. The first is that AsyncStream
has no way to propagate the internal async sequence back pressure to the external system that is producing the elements.
We had the same issue in swift-nio
and created the NIOAsyncSequenceProducer that allowed us to bridge NIO's back pressure into the back pressure of an async sequence. The learned a lot from the APIs and implementation in NIO and have an open PR in swift-async-algorithms
that generalizes this concept as a MultiProducerSingleConsumerAsyncChannel.
Now having said that it might be fine here back pressure wise since the DispatchIO
is going to call the ioHandler
multiple times but only up to readBufferSize
. So we do have some maximum limit and our buffer can't balloon indefinitely.
However, this brings me to my second point which is the performance of this. AsyncStream
is not super fast so I expect this PR to have performance impact when streaming a lot of data to/from a subprocess. It would be good to understand that impact and if the MultiProducerSingleConsumerAsyncChannel
can improve this.
Lastly, aren't we missing to close the diskIO
when the iterator is dropped. We probably want to setup a onTerminationCallback
on the stream or the channel to close the diskIO right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll look into this in a separate PR. I think the AsyncStream
based implementation is good enough for this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will investigate under #51
|
||
// Treat empty data and nil as the same | ||
if let data = data.map({ $0.isEmpty ? nil : $0 }) ?? nil { | ||
continuation.yield(AsyncBufferSequence.Buffer(data: data)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wait, this strips backpressure. If you do that then a slow consumer and fast producer will OOM kill you. You won't be able to use AsyncStream here (it's a type that should be avoided unless you use AsyncStream(unfolding: {...})
or set it to drop elements if the buffer is full (but of course that's not possible here because it would lose data).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand the concern, but I don't think this is the right PR to address this issue since it's trying to solve a different problem. I'll address this in a separate PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Created #51
|
||
// Treat empty data and nil as the same | ||
if let data = data.map({ $0.isEmpty ? nil : $0 }) ?? nil { | ||
continuation.yield(AsyncBufferSequence.Buffer(data: data)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand the concern, but I don't think this is the right PR to address this issue since it's trying to solve a different problem. I'll address this in a separate PR.
if let data = data.map({ $0.isEmpty ? nil : $0 }) ?? nil { | ||
continuation.yield(AsyncBufferSequence.Buffer(data: data)) | ||
} else if done { | ||
continuation.finish() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should stick with switch
just to be explicit that we handle all possible combinations. For example, it's possible done == true && data != nil
the current implementation won't call finish()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't go back to a switch because I re-read the semantics of DispatchIO.read()
and restructured this code to handle these semantics. Now all code paths either call fatalError
for an invalid combination of handler arguments, yield non-empty data, or call finish()
when we've reach the end of the file.
extension PlatformOptions { | ||
internal struct StreamOptions: Sendable { | ||
internal init() {} | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand this might make code sharing realizer but I don't think it's appropriate to add an empty struct here that will never be used (same with the streamOption
property on Windows).
(Sorry I might have missed this in earlier reviews)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rearranged how we use StreamOptions so it is only defined and used on platforms that need it.
|
||
if bytesRemaining <= 0 { | ||
bytesRemaining = readBufferSize | ||
diskIO.stream(upToLength: readBufferSize, continuation: continuation) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think you need to manually stream only up to readBufferSize
here. The idea of streaming is that we want to read until the end of file so we should use .max
here to specify DispatchIO
should read data until an EOF is reached. You should only need to call stream
once in the initializer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we sure we want that? That exacerbates the concerns of @weissi and @FranzBusch. With the current approach, the easing of back pressure is limited to buffering up to about readBufferSize
for slow consumers. This suggestion opens that up to the entire output leaving it essentially unbounded.
self.diskIO = diskIO | ||
self.buffer = [] | ||
self.currentPosition = 0 | ||
self.finished = false | ||
let (stream, continuation) = AsyncThrowingStream<Buffer, Swift.Error>.makeStream() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll look into this in a separate PR. I think the AsyncStream
based implementation is good enough for this PR.
9abd4dd
to
d94a9c6
Compare
9d01575
to
888666b
Compare
Resolves #39