-
-
Notifications
You must be signed in to change notification settings - Fork 73
/
Copy pathstreaming.php
118 lines (104 loc) · 3.5 KB
/
streaming.php
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
<?php
declare(strict_types=1);
namespace Psl\IO;
use Generator;
use Psl;
use Psl\Channel;
use Psl\DateTime\Duration;
use Psl\Result;
use Psl\Str;
use Revolt\EventLoop;
use function max;
/**
* Streaming the output of the given read stream handles using a generator.
*
* Example:
*
* $handles = [
* 'foo' => get_read_stream('foo'),
* 'bar' => get_read_stream('bar'),
* ];
*
* foreach(IO\streaming($handles) as $type => $chunk) {
* IO\write_line('received chunk "%s" from "%s" stream', $chunk, $type);
* }
*
* @template T of array-key
*
* @param iterable<T, ReadHandleInterface&StreamHandleInterface> $handles
*
* @throws Exception\AlreadyClosedException If one of the handles has been already closed.
* @throws Exception\RuntimeException If an error occurred during the operation.
* @throws Exception\TimeoutException If $timeout is reached before being able to read all the handles until the end.
*
* @return Generator<T, string, mixed, null>
*/
function streaming(iterable $handles, null|Duration $timeout = null): Generator
{
/**
* @psalm-suppress UnnecessaryVarAnnotation
*
* @var Channel\ReceiverInterface<array{0: T|null, 1: Result\ResultInterface<string>}> $receiver
* @var Channel\SenderInterface<array{0: T|null, 1: Result\ResultInterface<string>}> $sender
*/
[$receiver, $sender] = Channel\unbounded();
/** @var Psl\Ref<array<T, string>> $watchers */
$watchers = new Psl\Ref([]);
foreach ($handles as $index => $handle) {
$stream = $handle->getStream();
if ($stream === null) {
throw new Exception\AlreadyClosedException(Str\format('Handle "%s" is already closed.', (string) $index));
}
$watchers->value[$index] = EventLoop::onReadable($stream, static function (string $watcher) use (
$index,
$handle,
$sender,
$watchers,
): void {
try {
$result = Result\wrap($handle->tryRead(...));
if ($result->isFailed() || $result->isSucceeded() && $result->getResult() === '') {
EventLoop::cancel($watcher);
unset($watchers->value[$index]);
}
$sender->send([$index, $result]);
} finally {
if ($watchers->value === []) {
$sender->close();
}
}
});
}
$timeout_watcher = null;
if ($timeout !== null) {
$timeout = max($timeout->getTotalSeconds(), 0.0);
$timeout_watcher = EventLoop::delay($timeout, static function () use ($sender): void {
/** @var Result\ResultInterface<string> $failure */
$failure = new Result\Failure(
new Exception\TimeoutException(
'Reached timeout before being able to read all the handles until the end.',
),
);
$sender->send([null, $failure]);
});
}
try {
while (true) {
[$index, $result] = $receiver->receive();
if (null === $index || $result->isFailed()) {
throw $result->getThrowable();
}
yield $index => $result->getResult();
}
} catch (Channel\Exception\ClosedChannelException) {
// completed.
return;
} finally {
if ($timeout_watcher !== null) {
EventLoop::cancel($timeout_watcher);
}
foreach ($watchers->value as $watcher) {
EventLoop::cancel($watcher);
}
}
}