Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

channels: add Distribute pact #493

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

petrosagg
Copy link
Contributor

This PR adds the Distribute pact that aims to evenly distribute data among all workers by routing each container to a randomly selected worker.

Traditionally this "defensive distribution" could be implemented using an Exchange pact whose key function round-robined the records or randomly distributed them in some other way. While this works it has couple of downsides:

  • The key function is calculated once per record, instead of once per container
  • Each record must be copied out of the original container into a per-worker container depending on the key function

The Distribute pact streamlines this pattern by avoiding copying each record to a separate container and immediately pushing each container to a random worker.

Future work

A potential future improvement is to circulate in-band statistics over the channel about how many messages each workers has seen. This would allow each worker to estimate the current skew and only leap into action once things are bad enough.

This PR adds the `Distribute` pact that aims to evenly distribute data
among all workers by routing each container to a randomly selected
worker.

Traditionally this "defensive distribution" could be implemented using
an `Exchange` pact whose key function round-robined the records or
randomly distributed them in some other way. While this works it has
couple of downsides:
* The key function is calculated once per record, instead of once per
  container
* Each record must be copied out of the original container into a
  per-worker container depending on the key function

The `Distribute` pact streamlines this pattern by avoiding copying each
record to a separate container and immediately pushing each container to
a random worker.

== Future work ==

A potential future improvement is to circulate in-band statistics over
the channel about how many messages each workers has seen. This would
allow each worker to estimate the current skew and only leap into action
once things are bad enough.

Signed-off-by: Petros Angelatos <[email protected]>
Copy link
Member

@antiguru antiguru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks useful in some situations. What about adding a closure that can decide on a target for each container instead of hardcoding a random function?

Comment on lines +88 to +92
fn push(&mut self, message: &mut Option<BundleCore<T, C>>) {
let mut state: fnv::FnvHasher = Default::default();
std::time::Instant::now().hash(&mut state);
let worker_idx = (state.finish() as usize) % self.pushers.len();
self.pushers[worker_idx].push(message);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we could make this more generic by accepting a closure that takes a message and returns a usize to route the whole container. Probably also rename the struct to ContainerExchange or something like this?

impl<P> DistributePusher<P> {
/// Allocates a new `DistributePusher` from a supplied set of pushers
pub fn new(pushers: Vec<P>) -> DistributePusher<P> {
DistributePusher {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
DistributePusher {
Self {

@antiguru
Copy link
Member

antiguru commented Jan 5, 2023

An alternative could be the implement PushPartitioned for a wrapper type, but then one has to deal with non-vector streams, which can be difficult.

@petrosagg
Copy link
Contributor Author

This looks useful in some situations. What about adding a closure that can decide on a target for each container instead of hardcoding a random function?

The idea for this pact came from the office hours where we were discussing the fact that when an operator introduces skew to its output the skew persists along Pipeline edges and it would be nice to have an edge that behaves like Pipeline but can also detect when there is skew and do something about it.

For this reason I think we shouldn't expose any hook for users to plug any logic and instead state that the intent of this pact is to do this "smart routing". The current implementation is not particularly sophisticated, but the idea is that users use it as-is for the stated benefit and then we can improve the implementation independently.

I initially set out to integrate a work stealing queue by adding a new Allocate method but I think I want to explore the statistics based approach first that could potentially work with even Tcp channels.

The ideal case would be if this pact's performance is almost identical to Pipelines under balanced loads in which case we could replace the pact of all the timely/dd operators that don't care about distribution (map, filter, etc) from Pipeline to Distribute which would make them more robust to skewed workloads.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants