Skip to content

2.x Design: Flowable/Observable #3348

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 21, 2015
Merged

2.x Design: Flowable/Observable #3348

merged 1 commit into from
Sep 21, 2015

Conversation

benjchristensen
Copy link
Member

Proposed separation of Observable and Flowable along with description of each and their characteristics.

Proposed separation of `Observable` and `Flowable` along with description of each and their characteristics.
@benjchristensen benjchristensen added this to the 2.0 milestone Sep 16, 2015
@benjchristensen
Copy link
Member Author

Related to discussion in #2787 (comment)

@benjchristensen benjchristensen changed the title Flowable/Observable 2.x Design: Flowable/Observable Sep 16, 2015

Flow control support:

- buffering, sampling, throttling, windowing, dropping, etc
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that sampling by itself still can overflow a client if said client doesn't request fast enough or big enough. Same is true for throttling and non-count based buffering (such as with boundary Publisher). Window is a bit both since the outer sequence may overflow the client but the inners can't because UnicastSubject buffers values (see subscription gap problem) and replays them with as the client requests.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, but they are still approaches to flow control.

@benjchristensen
Copy link
Member Author

@ReactiveX/rxjava-committers Is there agreement to support Observable and Flowable as independent types as described by this PR?

If there are questions needing answering before you can respond, please ask the questions now so we can move forward.

I have posted lengthy explanations of reasoning here:

Another example supporting the need of the two types is that Subject can not implement Publisher: #3349

@benjchristensen
Copy link
Member Author

Here are some ideas on API design and comparisons between the types:

Push

Observable.create(s -> {
   s.onNext(t)
   ...
   s.onNext(t)
})

would be equivalent to this if we chose to have these APIs:

Flowable.createPush(s -> {
   s.onNext(t)
   ...
   s.onNext(t)
}, BackpressureStrategy.BUFFER)

Pull-Push/Async Pull

Observable does not support pull.

With Flowable there is synchronous variety:

Flowable.createSync(... SyncOnSubscribe ... )
// or
Flowable.from(Iterable)
// or
Flowable.just(T...)
// or
Flowable.range(0, 10000000)

and an asynchronous variety:

Flowable.createAsync(... AsyncOnSubscribe ... )

Conversion

From Flowable to Observable, it is easy since it asks for no request(n) flow control:

Flowable f = ...
Observable<T> o = f.toObservable(); // this will call request(Long.MAX_VALUE) up when subscribed to

From Observable to Flowable, it must provide a backpressure strategy:

Observable o = ...
Flowable<T> f = o.toFlowable(BackpressureStrategy.*)
// for example
Flowable<T> f = o.toFlowable(BackpressureStrategy.BUFFER)
// or 
Flowable<T> f = o.toFlowable(BackpressureStrategy.DROP)
// or
Flowable<T> f = o.toFlowable(BackpressureStrategy.FAIL)
// or 
Flowable<T> f = o.toFlowable(BackpressureStrategy.create(...)) // like lifting an operator into a stream, except for a backpressure strategy

@akarnokd
Copy link
Member

I think instead of toFlowable, we could move the onBackpressureXXX method into the Observable and they now allow direct customization instead of an enum/interface-like construct you are proposing.

I don't particulary like the name swap because users now have learned about the Observable being the one where one doesn't have to worry about memory overflow so much. The change, I think, will create a lot of confusion.

I don't particularly understand the need for this non-backpressure version again (even though I was skeptical about the addition of backpressure back then). Is it the overhead of the backpressure management? Is it the latency caused by the scattering effect when crossing a thread boundary? Or is it that one wants to manually emit onNext event "mindlessly"?

@benjchristensen
Copy link
Member Author

I think instead of toFlowable, we could move the onBackpressureXXX method into the Observable and they now allow direct customization instead of an enum/interface-like construct you are proposing.

This doesn't make sense to me, since Observable would not have request(n) signals for onBackpressureXXX to work.

I don't particulary like the name swap because users now have learned about the Observable being the one where one doesn't have to worry about memory overflow so much. The change, I think, will create a lot of confusion.

The rest of the ReactiveX community argues the opposite that RxJava has caused confusion by adding backpressure to Observable.

The argument also is that adding backpressure has made usage of Observable more complicated for the push cases. I think this is a fault of our API design in that we just exposed the raw Observable.create without better creation APIs.

The change, I think, will create a lot of confusion.

From 0.20 to 1.0 it evolved. The community dealt with it. Now I hear that combining the two has confused people. Separating the types is intended to clarify. And to stop confusing when compared against RxJS, Rx.Net, etc.

I don't particularly understand the need for this non-backpressure version again

The issue with Subject is representative. A Subject can not be a Publisher, as a Subject is "hot", "push", and can't obey request(n) backpressure. For use case that are truly push, particularly "hot" sources, the Observable type without backpressure is more honest and communicative. Thus, if we want to have a Subject, that is "hot" and controlled by the producer, we actually need the Observable type for Subject to extend/implement.

Additionally, there is overhead as you mention, though honestly, this is rarely applicable to most use cases.

Or is it that one wants to manually emit onNext event "mindlessly"?

This is the key, but "mindlessly" is dismissive. All "hot", push, use cases are like this.

I can easily argue that Flowable can solve this use case just fine, like this:

Flowable.createPush(s -> {
   s.onNext(t)
   ...
   s.onNext(t)
}, BackpressureStrategy.BUFFER)

But the type itself does nothing to communicate that this is push. Flowable really represents an interactive, "async pull" or "pull-push" type. Modeling a push source with it is possible, and I actually prefer doing so, as it makes me consider backpressure, but there are many who like to just have a push case.

Today, the default v1 Observable is equivalent to this:

Flowable.createPush(s -> {
   s.onNext(t)
   ...
   s.onNext(t)
}, BackpressureStrategy.FAIL)

I think that has confused people.

My conclusion of this debate is that it is actually quite subjective, but that alignment with the broader ReactiveX community is worth separating the types so that Observable is "push", not a "pull-push"/"async pull"/"push with flow control" type. The mental model is cleaner. Then, because @headinthebox is liberal in Rx not being just one thing, we extend RxJava to have not only Observable, but also Flowable and Single.

@akarnokd
Copy link
Member

Fine.

@akarnokd
Copy link
Member

To be precise, I'm not against having Single, Observable and Flowable in the library and if the community can deal with name- and feature-"shuffle" then I accept the decision.

This doesn't make sense to me, since Observable would not have request(n) signals for onBackpressureXXX to work.

In greater detail, I proposed that instead of having Observable.toFlowable where Observable is the non-backpressure, synchronously cancellable push-stream, I'd rather have Observable.onBackpressureXXX methods since, for example, onBackpressureDrop can run on its own or with a callback. Such extra parameter would be difficult to pass in with an enumeration such as the BackpressureStrategy.DROP. For example, the following general signature would be necessary:

Observable.toFlowable(BackpressureStrategy, Object... strategyParams)

The drawback is the lack of compile-time validation of strategyParams.

@benjchristensen
Copy link
Member Author

An Observable.onBackpressureDrop() is meaningless, as it would never know when to drop anything.

Sorry for not being clear on BackpressureStrategy.DROP. I did not intend it as an enum, but a type with static defaults, and factory methods for custom ones. You'll see that in the example above showing this:

Flowable<T> f = o.toFlowable(BackpressureStrategy.create(...)) // like lifting an operator into a stream, except for a backpressure strategy

@benjchristensen
Copy link
Member Author

Despite the debate over API design (which we can continue later), should we merge this PR?

@akarnokd
Copy link
Member

The text is okay 👍

@benjchristensen
Copy link
Member Author

Thanks @akarnokd

Anyone else have opinions on this? Reasons to not proceed?

@davidmoten
Copy link
Collaborator

The name shuffle is a thumbs down from me just for the loss of continuity
with existing code and documentation. I must say I'm also pretty sick of
typing the full Observable name as well and if we could rename Flowable to
Flow that would be nice for typing and nice for reading. Happy for you to
defer discussion of this stuff to another issue if you like.

On 18 September 2015 at 06:42, Ben Christensen [email protected]
wrote:

Thanks @akarnokd https://github.com/akarnokd

Anyone else have opinions on this? Reasons to not proceed?


Reply to this email directly or view it on GitHub
#3348 (comment).

@LalitMaganti
Copy link
Contributor

I supported the initial change and still support it now. The text is fine by me 👍

@benjchristensen
Copy link
Member Author

Flow

Flow is the interface name in Java 9. Hence Flowable which implements Flow. We can't use Flow without colliding with the j.u.c.Flow interface name.

loss of continuity with existing code and documentation

The only thing that would not apply to the new Observable is the backpressure portion.

The loss of continuity and alignment with the broader ReactiveX community is equally bad, just look at polyglot docs at reactivex.io to see how the RxJava Observable stands out as different.

@benjchristensen
Copy link
Member Author

Thank you @tilal6991 for your review and weighing in.

@stealthcode
Copy link

I would support a type called rx.Rx that had static constructors which returned the appropriate type (Obs vs Flw).

@benjchristensen
Copy link
Member Author

I would support a type called rx.Rx that had static constructors which returned the appropriate type (Obs vs Flw).

I'm okay with us exploring that type of thing. However, let's figure out the design contract of the core types first.

@benjchristensen
Copy link
Member Author

Anyone else have 👍 or 👎 on this before I proceed to merge it?

@stevegury
Copy link
Member

👍

@benjchristensen
Copy link
Member Author

Merging so we can move forward on Design.md.

benjchristensen added a commit that referenced this pull request Sep 21, 2015
@benjchristensen benjchristensen merged commit f7a5804 into 2.x Sep 21, 2015
@benjchristensen benjchristensen deleted the v2-design-flowable branch September 21, 2015 16:54
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants