Skip to content

Add AdaptiveScheduler with priority-based dispatch for pipeline sync ops#1390

Open
justinvjoseph wants to merge 2 commits into
facebookresearch:mainfrom
justinvjoseph:export-D99935461
Open

Add AdaptiveScheduler with priority-based dispatch for pipeline sync ops#1390
justinvjoseph wants to merge 2 commits into
facebookresearch:mainfrom
justinvjoseph:export-D99935461

Conversation

@justinvjoseph
Copy link
Copy Markdown

Summary:
Introduces an opt-in priority scheduler that replaces the ThreadPoolExecutor's
FIFO dispatch with a PriorityQueue. Deeper pipeline stages (closer to sink)
are dispatched first, reducing pipeline bubble time and WIP.

Key changes:

  • New _scheduler.py with AdaptiveScheduler class
  • _PipeArgs gains nice and _depth fields for priority control
  • convert_to_async() routes through scheduler when provided
  • _build_node() registers stages and intercepts sync ops
  • build_pipeline() gains use_scheduler=True flag
  • PipelineBuilder.pipe() gains nice parameter

When use_scheduler=False (default), behavior is identical to today.

Differential Revision: D99935461

@meta-cla meta-cla Bot added the CLA Signed This label is managed by the Meta Open Source bot. label Apr 30, 2026
@meta-codesync
Copy link
Copy Markdown
Contributor

meta-codesync Bot commented Apr 30, 2026

@justinvjoseph has exported this pull request. If you are a Meta employee, you can view the originating Diff in D99935461.

…earch#1389)

Summary:

Add `ResizableSemaphore` — an asyncio-compatible semaphore whose max permit count can be adjusted at runtime. This is the foundational primitive for dynamic concurrency control in SPDL pipelines.

Key features:
- `resize(new_max)` adjusts permits at runtime; resize-up wakes blocked waiters immediately, resize-down drains gracefully (no preemption)
- `acquire()`/`release()` semantics match `asyncio.Semaphore`
- `max_value` and `active` properties for observability
- Thread-safe within asyncio's single-threaded model
- Comprehensive error handling for invalid values

This is Diff 1 of a 5-diff series implementing a unified adaptive scheduler for SPDL pipelines (T262755626).

Differential Revision: D99920401
Summary:
Introduces an opt-in priority scheduler that replaces the ThreadPoolExecutor's
FIFO dispatch with a PriorityQueue. Deeper pipeline stages (closer to sink)
are dispatched first, reducing pipeline bubble time and WIP.

Key changes:
- New `_scheduler.py` with `AdaptiveScheduler` class
- `_PipeArgs` gains `nice` and `_depth` fields for priority control
- `convert_to_async()` routes through scheduler when provided
- `_build_node()` registers stages and intercepts sync ops
- `build_pipeline()` gains `use_scheduler=True` flag
- `PipelineBuilder.pipe()` gains `nice` parameter

When `use_scheduler=False` (default), behavior is identical to today.

Differential Revision: D99935461
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

CLA Signed This label is managed by the Meta Open Source bot. fb-exported meta-exported

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant