Skip to content

Commit a1abbf5

Browse files
committed
Move stream creation and configure to platform specific files
1 parent 7324ac4 commit a1abbf5

File tree

7 files changed

+107
-87
lines changed

7 files changed

+107
-87
lines changed

Sources/Subprocess/AsyncBufferSequence.swift

Lines changed: 11 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -33,15 +33,15 @@ public struct AsyncBufferSequence: AsyncSequence, Sendable {
3333
private var buffer: [UInt8]
3434
private var currentPosition: Int
3535
private var finished: Bool
36-
private var streamIterator: AsyncThrowingStream<StreamStatus, Swift.Error>.AsyncIterator
36+
private var streamIterator: AsyncThrowingStream<TrackedPlatformDiskIO.StreamStatus, Swift.Error>.AsyncIterator
3737

3838
internal init(diskIO: TrackedPlatformDiskIO, bufferSize: Int) {
3939
self.diskIO = diskIO
4040
self.bufferSize = bufferSize
4141
self.buffer = []
4242
self.currentPosition = 0
4343
self.finished = false
44-
self.streamIterator = Self.createDataStream(with: diskIO.dispatchIO, bufferSize: bufferSize).makeAsyncIterator()
44+
self.streamIterator = diskIO.readDataStream(upToLength: bufferSize).makeAsyncIterator()
4545
}
4646

4747
public mutating func next() async throws -> SequenceOutput.Buffer? {
@@ -51,7 +51,7 @@ public struct AsyncBufferSequence: AsyncSequence, Sendable {
5151
return data
5252

5353
case .endOfStream(let data):
54-
streamIterator = Self.createDataStream(with: diskIO.dispatchIO, bufferSize: bufferSize).makeAsyncIterator()
54+
streamIterator = diskIO.readDataStream(upToLength: bufferSize).makeAsyncIterator()
5555
return data
5656

5757
case .endOfFile:
@@ -63,54 +63,6 @@ public struct AsyncBufferSequence: AsyncSequence, Sendable {
6363
return nil
6464
}
6565
}
66-
67-
private enum StreamStatus {
68-
case data(SequenceOutput.Buffer)
69-
case endOfStream(SequenceOutput.Buffer)
70-
case endOfFile
71-
}
72-
73-
private static func createDataStream(with dispatchIO: DispatchIO, bufferSize: Int) -> AsyncThrowingStream<StreamStatus, Swift.Error> {
74-
return AsyncThrowingStream<StreamStatus, Swift.Error> { continuation in
75-
dispatchIO.read(
76-
offset: 0,
77-
length: bufferSize,
78-
queue: .global()
79-
) { done, data, error in
80-
if error != 0 {
81-
continuation.finish(throwing: SubprocessError(
82-
code: .init(.failedToReadFromSubprocess),
83-
underlyingError: .init(rawValue: error)
84-
))
85-
return
86-
}
87-
88-
// Treat empty data and nil as the same
89-
let buffer = data.map { $0.isEmpty ? nil : $0 } ?? nil
90-
let status: StreamStatus
91-
92-
switch (buffer, done) {
93-
case (.some(let data), false):
94-
status = .data(SequenceOutput.Buffer(data: data))
95-
96-
case (.some(let data), true):
97-
status = .endOfStream(SequenceOutput.Buffer(data: data))
98-
99-
case (nil, false):
100-
return
101-
102-
case (nil, true):
103-
status = .endOfFile
104-
}
105-
106-
continuation.yield(status)
107-
108-
if done {
109-
continuation.finish()
110-
}
111-
}
112-
}
113-
}
11466
}
11567

11668
private let diskIO: TrackedPlatformDiskIO
@@ -126,6 +78,14 @@ public struct AsyncBufferSequence: AsyncSequence, Sendable {
12678
}
12779
}
12880

81+
extension TrackedPlatformDiskIO {
82+
internal enum StreamStatus {
83+
case data(SequenceOutput.Buffer)
84+
case endOfStream(SequenceOutput.Buffer)
85+
case endOfFile
86+
}
87+
}
88+
12989
// MARK: - Page Size
13090
import _SubprocessCShims
13191

Sources/Subprocess/Execution.swift

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -110,15 +110,8 @@ extension Execution where Output == SequenceOutput {
110110
fatalError("The standard output has already been consumed")
111111
}
112112

113-
if let lowWater = output.lowWater {
114-
readFd.dispatchIO.setLimit(lowWater: lowWater)
115-
}
116-
117-
if let highWater = output.highWater {
118-
readFd.dispatchIO.setLimit(highWater: highWater)
119-
}
120-
121-
return AsyncBufferSequence(diskIO: readFd, bufferSize: output.bufferSize)
113+
// TODO: Make buffer size and option
114+
return AsyncBufferSequence(diskIO: readFd, bufferSize: readBufferSize)
122115
}
123116
}
124117

@@ -142,15 +135,8 @@ extension Execution where Error == SequenceOutput {
142135
fatalError("The standard output has already been consumed")
143136
}
144137

145-
if let lowWater = error.lowWater {
146-
readFd.dispatchIO.setLimit(lowWater: lowWater)
147-
}
148-
149-
if let highWater = error.highWater {
150-
readFd.dispatchIO.setLimit(highWater: highWater)
151-
}
152-
153-
return AsyncBufferSequence(diskIO: readFd, bufferSize: error.bufferSize)
138+
// TODO: Make buffer size and option
139+
return AsyncBufferSequence(diskIO: readFd, bufferSize: readBufferSize)
154140
}
155141
}
156142

Sources/Subprocess/IO/Output.swift

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -210,15 +210,8 @@ public struct BytesOutput: OutputProtocol {
210210
#endif
211211
public struct SequenceOutput: OutputProtocol {
212212
public typealias OutputType = Void
213-
internal let lowWater: Int?
214-
internal let highWater: Int?
215-
internal let bufferSize: Int
216-
217-
internal init(lowWater: Int? = nil, highWater: Int? = nil, bufferSize: Int = readBufferSize) {
218-
self.lowWater = lowWater
219-
self.highWater = highWater
220-
self.bufferSize = bufferSize
221-
}
213+
214+
internal init() {}
222215
}
223216

224217
#if SubprocessSpan
@@ -291,9 +284,6 @@ extension OutputProtocol where Self == SequenceOutput {
291284
/// to the `.standardOutput` (or `.standardError`) property
292285
/// of `Execution` as `AsyncSequence<Data>`.
293286
public static var sequence: Self { .init() }
294-
public static func sequence(lowWater: Int? = nil, highWater: Int? = nil, bufferSize: Int? = nil) -> Self {
295-
.init(lowWater: lowWater, highWater: highWater, bufferSize: bufferSize ?? readBufferSize)
296-
}
297287
}
298288

299289
// MARK: - Span Default Implementations

Sources/Subprocess/Platforms/Subprocess+Darwin.swift

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ public struct PlatformOptions: Sendable {
5757
/// Creates a session and sets the process group ID
5858
/// i.e. Detach from the terminal.
5959
public var createSession: Bool = false
60+
public var outputOptions: StreamOptions = .init()
61+
public var errorOptions: StreamOptions = .init()
6062
/// An ordered list of steps in order to tear down the child
6163
/// process in case the parent task is cancelled before
6264
/// the child proces terminates.
@@ -126,6 +128,18 @@ extension PlatformOptions {
126128
#endif
127129
}
128130

131+
extension PlatformOptions {
132+
public struct StreamOptions: Sendable {
133+
let lowWater: Int?
134+
let highWater: Int?
135+
136+
init(lowWater: Int? = nil, highWater: Int? = nil) {
137+
self.lowWater = lowWater
138+
self.highWater = highWater
139+
}
140+
}
141+
}
142+
129143
extension PlatformOptions: CustomStringConvertible, CustomDebugStringConvertible {
130144
internal func description(withIndent indent: Int) -> String {
131145
let indent = String(repeating: " ", count: indent * 4)
@@ -355,8 +369,8 @@ extension Configuration {
355369
output: output,
356370
error: error,
357371
inputPipe: inputPipe.createInputPipe(),
358-
outputPipe: outputPipe.createOutputPipe(),
359-
errorPipe: errorPipe.createOutputPipe()
372+
outputPipe: outputPipe.createOutputPipe(with: platformOptions.outputOptions),
373+
errorPipe: errorPipe.createOutputPipe(with: platformOptions.errorOptions)
360374
)
361375
}
362376

Sources/Subprocess/Platforms/Subprocess+Linux.swift

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,8 +127,8 @@ extension Configuration {
127127
output: output,
128128
error: error,
129129
inputPipe: inputPipe.createInputPipe(),
130-
outputPipe: outputPipe.createOutputPipe(),
131-
errorPipe: errorPipe.createOutputPipe()
130+
outputPipe: outputPipe.createOutputPipe(with: platformOptions.outputOptions),
131+
errorPipe: errorPipe.createOutputPipe(with: platformOptions.errorOptions)
132132
)
133133
}
134134

@@ -176,6 +176,8 @@ public struct PlatformOptions: Sendable {
176176
// Creates a session and sets the process group ID
177177
// i.e. Detach from the terminal.
178178
public var createSession: Bool = false
179+
public var outputOptions: StreamOptions = .init()
180+
public var errorOptions: StreamOptions = .init()
179181
/// An ordered list of steps in order to tear down the child
180182
/// process in case the parent task is cancelled before
181183
/// the child proces terminates.
@@ -197,6 +199,18 @@ public struct PlatformOptions: Sendable {
197199
public init() {}
198200
}
199201

202+
extension PlatformOptions {
203+
public struct StreamOptions: Sendable {
204+
let lowWater: Int?
205+
let highWater: Int?
206+
207+
init(lowWater: Int? = nil, highWater: Int? = nil) {
208+
self.lowWater = lowWater
209+
self.highWater = highWater
210+
}
211+
}
212+
}
213+
200214
extension PlatformOptions: CustomStringConvertible, CustomDebugStringConvertible {
201215
internal func description(withIndent indent: Int) -> String {
202216
let indent = String(repeating: " ", count: indent * 4)

Sources/Subprocess/Platforms/Subprocess+Unix.swift

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -412,6 +412,7 @@ extension CreatedPipe {
412412
}
413413
}
414414
)
415+
415416
writeEnd = .init(
416417
dispatchIO,
417418
closeWhenDone: writeFileDescriptor.closeWhenDone
@@ -423,7 +424,7 @@ extension CreatedPipe {
423424
)
424425
}
425426

426-
internal func createOutputPipe() -> OutputPipe {
427+
internal func createOutputPipe(with options: PlatformOptions.StreamOptions) -> OutputPipe {
427428
var readEnd: TrackedPlatformDiskIO? = nil
428429
if let readFileDescriptor = self.readFileDescriptor {
429430
let dispatchIO: DispatchIO = DispatchIO(
@@ -437,6 +438,15 @@ extension CreatedPipe {
437438
}
438439
}
439440
)
441+
442+
if let lowWater = options.lowWater {
443+
dispatchIO.setLimit(lowWater: lowWater)
444+
}
445+
446+
if let highWater = options.highWater {
447+
dispatchIO.setLimit(highWater: highWater)
448+
}
449+
440450
readEnd = .init(
441451
dispatchIO,
442452
closeWhenDone: readFileDescriptor.closeWhenDone
@@ -451,6 +461,48 @@ extension CreatedPipe {
451461

452462
// MARK: - TrackedDispatchIO extensions
453463
extension TrackedDispatchIO {
464+
internal func readDataStream(upToLength maxLength: Int) -> AsyncThrowingStream<StreamStatus, Swift.Error> {
465+
return AsyncThrowingStream<StreamStatus, Swift.Error> { continuation in
466+
self.dispatchIO.read(
467+
offset: 0,
468+
length: maxLength,
469+
queue: .global()
470+
) { done, data, error in
471+
if error != 0 {
472+
continuation.finish(throwing: SubprocessError(
473+
code: .init(.failedToReadFromSubprocess),
474+
underlyingError: .init(rawValue: error)
475+
))
476+
return
477+
}
478+
479+
// Treat empty data and nil as the same
480+
let buffer = data.map { $0.isEmpty ? nil : $0 } ?? nil
481+
let status: StreamStatus
482+
483+
switch (buffer, done) {
484+
case (.some(let data), false):
485+
status = .data(SequenceOutput.Buffer(data: data))
486+
487+
case (.some(let data), true):
488+
status = .endOfStream(SequenceOutput.Buffer(data: data))
489+
490+
case (nil, false):
491+
return
492+
493+
case (nil, true):
494+
status = .endOfFile
495+
}
496+
497+
continuation.yield(status)
498+
499+
if done {
500+
continuation.finish()
501+
}
502+
}
503+
}
504+
}
505+
454506
#if SubprocessSpan
455507
@available(SubprocessSpan, *)
456508
#endif

Tests/SubprocessTests/SubprocessTests+Unix.swift

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -674,12 +674,16 @@ extension SubprocessUnixTests {
674674
sleep \(threshold)
675675
"""
676676

677+
var platformOptions = PlatformOptions()
678+
platformOptions.outputOptions = .init(lowWater: 0)
679+
677680
let start = ContinuousClock().now
678681

679682
let catResult = try await Subprocess.run(
680683
.path("/bin/bash"),
681684
arguments: ["-c", script],
682-
output: .sequence(lowWater: 0),
685+
platformOptions: platformOptions,
686+
output: .sequence,
683687
error: .discarded,
684688
body: { (execution, _) in
685689
for try await chunk in execution.standardOutput {

0 commit comments

Comments
 (0)