Skip to content

Add let! and do! support for F# async / Async<'T> #114

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Dec 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 24 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ See [release notes.txt](release-notes.txt) for the version history of `TaskSeq`.

## Overview

The `IAsyncEnumerable` interface was added to .NET in `.NET Core 3.0` and is part of `.NET Standard 2.1`. The main use-case was for iterative asynchronous enumeration over some resource. For instance, an event stream or a REST API interface with pagination, asynchronous reading over a list of files and accumulating the results, where each action can be modeled as a [`MoveNextAsync`][4] call on the [`IAsyncEnumerator<'T>`][5] given by a call to [`GetAsyncEnumerator()`][6].
The `IAsyncEnumerable` interface was added to .NET in `.NET Core 3.0` and is part of `.NET Standard 2.1`. The main use-case was for iterative asynchronous enumeration over some resource. For instance, an event stream or a REST API interface with pagination, asynchronous reading over a list of files and accumulating the results, where each action can be modeled as a [`MoveNextAsync`][4] call on the [`IAsyncEnumerator<'T>`][3] given by a call to [`GetAsyncEnumerator()`][6].

Since the introduction of `task` in F# the call for a native implementation of _task sequences_ has grown, in particular because proper iteration over an `IAsyncEnumerable` has proven challenging, especially if one wants to avoid mutable variables. This library is an answer to that call and applies the same _resumable state machine_ approach with `taskSeq`.

Expand Down Expand Up @@ -177,23 +177,28 @@ There are more differences:
| | `TaskSeq` | `AsyncSeq` |
|----------------------------|---------------------------------------------------------------------------------|----------------------------------------------------------------------|
| **Frameworks** | .NET 5.0+, NetStandard 2.1 | .NET 5.0+, NetStandard 2.0 and 2.1, .NET Framework 4.6.1+ |
| **Underlying type** | `System.Collections.Generic.IAsyncEnumerable<'T>` | Its own type, also called `IAsyncEnumerable<'T>`, but not compatible |
| **F# concept of** | `task` | `async` |
| **Underlying type** | [`Generic.IAsyncEnumerable<'T>`][3] <sup>[note #1](#tsnote1 "Full name System.Collections.Generic.IAsyncEnumerable&lt;'T>.")</sup>| Its own type, also called `IAsyncEnumerable<'T>`<sup>[note #1](#tsnote1 "Full name FSharp.Control.IAsyncEnumerable&lt;'T>.")</sup> |
| **Implementation** | State machine (statically compiled) | No state machine, continuation style |
| **Semantics** | `seq`-like: on-demand | `seq`-like: on-demand |
| **Disposability** | Asynchronous, through [`IAsyncDisposable`][7] | Synchronous, through `IDisposable` |
| **Support `let!`** | All `task`-like: `Async<'T>`, `Task<'T>`, `ValueTask<'T>` or any `GetAwaiter()` | `Async<'T>` only |
| **Support `do!`** | `Async<unit>`, `Task<unit>` and `Task`, `ValueTask<unit>` and `ValueTask` | `Async<unit>` only |
| **Support `yield!`** | `IAsyncEnumerable<'T>`, `AsyncSeq`, any sequence | `AsyncSeq` |
| **Support `for`** | `IAsyncEnumerable<'T>`, `AsyncSeq`, any sequence | `AsyncSeq`, any sequence |
| **Support `yield!`** | [`IAsyncEnumerable<'T>`][3] (= `TaskSeq`), `AsyncSeq`, any sequence | `AsyncSeq` |
| **Support `for`** | [`IAsyncEnumerable<'T>`][3] (= `TaskSeq`), `AsyncSeq`, any sequence | `AsyncSeq`, any sequence |
| **Behavior with `yield`** | Zero allocations; no `Task` or even `ValueTask` created | Allocates an F# `Async` wrapped in a singleton `AsyncSeq` |
| **Conversion to other** | `TaskSeq.toAsyncSeq` | `AsyncSeq.toAsyncEnum` |
| **Conversion from other** | Implicit (`yield!`) or `TaskSeq.ofAsyncSeq` | `AsyncSeq.ofAsyncEnum` |
| **Conversion to other** | `TaskSeq.toAsyncSeq` | [`AsyncSeq.toAsyncEnum`][22] |
| **Conversion from other** | Implicit (`yield!`) or `TaskSeq.ofAsyncSeq` | [`AsyncSeq.ofAsyncEnum`][23] |
| **Recursion in `yield!`** | **No** (requires F# support, upcoming) | Yes |
| **Based on F# concept of** | `task` | `async` |
| **`MoveNextAsync`** impl | `ValueTask<bool>` | `Async<'T option>` |
| **Cancellation** | Implicit token governs iteration | Implicit token flows to all subtasks per `async` semantics |
| **Performance** | Very high, negligible allocations | Slower, more allocations, due to using `async` |
| **Iteration semantics** | [Two operations][6], 'Next' is a value task, 'Current' must be called separately| One operation, 'Next' is `Async`, returns `option` with 'Current' |
| **`MoveNextAsync`** | [Returns `ValueTask<bool>`][4] | Returns `Async<'T option>` |
| **[`Current`][5]** | [Returns `'T`][5] | n/a |
| **Cancellation** | See [#133][], until 0.3.0: use `GetAsyncEnumerator(cancelToken)` | Implicit token flows to all subtasks per `async` semantics |
| **Performance** | Very high, negligible allocations | Slower, more allocations, due to using `async` and cont style |
| **Parallelism** | Possible with ChildTask; support will follow | Supported explicitly |

<sup>¹⁾ <a id="tsnote1"></a>_Both `AsyncSeq` and `TaskSeq` use a type called `IAsyncEnumerable<'T>`, but only `TaskSeq` uses the type from the BCL Generic Collections. `AsyncSeq` supports .NET Framework 4.6.x and NetStandard 2.0 as well, which do not have this type in the BCL._</sup>

## Status & planning

This project has stable features currently, but before we go full "version one", we'd like to complete the surface area. This section covers the status of that, with a full list of implemented functions below. Here's the shortlist:
Expand All @@ -207,7 +212,7 @@ This project has stable features currently, but before we go full "version one",

### Implementation progress

As of 9 November 2022: [Nuget package available][21]. In this phase, we will frequently update the package. Current:
As of 9 November 2022: [Nuget package available][21]. In this phase, we will frequently update the package, see [release notes.txt](release-notes.txt). Current version:

[![Nuget](https://img.shields.io/nuget/vpre/FSharp.Control.TaskSeq)](https://www.nuget.org/packages/FSharp.Control.TaskSeq/)

Expand Down Expand Up @@ -553,11 +558,11 @@ module TaskSeq =

[1]: https://github.com/fsprojects/FSharp.Control.TaskSeq/pull/25
[2]: https://github.com/xunit/xunit/issues/2587
[3]: https://learn.microsoft.com/en-us/dotnet/api/system.collections.generic.iasyncenumerable-1?view=net-7.0
[4]: https://learn.microsoft.com/en-us/dotnet/api/system.collections.generic.iasyncenumerator-1.movenextasync?view=net-7.0
[5]: https://learn.microsoft.com/en-us/dotnet/api/system.collections.generic.iasyncenumerator-1?view=net-7.0
[6]: https://learn.microsoft.com/en-us/dotnet/api/system.collections.generic.iasyncenumerable-1.getasyncenumerator?view=net-7.0
[7]: https://learn.microsoft.com/en-us/dotnet/api/system.iasyncdisposable?view=net-7.0
[3]: https://learn.microsoft.com/en-us/dotnet/api/system.collections.generic.iasyncenumerable-1?view=net-6.0
[4]: https://learn.microsoft.com/en-us/dotnet/api/system.collections.generic.iasyncenumerator-1.movenextasync?view=net-6.0
[5]: https://learn.microsoft.com/en-us/dotnet/api/system.collections.generic.iasyncenumerator-1.current?view=net-6.0
[6]: https://learn.microsoft.com/en-us/dotnet/api/system.collections.generic.iasyncenumerable-1.getasyncenumerator?view=net-6.0
[7]: https://learn.microsoft.com/en-us/dotnet/api/system.iasyncdisposable?view=net-6.0
[8]: https://stu.dev/iasyncenumerable-introduction/
[9]: https://learn.microsoft.com/en-us/archive/msdn-magazine/2019/november/csharp-iterating-with-async-enumerables-in-csharp-8
[10]: https://gist.github.com/akhansari/d88812b742aa6be1c35b4f46bd9f8532
Expand All @@ -572,6 +577,8 @@ module TaskSeq =
[19]: https://fsharpforfunandprofit.com/series/computation-expressions/
[20]: https://github.com/dotnet/fsharp/blob/d5312aae8aad650f0043f055bb14c3aa8117e12e/tests/benchmarks/CompiledCodeBenchmarks/TaskPerf/TaskPerf/taskSeq.fs
[21]: https://www.nuget.org/packages/FSharp.Control.TaskSeq#versions-body-tab
[22]: https://fsprojects.github.io/FSharp.Control.AsyncSeq/reference/fsharp-control-asyncseq.html#toAsyncEnum
[23]: https://fsprojects.github.io/FSharp.Control.AsyncSeq/reference/fsharp-control-asyncseq.html#fromAsyncEnum

[#2]: https://github.com/fsprojects/FSharp.Control.TaskSeq/pull/2
[#11]: https://github.com/fsprojects/FSharp.Control.TaskSeq/pull/11
Expand All @@ -587,6 +594,7 @@ module TaskSeq =
[#83]: https://github.com/fsprojects/FSharp.Control.TaskSeq/pull/83
[#90]: https://github.com/fsprojects/FSharp.Control.TaskSeq/pull/90
[#126]: https://github.com/fsprojects/FSharp.Control.TaskSeq/pull/126
[#133]: https://github.com/fsprojects/FSharp.Control.TaskSeq/issues/133

[issues]: https://github.com/fsprojects/FSharp.Control.TaskSeq/issues
[nuget]: https://www.nuget.org/packages/FSharp.Control.TaskSeq/
1 change: 1 addition & 0 deletions release-notes.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@

Release notes:
0.4.x (unreleased)
- adds `let!` and `do!` support for F#'s Async<'T>
- adds TaskSeq.takeWhile, takeWhileAsync, takeWhileInclusive, takeWhileInclusiveAsync, #126 (by @bartelink)
- adds AsyncSeq vs TaskSeq comparison chart, #131

Expand Down
46 changes: 45 additions & 1 deletion src/FSharp.Control.TaskSeq.Test/TaskSeq.Do.Tests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ let ``CE taskSeq: use 'do!' with a non-generic valuetask`` () =
let ``CE taskSeq: use 'do!' with a non-generic task`` () =
let mutable value = 0

taskSeq { do! (task { do value <- value + 1 }) |> Task.ignore }
taskSeq { do! task { do value <- value + 1 } |> Task.ignore }
|> verifyEmpty
|> Task.map (fun _ -> value |> should equal 1)

Expand All @@ -56,3 +56,47 @@ let ``CE taskSeq: use 'do!' with a task-delay`` () =
}
|> verifyEmpty
|> Task.map (fun _ -> value |> should equal 2)

[<Fact>]
let ``CE taskSeq: use 'do!' with Async`` () =
let mutable value = 0

taskSeq {
do value <- value + 1
do! Async.Sleep 50
do value <- value + 1
}
|> verifyEmpty
|> Task.map (fun _ -> value |> should equal 2)

[<Fact>]
let ``CE taskSeq: use 'do!' with Async - mutables`` () =
let mutable value = 0

taskSeq {
do! async { value <- value + 1 }
do! Async.Sleep 50
do! async { value <- value + 1 }
}
|> verifyEmpty
|> Task.map (fun _ -> value |> should equal 2)

[<Fact>]
let ``CE taskSeq: use 'do!' with all kinds of overloads at once`` () =
let mutable value = 0

// this test should be expanded in case any new overload is added
// that is supported by `do!`, to ensure the high/low priority
// overloads still work properly
taskSeq {
do! task { do value <- value + 1 } |> Task.ignore
do! ValueTask <| task { do value <- value + 1 }
do! ValueTask.ofTask (task { do value <- value + 1 })
do! ValueTask<_>(()) // unit valuetask that completes immediately
do! Task.fromResult (()) // unit Task that completes immediately
do! Task.Delay 0
do! Async.Sleep 0
do! async { value <- value + 1 } // eq 4
}
|> verifyEmpty
|> Task.map (fun _ -> value |> should equal 4)
70 changes: 70 additions & 0 deletions src/FSharp.Control.TaskSeq.Test/TaskSeq.Let.Tests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,73 @@ let ``CE taskSeq: use 'let!' with a non-generic task`` () =
}
|> verifyEmpty
|> Task.map (fun _ -> value |> should equal 1)

[<Fact>]
let ``CE taskSeq: use 'let!' with Async`` () =
let mutable value = 0

taskSeq {
do value <- value + 1
let! _ = Async.Sleep 50
do value <- value + 1
}
|> verifyEmpty
|> Task.map (fun _ -> value |> should equal 2)

[<Fact>]
let ``CE taskSeq: use 'let!' with Async - mutables`` () =
let mutable value = 0

taskSeq {
do! async { value <- value + 1 }
do value |> should equal 1
let! x = async { return value + 1 }
do x |> should equal 2
do! Async.Sleep 50
do! async { value <- value + 1 }
do value |> should equal 2
let! ret = async { return value + 1 }
do value |> should equal 2
do ret |> should equal 3
yield x + ret // eq 5
}
|> TaskSeq.exactlyOne
|> Task.map (should equal 5)

[<Fact>]
let ``CE taskSeq: use 'let!' with all kinds of overloads at once`` () =
let mutable value = 0

// this test should be expanded in case any new overload is added
// that is supported by `let!`, to ensure the high/low priority
// overloads still work properly
taskSeq {
let! a = task { // eq 1
do! Task.Delay 10
do value <- value + 1
return value
}

let! b = // eq 2
task {
do! Task.Delay 50
do value <- value + 1
return value
}
|> ValueTask<int>

let! c = ValueTask<_>(4) // valuetask that completes immediately
let! _ = Task.Factory.StartNew(fun () -> value <- value + 1) // non-generic Task with side effect
let! d = Task.fromResult 99 // normal Task that completes immediately
let! _ = Async.Sleep 0 // unit Async

let! e = async {
do! Async.Sleep 40
do value <- value + 1 // eq 4 now
return value
}

yield! [ a; b; c; d; e ]
}
|> TaskSeq.toListAsync
|> Task.map (should equal [ 1; 2; 4; 99; 4 ])
36 changes: 36 additions & 0 deletions src/FSharp.Control.TaskSeq/TaskSeqBuilder.fs
Original file line number Diff line number Diff line change
Expand Up @@ -657,6 +657,42 @@ module HighPriority =
sm.Data.current <- ValueNone
false)

member inline _.Bind
(
asyncSource: Async<'TResult1>,
continuation: ('TResult1 -> ResumableTSC<'T>)
) : ResumableTSC<'T> =
ResumableTSC<'T>(fun sm ->
let mutable awaiter =
Async
.StartAsTask(asyncSource, cancellationToken = sm.Data.cancellationToken)
.GetAwaiter()

let mutable __stack_fin = true

Debug.logInfo "at Bind"

if not awaiter.IsCompleted then
// This will yield with __stack_fin2 = false
// This will resume with __stack_fin2 = true
let __stack_fin2 = ResumableCode.Yield().Invoke(&sm)
__stack_fin <- __stack_fin2

Debug.logInfo ("at Bind: with __stack_fin = ", __stack_fin)
Debug.logInfo ("at Bind: this.completed = ", sm.Data.completed)

if __stack_fin then
Debug.logInfo "at Bind: finished awaiting, calling continuation"
let result = awaiter.GetResult()
(continuation result).Invoke(&sm)

else
Debug.logInfo "at Bind: await further"

sm.Data.awaiter <- awaiter
sm.Data.current <- ValueNone
false)

[<AutoOpen>]
module TaskSeqBuilder =
/// Builds an asynchronous task sequence based on IAsyncEnumerable<'T> using computation expression syntax.
Expand Down
2 changes: 2 additions & 0 deletions src/FSharp.Control.TaskSeq/TaskSeqBuilder.fsi
Original file line number Diff line number Diff line change
Expand Up @@ -191,3 +191,5 @@ module HighPriority =
type TaskSeqBuilder with

member inline Bind: task: Task<'TResult1> * continuation: ('TResult1 -> ResumableTSC<'T>) -> ResumableTSC<'T>
member inline Bind:
asyncSource: Async<'TResult1> * continuation: ('TResult1 -> ResumableTSC<'T>) -> ResumableTSC<'T>