Skip to content

Commit ff7ae12

Browse files
author
Ryan Dingman
committed
Simplify interaction between AsyncBufferSequence.Iterator and its AsyncStream
1 parent 30876f5 commit ff7ae12

File tree

3 files changed

+16
-66
lines changed

3 files changed

+16
-66
lines changed

Sources/Subprocess/AsyncBufferSequence.swift

Lines changed: 10 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -35,25 +35,25 @@ public struct AsyncBufferSequence: AsyncSequence, Sendable {
3535
@_nonSendable
3636
public struct Iterator: AsyncIteratorProtocol {
3737
public typealias Element = Buffer
38-
internal typealias Stream = AsyncThrowingStream<StreamStatus, Swift.Error>
38+
internal typealias Stream = AsyncThrowingStream<Buffer, Swift.Error>
3939

4040
private let diskIO: DiskIO
4141
private var buffer: [UInt8]
4242
private var currentPosition: Int
4343
private var finished: Bool
4444
private var streamIterator: Stream.AsyncIterator
4545
private let continuation: Stream.Continuation
46-
private var needsNextChunk: Bool
46+
private var bytesRemaining: Int
4747

4848
internal init(diskIO: DiskIO, streamOptions: PlatformOptions.StreamOptions) {
4949
self.diskIO = diskIO
5050
self.buffer = []
5151
self.currentPosition = 0
5252
self.finished = false
53-
let (stream, continuation) = AsyncThrowingStream<StreamStatus, Swift.Error>.makeStream()
53+
let (stream, continuation) = AsyncThrowingStream<Buffer, Swift.Error>.makeStream()
5454
self.streamIterator = stream.makeAsyncIterator()
5555
self.continuation = continuation
56-
self.needsNextChunk = true
56+
self.bytesRemaining = 0
5757

5858
#if !os(Windows)
5959
if let minimumBufferSize = streamOptions.minimumBufferSize {
@@ -68,28 +68,14 @@ public struct AsyncBufferSequence: AsyncSequence, Sendable {
6868

6969
public mutating func next() async throws -> Buffer? {
7070

71-
if needsNextChunk {
72-
diskIO.readChunk(upToLength: readBufferSize, continuation: continuation)
73-
needsNextChunk = false
71+
if bytesRemaining <= 0 {
72+
bytesRemaining = readBufferSize
73+
diskIO.stream(upToLength: readBufferSize, continuation: continuation)
7474
}
7575

76-
if let status = try await streamIterator.next() {
77-
switch status {
78-
case .data(let data):
79-
return data
80-
81-
case .endOfChunk(let data):
82-
needsNextChunk = true
83-
return data
84-
85-
case .endOfFile:
86-
#if os(Windows)
87-
try self.diskIO.close()
88-
#else
89-
self.diskIO.close()
90-
#endif
91-
return nil
92-
}
76+
if let buffer = try await streamIterator.next() {
77+
bytesRemaining -= buffer.count
78+
return buffer
9379
} else {
9480
#if os(Windows)
9581
try self.diskIO.close()
@@ -114,17 +100,6 @@ public struct AsyncBufferSequence: AsyncSequence, Sendable {
114100
}
115101
}
116102

117-
extension AsyncBufferSequence {
118-
#if SubprocessSpan
119-
@available(SubprocessSpan, *)
120-
#endif
121-
internal enum StreamStatus {
122-
case data(AsyncBufferSequence.Buffer)
123-
case endOfChunk(AsyncBufferSequence.Buffer)
124-
case endOfFile
125-
}
126-
}
127-
128103
// MARK: - Page Size
129104
import _SubprocessCShims
130105

Sources/Subprocess/Platforms/Subprocess+Unix.swift

Lines changed: 4 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -415,7 +415,7 @@ extension DispatchIO {
415415
#if SubprocessSpan
416416
@available(SubprocessSpan, *)
417417
#endif
418-
internal func readChunk(upToLength maxLength: Int, continuation: AsyncBufferSequence.Iterator.Stream.Continuation) {
418+
internal func stream(upToLength maxLength: Int, continuation: AsyncBufferSequence.Iterator.Stream.Continuation) {
419419
self.read(
420420
offset: 0,
421421
length: maxLength,
@@ -430,26 +430,9 @@ extension DispatchIO {
430430
}
431431

432432
// Treat empty data and nil as the same
433-
let buffer = data.map { $0.isEmpty ? nil : $0 } ?? nil
434-
let status: AsyncBufferSequence.StreamStatus
435-
436-
switch (buffer, done) {
437-
case (.some(let data), false):
438-
status = .data(AsyncBufferSequence.Buffer(data: data))
439-
440-
case (.some(let data), true):
441-
status = .endOfChunk(AsyncBufferSequence.Buffer(data: data))
442-
443-
case (nil, false):
444-
fatalError("Unexpectedly received no data from DispatchIO with it indicating it is not done.")
445-
446-
case (nil, true):
447-
status = .endOfFile
448-
}
449-
450-
continuation.yield(status)
451-
452-
if done {
433+
if let data = data.map({ $0.isEmpty ? nil : $0 }) ?? nil {
434+
continuation.yield(AsyncBufferSequence.Buffer(data: data))
435+
} else if done {
453436
continuation.finish()
454437
}
455438
}

Sources/Subprocess/Platforms/Subprocess+Windows.swift

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1078,7 +1078,7 @@ extension FileDescriptor {
10781078
}
10791079

10801080
extension FileDescriptor {
1081-
internal func readChunk(upToLength maxLength: Int, continuation: AsyncBufferSequence.Iterator.Stream.Continuation) {
1081+
internal func stream(upToLength maxLength: Int, continuation: AsyncBufferSequence.Iterator.Stream.Continuation) {
10821082
do {
10831083
var totalBytesRead: Int = 0
10841084

@@ -1121,16 +1121,8 @@ extension FileDescriptor {
11211121

11221122
if values.count > 0 {
11231123
totalBytesRead += values.count
1124-
1125-
if totalBytesRead >= maxLength {
1126-
continuation.yield(.endOfChunk(AsyncBufferSequence.Buffer(data: values)))
1127-
continuation.finish()
1128-
return
1129-
} else {
1130-
continuation.yield(.data(AsyncBufferSequence.Buffer(data: values)))
1131-
}
1124+
continuation.yield(AsyncBufferSequence.Buffer(data: values))
11321125
} else {
1133-
continuation.yield(.endOfFile)
11341126
continuation.finish()
11351127
return
11361128
}

0 commit comments

Comments
 (0)