diff --git a/README.md b/README.md index ff044502..0d636109 100644 --- a/README.md +++ b/README.md @@ -19,7 +19,7 @@ The latest preview release is available on Maven Central as Handling streams of data—especially “live” data whose volume is not predetermined—requires special care in an asynchronous system. The most prominent issue is that resource consumption needs to be carefully controlled such that a fast data source does not overwhelm the stream destination. Asynchrony is needed in order to enable the parallel use of computing resources, on collaborating network hosts or multiple CPU cores within a single machine. -The main goal of Reactive Streams is to govern the exchange of stream data across an asynchronous boundary—think passing elements on to another thread or thread-pool—while ensuring that the receiving side is not forced to buffer arbitrary amounts of data. In other words, backpressure is an integral part of this model in order to allow the queues which mediate between threads to be bounded. The benefits of asynchronous processing would be negated if the communication of backpressure were synchronous (see also the [Reactive Manifesto](http://reactivemanifesto.org/)), therefore care has been taken to mandate fully non-blocking and asynchronous behavior of all aspects of a Reactive Streams implementation. +The main goal of Reactive Streams is to govern the exchange of stream data across an asynchronous boundary – think passing elements on to another thread or thread-pool — while ensuring that the receiving side is not forced to buffer arbitrary amounts of data. In other words, backpressure is an integral part of this model in order to allow the queues which mediate between threads to be bounded. The benefits of asynchronous processing would be negated if the communication of backpressure were synchronous (see also the [Reactive Manifesto](http://reactivemanifesto.org/)), therefore care has been taken to mandate fully non-blocking and asynchronous behavior of all aspects of a Reactive Streams implementation. It is the intention of this specification to allow the creation of many conforming implementations, which by virtue of abiding by the rules will be able to interoperate smoothly, preserving the aforementioned benefits and characteristics across the whole processing graph of a stream application. @@ -34,67 +34,101 @@ In summary, Reactive Streams is a standard and specification for Stream-oriented The Reactive Streams specification consists of the following parts: -**The SPI** defines the interoperablility layer between different implementations. - -**The API** specifies the types that the users of Reactive Stream libraries use. +**The API** specifies the types to implement Reactive Streams and achieve interoperablility between different implementations. ***The Technology Compatibility Kit (TCK)*** is a standard test suite for conformance testing of implementations. -Implementations are free to implement additional features not covered by the specification as long as they conform to the API and SPI requirements and pass the tests in the TCK. +Implementations are free to implement additional features not covered by the specification as long as they conform to the API requirements and pass the tests in the TCK. -#### Comparison with related technologies #### +### API Components ### -In contrast to reactive streams described in this document, a Future represents exactly one element (or a failure) that is produced asynchronosly while streams can provide a potentially unbounded number of elements. +The API consists of the following components that are required to be provided by Reactive Stream implementations: -Compared to Rx, the SPI described here prescribes a mandatory, non-blocking way to handle back-pressure and requires the processing of an element by a dowstream component to be dispatched asynchronously. + - Publisher + - Subscriber + - Subscription -Iteratees are an abstraction used for consuming a stream, often for parsing it. In this sense they are not a stream transformation or combination tool in themselves. +A *Publisher* is a provider of a potentially unbounded number of sequenced elements, publishing them according to the demand received from its Subscriber(s). -### SPI Components ### +The protocol of a `Publisher`/`Subscriber` relationship is defined as: -The SPI consists of components that are required to be provided by Reactive Stream implementations but these interfaces should not be exposed to libraries or user code that *use* a Reactive Streams implementation. The reason for this is that the methods used on the SPI level have very strict and rather complex semantic requirements which are likely to be violated by end users. +``` +onError | (onSubscribe onNext* (onError | onComplete)?) +``` -The components of the SPI are: +- The number of `onNext` events emitted by a `Publisher` to a `Subscriber` will at no point in time exceed the cumulative demand that has been signaled via that `Subscriber`’s `Subscription`. +- A `Publisher` can send less events that requested and end the `Subscription` by emitting `onComplete` or `onError`. +- Events sent to a `Subscriber` can only be sent sequentially (no concurrent notifications). +- If a `Publisher` fails it must emit an `onError`. +- If a `Publisher` terminates successfully (finite stream) it must emit an `onComplete`. +- If a Publisher signals either `onError` or `onComplete` on a `Subscriber`, that `Subscriber`’s `Subscription` must be considered canceled. +- Once a terminal state has been signaled (`onError`, `onNext`) no further events can be sent. +- Upon receiving a `Subscription.cancel` request it should stop sending events as soon as it can. +- Calling `onError` or `onComplete` is not required after having received a `Subscription.cancel`. +- The `Publisher.subscribe` method can be called as many times as wanted as long as it is with a different `Subscriber` each time. It is up to the `Publisher` whether underlying streams are shared or not. In other words, a `Publisher` can support multi-subscribe and then choose whether each `Subscription` is unicast or multicast. +- A `Publisher` can refuse subscriptions (calls to `subscribe`) if it is unable or unwilling to serve them (overwhelmed, fronting a single-use data source, etc) and can do so by calling `Subscriber.onError` instead of `Subscriber.onSubscribe` on the `Subscriber` instance calling `subscribe`. +- A `Publisher` should not throw an `Exception`. The only legal way to signal failure (or reject a `Subscription`) is via the `Subscriber.onError` method. +- The `Subscription.request` method must behave asynchronously (separate thread, event loop, trampoline, etc) so as to not cause a StackOverflow since `Subscriber.onNext` -> `Subscription.request` -> `Subscriber.onNext` can recurse infinitely. This allows a `Subscriber` to directly invoke `Subscription.request` and isolate the async responsibility to the `Subscription` instance which has responsibility for scheduling events. + + +A *`Subscriber`* is a component that accepts a sequenced stream of elements provided by a `Publisher`. At any given time a `Subscriber` might be subscribed to at most one `Publisher`. It provides the callback `onNext` to be called by the upstream `Publisher`, accepting an element that is to be processed or enqueued without blocking the `Publisher`. + +- `Subscriber` can be used once-and-only-once to subscribe to a `Publisher`. - - Publisher - - Subscriber - - Subscription -A *Publisher* is a provider of a potentially unbounded number of sequenced elements, publishing them according to the demand received from its Subscriber(s). A Publisher can serve multiple subscribers subscribed dynamically at various points in time. In the case of multiple subscribers the Publisher should respect the processing rates of all of its subscribers (possibly allowing for a bounded drift between them). It must eventually clean up its resources after all of its subscribers have been unsubscribed and shut down. A Publisher will typically support fanning out to multiple Subscribers in order to support the dynamic assembly of processing networks from building blocks that can freely be shared. +A `Subscriber` communicates demand to the `Publisher` via a *`Subscription`* which is passed to the `Subscriber` after the subscription has been established. The `Subscription` exposes the `request(int)` method that is used by the `Subscriber` to signal demand to the `Publisher`. -A *Subscriber* is a component that accepts a sequenced stream of elements provided by a Publisher. At any given time a Subscriber might be subscribed to at most one Publisher. It provides the callback onNext to be called by the upstream Producer, accepting an element that is to be asynchronously processed or enqueued without blocking the Producer. +- A `Subscription` can be used once-and-only-once to represent a subscription by a `Subscriber` to a `Publisher`. +- Calls from a `Subscriber` to `Subscription.request(int n)` can be made directly since it is the responsibility of `Subscription` to handle async dispatching. -A Subscriber communicates demand to the Publisher via a *Subscription* which is passed to the Subscriber after the subscription has been established. The Subscription exposes the requestMore(int) method that is used by the Subscriber to signal demand to the Publisher. For each of its subscribers the Publisher obeys the following invariant: +For each of its subscribers the `Publisher` obeys the following invariant: -*If N is the total number of demand tokens handed to the Publisher P by a Subscriber S during the time period up to a time T, then the number of onNext calls that are allowed to be performed by P on S before T must be less than or equal to N. The number of pending demand tokens must be tracked by the Producer separately for each of its subscribers.* +*If N is the total number of demand tokens handed to the `Publisher` P by a `Subscriber` S during the time period up to a time T, then the number of `onNext` calls that are allowed to be performed by P on S before T must be less than or equal to N. The number of pending demand tokens must be tracked by the `Producer` separately for each of its subscribers.* -Subscribers that do not currently have an active subscription may subscribe to a Publisher. The only guarantee for subscribers attached at different points in time is that they all observe a common suffix of the stream, i.e. they receive the same elements after a certain point in time but it is not guaranteed that they see exactly the same number of elements. This obviously only holds if the subscriber does not cancel its subscription before the stream has been terminated. +`Subscriber`s that do not currently have an active subscription may subscribe to a `Publisher`. The only guarantee for subscribers attached at different points in time is that they all observe a common suffix of the stream, i.e. they receive the same elements after a certain point in time but it is not guaranteed that they see exactly the same number of elements. This obviously only holds if the subscriber does not cancel its subscription before the stream has been terminated. > In practice there is a difference between the guarantees that different publishers can provide for subscribers attached at different points in time. For example Publishers serving elements from a strict collection (“cold”) might guarantee that all subscribers see *exactly* the same elements (unless unsubscribed before completion) since they can replay the elements from the collection at any point in time. Other publishers might represent an ephemeral source of elements (e.g. a “hot” TCP stream) and keep only a limited output buffer to replay for future subscribers. -At any time the Publisher may signal that it is not able to provide more elements. This is done by invoking onComplete on its subscribers. +At any time the `Publisher` may signal that it is not able to provide more elements. This is done by invoking `onComplete` on its subscribers. -> For example a Publisher representing a strict collection signals completion to its subscriber after it provided all the elements. Now a later subscriber might still receive the whole collection before receiving onComplete. +> For example a `Publisher` representing a strict collection signals completion to its subscriber after it provided all the elements. Now a later subscriber might still receive the whole collection before receiving onComplete. -### API components ### +### Asynchronous vs Synchronous Processing ### -The purpose of the API is to provide the types that users interact with directly. SPI methods and interfaces should not be exposed expect for the purpose of writing Reactive Streams implementations. +The Reactive Streams API prescribes that all processing of elements (`onNext`) or termination signals (`onError`, `onComplete`) do not *block* the `Publisher`. Each of the `on*` handlers can process the events synchronously or asynchronously. -The API counterpart for Publisher is *Producer* and for Subscriber is *Consumer*. The combination of these two—a stream processing element with asynchronous input and output—is called *Processor*. +For example, this `onNext` implementation does synchronous transformation and enqueues the result for further asynchronous processing: -The only operation supported by any Producer–Consumer pair is their ability to establish a connection for the purpose of transferring the stream of elements from Producer to Consumer; this is achieved by the method `produceTo()`. Concrete implementations of Reactive Streams are expected to offer a rich set of combinators and transformations, but these are not the subject of this specification. The reason is that implementations shall have the freedom to formulate the end-user API in an idiomatic fashion for the respective platform, language and use-case they target. +```java +void onNext(T t) { + queue.offer(transform(t)); +} +``` -In addition there is one method each on Producer and Consumer to obtain a reference to the underlying Publisher or Subscriber, respectively. These are necessary for implementations, but is not to be considered end-user API. +In a push-based model such as this doing asynchronous processing, back-pressure needs to be provided otherwise buffer bloat can occur. -### Asynchronous processing ### +In contrast to communicating back-pressure by blocking the publisher, a non-blocking solution needs to communicate demand through a dedicated control channel. This channel is provided by the `Subscription`: the `Subscriber` controls the maximum amount of future elements it is willing receive by sending explicit demand tokens (by calling `request(int)`). -The Reactive Streams SPI prescribes that all processing of elements (onNext) or termination signals (onError, onComplete) happens outside of the execution stack of the Publisher. This is achieved by scheduling the processing to run asynchronously, possibly on a different thread. The Subscriber should make sure to minimize the amount of processing steps used to initiate this process, meaning that all its SPI-mandated methods shall return as quickly as possible. +Expanding on the `onNext` example above, as the queue is drained and processed asynchronously it would signal demand such as this: -In contrast to communicating back-pressure by blocking the publisher, a non-blocking solution needs to communicate demand through a dedicated control channel. This channel is provided by the Subscription: the subscriber controls the maximum amount of future elements it is willing receive by sending explicit demand tokens (by calling requestMore(int)). +```java +// TODO replace with fully functioning code example rather than this pseudo-code snippet +void process() { + eventLoop.schedule(() -> { + T t; + while((t = queue.poll()) != null) { + doWork(t); + if(queue.size() < THRESHOLD) { + subscription.request(queue.capacity()); + } + } + }) +} +``` #### Relationship to synchronous stream-processing #### -This document describes asynchronous, non-blocking backpressure boundaries but in between those boundaries any kind of synchronous stream processing model is permitted. This is useful for performance optimization (eliminating inter-thread synchronization) and it conveniently transports backpressure implicitly (the calling method cannot continue while the call lasts). As an example consider a section consisting of three connected Processors, A, B and C: +This document defines a protocol for asynchronous, non-blocking backpressure boundaries but in between those boundaries any kind of synchronous stream processing model is permitted. This is useful for performance optimization (eliminating inter-thread synchronization) and it conveniently transports backpressure implicitly (the calling method cannot continue while the call lasts). As an example consider a section consisting of three connected Processors, A, B and C: (...) --> A[S1 --> S2] --> B[S3 --> S4 --> S5] --> C[S6] --> (...) diff --git a/api/.gitignore b/api/.gitignore new file mode 100644 index 00000000..5e56e040 --- /dev/null +++ b/api/.gitignore @@ -0,0 +1 @@ +/bin diff --git a/spi/build.sbt b/api/build.sbt similarity index 100% rename from spi/build.sbt rename to api/build.sbt diff --git a/api/src/examples/java/org/reactivestreams/example/multicast/MulticastExample.java b/api/src/examples/java/org/reactivestreams/example/multicast/MulticastExample.java new file mode 100644 index 00000000..7c7b2a26 --- /dev/null +++ b/api/src/examples/java/org/reactivestreams/example/multicast/MulticastExample.java @@ -0,0 +1,23 @@ +package org.reactivestreams.example.multicast; + +import org.reactivestreams.Publisher; + +public class MulticastExample { + + /** + * Each subscribe will join an existing stream. + * + * @param args + * @throws InterruptedException + */ + public static void main(String... args) throws InterruptedException { + Publisher dataStream = new StockPricePublisher(); + + dataStream.subscribe(new StockPriceSubscriber(5, 500)); // 500ms on each event, infinite + dataStream.subscribe(new StockPriceSubscriber(10, 2000)); // 2000ms on each event, infinite + Thread.sleep(5000); + dataStream.subscribe(new StockPriceSubscriber(10, 111, 20)); // 111ms on each event, take 20 + Thread.sleep(5000); + dataStream.subscribe(new StockPriceSubscriber(10, 222, 20));// 222ms on each event, take 20 + } +} diff --git a/api/src/examples/java/org/reactivestreams/example/multicast/NeverEndingStockStream.java b/api/src/examples/java/org/reactivestreams/example/multicast/NeverEndingStockStream.java new file mode 100644 index 00000000..0086e8c1 --- /dev/null +++ b/api/src/examples/java/org/reactivestreams/example/multicast/NeverEndingStockStream.java @@ -0,0 +1,75 @@ +package org.reactivestreams.example.multicast; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Simulate a network connection that is firing data at you. + *

+ * Purposefully not using the `Subscriber` and `Publisher` types to not confuse things with `StockPricePublisher` + */ +public class NeverEndingStockStream { + + private static final NeverEndingStockStream INSTANCE = new NeverEndingStockStream(); + + private NeverEndingStockStream() { + } + + // using array because it is far faster than list/set for iteration + // which is where most of the time in the tight loop will go (... well, beside object allocation) + private volatile Handler[] handlers = new Handler[0]; + + public static synchronized void addHandler(Handler handler) { + if (INSTANCE.handlers.length == 0) { + INSTANCE.handlers = new Handler[] { handler }; + } else { + Handler[] newHandlers = new Handler[INSTANCE.handlers.length + 1]; + System.arraycopy(INSTANCE.handlers, 0, newHandlers, 0, INSTANCE.handlers.length); + newHandlers[newHandlers.length - 1] = handler; + INSTANCE.handlers = newHandlers; + } + INSTANCE.startIfNeeded(); + } + + public static synchronized void removeHandler(Handler handler) { + // too lazy to do the array handling + HashSet set = new HashSet<>(Arrays.asList(INSTANCE.handlers)); + set.remove(handler); + INSTANCE.handlers = set.toArray(new Handler[set.size()]); + } + + public static interface Handler { + public void handle(Stock event); + } + + private final AtomicBoolean running = new AtomicBoolean(false); + private final AtomicLong stockIndex = new AtomicLong(); + + private void startIfNeeded() { + if (running.compareAndSet(false, true)) { + new Thread(new Runnable() { + + @Override + public void run() { + while (handlers.length > 0) { + long l = stockIndex.incrementAndGet(); + Stock s = new Stock(l); + for (Handler h : handlers) { + h.handle(s); + } + try { + // just so it is someone sane how fast this is moving + Thread.sleep(1); + } catch (InterruptedException e) { + } + } + running.set(false); + } + + }).start(); + } + } + +} diff --git a/api/src/examples/java/org/reactivestreams/example/multicast/Stock.java b/api/src/examples/java/org/reactivestreams/example/multicast/Stock.java new file mode 100644 index 00000000..431e60de --- /dev/null +++ b/api/src/examples/java/org/reactivestreams/example/multicast/Stock.java @@ -0,0 +1,15 @@ +package org.reactivestreams.example.multicast; + +public class Stock { + + private final long l; + + public Stock(long l) { + this.l = l; + } + + public long getPrice() { + return l; + } + +} diff --git a/api/src/examples/java/org/reactivestreams/example/multicast/StockPricePublisher.java b/api/src/examples/java/org/reactivestreams/example/multicast/StockPricePublisher.java new file mode 100644 index 00000000..78aa3e6f --- /dev/null +++ b/api/src/examples/java/org/reactivestreams/example/multicast/StockPricePublisher.java @@ -0,0 +1,73 @@ +package org.reactivestreams.example.multicast; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.reactivestreams.Subscription; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Publisher; +import org.reactivestreams.example.multicast.NeverEndingStockStream.Handler; + +/** + * Publisher of stock prices from a never ending stream. + *

+ * It will share a single underlying stream with as many subscribers as it receives. + *

+ * If the subscriber can not keep up, it will drop (different strategies could be implemented, configurable, etc). + */ +public class StockPricePublisher implements Publisher { + + @Override + public void subscribe(final Subscriber s) { + s.onSubscribe(new Subscription() { + + AtomicInteger capacity = new AtomicInteger(); + EventHandler handler = new EventHandler(s, capacity); + + @Override + public void request(int n) { + if (capacity.getAndAdd(n) == 0) { + // was at 0, so start up consumption again + startConsuming(); + } + } + + @Override + public void cancel() { + System.out.println("StockPricePublisher => Cancel Subscription"); + NeverEndingStockStream.removeHandler(handler); + } + + public void startConsuming() { + NeverEndingStockStream.addHandler(handler); + } + + }); + + } + + private static final class EventHandler implements Handler { + private final Subscriber s; + private final AtomicInteger capacity; + + private EventHandler(Subscriber s, AtomicInteger capacity) { + this.s = s; + this.capacity = capacity; + } + + @Override + public void handle(Stock event) { + int c = capacity.get(); + if (c == 0) { + // shortcut instead of doing decrement/increment loops while no capacity + return; + } + if (capacity.getAndDecrement() > 0) { + s.onNext(event); + } else { + // we just decremented below 0 so increment back one + capacity.incrementAndGet(); + } + } + } + +} diff --git a/api/src/examples/java/org/reactivestreams/example/multicast/StockPriceSubscriber.java b/api/src/examples/java/org/reactivestreams/example/multicast/StockPriceSubscriber.java new file mode 100644 index 00000000..e0a37813 --- /dev/null +++ b/api/src/examples/java/org/reactivestreams/example/multicast/StockPriceSubscriber.java @@ -0,0 +1,78 @@ +package org.reactivestreams.example.multicast; + +import java.util.concurrent.ArrayBlockingQueue; + +import org.reactivestreams.Subscription; +import org.reactivestreams.Subscriber; + +public class StockPriceSubscriber implements Subscriber { + + private final ArrayBlockingQueue buffer; + private final int delayPerStock; + private volatile boolean terminated = false; + private final int take; + + public StockPriceSubscriber(int bufferSize, int delayPerStock, int take) { + this.buffer = new ArrayBlockingQueue<>(bufferSize); + this.delayPerStock = delayPerStock; + this.take = take; + } + + public StockPriceSubscriber(int bufferSize, int delayPerStock) { + this(bufferSize, delayPerStock, -1); + } + + @Override + public void onSubscribe(Subscription s) { + System.out.println("StockPriceSubscriber.onSubscribe => request " + buffer.remainingCapacity()); + s.request(buffer.remainingCapacity()); + startAsyncWork(s); + } + + @Override + public void onNext(Stock t) { + buffer.add(t); + } + + @Override + public void onError(Throwable t) { + terminated = true; + throw new RuntimeException(t); + } + + @Override + public void onComplete() { + terminated = true; + } + + private void startAsyncWork(final Subscription s) { + System.out.println("StockPriceSubscriber => Start new worker thread"); + /* don't write real code like this! just for quick demo */ + new Thread(new Runnable() { + public void run() { + int received = 0; + + while (!terminated) { + Stock v = buffer.poll(); + try { + Thread.sleep(delayPerStock); + } catch (Exception e) { + e.printStackTrace(); + } + if (buffer.size() < 3) { + s.request(buffer.remainingCapacity()); + } + if (v != null) { + received++; + System.out.println("StockPriceSubscriber[" + delayPerStock + "] => " + v.getPrice()); + if (take > 0 && received >= take) { + s.cancel(); + terminated = true; + } + } + } + } + }).start(); + } + +} diff --git a/api/src/examples/java/org/reactivestreams/example/unicast/InfiniteIncrementNumberPublisher.java b/api/src/examples/java/org/reactivestreams/example/unicast/InfiniteIncrementNumberPublisher.java new file mode 100644 index 00000000..450e0089 --- /dev/null +++ b/api/src/examples/java/org/reactivestreams/example/unicast/InfiniteIncrementNumberPublisher.java @@ -0,0 +1,52 @@ +package org.reactivestreams.example.unicast; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.reactivestreams.Subscription; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Publisher; + +class InfiniteIncrementNumberPublisher implements Publisher { + + @Override + public void subscribe(final Subscriber s) { + + final AtomicInteger i = new AtomicInteger(); + + Subscription subscription = new Subscription() { + + AtomicInteger capacity = new AtomicInteger(); + + @Override + public void request(int n) { + System.out.println("signalAdditionalDemand => " + n); + if (capacity.getAndAdd(n) == 0) { + // start sending again if it wasn't already running + send(); + } + } + + private void send() { + System.out.println("send => " + capacity.get()); + // this would normally use an eventloop, actor, whatever + new Thread(new Runnable() { + + public void run() { + do { + s.onNext(i.incrementAndGet()); + } while (capacity.decrementAndGet() > 0); + } + }).start(); + } + + @Override + public void cancel() { + capacity.set(-1); + } + + }; + + s.onSubscribe(subscription); + + } +} \ No newline at end of file diff --git a/api/src/examples/java/org/reactivestreams/example/unicast/NumberSubscriberThatHopsThreads.java b/api/src/examples/java/org/reactivestreams/example/unicast/NumberSubscriberThatHopsThreads.java new file mode 100644 index 00000000..f0d594fa --- /dev/null +++ b/api/src/examples/java/org/reactivestreams/example/unicast/NumberSubscriberThatHopsThreads.java @@ -0,0 +1,64 @@ +package org.reactivestreams.example.unicast; + +import java.util.concurrent.ArrayBlockingQueue; + +import org.reactivestreams.Subscription; +import org.reactivestreams.Subscriber; + +class NumberSubscriberThatHopsThreads implements Subscriber { + + final int BUFFER_SIZE = 10; + private final ArrayBlockingQueue buffer = new ArrayBlockingQueue<>(BUFFER_SIZE); + private volatile boolean terminated = false; + private final String token; + + NumberSubscriberThatHopsThreads(String token) { + this.token = token; + } + + @Override + public void onSubscribe(Subscription s) { + System.out.println("onSubscribe => request " + BUFFER_SIZE); + s.request(BUFFER_SIZE); + startAsyncWork(s); + } + + @Override + public void onNext(Integer t) { + buffer.add(t); + } + + @Override + public void onError(Throwable t) { + terminated = true; + throw new RuntimeException(t); + } + + @Override + public void onComplete() { + terminated = true; + } + + private void startAsyncWork(final Subscription s) { + System.out.println("**** Start new worker thread"); + /* don't write real code like this! just for quick demo */ + new Thread(new Runnable() { + public void run() { + while (!terminated) { + Integer v = buffer.poll(); + try { + Thread.sleep(100); + } catch (Exception e) { + e.printStackTrace(); + } + if (buffer.size() < 3) { + s.request(BUFFER_SIZE - buffer.size()); + } + if (v != null) { + System.out.println(token + " => Did stuff with v: " + v); + } + } + } + }).start(); + } +} \ No newline at end of file diff --git a/api/src/examples/java/org/reactivestreams/example/unicast/UnicastExample.java b/api/src/examples/java/org/reactivestreams/example/unicast/UnicastExample.java new file mode 100644 index 00000000..59c7e197 --- /dev/null +++ b/api/src/examples/java/org/reactivestreams/example/unicast/UnicastExample.java @@ -0,0 +1,21 @@ +package org.reactivestreams.example.unicast; + +import org.reactivestreams.Publisher; + +public class UnicastExample { + + /** + * Each subscribe will start a new stream starting at 0. + * + * @param args + * @throws InterruptedException + */ + public static void main(String... args) throws InterruptedException { + Publisher dataStream = new InfiniteIncrementNumberPublisher(); + + dataStream.subscribe(new NumberSubscriberThatHopsThreads("A")); + Thread.sleep(2000); + dataStream.subscribe(new NumberSubscriberThatHopsThreads("B")); + } + +} diff --git a/api/src/main/java/org/reactivestreams/Publisher.java b/api/src/main/java/org/reactivestreams/Publisher.java new file mode 100644 index 00000000..3e094965 --- /dev/null +++ b/api/src/main/java/org/reactivestreams/Publisher.java @@ -0,0 +1,20 @@ +package org.reactivestreams; + +public interface Publisher { + + /** + * Request {@link Publisher} to start streaming data. + *

+ * This is a "factory method" and can be called multiple times, each time starting a new {@link Subscription}. + *

+ * Each {@link Subscription} will work for only a single {@link Subscriber}. + *

+ * A {@link Subscriber} should only subscribe once to a single {@link Publisher}. + *

+ * If the {@link Publisher} rejects the subscription attempt or otherwise fails it will + * signal the error via {@link Subscriber#onError}. + * + * @param s + */ + public void subscribe(Subscriber s); +} diff --git a/api/src/main/java/org/reactivestreams/Subscriber.java b/api/src/main/java/org/reactivestreams/Subscriber.java new file mode 100644 index 00000000..4d28b44a --- /dev/null +++ b/api/src/main/java/org/reactivestreams/Subscriber.java @@ -0,0 +1,55 @@ +package org.reactivestreams; + +/** + * Will receive call to {@link #onSubscribe(Subscription)} once after passing an instance of {@link Subscriber} to {@link Publisher#subscribe(Subscriber)}. + *

+ * No further notifications will be received until {@link Subscription#request(int)} is called. + *

+ * After signaling demand: + *

+ *

+ * Demand can be signalled via {@link Subscription#request(int)} whenever the {@link Subscriber} instance is capable of handling more. + * + * @param + */ +public interface Subscriber { + /** + * Invoked after calling {@link Publisher#subscribe(Subscriber)}. + *

+ * No data will start flowing until {@link Subscription#request(int)} is invoked. + *

+ * It is the responsibility of this {@link Subscriber} instance to call {@link Subscription#request(int)} whenever more data is wanted. + *

+ * The {@link Publisher} will send notifications only in response to {@link Subscription#request(int)}. + * + * @param s + * {@link Subscription} that allows requesting data via {@link Subscription#request(int)} + */ + public void onSubscribe(Subscription s); + + /** + * Data notification sent by the {@link Publisher} in response to requests to {@link Subscription#request(int)}. + * + * @param t + */ + public void onNext(T t); + + /** + * Failed terminal state. + *

+ * No further events will be sent even if {@link Subscription#request(int)} is invoked again. + * + * @param t + */ + public void onError(Throwable t); + + /** + * Successful terminal state. + *

+ * No further events will be sent even if {@link Subscription#request(int)} is invoked again. + */ + public void onComplete(); +} diff --git a/api/src/main/java/org/reactivestreams/Subscription.java b/api/src/main/java/org/reactivestreams/Subscription.java new file mode 100644 index 00000000..9c665bdd --- /dev/null +++ b/api/src/main/java/org/reactivestreams/Subscription.java @@ -0,0 +1,32 @@ +package org.reactivestreams; + +/** + * A {@link Subscription} represents a one-to-one lifecycle of a {@link Subscriber} subscribing to a {@link Publisher}. + *

+ * It can only be used once by a single {@link Subscriber}. + *

+ * It is used to both signal desire for data and cancel demand (and allow resource cleanup). + * + */ +public interface Subscription { + /** + * No events will be sent by a {@link Publisher} until demand is signaled via this method. + *

+ * It can be called however often and whenever needed. + *

+ * Whatever has been requested can be sent by the {@link Publisher} so only signal demand for what can be safely handled. + *

+ * A {@link Publisher} can send less than is requested if the stream ends but + * then must emit either {@link Subscriber#onError(Throwable)} or {@link Subscriber#onComplete()}. + * + * @param n + */ + public void request(int n); + + /** + * Request the {@link Publisher} to stop sending data and clean up resources. + *

+ * Data may still be sent to meet previously signalled demand after calling cancel as this request is asynchronous. + */ + public void cancel(); +} diff --git a/project/plugins.sbt b/project/plugins.sbt index a5ddf282..5128a075 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,3 +1,3 @@ -addSbtPlugin("de.johoop" % "sbt-testng-plugin" % "3.0.0") - addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.4.0") + +addSbtPlugin("de.johoop" % "sbt-testng-plugin" % "3.0.0") diff --git a/spi/src/main/java/org/reactivestreams/api/Consumer.java b/spi/src/main/java/org/reactivestreams/api/Consumer.java deleted file mode 100644 index d9696edd..00000000 --- a/spi/src/main/java/org/reactivestreams/api/Consumer.java +++ /dev/null @@ -1,21 +0,0 @@ -package org.reactivestreams.api; - -import org.reactivestreams.spi.Subscriber; - -/** - * A Consumer is the logical sink of elements of a given type. - * The underlying implementation is done by way of a {@link org.reactivestreams.spi.Subscriber Subscriber}. - * This interface is the user-level API for a sink while a Subscriber is the SPI. - *

- * Implementations of this interface will typically offer domain- or language-specific - * methods for transforming or otherwise interacting with the stream of elements. - */ -public interface Consumer { - - /** - * Get the underlying {@link org.reactivestreams.spi.Subscriber Subscriber} for this Consumer. This method should only be used by - * implementations of this API. - * @return the underlying subscriber for this consumer - */ - public Subscriber getSubscriber(); -} \ No newline at end of file diff --git a/spi/src/main/java/org/reactivestreams/api/Processor.java b/spi/src/main/java/org/reactivestreams/api/Processor.java deleted file mode 100644 index da522cd6..00000000 --- a/spi/src/main/java/org/reactivestreams/api/Processor.java +++ /dev/null @@ -1,10 +0,0 @@ -package org.reactivestreams.api; - -/** - * A Processor is a stand-alone representation of a transformation for - * elements from In to Out types. Implementations of this API will provide - * factory methods for creating Processors and connecting them to - * {@link org.reactivestreams.api.Producer Producer} and {@link org.reactivestreams.api.Consumer Consumer}. - */ -public interface Processor extends Consumer, Producer { -} diff --git a/spi/src/main/java/org/reactivestreams/api/Producer.java b/spi/src/main/java/org/reactivestreams/api/Producer.java deleted file mode 100644 index 3e8b5640..00000000 --- a/spi/src/main/java/org/reactivestreams/api/Producer.java +++ /dev/null @@ -1,38 +0,0 @@ -package org.reactivestreams.api; - -import org.reactivestreams.spi.Publisher; - -/** - * A Producer is the logical source of elements of a given type. - * The underlying implementation is done by way of a {@link org.reactivestreams.spi.Publisher Publisher}. - * This interface is the user-level API for a source while a Publisher is the - * SPI. - *

- * Implementations of this interface will typically offer domain- or language-specific - * methods for transforming or otherwise interacting with the produced stream of elements. - */ -public interface Producer { - - /** - * Get the underlying {@link org.reactivestreams.spi.Publisher Publisher} for this Producer. This method should only be used by - * implementations of this API. - * @return the underlying publisher for this producer - */ - public Publisher getPublisher(); - - /** - * Connect the given consumer to this producer. This means that the - * Subscriber underlying the {@link org.reactivestreams.api.Consumer Consumer} subscribes to this Producer’s - * underlying {@link org.reactivestreams.spi.Publisher Publisher}, which will initiate the transfer of the produced - * stream of elements from producer to consumer until either of three things - * happen: - *

- *

- * @param consumer The consumer to register with this producer. - */ - public void produceTo(Consumer consumer); -} diff --git a/spi/src/main/java/org/reactivestreams/spi/Publisher.java b/spi/src/main/java/org/reactivestreams/spi/Publisher.java deleted file mode 100644 index 7a4a2154..00000000 --- a/spi/src/main/java/org/reactivestreams/spi/Publisher.java +++ /dev/null @@ -1,16 +0,0 @@ -package org.reactivestreams.spi; - -/** - * A Publisher is a source of elements of a given type. One or more {@link org.reactivestreams.spi.Subscriber Subscriber} may be connected - * to this Publisher in order to receive the published elements, contingent on availability of these - * elements as well as the presence of demand signaled by the Subscriber via {@link org.reactivestreams.spi.Subscription#requestMore(int) requestMore}. - */ -public interface Publisher { - - /** - * Subscribe the given {@link org.reactivestreams.spi.Subscriber Subscriber} to this Publisher. A Subscriber can at most be subscribed once - * to a given Publisher, and to at most one Publisher in total. - * @param subscriber The subscriber to register with this publisher. - */ - public void subscribe(Subscriber subscriber); -} diff --git a/spi/src/main/java/org/reactivestreams/spi/Subscriber.java b/spi/src/main/java/org/reactivestreams/spi/Subscriber.java deleted file mode 100644 index c900229c..00000000 --- a/spi/src/main/java/org/reactivestreams/spi/Subscriber.java +++ /dev/null @@ -1,43 +0,0 @@ -package org.reactivestreams.spi; - -/** - * A Subscriber receives elements from a {@link org.reactivestreams.spi.Publisher Publisher} based on the {@link org.reactivestreams.spi.Subscription Subscription} it has. - * The Publisher may supply elements as they become available, the Subscriber signals demand via - * {@link org.reactivestreams.spi.Subscription#requestMore(int) requestMore} and elements from when supply and demand are both present. - */ -public interface Subscriber { - - /** - * The {@link org.reactivestreams.spi.Publisher Publisher} generates a {@link org.reactivestreams.spi.Subscription Subscription} upon {@link org.reactivestreams.spi.Publisher#subscribe(org.reactivestreams.spi.Subscriber) subscribe} and passes - * it on to the Subscriber named there using this method. The Publisher may choose to reject - * the subscription request by calling {@link #onError onError} instead. - * @param subscription The subscription which connects this subscriber to its publisher. - */ - public void onSubscribe(Subscription subscription); - - /** - * The {@link org.reactivestreams.spi.Publisher Publisher} calls this method to pass one element to this Subscriber. The element - * must not be null. The Publisher must not call this method more often than - * the Subscriber has signaled demand for via the corresponding {@link org.reactivestreams.spi.Subscription Subscription}. - * @param element The element that is passed from publisher to subscriber. - */ - public void onNext(T element); - - /** - * The {@link org.reactivestreams.spi.Publisher Publisher} calls this method in order to signal that it terminated normally. - * No more elements will be forthcoming and none of the Subscriber’s methods will be called hereafter. - */ - public void onComplete(); - - /** - * The {@link org.reactivestreams.spi.Publisher Publisher} calls this method to signal that the stream of elements has failed - * and is being aborted. The Subscriber should abort its processing as soon as possible. - * No more elements will be forthcoming and none of the Subscriber’s methods will be called hereafter. - *

- * This method is not intended to pass validation errors or similar from Publisher to Subscriber - * in order to initiate an orderly shutdown of the exchange; it is intended only for fatal - * failure conditions which make it impossible to continue processing further elements. - * @param cause An exception which describes the reason for tearing down this stream. - */ - public void onError(Throwable cause); -} diff --git a/spi/src/main/java/org/reactivestreams/spi/Subscription.java b/spi/src/main/java/org/reactivestreams/spi/Subscription.java deleted file mode 100644 index 3c8b1f47..00000000 --- a/spi/src/main/java/org/reactivestreams/spi/Subscription.java +++ /dev/null @@ -1,26 +0,0 @@ -package org.reactivestreams.spi; - -/** - * A Subscription models the relationship between a {@link org.reactivestreams.spi.Publisher Publisher} and a {@link org.reactivestreams.spi.Subscriber Subscriber}. - * The Subscriber receives a Subscription so that it can ask for elements to be delivered - * using {@link org.reactivestreams.spi.Subscription#requestMore(int) requestMore}. The Subscription can be disposed of by canceling it. - */ -public interface Subscription { - - /** - * Cancel this subscription. The {@link org.reactivestreams.spi.Publisher Publisher} to which produced this Subscription - * will eventually stop sending more elements to the {@link org.reactivestreams.spi.Subscriber Subscriber} which owns - * this Subscription. This may happen before the requested number of elements has - * been delivered, even if the Publisher would still have more elements. - */ - public void cancel(); - - /** - * Request more data from the {@link org.reactivestreams.spi.Publisher Publisher} which produced this Subscription. - * The number of requested elements is cumulative to the number requested previously. - * The Publisher may eventually publish up to the requested number of elements to - * the {@link org.reactivestreams.spi.Subscriber Subscriber} which owns this Subscription. - * @param elements The number of elements requested. - */ - public void requestMore(int elements); -} \ No newline at end of file diff --git a/tck/.gitignore b/tck/.gitignore new file mode 100644 index 00000000..5e56e040 --- /dev/null +++ b/tck/.gitignore @@ -0,0 +1 @@ +/bin diff --git a/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java b/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java index cf009d13..b72046eb 100644 --- a/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java +++ b/tck/src/main/java/org/reactivestreams/tck/IdentityProcessorVerification.java @@ -1,18 +1,17 @@ package org.reactivestreams.tck; -import org.reactivestreams.api.Processor; -import org.reactivestreams.spi.Publisher; -import org.reactivestreams.spi.Subscriber; -import org.reactivestreams.spi.Subscription; +import java.util.HashSet; +import java.util.Set; + +import org.reactivestreams.Subscription; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Publisher; import org.reactivestreams.tck.TestEnvironment.ManualPublisher; import org.reactivestreams.tck.TestEnvironment.ManualSubscriber; import org.reactivestreams.tck.TestEnvironment.ManualSubscriberWithSubscriptionSupport; import org.reactivestreams.tck.TestEnvironment.Promise; import org.testng.annotations.Test; -import java.util.HashSet; -import java.util.Set; - public abstract class IdentityProcessorVerification { private final TestEnvironment env; @@ -79,7 +78,7 @@ public Publisher createErrorStatePublisher() { * It must create a Processor, which simply forwards all stream elements from its upstream * to its downstream. It must be able to internally buffer the given number of elements. */ - public abstract Processor createIdentityProcessor(int bufferSize); + public abstract TestProcessor createIdentityProcessor(int bufferSize); /** * Helper method required for running the Publisher rules against a Processor. @@ -107,7 +106,7 @@ public Publisher createErrorStatePublisher() { // A Processor // must obey all Publisher rules on its producing side public Publisher createPublisher(int elements) { - Processor processor = createIdentityProcessor(testBufferSize); + TestProcessor processor = createIdentityProcessor(testBufferSize); Publisher pub = createHelperPublisher(elements); pub.subscribe(processor.getSubscriber()); return processor.getPublisher(); // we run the PublisherVerification against this @@ -190,7 +189,7 @@ public void mustNotCallOnCompleteOrOnErrorMoreThanOncePerSubscriber() { // A Processor // must obey all Subscriber rules on its consuming side public Subscriber createSubscriber(final SubscriberVerification.SubscriberProbe probe) { - Processor processor = createIdentityProcessor(testBufferSize); + TestProcessor processor = createIdentityProcessor(testBufferSize); processor.getPublisher().subscribe( new Subscriber() { public void onSubscribe(final Subscription subscription) { @@ -201,7 +200,7 @@ public void triggerShutdown() { } public void triggerRequestMore(int elements) { - subscription.requestMore(elements); + subscription.request(elements); } public void triggerCancel() { @@ -500,7 +499,7 @@ public abstract class TestSetup extends ManualPublisher { private TestEnvironment.ManualSubscriber tees; // gives us access to an infinite stream of T values private Set seenTees = new HashSet(); - final Processor processor; + final TestProcessor processor; final int testBufferSize; public TestSetup(TestEnvironment env, int testBufferSize) throws InterruptedException { diff --git a/tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java b/tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java index 7d0babd7..0624c54f 100644 --- a/tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java +++ b/tck/src/main/java/org/reactivestreams/tck/PublisherVerification.java @@ -1,10 +1,7 @@ package org.reactivestreams.tck; -import org.reactivestreams.spi.Publisher; -import org.reactivestreams.spi.Subscription; -import org.reactivestreams.tck.support.Optional; -import org.testng.SkipException; -import org.testng.annotations.Test; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; import java.lang.ref.ReferenceQueue; import java.lang.ref.WeakReference; @@ -14,9 +11,16 @@ import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; -import static org.reactivestreams.tck.TestEnvironment.*; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertTrue; +import org.reactivestreams.Subscription; +import org.reactivestreams.Publisher; +import org.reactivestreams.tck.TestEnvironment.Latch; +import org.reactivestreams.tck.TestEnvironment.ManualSubscriber; +import org.reactivestreams.tck.TestEnvironment.ManualSubscriberWithSubscriptionSupport; +import org.reactivestreams.tck.TestEnvironment.Promise; +import org.reactivestreams.tck.TestEnvironment.TestSubscriber; +import org.reactivestreams.tck.support.Optional; +import org.testng.SkipException; +import org.testng.annotations.Test; public abstract class PublisherVerification { @@ -222,7 +226,7 @@ public void subscriptionRequestMoreWhenCancelledMustIgnoreTheCall() throws Throw public void run(Publisher pub) throws InterruptedException { ManualSubscriber sub = env.newManualSubscriber(pub); sub.subscription.value().cancel(); - sub.subscription.value().requestMore(1); // must not throw + sub.subscription.value().request(1); // must not throw } }); } @@ -271,7 +275,7 @@ public void run(Publisher pub) throws Throwable { new Runnable() { @Override public void run() { - sub.subscription.value().requestMore(-1); + sub.subscription.value().request(-1); } }); @@ -281,7 +285,7 @@ public void run() { new Runnable() { @Override public void run() { - sub.subscription.value().requestMore(0); + sub.subscription.value().request(0); } }); sub.cancel(); diff --git a/tck/src/main/java/org/reactivestreams/tck/SubscriberVerification.java b/tck/src/main/java/org/reactivestreams/tck/SubscriberVerification.java index 9969f122..96b4f8c1 100644 --- a/tck/src/main/java/org/reactivestreams/tck/SubscriberVerification.java +++ b/tck/src/main/java/org/reactivestreams/tck/SubscriberVerification.java @@ -1,12 +1,15 @@ package org.reactivestreams.tck; -import org.reactivestreams.spi.Publisher; -import org.reactivestreams.spi.Subscriber; -import org.reactivestreams.spi.Subscription; +import org.reactivestreams.Subscription; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Publisher; +import org.reactivestreams.tck.TestEnvironment.Latch; +import org.reactivestreams.tck.TestEnvironment.ManualPublisher; +import org.reactivestreams.tck.TestEnvironment.ManualSubscriber; +import org.reactivestreams.tck.TestEnvironment.Promise; +import org.reactivestreams.tck.TestEnvironment.Receptacle; import org.testng.annotations.Test; -import static org.reactivestreams.tck.TestEnvironment.*; - public abstract class SubscriberVerification { private final TestEnvironment env; @@ -84,7 +87,7 @@ public void mustNotAcceptAnOnSubscribeEventIfItAlreadyHasAnActiveSubscription() // try to subscribe another time, if the subscriber calls `probe.registerOnSubscribe` the test will fail sub().onSubscribe( new Subscription() { - public void requestMore(int elements) { + public void request(int elements) { env.flop(String.format("Subscriber %s illegally called `subscription.requestMore(%s)`", sub(), elements)); } diff --git a/tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java b/tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java index fe0c79e1..24beceb2 100644 --- a/tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java +++ b/tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java @@ -1,9 +1,6 @@ package org.reactivestreams.tck; -import org.reactivestreams.spi.Publisher; -import org.reactivestreams.spi.Subscriber; -import org.reactivestreams.spi.Subscription; -import org.reactivestreams.tck.support.Optional; +import static org.testng.Assert.fail; import java.util.LinkedList; import java.util.List; @@ -12,7 +9,10 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import static org.testng.Assert.fail; +import org.reactivestreams.Subscription; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Publisher; +import org.reactivestreams.tck.support.Optional; public class TestEnvironment { public static final int TEST_BUFFER_SIZE = 16; @@ -183,7 +183,7 @@ public void onComplete() { } public void requestMore(int elements) { - subscription.value().requestMore(elements); + subscription.value().request(elements); } public T requestNextElement() throws InterruptedException { @@ -343,7 +343,7 @@ public void subscribe(Subscriber s) { Subscription subs = new Subscription() { @Override - public void requestMore(int elements) { + public void request(int elements) { requests.add(elements); } diff --git a/tck/src/main/java/org/reactivestreams/tck/TestProcessor.java b/tck/src/main/java/org/reactivestreams/tck/TestProcessor.java new file mode 100644 index 00000000..3c6f335c --- /dev/null +++ b/tck/src/main/java/org/reactivestreams/tck/TestProcessor.java @@ -0,0 +1,15 @@ +package org.reactivestreams.tck; + +import org.reactivestreams.Subscriber; +import org.reactivestreams.Publisher; + +/** + * The TCK uses this to pull together the 2 sides of {@link Publisher} and {@link Subscriber}. + */ +public interface TestProcessor extends Publisher, Subscriber { + + Subscriber getSubscriber(); + + Publisher getPublisher(); + +}