Skip to content

Commit 11fe9a6

Browse files
committed
CABI refactor: merge borrow_count and num_async_subtasks counters
1 parent a7c94fc commit 11fe9a6

File tree

3 files changed

+51
-86
lines changed

3 files changed

+51
-86
lines changed

design/mvp/CanonicalABI.md

+37-52
Original file line numberDiff line numberDiff line change
@@ -394,21 +394,19 @@ class Task(CallContext):
394394
caller: Optional[Task]
395395
on_return: Optional[Callable]
396396
on_block: OnBlockCallback
397-
borrow_count: int
397+
need_to_drop: int
398398
events: list[EventCallback]
399399
has_events: asyncio.Event
400-
num_async_subtasks: int
401400

402401
def __init__(self, opts, inst, ft, caller, on_return, on_block):
403402
super().__init__(opts, inst, self)
404403
self.ft = ft
405404
self.caller = caller
406405
self.on_return = on_return
407406
self.on_block = on_block
408-
self.borrow_count = 0
407+
self.need_to_drop = 0
409408
self.events = []
410409
self.has_events = asyncio.Event()
411-
self.num_async_subtasks = 0
412410
```
413411
The fields of `Task` are introduced in groups of related `Task` methods next.
414412
Using a conservative syntactic analysis of the component-level definitions of
@@ -570,34 +568,6 @@ external I/O (as emulated in the Python code by awaiting `sleep(0)`:
570568
await self.wait_on(asyncio.sleep(0))
571569
```
572570

573-
All `Task`s (whether lifted `async` or not) are allowed to call `async`-lowered
574-
imports. Calling an `async`-lowered import stores a `Subtask` (defined below)
575-
in the current component instance's `async_subtasks` table. The current task
576-
tracks the number of live async subtasks and guards this to be in `Task.exit`
577-
(below) to ensure [structured concurrency].
578-
```python
579-
def add_async_subtask(self, subtask):
580-
assert(subtask.task is self and not subtask.notify_supertask)
581-
subtask.notify_supertask = True
582-
self.num_async_subtasks += 1
583-
return self.inst.async_subtasks.add(subtask)
584-
```
585-
The `notify_supertask` flag signals to the methods of `Subtask` (defined below)
586-
to notify this `Task` when the async call makes progress.
587-
588-
The `borrow_count` field is used by the following methods to track the number
589-
of borrowed handles that were passed as parameters to the export that have not
590-
yet been dropped (and thus might dangle if the caller destroys the resource
591-
after this export call finishes):
592-
```python
593-
def create_borrow(self):
594-
self.borrow_count += 1
595-
596-
def drop_borrow(self):
597-
assert(self.borrow_count > 0)
598-
self.borrow_count -= 1
599-
```
600-
601571
The `return_` method is called by either `canon_task_return` or `canon_lift`
602572
(both defined below) to lift and return results to the caller using the
603573
`on_return` callback that was supplied by the caller to `canon_lift`. Using a
@@ -619,15 +589,16 @@ more than once which must be checked by `return_` and `exit`.
619589
```
620590

621591
Lastly, when a task exits, the runtime enforces the guard conditions mentioned
622-
above and allows a pending task to start.
592+
above and allows a pending task to start. The `need_to_drop` counter is
593+
incremented and decremented below as a way of ensuring that a task does
594+
something (like dropping a resource or subtask handle) before the task exits.
623595
```python
624596
def exit(self):
625597
assert(current_task.locked())
626598
assert(not self.events)
627599
assert(self.inst.num_tasks >= 1)
628600
trap_if(self.on_return)
629-
trap_if(self.borrow_count != 0)
630-
trap_if(self.num_async_subtasks != 0)
601+
trap_if(self.need_to_drop != 0)
631602
self.inst.num_tasks -= 1
632603
if self.opts.sync:
633604
assert(not self.inst.interruptible.is_set())
@@ -728,9 +699,9 @@ called).
728699
self.flat_results = lower_flat_values(self, max_flat, vs, ts, self.flat_args)
729700
```
730701

731-
Lastly, when a `Subtask` finishes, it calls `release_lenders` to allow owned
732-
handles passed to this subtask to be dropped. In the synchronous or eager case
733-
this happens immediately before returning to the caller. In the
702+
When a `Subtask` finishes, it calls `release_lenders` to allow owned handles
703+
passed to this subtask to be dropped. In the synchronous or eager case this
704+
happens immediately before returning to the caller. In the
734705
asynchronous+blocking case, this happens right before the `CallState.DONE`
735706
event is delivered to the guest program.
736707
```python
@@ -744,6 +715,14 @@ event is delivered to the guest program.
744715
return self.flat_results
745716
```
746717

718+
Lastly, after a `Subtask` has finished and notified its supertask (thereby
719+
clearing `enqueued`), it may be dropped from the `async_subtasks` table:
720+
```python
721+
def drop(self):
722+
trap_if(self.enqueued)
723+
trap_if(self.state != CallState.DONE)
724+
self.task.need_to_drop -= 1
725+
```
747726

748727
### Despecialization
749728

@@ -1568,7 +1547,9 @@ def pack_flags_into_int(v, labels):
15681547
```
15691548

15701549
Finally, `own` and `borrow` handles are lowered by initializing new handle
1571-
elements in the current component instance's handle table:
1550+
elements in the current component instance's handle table. The increment of
1551+
`need_to_drop` is complemented by a decrement in `canon_resource_drop` and
1552+
ensures that all borrowed handles are dropped before the end of the task.
15721553
```python
15731554
def lower_own(cx, rep, t):
15741555
h = ResourceHandle(rep, own=True)
@@ -1579,7 +1560,7 @@ def lower_borrow(cx, rep, t):
15791560
if cx.inst is t.rt.impl:
15801561
return rep
15811562
h = ResourceHandle(rep, own=False, scope=cx)
1582-
cx.create_borrow()
1563+
cx.need_to_drop += 1
15831564
return cx.inst.resources.add(t.rt, h)
15841565
```
15851566
The special case in `lower_borrow` is an optimization, recognizing that, when
@@ -1588,6 +1569,7 @@ type, the only thing the borrowed handle is good for is calling
15881569
`resource.rep`, so lowering might as well avoid the overhead of creating an
15891570
intermediate borrow handle.
15901571

1572+
15911573
### Flattening
15921574

15931575
With only the definitions above, the Canonical ABI would be forced to place all
@@ -2166,16 +2148,24 @@ async def canon_lower(opts, ft, callee, task, flat_args):
21662148
await callee(task, subtask.on_start, subtask.on_return, on_block)
21672149
[] = subtask.finish()
21682150
if await call_and_handle_blocking(do_call):
2169-
i = task.add_async_subtask(subtask)
2151+
subtask.notify_supertask = True
2152+
task.need_to_drop += 1
2153+
i = task.inst.async_subtasks.add(subtask)
21702154
flat_results = [pack_async_result(i, subtask.state)]
21712155
else:
21722156
flat_results = [0]
21732157
return flat_results
21742158
```
21752159
In the asynchronous case, `Task.call_and_handle_blocking` returns `True` if the
2176-
call to `do_call` blocks. If the `callee` blocks, `on_start` and `on_return`
2177-
may be called after `canon_lower` has returned to the core wasm caller, which
2178-
is signaled via the `subtask.state` packed into the result `i32`:
2160+
call to `do_call` blocks. In this blocking case, the `Subtask` is added to
2161+
stored in an instance-wide table and given an `i32` index that is later
2162+
returned by `task.wait` to indicate that the subtask made progress. The
2163+
`need_to_drop` increment is matched by a decrement in `canon_subtask_drop` and
2164+
ensures that all subtasks of a supertask are allowed to complete before the
2165+
supertask completes. The `notify_supertask` flag is set to tell `Subtask`
2166+
methods (below) to asynchronously notify the supertask of progress. Lastly,
2167+
the current state of the subtask is eagerly returned to the caller, packed
2168+
with the `i32` subtask index:
21792169
```python
21802170
def pack_async_result(i, state):
21812171
assert(0 < i < 2**30)
@@ -2265,7 +2255,7 @@ async def canon_resource_drop(rt, sync, task, i):
22652255
else:
22662256
task.trap_if_on_the_stack(rt.impl)
22672257
else:
2268-
h.scope.drop_borrow()
2258+
h.scope.need_to_drop -= 1
22692259
return flat_results
22702260
```
22712261
In general, the call to a resource's destructor is treated like a
@@ -2431,16 +2421,11 @@ validation specifies:
24312421
* `$f` is given type `(func (param i32))`
24322422

24332423
Calling `$f` removes the indicated subtask from the instance's table, trapping
2434-
if the subtask isn't done or isn't a subtask of the current task. The guard
2435-
on `enqueued` ensures that supertasks can only drop subtasks once they've been
2436-
officially notified of their completion (via `task.wait` or callback).
2424+
if various conditions aren't met in `Subtask.drop()`.
24372425
```python
24382426
async def canon_subtask_drop(task, i):
24392427
trap_if(not task.inst.may_leave)
2440-
subtask = task.inst.async_subtasks.remove(i)
2441-
trap_if(subtask.enqueued)
2442-
trap_if(subtask.state != CallState.DONE)
2443-
subtask.task.num_async_subtasks -= 1
2428+
task.inst.async_subtasks.remove(i).drop()
24442429
return []
24452430
```
24462431

design/mvp/canonical-abi/definitions.py

+14-26
Original file line numberDiff line numberDiff line change
@@ -339,21 +339,19 @@ class Task(CallContext):
339339
caller: Optional[Task]
340340
on_return: Optional[Callable]
341341
on_block: OnBlockCallback
342-
borrow_count: int
342+
need_to_drop: int
343343
events: list[EventCallback]
344344
has_events: asyncio.Event
345-
num_async_subtasks: int
346345

347346
def __init__(self, opts, inst, ft, caller, on_return, on_block):
348347
super().__init__(opts, inst, self)
349348
self.ft = ft
350349
self.caller = caller
351350
self.on_return = on_return
352351
self.on_block = on_block
353-
self.borrow_count = 0
352+
self.need_to_drop = 0
354353
self.events = []
355354
self.has_events = asyncio.Event()
356-
self.num_async_subtasks = 0
357355

358356
def trap_if_on_the_stack(self, inst):
359357
c = self.caller
@@ -433,19 +431,6 @@ async def yield_(self):
433431
self.maybe_start_pending_task()
434432
await self.wait_on(asyncio.sleep(0))
435433

436-
def add_async_subtask(self, subtask):
437-
assert(subtask.task is self and not subtask.notify_supertask)
438-
subtask.notify_supertask = True
439-
self.num_async_subtasks += 1
440-
return self.inst.async_subtasks.add(subtask)
441-
442-
def create_borrow(self):
443-
self.borrow_count += 1
444-
445-
def drop_borrow(self):
446-
assert(self.borrow_count > 0)
447-
self.borrow_count -= 1
448-
449434
def return_(self, flat_results):
450435
trap_if(not self.on_return)
451436
if self.opts.sync and not self.opts.sync_task_return:
@@ -462,8 +447,7 @@ def exit(self):
462447
assert(not self.events)
463448
assert(self.inst.num_tasks >= 1)
464449
trap_if(self.on_return)
465-
trap_if(self.borrow_count != 0)
466-
trap_if(self.num_async_subtasks != 0)
450+
trap_if(self.need_to_drop != 0)
467451
self.inst.num_tasks -= 1
468452
if self.opts.sync:
469453
assert(not self.inst.interruptible.is_set())
@@ -534,6 +518,11 @@ def finish(self):
534518
self.maybe_notify_supertask()
535519
return self.flat_results
536520

521+
def drop(self):
522+
trap_if(self.enqueued)
523+
trap_if(self.state != CallState.DONE)
524+
self.task.need_to_drop -= 1
525+
537526
### Despecialization
538527

539528
def despecialize(t):
@@ -1116,7 +1105,7 @@ def lower_borrow(cx, rep, t):
11161105
if cx.inst is t.rt.impl:
11171106
return rep
11181107
h = ResourceHandle(rep, own=False, scope=cx)
1119-
cx.create_borrow()
1108+
cx.need_to_drop += 1
11201109
return cx.inst.resources.add(t.rt, h)
11211110

11221111
### Flattening
@@ -1466,7 +1455,9 @@ async def do_call(on_block):
14661455
await callee(task, subtask.on_start, subtask.on_return, on_block)
14671456
[] = subtask.finish()
14681457
if await call_and_handle_blocking(do_call):
1469-
i = task.add_async_subtask(subtask)
1458+
subtask.notify_supertask = True
1459+
task.need_to_drop += 1
1460+
i = task.inst.async_subtasks.add(subtask)
14701461
flat_results = [pack_async_result(i, subtask.state)]
14711462
else:
14721463
flat_results = [0]
@@ -1508,7 +1499,7 @@ async def canon_resource_drop(rt, sync, task, i):
15081499
else:
15091500
task.trap_if_on_the_stack(rt.impl)
15101501
else:
1511-
h.scope.drop_borrow()
1502+
h.scope.need_to_drop -= 1
15121503
return flat_results
15131504

15141505
### `canon resource.rep`
@@ -1564,8 +1555,5 @@ async def canon_task_yield(task):
15641555

15651556
async def canon_subtask_drop(task, i):
15661557
trap_if(not task.inst.may_leave)
1567-
subtask = task.inst.async_subtasks.remove(i)
1568-
trap_if(subtask.enqueued)
1569-
trap_if(subtask.state != CallState.DONE)
1570-
subtask.task.num_async_subtasks -= 1
1558+
task.inst.async_subtasks.remove(i).drop()
15711559
return []

design/mvp/canonical-abi/run_tests.py

-8
Original file line numberDiff line numberDiff line change
@@ -560,24 +560,20 @@ async def consumer(task, args):
560560
ptr = consumer_heap.realloc(0, 0, 1, 1)
561561
[ret] = await canon_lower(consumer_opts, eager_ft, eager_callee, task, [0, ptr])
562562
assert(ret == 0)
563-
assert(task.num_async_subtasks == 0)
564563
u8 = consumer_heap.memory[ptr]
565564
assert(u8 == 43)
566565
[ret] = await canon_lower(consumer_opts, toggle_ft, toggle_callee, task, [])
567566
assert(ret == (1 | (CallState.STARTED << 30)))
568-
assert(task.num_async_subtasks == 1)
569567
retp = ptr
570568
consumer_heap.memory[retp] = 13
571569
[ret] = await canon_lower(consumer_opts, blocking_ft, blocking_callee, task, [83, retp])
572570
assert(ret == (2 | (CallState.STARTING << 30)))
573-
assert(task.num_async_subtasks == 2)
574571
assert(consumer_heap.memory[retp] == 13)
575572
fut1.set_result(None)
576573
event, callidx = await task.wait()
577574
assert(event == EventCode.CALL_DONE)
578575
assert(callidx == 1)
579576
[] = await canon_subtask_drop(task, callidx)
580-
assert(task.num_async_subtasks == 1)
581577
event, callidx = await task.wait()
582578
assert(event == EventCode.CALL_STARTED)
583579
assert(callidx == 2)
@@ -588,12 +584,10 @@ async def consumer(task, args):
588584
assert(callidx == 2)
589585
assert(consumer_heap.memory[retp] == 44)
590586
fut3.set_result(None)
591-
assert(task.num_async_subtasks == 1)
592587
event, callidx = await task.wait()
593588
assert(event == EventCode.CALL_DONE)
594589
assert(callidx == 2)
595590
[] = await canon_subtask_drop(task, callidx)
596-
assert(task.num_async_subtasks == 0)
597591

598592
dtor_fut = asyncio.Future()
599593
dtor_value = None
@@ -609,14 +603,12 @@ async def dtor(task, args):
609603
assert(dtor_value is None)
610604
[ret] = await canon_resource_drop(rt, False, task, 1)
611605
assert(ret == (2 | (CallState.STARTED << 30)))
612-
assert(task.num_async_subtasks == 1)
613606
assert(dtor_value is None)
614607
dtor_fut.set_result(None)
615608
event, callidx = await task.wait()
616609
assert(event == CallState.DONE)
617610
assert(callidx == 2)
618611
[] = await canon_subtask_drop(task, callidx)
619-
assert(task.num_async_subtasks == 0)
620612

621613
[] = await canon_task_return(task, CoreFuncType(['i32'],[]), [42])
622614
return []

0 commit comments

Comments
 (0)