Skip to content

Commit b6cbd79

Browse files
committed
Add takeWhile, takeWhileInclusive
1 parent f0e27ce commit b6cbd79

File tree

3 files changed

+85
-0
lines changed

3 files changed

+85
-0
lines changed

src/FSharp.Control.TaskSeq/TaskSeq.fs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,10 @@ module TaskSeq =
253253
let chooseAsync chooser source = Internal.choose (TryPickAsync chooser) source
254254
let filter predicate source = Internal.filter (Predicate predicate) source
255255
let filterAsync predicate source = Internal.filter (PredicateAsync predicate) source
256+
let takeWhile predicate source = Internal.takeWhile (Predicate predicate) source
257+
let takeWhileAsync predicate source = Internal.takeWhile (PredicateAsync predicate) source
258+
let takeWhileInclusive predicate source = Internal.takeWhileInclusive (Predicate predicate) source
259+
let takeWhileInclusiveAsync predicate source = Internal.takeWhileInclusive (PredicateAsync predicate) source
256260
let tryPick chooser source = Internal.tryPick (TryPick chooser) source
257261
let tryPickAsync chooser source = Internal.tryPick (TryPickAsync chooser) source
258262
let tryFind predicate source = Internal.tryFind (Predicate predicate) source

src/FSharp.Control.TaskSeq/TaskSeq.fsi

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -365,6 +365,36 @@ module TaskSeq =
365365
/// </summary>
366366
val filter: predicate: ('T -> bool) -> source: taskSeq<'T> -> taskSeq<'T>
367367

368+
/// <summary>
369+
/// Yields items from the source while the <paramref name="predicate" /> function returns <see cref="true" />.
370+
/// The first <see cref="false" /> result concludes consumption of the source.
371+
/// If <paramref name="predicate" /> is asynchronous, consider using <see cref="TaskSeq.takeWhileAsync" />.
372+
/// </summary>
373+
val takeWhile: predicate: ('T -> bool) -> source: taskSeq<'T> -> taskSeq<'T>
374+
375+
/// <summary>
376+
/// Yields items from the source while the <paramref name="predicate" /> asynchronous function returns <see cref="true" />.
377+
/// The first <see cref="false" /> result concludes consumption of the source.
378+
/// If <paramref name="predicate" /> does not need to be asynchronous, consider using <see cref="TaskSeq.takeWhile" />.
379+
/// </summary>
380+
val takeWhileAsync: predicate: ('T -> #Task<bool>) -> source: taskSeq<'T> -> taskSeq<'T>
381+
382+
/// <summary>
383+
/// Yields items from the source while the <paramref name="predicate" /> function returns <see cref="true" />.
384+
/// The first <see cref="false" /> result concludes consumption of the source, but is included in the result.
385+
/// If <paramref name="predicate" /> is asynchronous, consider using <see cref="TaskSeq.takeWhileInclusiveAsync" />.
386+
/// If the final item is not desired, consider using <see cref="TaskSeq.takeWhile" />.
387+
/// </summary>
388+
val takeWhileInclusive: predicate: ('T -> bool) -> source: taskSeq<'T> -> taskSeq<'T>
389+
390+
/// <summary>
391+
/// Yields items from the source while the <paramref name="predicate" /> asynchronous function returns <see cref="true" />.
392+
/// The first <see cref="false" /> result concludes consumption of the source, but is included in the result.
393+
/// If <paramref name="predicate" /> does not need to be asynchronous, consider using <see cref="TaskSeq.takeWhileInclusive" />.
394+
/// If the final item is not desired, consider using <see cref="TaskSeq.takeWhileAsync" />.
395+
/// </summary>
396+
val takeWhileInclusiveAsync: predicate: ('T -> #Task<bool>) -> source: taskSeq<'T> -> taskSeq<'T>
397+
368398
/// <summary>
369399
/// Returns a new collection containing only the elements of the collection
370400
/// for which the given asynchronous function <paramref name="predicate" /> returns <see cref="true" />.

src/FSharp.Control.TaskSeq/TaskSeqInternal.fs

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -531,6 +531,57 @@ module internal TaskSeqInternal =
531531
| true -> yield item
532532
| false -> ()
533533
}
534+
535+
let takeWhile predicate (source: taskSeq<_>) = taskSeq {
536+
use e = source.GetAsyncEnumerator(CancellationToken())
537+
let! step = e.MoveNextAsync()
538+
let mutable go = step
539+
540+
match predicate with
541+
| Predicate predicate ->
542+
while go do
543+
let value = e.Current
544+
if predicate value then
545+
yield value
546+
let! more = e.MoveNextAsync()
547+
go <- more
548+
else go <- false
549+
| PredicateAsync predicate ->
550+
while go do
551+
let value = e.Current
552+
match! predicate value with
553+
| true ->
554+
yield value
555+
let! more = e.MoveNextAsync()
556+
go <- more
557+
| false -> go <- false
558+
}
559+
560+
let takeWhileInclusive predicate (source: taskSeq<_>) = taskSeq {
561+
use e = source.GetAsyncEnumerator(CancellationToken())
562+
let! step = e.MoveNextAsync()
563+
let mutable go = step
564+
565+
match predicate with
566+
| Predicate predicate ->
567+
while go do
568+
let value = e.Current
569+
yield value
570+
if predicate value then
571+
let! more = e.MoveNextAsync()
572+
go <- more
573+
else go <- false
574+
| PredicateAsync predicate ->
575+
while go do
576+
let value = e.Current
577+
yield value
578+
match! predicate value with
579+
| true ->
580+
let! more = e.MoveNextAsync()
581+
go <- more
582+
| false -> go <- false
583+
}
584+
534585
// Consider turning using an F# version of this instead?
535586
// https://github.com/i3arnon/ConcurrentHashSet
536587
type ConcurrentHashSet<'T when 'T: equality>(ct) =

0 commit comments

Comments
 (0)