Skip to content

API/SPI Combination + Contract Details and Examples #37

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 20 commits into from

Conversation

benjchristensen
Copy link
Contributor

Based on discussion at #19 (comment) I propose merging this into master so as to move forward on these items:

  • Combine API and SPI into a single API for both interop and direct usage
  • Clarify contracts and behavior, particularly related to multi-subscribe/single-subscribe and multicast/unicast.

This supersedes #25 and will complete the discussions in #19, #24, #23 and #21.

The code compiles using sbt (I have updated the TCK as well). Running the 'test' task appears to do nothing (before and after these changes).

This is by no means a final proposal nor is it considered ready for final discussion, but it makes progress and allows us to move all of the conversations forward.

Known discussion points still include:

I'm sure there are many others...

Do we have enough agreement to merge this and continue discussing?

A simplified implementation where SPI and API combine.
--------------------------

- `Subscriber` can be used once-and-only-once to subscribe to a `Publisher`.
- a `Subscription` can be used once-and-only-once to represent a subscription by a `Subscriber` to a `Publisher`.
- The `Publisher.subscribe` method can be called as many times as wanted as long as it is with a different `Subscriber` each time. It is up to the `Publisher` whether underlying streams are shared or not.
- A `Publisher` can refuse subscriptions (calls to `subscribe`) if it is unable or unwilling to serve them (overwhelmed, fronting a single-use data sources, etc) and can do so by immediately calling `Subscriber.onError` on the `Subscriber` instance calling `subscribe`.
- Events sent to a `Subscriber` can only be sent sequentially (no concurrent notifications).
- Once an `onComplete` or `onError` is sent, no further events can be sent.
- Once a `Subscription` is cancelled, the `Publisher` will stop sending events as soon as it can.
- A `Publisher` will never send more `onNext` events than have been requested via the `Subscription.request/signalDemand` method. It can send less events than requested and end the subscription by emitting `onComplete` or `onError`.
@rkuhn
Copy link
Member

rkuhn commented Apr 22, 2014

I’ll have to defer digging into this until Thursday morning due to PhillyETE.

@benjchristensen
Copy link
Contributor Author

Thanks Roland, hope your presentation went well at PhillyETE.

@rkuhn
Copy link
Member

rkuhn commented Apr 23, 2014

The more interesting—and related—one lies still ahead of me ;-)

@benjchristensen
Copy link
Contributor Author

Well good luck then and have fun with it!

**The SPI** defines the interoperablility layer between different implementations.

**The API** specifies the types that the users of Reactive Stream libraries use.
**The API** specifies the types to implement Reactive Streams and achieve interoperablility between different implementations.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer to keep calling this the SPI, since we want to express that users are not supposed to implement any of these interfaces directly or invoke anything besides Publisher.subscribe().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why wouldn't a user be able to implement a Subscriber? Yes they need to think about it (invoke subscription.request(n) correctly) but this is not difficult stuff to implement if following the interface.

I think it diminishes this standard if we don't design the types for broad use by all developers, not just "library authors".

Perhaps right now we'll all hide it behind our various library types, but if we truly intend this to become part of a future JDK then these types will be exposed and implemented by many and we should assume this from the start.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I had in mind is somewhat like java.nio.channels.spi.*: sure, some people will implement it, but the vast majority will just use readily made implementations coming with the JDK or other libraries.

When I wrote the above, I had not yet scrolled down to see that you moved things out of the spi package, which makes sense if we assume that we will never add any real end-user API, because then we would mix up different target audiences in the same package (and the NIO example shows that there is precedent in the JDK for keeping these things separate). I’m fine with that, but we should be clear about this decision. In this case the question of API vs SPI becomes purely one of how we talk about it in the documentation, to clarify the intent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some people will implement it, but the vast majority will just use readily made implementations coming with the JDK or other libraries.

I agree with this.

I chose "API" for the combined types since that feels like a generic term to me just to represent the public types we are exposing for the standard once we eliminate a division between SPI and API.

I not only eliminated the spi package, but the api package as well so the types just live directly inside org.reactivestreams.

we should be clear about this decision

Definitely, and that's what I'm proposing this pull request for, to combine the API and SPI into a single set of types that serve the combined purpose.

I fully recognize that most people will use prebuilt implementations. The types though (even if using an implementation) are very good at communicating intent and contract, and are good public interaction points. Similar to how in code we often use Iterable as a type we pass around but rarely implement it directly. This is what I'm aiming for with this.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @benjchristensen here. It is very, very likely that people will want to implement and interact with the base types as users as well as library authors. It also makes the library much more extensible if you can easily extend the base types provided by the library by simply overriding a method and providing your own twist on the functionality without having to implement an entire Reactive Streams implementation. I don't see a downside to exposing these types directly to whomever may wish to implement them in whatever way they wish.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just because something requires thought does not mean we hide it. We just need to make things clear rather than confusing and nuanced.

Also, if the Subscriber is that hard to implement we have failed. See my comment at #37 (comment) for how the Subscriber can be simplified. The Subscription is the more complicated item to implement, and if people are going to do so they need to comply with the contract.

A Subscriber should be dead simple to implement, and it really is.

If we make it the responsibility of Subscription.request to be async, then a Subscriber can directly invoke Subscription.request without event loops, trampolines etc which is currently the most nuanced complication of the spec.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A Subscriber should be dead simple to implement, and it really is.

Again, I quote the same sentence from below: "The Reactive Streams API prescribes that all processing of elements (onNext) or termination signals (onError, onComplete) happens outside of the execution stack of the Publisher."

Maybe there's still a basic misunderstanding about how this rule is meant. Our previous understanding was that all the asynchronous handoff happened inside the onX methods (and the same for request/cancel). So, it previously meant that the caller of the onX method never has to care for scheduling but it's always the responsibility of the callee to ensure proper scheduling. Has this changed?

In this previous sense, I wouldn't call it "dead simple" to implement a Subscriber in a way that complies with this rule.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think "happens outside of the execution stack of the Publisher" needs to be deleted or changed.

The only rule that a Subscriber should have is "don't block".

If it is doing synchronous transformations/filtering/etc, that's fine. If it's blocking a thread on IO, that's not fine.

I should be able to write code like this:

void onNext(T t) {
  queue.offer(transform(t));
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@benjchristensen Assume I use the RingBuffer for dispatching. In your description, the RingBuffer.tryNext() calls would happen in the Subscription and if no slots were available, then that would be the situation where one would need to apply backpressure. The Subscriber would basically be the RingBuffer's EventHandler and be under the same restrictions it is today without Reactive Streams: "don't block".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, my mental picture of that isn't quite clear ... is the RingBuffer the source of the data, or where you're buffering it in the Subscriber?

If in the Subscriber, the Subscription.request will never be larger than slots available in the ring so it would never overflow.

If in the Publisher, then the Subscription would represent the reader (tail pointer) that is following the head as it receives requests via request(n).

if no slots were available, then that would be the situation where one would need to apply back pressure

I don't see in our model how we'd find out we need to apply back pressure once no space. We are requesting up front with a known amount so we should never hit a scenario where we "run out".

Line 61: The number of onNext events emitted by a Publisher to a Subscriber will at no point in time exceed the cumulative demand that has been signaled via that Subscriber’s Subscription.
/**
* Request {@link Publisher} to start streaming data.
* <p>
* This is a "factory method" and can be called multiple times, each time starting a new {@link Subscriber}.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

“… starting a new Subscription”, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, good catch.

As per discussion with Roland.
Not thrilled with this ... but it's a starting point:

- Calls from a `Subscriber` to `Subscription` such as `Subscription.request(int n)` must be dispatched asynchronously (separate thread, event loop, trampoline, etc) so as to not cause a StackOverflow since `Subscriber.onNext` -> `Subscription.request` -> `Subscriber.onNext` can recurse infinitely.
@benjchristensen
Copy link
Contributor Author

Items I intend to open issues on after this is merged include:

  • naming of everything
  • void onSubscribe vs Subscription subscribe
  • support for synchronous subscribe (ie. `request(-1))
  • better rewrite of contract with help of a technical writer
  • how this relates to network protocols for interaction with other languages

I'm sure there are plenty more things to discuss ...

- The `Publisher.subscribe` method can be called as many times as wanted as long as it is with a different `Subscriber` each time. It is up to the `Publisher` whether underlying streams are shared or not. In other words, a `Publisher` can support multi-subscribe and then choose whether each `Subscription` is unicast or multicast.
- A `Publisher` can refuse subscriptions (calls to `subscribe`) if it is unable or unwilling to serve them (overwhelmed, fronting a single-use data source, etc) and can do so by calling `Subscriber.onError` instead of `Subscriber.onSubscribe` on the `Subscriber` instance calling `subscribe`.
- A `Publisher` should not throw an `Exception`. The only legal way to signal failure (or reject a `Subscription`) is via the `Subscriber.onError` method.
- The `Subscription.request` method must behave asynchronously (separate thread, event loop, trampoline, etc) so as to not cause a StackOverflow since `Subscriber.onNext` -> `Subscription.request` -> `Subscriber.onNext` can recurse infinitely. This allows a `Subscriber` to directly invoke `Subscription.request` and isolate the async responsibility to the `Subscription` instance which has responsibility for scheduling events.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really like this! Makes code like this legit:

public void onNext(T elem) {
   if (isStillValid(elem)) {
     process(elem);
     subscription.request(1); // acknowlegement, next please
  }
  else
     subscription.cancel(); // no longer interested, stop please
}

+1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another possible approach for async processing is like this with an event loop:

void process() {
   eventLoop.schedule(() -> {
        T t;
        while((t = queue.poll()) != null) {
            doWork(t);
            if(queue.size() < THRESHOLD) {
                subscription.request(queue.capacity());
            }
        }
   })
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In principle we are free to choose either way to break up the potential infinite loop: allowing synchronous Publisher or allowing synchronous Subscriber. What we have currently allows synchronous Publishers and disallows synchronous Subscribers. The reasoning behind that choice was that producing a stream from a strict collection should be cheap. With this proposed change there would be a huge cost for this rather common use case, meaning that streaming a List would necessarily need to involve a task scheduler (e.g. thread pool or actor). Can you please elaborate how this cost is offset and amortized by the benefits of allowing a synchronous Subscriber?

@benjchristensen
Copy link
Contributor Author

All ... this will be much easier if we merge this and allow the various conversations to proceed independently on their own issues or subsequent pull requests.

@jbrisbin
Copy link

+1 LGTM

@rkuhn
Copy link
Member

rkuhn commented Apr 24, 2014

@benjchristensen Just a comment on process: it would be better to break up the proposed PR instead of merging a big set of changes and then fixing things up later, that makes it easier to follow the conversation now and later. In this case, there are many changes we already agree upon, so we could merge those in one go—including the fix for #39 once that has enough votes because I would be uncomfortable with merging a spec change which leaves the TCK incorrect and broken.

@benjchristensen
Copy link
Contributor Author

Then I suggest deleting the TCK along with this PR because the types and the contract (defined in the README) go hand in hand.

@benjchristensen
Copy link
Contributor Author

In other words, I don't know what I'd break up about this PR.

@rkuhn
Copy link
Member

rkuhn commented Apr 24, 2014

For example the textual changes I just commented on ;-) (sync Subscriber vs. sync Publisher)

You have described a lot of semantic changes beyond the original proposal—that was only about removing the required semantics for multiple subscriptions to the same Publisher and you have gone way beyond that and basically rewritten and partially inverted the whole proposal. I think it would be conducive to more focused discussion if we would break that up into individual points.

rkuhn added a commit that referenced this pull request Apr 24, 2014
Squash of #37
Work as a result of discussion in #19
Removes TCK implementation so it is not out of sync with API as per #39
benjchristensen added a commit that referenced this pull request Apr 24, 2014
Subset of discussion at #37
Work as a result of discussion in #19
@benjchristensen
Copy link
Contributor Author

I have extracted out just the type changes into #40

It does not touch the README file (which is no longer correct or in sync with the code) nor does it remove the TCK.

@benjchristensen
Copy link
Contributor Author

Closing this in favor of smaller PRs.

@benjchristensen
Copy link
Contributor Author

The README is now in #41

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants