@@ -30,178 +30,137 @@ import java.util.concurrent.atomic.AtomicInteger
3030import java.util.concurrent.atomic.AtomicReference
3131
3232/* *
33- * A subject implementation that dispatches signals to multiple
34- * consumers or buffers them until such consumers arrive.
33+ * A subject implementation that awaits a certain number of collectors
34+ * to start consuming, then allows the producer side to deliver items
35+ * to them.
3536 *
3637 * @param <T> the element type of the [Flow]
3738 * @param bufferSize the number of items to buffer until consumers arrive
3839 */
3940@FlowPreview
40- class MulticastSubject <T >(private val bufferSize : Int = 32 ) : AbstractFlow<T>(), SubjectAPI<T> {
41+ class MulticastSubject <T >(private val expectedCollectors : Int ) : AbstractFlow<T>(), SubjectAPI<T> {
4142
42- val queue = ConcurrentLinkedQueue < T >( )
43+ val collectors = AtomicReference < Array < ResumableCollector < T >>>( EMPTY as Array < ResumableCollector < T >> )
4344
44- val availableQueue = AtomicInteger (bufferSize )
45+ val producer = Resumable ( )
4546
46- val consumers = AtomicReference (EMPTY as Array <ResumableCollector <T >>)
47-
48- val producerAwait = Resumable ()
49-
50- val wip = AtomicInteger ()
47+ val remainingCollectors = AtomicInteger (expectedCollectors)
5148
5249 @Volatile
53- var error : Throwable ? = null
50+ var terminated : Throwable ? = null
5451
5552 override suspend fun emit (value : T ) {
56- while (availableQueue.get() == 0 ) {
57- producerAwait.await()
53+ awaitCollectors()
54+ for (collector in collectors.get()) {
55+ try {
56+ collector.next(value)
57+ } catch (ex: CancellationException ) {
58+ remove(collector)
59+ }
5860 }
59- queue.offer(value)
60- availableQueue.decrementAndGet();
61- drain()
6261 }
6362
6463 override suspend fun emitError (ex : Throwable ) {
65- error = ex
66- drain()
64+ // awaitCollectors()
65+ terminated = ex;
66+ for (collector in collectors.getAndSet(TERMINATED as Array <ResumableCollector <T >>)) {
67+ try {
68+ collector.error(ex)
69+ } catch (_: CancellationException ) {
70+ // ignored at this point
71+ }
72+ }
6773 }
6874
6975 override suspend fun complete () {
70- error = DONE
71- drain()
76+ // awaitCollectors()
77+ terminated = DONE
78+ for (collector in collectors.getAndSet(TERMINATED as Array <ResumableCollector <T >>)) {
79+ try {
80+ collector.complete()
81+ } catch (_: CancellationException ) {
82+ // ignored at this point
83+ }
84+ }
7285 }
7386
7487 override fun hasCollectors (): Boolean {
75- return consumers .get().isNotEmpty();
88+ return collectors .get().isNotEmpty()
7689 }
7790
7891 override fun collectorCount (): Int {
79- return consumers .get().size;
92+ return collectors .get().size
8093 }
8194
82- override suspend fun collectSafely (collector : FlowCollector <T >) {
83- val c = ResumableCollector <T >()
84- if (add(c)) {
85- c.readyConsumer()
86- drain()
87- c.drain(collector) { remove(it) }
88- } else {
89- val ex = error
90- if (ex != null && ex != DONE ) {
91- throw ex
92- }
95+ private suspend fun awaitCollectors () {
96+ if (remainingCollectors.get() != 0 ) {
97+ producer.await()
9398 }
9499 }
95100
96- private fun add (collector : ResumableCollector <T >) : Boolean {
101+ @Suppress(" UNCHECKED_CAST" , " " )
102+ private fun add (inner : ResumableCollector <T >) : Boolean {
97103 while (true ) {
98- val a = consumers.get()
99- if (a == TERMINATED ) {
104+
105+ val a = collectors.get()
106+ if (a as Any == TERMINATED as Any ) {
100107 return false
101108 }
102- val b = Array < ResumableCollector < T >>( a.size + 1 ) { idx ->
103- if (idx < a.size) a[idx] else collector
104- }
105- if (consumers .compareAndSet(a, b)) {
109+ val n = a.size
110+ val b = a.copyOf(n + 1 )
111+ b[n] = inner
112+ if (collectors .compareAndSet(a, b as Array < ResumableCollector < T >> )) {
106113 return true
107114 }
108115 }
109116 }
110- private fun remove (collector : ResumableCollector <T >) {
117+
118+ @Suppress(" UNCHECKED_CAST" )
119+ private fun remove (inner : ResumableCollector <T >) {
111120 while (true ) {
112- val a = consumers .get()
121+ val a = collectors .get()
113122 val n = a.size
114123 if (n == 0 ) {
115124 return
116125 }
117- var j = - 1
118- for (i in 0 until n) {
119- if (a[i] == collector) {
120- j = i
121- break
122- }
123- }
126+
127+ val j = a.indexOf(inner)
124128 if (j < 0 ) {
125- return ;
129+ return
126130 }
131+
127132 var b = EMPTY as Array <ResumableCollector <T >? >
128133 if (n != 1 ) {
129- b = Array < ResumableCollector < T > ? > (n - 1 ) { null }
134+ b = Array (n - 1 ) { null }
130135 System .arraycopy(a, 0 , b, 0 , j)
131136 System .arraycopy(a, j + 1 , b, j, n - j - 1 )
132137 }
133- if (consumers.compareAndSet(a, b as Array <ResumableCollector <T >>)) {
134- return ;
135- }
136- }
137- }
138-
139- private suspend fun drain () {
140- if (wip.getAndIncrement() != 0 ) {
141- return ;
142- }
143-
144- while (true ) {
145- val collectors = consumers.get()
146- if (collectors.isNotEmpty()) {
147- val ex = error;
148- val v = queue.poll();
149-
150- if (v == null && ex != null ) {
151- finish(ex)
152- }
153- else if (v != null ) {
154- var k = 0 ;
155- for (collector in consumers.get()) {
156- try {
157- // println("MulticastSubject -> [$k]: $v")
158- collector.next(v)
159- } catch (ex: CancellationException ) {
160- remove(collector);
161- }
162- k++
163- }
164- availableQueue.getAndIncrement()
165- producerAwait.resume()
166- continue
167- }
168- } else {
169- val ex = error;
170- if (ex != null && queue.isEmpty()) {
171- finish(ex)
172- }
173- }
174- if (wip.decrementAndGet() == 0 ) {
175- break
138+ if (collectors.compareAndSet(a, b as Array <ResumableCollector <T >>)) {
139+ return
176140 }
177141 }
178142 }
179143
180- private suspend fun finish (ex : Throwable ) {
181- if (ex == DONE ) {
182- for (collector in consumers.getAndSet(TERMINATED as Array <ResumableCollector <T >>)) {
183- try {
184- collector.complete()
185- } catch (_: CancellationException ) {
186- // ignored
187- }
144+ override suspend fun collectSafely (collector : FlowCollector <T >) {
145+ val rc = ResumableCollector <T >()
146+ if (add(rc)) {
147+ if (remainingCollectors.decrementAndGet() == 0 ) {
148+ producer.resume()
188149 }
150+ rc.drain(collector) { remove(it) }
189151 } else {
190- for (collector in consumers.getAndSet(TERMINATED as Array <ResumableCollector <T >>)) {
191- try {
192- collector.error(ex)
193- } catch (_: CancellationException ) {
194- // ignored
195- }
152+ val ex = terminated;
153+ if (ex != null && ex != DONE ) {
154+ throw ex
196155 }
197156 }
198157 }
199158
200159 companion object {
201- val DONE : Throwable = Throwable ( " Subject Completed " )
160+ val EMPTY = arrayOf< ResumableCollector < Any >>( )
202161
203- val EMPTY = arrayOf<ResumableCollector <Any >>();
162+ val TERMINATED = arrayOf<ResumableCollector <Any >>()
204163
205- val TERMINATED = arrayOf< ResumableCollector < Any >>();
164+ val DONE = Throwable ( " Subject completed " )
206165 }
207166}
0 commit comments