Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track scheduling frontier #521

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
173 changes: 154 additions & 19 deletions timely/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

use std::rc::Rc;
use std::cell::{RefCell, RefMut};
use std::cmp::Ordering;
use std::any::Any;
use std::str::FromStr;
use std::time::{Instant, Duration};
Expand All @@ -13,10 +14,14 @@ use crate::communication::{Allocate, Data, Push, Pull};
use crate::communication::allocator::thread::{ThreadPusher, ThreadPuller};
use crate::scheduling::{Schedule, Scheduler, Activations};
use crate::progress::timestamp::{Refines};
use crate::progress::SubgraphBuilder;
use crate::progress::{ChangeBatch, SubgraphBuilder};
use crate::progress::operate::Operate;
use crate::progress::frontier::{AntichainRef, MutableAntichain};
use crate::dataflow::scopes::Child;
use crate::logging::TimelyLogger;
use crate::order::PartialOrder;

const SCHEDULING_CHANNEL: usize = 0;

/// Different ways in which timely's progress tracking can work.
///
Expand Down Expand Up @@ -216,12 +221,16 @@ pub struct Worker<A: Allocate> {
identifiers: Rc<RefCell<usize>>,
// dataflows: Rc<RefCell<Vec<Wrapper>>>,
dataflows: Rc<RefCell<HashMap<usize, Wrapper>>>,
frozen_dataflows: Rc<RefCell<HashMap<usize, Wrapper>>>,
scheduling_frontier: Rc<RefCell<SchedulingFrontier>>,

dataflow_counter: Rc<RefCell<usize>>,
logging: Rc<RefCell<crate::logging_core::Registry<crate::logging::WorkerIdentifier>>>,

activations: Rc<RefCell<Activations>>,
active_dataflows: Vec<usize>,


// Temporary storage for channel identifiers during dataflow construction.
// These are then associated with a dataflow once constructed.
temp_channel_ids: Rc<RefCell<Vec<usize>>>,
Expand Down Expand Up @@ -260,16 +269,21 @@ impl<A: Allocate> Scheduler for Worker<A> {

impl<A: Allocate> Worker<A> {
/// Allocates a new `Worker` bound to a channel allocator.
pub fn new(config: Config, c: A) -> Worker<A> {
pub fn new(config: Config, mut c: A) -> Worker<A> {
let now = Instant::now();
let index = c.index();

let scheduling_frontier = SchedulingFrontier::new(&mut c);

Worker {
config,
timer: now,
paths: Default::default(),
allocator: Rc::new(RefCell::new(c)),
identifiers: Default::default(),
identifiers: Rc::new(RefCell::new(SCHEDULING_CHANNEL + 1)),
dataflows: Default::default(),
frozen_dataflows: Default::default(),
scheduling_frontier: Rc::new(RefCell::new(scheduling_frontier)),
dataflow_counter: Default::default(),
logging: Rc::new(RefCell::new(crate::logging_core::Registry::new(now, index))),
activations: Rc::new(RefCell::new(Activations::new(now))),
Expand Down Expand Up @@ -332,6 +346,7 @@ impl<A: Allocate> Worker<A> {
/// ```
pub fn step_or_park(&mut self, duration: Option<Duration>) -> bool {

let mut activate_scheduling_frontier = false;
{ // Process channel events. Activate responders.
let mut allocator = self.allocator.borrow_mut();
allocator.receive();
Expand All @@ -345,13 +360,18 @@ impl<A: Allocate> Worker<A> {
// on the basis of non-empty channels.
// TODO: This is a sloppy way to deal
// with channels that may not be alloc'd.
if let Some(path) = paths.get(&channel) {
if channel == SCHEDULING_CHANNEL {
activate_scheduling_frontier = true;
} else if let Some(path) = paths.get(&channel) {
self.activations
.borrow_mut()
.activate(&path[..]);
}
}
}
if activate_scheduling_frontier {
self.scheduling_frontier.borrow_mut().step();
}

// Organize activations.
self.activations
Expand Down Expand Up @@ -400,15 +420,35 @@ impl<A: Allocate> Worker<A> {
paths.remove(&channel);
}
entry.remove_entry();
self.scheduling_frontier.borrow_mut().update([(DataflowId::Installed(index), -1)]);
}
}
}
}

// Drop all frozen dataflows that are not beyond the scheduling frontier
{
let mut paths = self.paths.borrow_mut();
let mut frozen_dataflows = self.frozen_dataflows.borrow_mut();
let scheduling_frontier = self.scheduling_frontier.borrow();
frozen_dataflows.retain(|id, dataflow| {
if !scheduling_frontier.frontier().less_equal(&DataflowId::Installed(*id)) {
// Garbage collect channel_id to path information.
for channel in dataflow.channel_ids.drain(..) {
paths.remove(&channel);
}
false
} else {
true
}
});
}


// Clean up, indicate if dataflows remain.
self.logging.borrow_mut().flush();
self.allocator.borrow_mut().release();
!self.dataflows.borrow().is_empty()
!self.dataflows.borrow().is_empty() || !self.frozen_dataflows.borrow().is_empty()
}

/// Calls `self.step()` as long as `func` evaluates to true.
Expand Down Expand Up @@ -671,17 +711,17 @@ impl<A: Allocate> Worker<A> {

/// Drops an identified dataflow.
///
/// This method removes the identified dataflow, which will no longer be scheduled.
/// Various other resources will be cleaned up, though the method is currently in
/// public beta rather than expected to work. Please report all crashes and unmet
/// expectations!
pub fn drop_dataflow(&mut self, dataflow_identifier: usize) {
if let Some(mut entry) = self.dataflows.borrow_mut().remove(&dataflow_identifier) {
// Garbage collect channel_id to path information.
let mut paths = self.paths.borrow_mut();
for channel in entry.channel_ids.drain(..) {
paths.remove(&channel);
}
/// This method immediately stops scheduling the the identified dataflow. Once all other
/// workers also stop scheduling the identified dataflow, due to an explicit drop or due to
/// graceful termination, the worker will proceed with dropping any resources held by this
/// dataflow.
///
/// If the identified dataflow is in the process of being constructed, this function is a
/// no-op.
pub fn drop_dataflow(&mut self, id: usize) {
if let Some(entry) = self.dataflows.borrow_mut().remove(&id) {
self.frozen_dataflows.borrow_mut().insert(id, entry);
self.scheduling_frontier.borrow_mut().update([(DataflowId::Installed(id), -1)]);
}
}

Expand All @@ -693,20 +733,27 @@ impl<A: Allocate> Worker<A> {
*self.dataflow_counter.borrow()
}

/// List the current dataflow indices.
/// List the current dataflow indices that may be scheduled.
pub fn installed_dataflows(&self) -> Vec<usize> {
self.dataflows.borrow().keys().cloned().collect()
}

/// True if there is at least one dataflow under management.
pub fn has_dataflows(&self) -> bool {
!self.dataflows.borrow().is_empty()
!self.dataflows.borrow().is_empty() || !self.frozen_dataflows.borrow().is_empty()
}

// Acquire a new distinct dataflow identifier.
fn allocate_dataflow_index(&mut self) -> usize {
*self.dataflow_counter.borrow_mut() += 1;
*self.dataflow_counter.borrow() - 1
let new_id = *self.dataflow_counter.borrow() - 1;

self.scheduling_frontier.borrow_mut().update([
(DataflowId::Installed(new_id), 1),
(DataflowId::Future(new_id + 1), 1),
(DataflowId::Future(new_id), -1)
]);
new_id
}
}

Expand All @@ -721,6 +768,8 @@ impl<A: Allocate> Clone for Worker<A> {
allocator: self.allocator.clone(),
identifiers: self.identifiers.clone(),
dataflows: self.dataflows.clone(),
frozen_dataflows: self.frozen_dataflows.clone(),
scheduling_frontier: self.scheduling_frontier.clone(),
dataflow_counter: self.dataflow_counter.clone(),
logging: self.logging.clone(),
activations: self.activations.clone(),
Expand Down Expand Up @@ -776,3 +825,89 @@ impl Drop for Wrapper {
self.resources = None;
}
}


#[derive(Debug, Eq, PartialEq, Clone, Abomonation, Serialize, Deserialize)]
/// A partial order representing the scheduling frontier.
enum DataflowId {
/// A lower bound on identifiers of future dataflows.
Future(usize),
/// An installed dataflow identifier.
Installed(usize),
}

// A manual implementation of Ord to ensure that it is compatible with the PartialOrder below
impl Ord for DataflowId {
fn cmp(&self, other: &Self) -> Ordering {
match (self, other) {
(Self::Installed(this), Self::Installed(other)) => this.cmp(other),
(Self::Future(this), Self::Future(other)) => this.cmp(other),
(Self::Future(_), Self::Installed(_)) => Ordering::Less,
(Self::Installed(_), Self::Future(_)) => Ordering::Greater,
}
}
}
impl PartialOrd for DataflowId {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}

impl Default for DataflowId {
fn default() -> Self {
Self::Future(0)
}
}

impl PartialOrder for DataflowId {
fn less_equal(&self, other: &Self) -> bool {
match (self, other) {
(Self::Future(lower), Self::Installed(id)) => lower <= id,
(Self::Installed(this), Self::Installed(other)) => this == other,
(Self::Future(this), Self::Future(other)) => this <= other,
(Self::Installed(_), Self::Future(_)) => false,
}
}
}

/// Keeps track of the scheduling frontier and broadcasts any local updates to it to the other
/// workers.
struct SchedulingFrontier {
frontier: MutableAntichain<DataflowId>,
pushers: Vec<Box<dyn Push<Message<ChangeBatch<DataflowId>>>>>,
puller: Box<dyn Pull<Message<ChangeBatch<DataflowId>>>>,
}

impl SchedulingFrontier {
fn new<A: Allocate>(alloc: &mut A) -> Self {
let (pushers, puller) = alloc.allocate(SCHEDULING_CHANNEL);
let mut frontier = MutableAntichain::new();
frontier.update_iter([(DataflowId::default(), alloc.peers() as i64)]);
Self {
frontier,
pushers,
puller,
}
}

fn frontier(&self) -> AntichainRef<'_, DataflowId> {
self.frontier.frontier()
}

fn step(&mut self) {
// TODO: reuse allocations
while let Some(mut msg) = self.puller.recv() {
self.frontier.update_iter(msg.as_mut().iter().cloned());
}
}

fn update<I: IntoIterator<Item=(DataflowId, i64)>>(&mut self, iter: I) {
let mut change_batch = ChangeBatch::new();
change_batch.extend(iter.into_iter());
// TODO: reduce clones and consolidate updates into one push
for pusher in self.pushers.iter_mut() {
pusher.send(Message::from_typed(change_batch.clone()));
pusher.done();
}
}
}