Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
This PR introduces a scheduling frontier which is a frontier that tracks the dataflows that can be scheduled in the cluster. This is accomplished using a timestamp that can be one of two variants:
Installed(id)
variant, which represents the fact thatid
might be scheduled.Future(lower)
variant, which represents the fact that dataflows withid >= lower
might be scheduled.Each worker initializes a local
MutableAntichain
withnum_worker
copies ofFuture(0)
elements (the minimum) which represents the fact that any worker might schedule any id. A channel is allocated through the allocator so that changes to that local view of the frontier can be communicated between all the workers.When a dataflow is installed in a worker
Installed(id)
andFuture(id+1)
elements are inserted into the frontier and aFuture(id)
element is retracted.When a dataflow naturally terminates an
Installed(id)
element is retracted from the frontier.When a dataflow is dropped through
drop_dataflow
is is first moved into a frozen dataflow list but all its resources are maintained. ThenInstalled(id)
is retracted from the frontier to let the other workers know that it will not be scheduled anymore.Is is ensured that only one of the two paths above are taken to ensure that we only retract a
Installed(id)
element once.Eventually all workers will retract their copies of a
Dataflow(id)
, either because it was frozen or because it naturally terminated. When this happensid
stops being beyond the global scheduling frontier and at that moment each worker is free to drop the dataflow and its associated resources since no worker will scheduled it again.