Skip to content

2.x Design: Subject #3349

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 21, 2015
Merged

2.x Design: Subject #3349

merged 1 commit into from
Sep 21, 2015

Conversation

benjchristensen
Copy link
Member

Clarification of Subject that affects implementation.

Related to discussion in #3345.

Clarification of `Subject` that affects implementation. 

Related to discussion in #3345.
@benjchristensen benjchristensen added this to the 2.0 milestone Sep 16, 2015
Relation to Reactive Streams

- It can not implement Reactive Streams `Publisher` unless it is created with a default flow control strategy.
- It can not implement `Processor` since a `Processor` must compose `request(n)` which can not be done with multicasting or pure push.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've read the spec but I haven't seen an implication of this. The only rule I can deduce is that Processor may not overflow its subscribers. Nothing about request coordination requirement. This is what publish() is for.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

4.1: A Processor represents a processing stage—which is both a Subscriber and a Publisher and MUST obey the contracts of both.

Thus, it has to obey the rules of a Publisher which can't overflow its subscribers as per rule 1.1:

1.1 The total number of onNext signals sent by a Publisher to a Subscriber MUST be less than or equal to the total number of elements requested by that Subscriber´s Subscription at all times.

I strongly argued against Processor as it looks like a Subject but can't be used as one. It is actually more like the RxJava Operator, but requires subscription rather than lifting into the observer chain.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that this is exactly why we stopped using Subjects inside RxJava v1 Observable operators, such as publish(), and why we removed multicast(Subject s).

A Subject can not compose request(n). That's why v1 Observable.publish() behaves very differently than a PublishSubject and will slow down to the slowest consumer.

@benjchristensen
Copy link
Member Author

Ping @akarnokd

@benjchristensen
Copy link
Member Author

@ReactiveX/rxjava-committers Can this PR be merged?

@akarnokd
Copy link
Member

I disagree with the two statements.

@benjchristensen
Copy link
Member Author

Please explain why you disagree and your proposed solution.

@akarnokd
Copy link
Member

Relation to Reactive Streams

  • It can not implement Reactive Streams Publisher unless it is created with a default flow control strategy.

Subjects can implement Publisher if they don't overflow their clients that are of type org.reactivestreams.Subscriber.

In other words, there could be AsyncProcessor, PublishProcessor, ReplayProcessor, etc. that implement org.reactivestreams.Processor but have default backpressure strategy coded in: PublishProcessor and BehaviorProcessor fail on overflow, the rest buffers/replays. The fail behavior can be avoided via onBackpressure operator that drops/buffers/latests values.

What we call PublishSubject now can then be non-backpressured by extending NbpObservable and taking NbpSubscribers. These are naturally not RS because they have different type and structure.

  • It can not implement Processor since a Processor must compose request(n) which can not be done with multicasting or pure push.

Again, I don't see any mention of composing requests through a processor in the spec, but you could ask them (they're less likely to answer me lately).

@benjchristensen
Copy link
Member Author

Again, I don't see any mention of composing requests through a processor in the spec

There is a very, very, long discussion about multicasting here: reactive-streams/reactive-streams-jvm#19 (comment)

Other related discussions in reactive-streams/reactive-streams-jvm#22 and reactive-streams/reactive-streams-jvm#37

In short, Reactive Streams chose to not say anything about whether a Publisher can support multicast, as it is up to the implementation – but it still requires the Publisher to not send more than each Subscriber has requested.

This means some variants of an Rx Subject could be made to work, but generically it can not. A PublishSubject can not implement Publisher, thus the base type Subject can not.

Subjects can implement Publisher if they don't overflow their clients that are of type org.reactivestreams.Subscriber.

Yup, but since PublishSubject can't comply with that, the base type Subject can't implement Publisher.

'Subjects' are "hot", and Publisher is not suited to "hot" sources. They can only represent a "hot" source if a backpressure strategy is applied, and that's not what Subjects are.

@akarnokd
Copy link
Member

A PublishSubject can not implement Publisher

Yet it does and works fine. It does not overflow the client and doesn't drop values silently either; if the client can't keep up, it will receive an error. The developer must think about what should happen in case the client can't keep up, perhaps increase the buffer size in observeOn, perhaps it is okay to drop values, perhaps there are some transient bursts and onBackpressureBuffer is enough.

I've read through the linked discussion and it feels there is a lot of self-handcuffing going on. Restricting a Publisher the work exactly once is what Iterator does: there is no going back or re-iteration and this is also what happens with a j.u.Stream. I have the feeling the way Akka was implemented attempted to sneak into the specification via this single-use: actors don't know about what we can call "channels" but only messages. The second problem I see with RS is that in some people's mind, there is only one stage: a source and a consumer and they can't imagine a chain of operators we deal with RxJava all the time where aspects are distributed along distinct operators. This also reminds me of an article which detailed why CORBA failed; one of the reasons being that proposers where not required to reference-implement their proposals which would have brought out the problems early on.

So if you think PublishSubject violates the RS spec or fails a TCK, then show me a code example of such violation.

@benjchristensen
Copy link
Member Author

1.1 The total number of onNext signals sent by a Publisher to a Subscriber MUST be less than or equal to the total number of elements requested by that Subscriber´s Subscription at all times.

A PublishSubject breaks rules 1.1, as it does not obey the request(n) from the Subscribers. It is not okay to onError if the Publisher is emitting more than requested. That means the Publisher does not comply with rule 1.1.

Here is the example:

package io.reactivesocket;

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import io.reactivex.subjects.PublishSubject;

public class SubjectContract {

    public static void main(String... args) {
        PublishSubject<String> s = PublishSubject.create();

        s.subscribe(new Subscriber<String>() {

            @Override
            public void onSubscribe(Subscription s) {
                s.request(10);
            }

            @Override
            public void onNext(String t) {
                System.out.println("A Received: " + t);
            }

            @Override
            public void onError(Throwable t) {
                t.printStackTrace();
            }

            @Override
            public void onComplete() {

            }

        });

        s.subscribe(new Subscriber<String>() {

            @Override
            public void onSubscribe(Subscription s) {
                s.request(50);
            }

            @Override
            public void onNext(String t) {
                System.out.println("B Received: " + t);
            }

            @Override
            public void onError(Throwable t) {
                t.printStackTrace();
            }

            @Override
            public void onComplete() {

            }

        });

        for(int i=0; i < 1000; i++) {
            s.onNext(String.valueOf(i));
        }
    }
}

This results in an error on both Subscribers, which is the overflow strategy you have chosen to apply. That does not mean it's not overflowing, just because it stops emitting to onNext.

A Received: 0
B Received: 0
A Received: 1
B Received: 1
A Received: 2
B Received: 2
A Received: 3
B Received: 3
A Received: 4
B Received: 4
A Received: 5
B Received: 5
A Received: 6
B Received: 6
A Received: 7
B Received: 7
A Received: 8
B Received: 8
A Received: 9
B Received: 9
B Received: 10
io.reactivex.exceptions.MissingBackpressureException: Could not emit value due to lack of requestsB Received: 11
    at io.reactivex.subjects.PublishSubject$PublishSubscriber.onNext(PublishSubject.java:310)
    at io.reactivex.subjects.PublishSubject.onNext(PublishSubject.java:86)
    at io.reactivesocket.SubjectContract.main(SubjectContract.java:62)

B Received: 12
B Received: 13
B Received: 14
B Received: 15
B Received: 16
B Received: 17
B Received: 18
B Received: 19
B Received: 20
B Received: 21
B Received: 22
B Received: 23
B Received: 24
B Received: 25
B Received: 26
B Received: 27
B Received: 28
B Received: 29
B Received: 30
B Received: 31
B Received: 32
B Received: 33
B Received: 34
B Received: 35
B Received: 36
B Received: 37
B Received: 38
B Received: 39
B Received: 40
B Received: 41
B Received: 42
B Received: 43
B Received: 44
B Received: 45
B Received: 46
B Received: 47
B Received: 48
B Received: 49
io.reactivex.exceptions.MissingBackpressureException: Could not emit value due to lack of requests
    at io.reactivex.subjects.PublishSubject$PublishSubscriber.onNext(PublishSubject.java:310)
    at io.reactivex.subjects.PublishSubject.onNext(PublishSubject.java:86)
    at io.reactivesocket.SubjectContract.main(SubjectContract.java:62)

1.9 Publisher.subscribe MUST call onSubscribe on the provided Subscriber prior to any other signals to that Subscriber and MUST return normally, except when the provided Subscriber is null in which case it MUST throw a java.lang.NullPointerException to the caller, for all other situations the only legal way to signal failure (or reject the Subscriber) is by calling onError (after calling onSubscribe).

A Publisher can not call onNext without first calling onSubscribe, and then only emitting based on request(n).

However, the intent and design of a Subject is for a producer to emit to it directly without knowledge of consumers.

PublishSubject<String> s = PublishSubject.create();
for(int i=0; i < 1000; i++) {
    s.onNext(String.valueOf(i));
}

So let's see what happens when I do use onSubscribe on a PublishSubject:

package io.reactivesocket;

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import io.reactivex.subjects.PublishSubject;

public class SubjectContract {

    public static void main(String... args) {
        PublishSubject<String> s = PublishSubject.create();

        s.onSubscribe(new Subscription() {

            @Override
            public void request(long n) {
                System.out.println("Requested: " + n);
            }

            @Override
            public void cancel() {
                // what does it mean to cancel here?
            }


        });


        s.subscribe(new Subscriber<String>() {

            @Override
            public void onSubscribe(Subscription s) {
                s.request(10);
            }

            @Override
            public void onNext(String t) {
                System.out.println("A Received: " + t);
            }

            @Override
            public void onError(Throwable t) {
                t.printStackTrace();
            }

            @Override
            public void onComplete() {

            }

        });

        s.subscribe(new Subscriber<String>() {

            @Override
            public void onSubscribe(Subscription s) {
                s.request(50);
            }

            @Override
            public void onNext(String t) {
                System.out.println("B Received: " + t);
            }

            @Override
            public void onError(Throwable t) {
                t.printStackTrace();
            }

            @Override
            public void onComplete() {

            }

        });

        for(int i=0; i < 1000; i++) {
            s.onNext(String.valueOf(i));
        }
    }
}

I received Long.MAX_VALUE, completely ignoring the downstream requested amounts from then consumers:

Requested: 9223372036854775807

So, rule 1.9 is broken.

And what does a cancel mean on a Subject? It can't mean anything, because a Subject decouples the producer from the consumer.

1.8 If a Subscription is cancelled its Subscriber MUST eventually stop being signaled.

Thus, rule 1.8 is also broken.

A Subject can not implement Publisher.

@benjchristensen
Copy link
Member Author

Related to this is how we determined that Subject can not even be used in v1 operator chains where request(n) is being used:

@akarnokd
Copy link
Member

If your interpretation is true, no operator can implement RS at all and no source, hot or cold is allowed.

  • the moment there is an exception, request(n) is violated because there wasn't n onNexts
  • the moment an operator such as observeOn doesn't request the same amount as its child, request(n) is violated.
  • if there is an infinite source or unbounded subscriber, cancel is never called thus rule 1.8 is violated.

@stealthcode
Copy link

I am convinced that the PublishSubject cannot be a Processor. In order for a thing to offer stream fan-in/fan-out functionality then a new Subscription must be produced for every subscriber and back pressure requests must be maintained for each separately. The use case of a subject is different from a processors intended use.

@benjchristensen
Copy link
Member Author

the moment there is an exception, request(n) is violated because there wasn't n onNexts

onError signals are not part of request(n). Rule 1.1 only talks about onNext. Rule 1.4 says exceptions are emitted via onError.

Additionally:

  • 2.9 A Subscriber MUST be prepared to receive an onComplete signal with or without a preceding Subscription.request(long n) call.
  • 2.10 A Subscriber MUST be prepared to receive an onError signal with or without a preceding Subscription.request(long n) call.

the moment an operator such as observeOn doesn't request the same amount as its child, request(n) is violated.

No, because observeOn decouples the flow. It can request(128) up as long as it can handle receipt and buffer it, and then respond correctly to its consumer.

Thus, a fully unbounded ReplaySubject could satisfy request(n) semantics since it could act like an infinite observeOn. This does not however apply generically to Subject or PublishSubject, and not a bounded ReplaySubject.

if there is an infinite source or unbounded subscriber, cancel is never called thus rule 1.8 is violated.

Huh? 1.8 does not say cancel MUST be invoked. It just says that if it is invoked then the source must stop emitting:

  • 1.8 If a Subscription is cancelled its Subscriber MUST eventually stop being signaled.

@akarnokd
Copy link
Member

Let's assume there is a non-backpressure PublishSubject, not Flowable and not implementing RS but it is a NbpObservable. I then take this subject, cast it to NbpObservable, call toFlowable(BackpressureStrategy.FAIL) and I have a Flowable. Is this Flowable conforming with the RS spec? If not, there any legal way of implementing any toFlowable at all?

@benjchristensen
Copy link
Member Author

That example is now okay, because when you converted from Subject to Flowable, you chose to add a BackpressureStrategy that defines flow control. That Flowable conforms to the spec.

@benjchristensen
Copy link
Member Author

The issue with Subject implementing Publisher is that Publisher has this method:

 public void subscribe(Subscriber<? super T> s);

If it was instead:

 public void subscribe(Subscriber<? super T> s, BackpressureStrategy strategy);

... then it would be okay, and that is the same as toFlowable(BackpressureStrategy s).

@akarnokd
Copy link
Member

@benjchristensen Then why is it wrong if PublishSubject does what you just replied: sends an error outside requested amount, decouples the flow and doesn't call cancel to upstream?

@benjchristensen
Copy link
Member Author

Because you are calling an error on the Subscriber, which is due to overflowing, which is the fault of the Publisher for not obeying the request(n).

In the toFlowable() case, the Subscriber does not receive an error.

@akarnokd
Copy link
Member

Also, do you recognize that what I described in the NbpObservable->Flowable example is what the current 2.0 PublishSubject does in one step?

@benjchristensen
Copy link
Member Author

Yes I do, but you decide a default backpressure strategy of "FAIL", which is not okay to do. It means you are ignoring the request(n) of the subscribers that subscribe to this method:

 public void subscribe(Subscriber<? super T> s);

Ignoring the request(n) breaks rule 1.1.

We can not choose a default flow control strategy that defeats the purpose of request(n). That breaks the RS contract, and the whole point of Reactive Streams.

@akarnokd
Copy link
Member

The default can be overruled by any of the onBackpressureXXX operators.

Do error(), empty(), never() and just() ignore request(n)? If yes, then we can't have them as flowables.

@benjchristensen
Copy link
Member Author

That's missing the point. The user is opting into those forms of flow control. The user can choose whatever request(n) behavior they want. Operators are a mechanism for them to control request(n).

A Subject ignoring the request(n) though is not allowed.

@benjchristensen
Copy link
Member Author

Don't ignore the other side of this as well, the purpose of a Subject existing is for a producer to do "hot" emission to onNext. This means they will not call onSubscribe. This breaks 1.9 as I explained above.

@akarnokd
Copy link
Member

If this is about the lack of call to PublishSubject.onSubscribe then one should call it with something.

Such call is practically no-op because PublishSubject is an unbounded Subscriber. Is being an unbounded Subscriber a violation in general or only if said Subscriber is a Subject?

If the latter, then all Subjects should be banned and neither groupBy() nor window() should be allowed.

@benjchristensen
Copy link
Member Author

Is being an unbounded Subscriber a violation in general

Being an unbounded subscriber is fine. Being an unbounded Publisher is not.

Subjects are hot, push. They do not participate in backpressure. Stop trying to apply the same rules to all types, that is why we are splitting the types in #3348. We will have a Subject that implements Observable, not Flowable.

@akarnokd
Copy link
Member

I can't disprove a definition which this thread has become now. In addition, reviewing anything is now quite difficult because I can't predict which rule applies when and when will a rule suddenly mean its opposite.

@stealthcode
Copy link

I see no logical fault in reasoning 👍. We can debate exactly what subjects are later but it is clear to me that the assumptions established by the RS Processor do not allow for multicast subjects (as they operate today).

@benjchristensen
Copy link
Member Author

Thanks @stealthcode for reviewing and weighing in. As you allude to, this PR is simply attempting to document what a Subject is, and align with what is in the Reactive Streams spec.

@benjchristensen
Copy link
Member Author

@akarnokd Any further thoughts on the text of this PR? I'd like to move forward to other items of design if we can move past this one.

@akarnokd
Copy link
Member

I disagree with the text. No further comments on this topic.

@stevegury
Copy link
Member

I think this PR makes the distinction between Subject and reactive streams Publisher more apparent which is good.

Regarding the argument, I'm convinced that PublishSubject can't be a Processor without implementing a backpressure strategy.

@benjchristensen
Copy link
Member Author

@akarnokd You can't disagree and then abstain from providing alternatives when evidence has been provided supporting this text. Are you removing yourself from the design discussion and letting the rest of us make the decision?

@benjchristensen
Copy link
Member Author

@stevegury thank you for your review of the topic and weighing in.

@akarnokd
Copy link
Member

You can't disagree and then abstain from providing alternatives when evidence has been provided supporting this text.

Don't put is on me. In my understanding, there is no problem with PublishSubject being a Processor as it by default doesn't overflow its children but sends them an error. I've also mentioned that you can apply onBackpressureXXX on a PublishSubject and change this behavior to be dropping, buffering or using the latest. This is nothing different than applying onErrorResumeNext() on a source which is likely to emit an error. For example:

subject.onErrorResumeNext(
    e -> e instanceof MissingBackpressureException 
        ? subject.onBackpressureDrop() : Observable.error(e));

This will resubscribe with a drop strategy to the subject to avoid any further errors.

But if your problem is that PublishSubject defaults to error out, I see the option to add a BackpressureStrategy parameter to PublishSubject.create() so there is no "surprise" to those who wouldn't read the specification anyways regarding default behavior.

The final alternative I can offer is something like the SubmissionPublisher from Doug Lea which pushes the rejection handling to the caller of submit or offer.

Are you removing yourself from the design discussion and letting the rest of us make the decision?

I'm outnumbered anyways. Does this mean whenever I disagree with something in the future, you either keep asking until I change my mind or simply outrule me?

Perhaps this is a good time to ask for insight from the guys who maintain the RS spec or the persons who are not Neflix or me.

@benjchristensen
Copy link
Member Author

the guys who maintain the RS spec

I'm one of them => https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.0/CONTRIBUTING.md#gatekeepers But if we can get someone else to spend time on this with us, we can get others involved. Perhaps @smaldini could give us some time.

as it by default doesn't overflow its children but sends them an error

This is the key problem so let me summarize this for anyone else catching up on this thread. A Publisher in "Reactive Streams" is supposed to respect the request(n) from a Subscriber. To respect means that it won't emit more than requested. It doesn't mean that it sends onError to the Subscriber if the Subscriber can't keep up. The whole point of "Reactive Streams" is to slow down the producer to the rate of the consumer. Sending an error to the consumer when it is slow is counter to its entire purpose. In "Reactive Streams", the consumer is in control. An Rx Subject on the other hand puts the producer in control.

@benjchristensen
Copy link
Member Author

I'm merging this based on feedback from @stevegury and @stealthcode and my understanding of Reactive Streams based on my involvement in defining that contract. The Design.md document still has a long way to go and we'll have many more weeks of discussions on design. This is but one small point.

benjchristensen added a commit that referenced this pull request Sep 21, 2015
@benjchristensen benjchristensen merged commit 09d5903 into 2.x Sep 21, 2015
@benjchristensen benjchristensen deleted the v2-design-subject branch September 21, 2015 16:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants