Skip to content

2.x Design: Creation/Destruction #3350

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 17, 2015
Merged

2.x Design: Creation/Destruction #3350

merged 1 commit into from
Sep 17, 2015

Conversation

benjchristensen
Copy link
Member

How to create and destroy streams.

The wording of this can become much better once agreement occurs in https://github.com/ReactiveX/RxJava/pull/3348/files

The intent of this is:

  1. Specify how to create streams (Observable/Flowable) of the various flavors, and account for flow control.
  2. Evolve towards what the creation/generation APIs are
  3. Specify that it is not the responsibility of the final Subscriber to call unsubscribe/cancel upstream

How to create and destroy streams.

The wording of this can become much better once agreement occurs in https://github.com/ReactiveX/RxJava/pull/3348/files

The intent of this is:

1) Specify how to create streams (Observable/Flowable) of the various flavors, and account for flow control.
2) Evolve towards what the creation/generation APIs are
3) Specify that it is not the responsibility of the final Subscriber to call unsubscribe/cancel upstream
@benjchristensen benjchristensen added this to the 2.0 milestone Sep 16, 2015
@davidmoten
Copy link
Collaborator

Just read the Hot and Cold stuff. I threw down some cogitations on Hot and Cold in a gist just in the last couple of days. It's here. My thoughts on the matter not really matured yet but the vagueness about Hot and Cold Observables was something I wanted to clarify for myself. One characteristic of Hot observables that doesn't seem to get mentioned is that multiple concurrent subscribers will see the same stream at some point.

@akarnokd
Copy link
Member

I see a few cases when one wants to "create" an Observable:

  • There is a hot source in some other library (such as mouse events) and one wants to bring it into the Observable world. The safest way to do this is to add in a Subject.
  • There is a cold source (such as a JDBC query) that would be consumed in a pull-fashion anyway and one wants to bring it into the Observable world too. That case could use a generator such as AbstractOnSubscribe or SyncOnSubscribe.
  • Other cases should be (already) covered by static factory methods such as just, fromCallable, etc.

@benjchristensen
Copy link
Member Author

@davidmoten The definition of hot/cold as I've put in this document comes from @headinthebox. It is about side-effects.

It is also only assured for multiple concurrent subscribers to see the same stream at some point if they all subscribe at the exact same time, which is only assured if ConnectableObservable.connect() is used.

Here is Reactive Cocoa 3.0 on hot/cold: https://github.com/ReactiveCocoa/ReactiveCocoa/blob/v3.0.0/CHANGELOG.md#replacements They have separated the types into Signal (hot) and SignalProducer (cold).

The temperature of an Observable does not prevent flow control from being applied. Thus this is not true:

once subscribed to if no requests are made that eventually available storage (memory/disk) will exceed any given bound (due to events being buffered)

If publish() is being used, then the flow control prior to publish() would be equivalent for all consumers. If each consumer chooses to apply their own flow control, then they are choosing to not see "the same stream" as their siblings as they have created different streams from the same source.

And nothing about being "hot" requires everything to be buffered. That is purely a choice of flow control. Note that an operating system already is doing flow control before it sends mouse events. It is absolutely not sending every event. It is sending sampled events, and that sampling happens at the hardware and software levels before it ever hits us anyways. Flow control is everywhere.

@benjchristensen
Copy link
Member Author

I see a few cases when one wants to "create" an Observable:

The approaches you gave all work. A few comments:

  • a Subject is not the only way (or safest ... not sure what safety has to do with this) to do this, as Observable.create which then listens and stops listening on cancellation is equally effective, and can be better than an always-on Subject if the use of the stream is not constant.
  • missing is the ability to do async batched fetching, such as AsyncOnSubscribe, which is needed for more advanced cold sources over network boundaries (for example, fetching 5000 items 100 at a time)
  • the static factory methods today are legit, but I think we should revisit their naming, and anything from a cold pullable source should return a Flowable, not an Observable, if we agree upon that distinction in 2.x Design: Flowable/Observable #3348

@benjchristensen
Copy link
Member Author

Do you all agree that we do not need an unsubscribe/cancel send from a SafeSubscriber wrapper after every onError/onComplete? I find it wasteful, unnecessary, and affecting performance (in cases such as a stream of 1 where the extra unsubscribe is noticeable when the stream itself only emitted 1 item).

@akarnokd
Copy link
Member

I agree, there is no need to force a cancel call on a final Subscriber anymore. Upstream will cancel the subscription for you (maybe before calling you, maybe after they called you: see observeOn).

@davidmoten
Copy link
Collaborator

@benjchristensen re Hot/Cold

It is also only assured for multiple concurrent subscribers to see the same stream at some point if they all subscribe at the exact same time, which is only assured if ConnectableObservable.connect() is used.

This is how to make this precise and not demand mention of ConnectableObservable (not part of the reactive streams spec for instance):

  • once two subscribers are subscribed to source there exists a finite non-negative integer n such that events1.skip(n) is equal to events2 or events1 is equal to events2.skip(n). That is eventually the streams have the same elements till termination.

Your point about sampling doesn't change the fact that an infinite backpressure supporting hot observable like mouse events if subscribed to and say have only 1 element requested of it then wait forever will bring about unbounded buffer growth. It's irrelevant that the OS gives me one mouse event a millisecond or one per year because I'm not talking about a practical measuring test but a theoretical one. It's certainly the case that any consumer could apply flow control measures to such an observable but I'm trying to arrive at at a way of deciding that an observable is Hot or Cold by just observing it (having no knowledge of its internals). I guess once flow control has been applied to a Hot Observable then the resultant Observable is by my definition no longer Hot. This may be a confusing distinction and I might have to come up with a new name for it say Red Hot observable. Just semantics I guess but I think interesting to explore a bit.

@benjchristensen
Copy link
Member Author

not part of the reactive streams spec for instance

The Reactive Streams spec does not talk about hot, cold, or multicast, nor is RxJava going to be 100% about Reactive Streams. That spec represents a type, but it certainly doesn't address everything. One example of that is shown in #3349.

Thus, the contract requirements of Reactive Streams are only applicable to types that implement the Reactive Streams interfaces.

This is how to make this precise

I don't understand what precision you are aiming for. We can not redefine what "hot" and "cold" mean, since those are beyond RxJava. The definitions (https://github.com/ReactiveX/RxJava/blob/2.x/DESIGN.md#hot) that I have included in the document come from @headinthebox and exist in other ReactiveX implementations and documentation (such as here: http://www.introtorx.com/Content/v1.0.10621.0/14_HotAndColdObservables.html#HotAndCold and here: https://github.com/ReactiveCocoa/ReactiveCocoa/blob/master/CHANGELOG.md#hot-signals-are-now-signals)

So what specifically are you suggesting to make more precise than what is already accepted as the definition of "hot" and "cold"?

infinite backpressure supporting hot observable like mouse events if subscribed to and say have only 1 element requested of it then wait forever will bring about unbounded buffer growth

I don't understand this. What does being "hot" have to do with having an unbounded buffer?

I'm trying to arrive at at a way of deciding that an observable is Hot or Cold by just observing it

Since "hot" and "cold" is about side-effects, and nothing to do with the data flowing through it, then you would need to look at whether there are side-effects when a subscribe happens.

A type system can attempt to try and communicate when a source represents a hot or cold source, such as Reactive Cocoa does with Signal (hot) and SignalProducer (cold): https://github.com/ReactiveCocoa/ReactiveCocoa/blob/master/CHANGELOG.md#hot-signals-are-now-signals Determining whether we should use types to represent these was the point of the discussion in #2785

I guess once flow control has been applied to a Hot Observable then the resultant Observable is by my definition no longer Hot.

I don't see how the application of flow control is related to whether a source is "hot" or "cold" since flow control does not impact the side-effects or lack of side-effects of subscription.

@benjchristensen
Copy link
Member Author

Is this PR good for merge, or are there things that need to change?

Any further additions (such as more precision if needed) can come through followup PRs. We don't need to get everything done in this PR, as long as this PR is correct thus far.

If this PR is okay, I'd like to merge so we can move forward in defining the design and allow maturation of the picture for v2.

cc @ReactiveX/rxjava-committers for vote

@benjchristensen
Copy link
Member Author

Ping @ReactiveX/rxjava-committers @akarnokd @davidmoten

Creation of a stream falls into the following use cases, all of which should be catered to in API design.

- async, hot, push (ie. system or user events)
- async, cold, push (ie. events resulting from remote system via network connection)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some network messages have to get off the OS buffer into application memory or result in data loss. There could be variable amount of work (ranging from trivial to laborious) involved to parse from bytes to application domain which means that they should be called "cold" to cover the extreme cases however because of the potentially lossy nature of OS buffers it may be required to subscribe and parse it. Should we differentiate between persistent (non-lossy) network and (volatile) network buffering?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand your point. Rx does not involve itself in network behaviors. What are you suggesting should be changed in the text?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that events resulting from remote system via network connection should be considered hot. Then that leaves the question of... what async push events are actually cold?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The confusion is cleared up. My comment here had more to do with the choice of whether a network connection is wrapped in an Observable (hot) or a Flowable (cold). As most network connections are potentially lossy (TCP and UDP overflowing the OS buffer for instance) the choice would be made according to the developers confidence in the ability to consume data fast enough to prevent loss (Observable -> Flowable with a back pressure strategy). Anyhow, I'll defer my comments from making any change here in this PR to later clarifications of wording.

@akarnokd
Copy link
Member

👍

@benjchristensen
Copy link
Member Author

Thanks @akarnokd

@stealthcode Let me know what you need changed in this before you're okay.

@stealthcode
Copy link

👍 all good. thanks!

benjchristensen added a commit that referenced this pull request Sep 17, 2015
@benjchristensen benjchristensen merged commit 1e36ad4 into 2.x Sep 17, 2015
@benjchristensen
Copy link
Member Author

Thank you. Merged.

@benjchristensen benjchristensen deleted the v2-design-lifecycle branch September 17, 2015 20:43
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants