- πΉ
Stream[T]is a decorator forIterable[T]/AsyncIterable[T] - π lazy operations chaining
- π concurrent via threads / processes / coroutines (
async) - π‘οΈ battle-tested with Python 3.7 to 3.15 (compatible with PyPy)
(zero dependencies)
pip install streamable
from streamable import StreamCreate a Stream[T] decorating an Iterable[T]/AsyncIterable[T]:
integers: Stream[int] = Stream(range(10))Chain lazy operations (only evaluated during iteration), each returning a new immutable Stream:
inverses: Stream[float] = (
integers
.map(lambda n: round(1 / n, 2))
.catch(ZeroDivisionError, replacement=float("inf"))
)Iterate over a Stream[T] like any Iterable[T]/AsyncIterable[T]:
>>> list(inverses)
[inf, 1.0, 0.5, 0.33, 0.25, 0.2, 0.17, 0.14, 0.12, 0.11]Elements are processed on-the-fly as the iteration advances.
Let's walk through the Stream's features with an Extract-Transform-Load script:
This toy script gets PokΓ©mons concurrently from PokΓ©API, and writes the quadrupeds from the first three generations into a CSV file, in 5-seconds batches:
import csv
from datetime import timedelta
import itertools
import requests
from streamable import Stream
with open("./quadruped_pokemons.csv", mode="w") as file:
fields = ["id", "name", "is_legendary", "base_happiness", "capture_rate"]
writer = csv.DictWriter(file, fields, extrasaction='ignore')
writer.writeheader()
pipeline: Stream = (
# Infinite Stream[int] of Pokemon ids starting from PokΓ©mon #1: Bulbasaur
Stream(itertools.count(1))
# Limit to 16 requests per second to be friendly to our fellow PokΓ©API devs
.throttle(16, per=timedelta(seconds=1))
# GET pokemons concurrently using a pool of 8 threads
.map(lambda poke_id: f"https://pokeapi.co/api/v2/pokemon-species/{poke_id}")
.map(requests.get, concurrency=8)
.foreach(requests.Response.raise_for_status)
.map(requests.Response.json)
# Stop the iteration when reaching the 1st pokemon of the 4th generation
.truncate(when=lambda poke: poke["generation"]["name"] == "generation-iv")
.observe("pokemons")
# Keep only quadruped Pokemons
.filter(lambda poke: poke["shape"]["name"] == "quadruped")
.observe("quadruped pokemons")
# Write a batch of pokemons every 5 seconds to the CSV file
.group(interval=timedelta(seconds=5))
.foreach(writer.writerows)
.flatten()
.observe("written pokemons")
# Catch exceptions and raises the 1st one at the end of the iteration
.catch(Exception, finally_raise=True)
)
# Start a full iteration
pipeline()Let's write an async version of this script:
- using
httpx.AsyncCLienttogether with the.amapoperation (theasynccounterpart of.map). - instead of calling
pipeline()to iterate over it as anIterable, let'sawait pipelineto iterate over it as anAsyncIterable.
import asyncio
import csv
from datetime import timedelta
import itertools
import httpx
from streamable import Stream
async def main() -> None:
with open("./quadruped_pokemons.csv", mode="w") as file:
fields = ["id", "name", "is_legendary", "base_happiness", "capture_rate"]
writer = csv.DictWriter(file, fields, extrasaction='ignore')
writer.writeheader()
async with httpx.AsyncClient() as http:
pipeline: Stream = (
# Infinite Stream[int] of Pokemon ids starting from PokΓ©mon #1: Bulbasaur
Stream(itertools.count(1))
# Limit to 16 requests per second to be friendly to our fellow PokΓ©API devs
.throttle(16, per=timedelta(seconds=1))
# GET pokemons via 8 concurrent coroutines
.map(lambda poke_id: f"https://pokeapi.co/api/v2/pokemon-species/{poke_id}")
.amap(http.get, concurrency=8)
.foreach(httpx.Response.raise_for_status)
.map(httpx.Response.json)
# Stop the iteration when reaching the 1st pokemon of the 4th generation
.truncate(when=lambda poke: poke["generation"]["name"] == "generation-iv")
.observe("pokemons")
# Keep only quadruped Pokemons
.filter(lambda poke: poke["shape"]["name"] == "quadruped")
.observe("quadruped pokemons")
# Write a batch of pokemons every 5 seconds to the CSV file
.group(interval=timedelta(seconds=5))
.foreach(writer.writerows)
.flatten()
.observe("written pokemons")
# Catch exceptions and raises the 1st one at the end of the iteration
.catch(Exception, finally_raise=True)
)
# Start a full async iteration
await pipeline
asyncio.run(main())- Tryolabs' Top 10 Python libraries of 2024 (LinkedIn, Reddit)
- PyCoderβs Weekly x Real Python
- @PythonHub's tweet
- Upvoters on our showcase Reddit post
Tip
If an operation raises an exception while processing an element, you can handle it and continue the iteration:
π show snippet
from contextlib import suppress
casted_ints: Iterator[int] = iter(
Stream("0123_56789")
.map(int)
.group(3)
.flatten()
)
collected: List[int] = []
with suppress(ValueError):
collected.extend(casted_ints)
assert collected == [0, 1, 2, 3]
collected.extend(casted_ints)
assert collected == [0, 1, 2, 3, 5, 6, 7, 8, 9]Declaring a Stream is lazy,
odd_int_strings = Stream(range(1_000_000)).filter(lambda n: n % 2).map(str)and there is zero overhead during iteration compared to builtins, iter(odd_int_strings) visits the operations lineage and returns exactly this iterator:
map(str, filter(lambda n: n % 2, range(1_000_000)))Operations have been implemented with speed in mind. If you have any ideas for improvement, whether performance-related or not, an issue, PR, or discussion would be very much appreciated! π (CONTRIBUTING.md)
.map |
transform elements |
.foreach |
call a side effect function on elements |
.group / .groupby |
batch a certain number of elements, by a given key, over a time interval |
.flatten |
explode iterable elements |
.filter |
remove elements |
.distinct |
remove duplicates |
.truncate |
cut the stream |
.skip |
ignore head elements |
.catch |
handle exceptions |
.throttle |
control the rate of iteration |
.observe |
log elements/errors counters |
Important
A Stream exposes a minimalist yet expressive set of operations to manipulate its elements, but creating its source or consuming it is not its responsability, it's meant to be combined with specialized libraries (csv, json, pyarrow, psycopg2, boto3, requests, httpx, ...).
Note
async counterparts: For each operation that takes a function (such as .map), there is an equivalent that accepts an async function (such as .amap).
You can freely mix synchronous and asynchronous operations within the same Stream. The result can then be consumed either as an Iterable or as an AsyncIterable. When a stream involving async operations is consumed as an Iterable, a temporary asyncio event loop is attached to it.
Applies a transformation on elements:
π show snippet
integer_strings: Stream[str] = integers.map(str)
assert list(integer_strings) == ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9']Applies the transformation via
concurrencythreads.
π show snippet
import requests
pokemon_names: Stream[str] = (
Stream(range(1, 4))
.map(lambda i: f"https://pokeapi.co/api/v2/pokemon-species/{i}")
.map(requests.get, concurrency=3)
.map(requests.Response.json)
.map(lambda poke: poke["name"])
)
assert list(pokemon_names) == ['bulbasaur', 'ivysaur', 'venusaur']Note
Memory-efficient: Only concurrency upstream elements are pulled for processing; the next upstream element is pulled only when a result is yielded downstream.
Note
Ordering: it yields results in the upstream order (FIFO), set ordered=False to yield results as they become available (First Done, First Out).
Same but set
via="process":
π show snippet
if __name__ == "__main__":
state: List[int] = []
# integers are mapped
assert integers.map(state.append, concurrency=4, via="process").count() == 10
# but the `state` of the main process is not mutated
assert state == []
.amapcan apply anasyncfunction concurrently.
π show snippet
import asyncio
import httpx
async def main() -> None:
async with httpx.AsyncClient() as http:
pokemon_names: Stream[str] = (
Stream(range(1, 4))
.map(lambda i: f"https://pokeapi.co/api/v2/pokemon-species/{i}")
.amap(http.get, concurrency=3)
.map(httpx.Response.json)
.map(lambda poke: poke["name"])
)
# consume as an AsyncIterable[str]
assert [name async for name in pokemon_names] == ['bulbasaur', 'ivysaur', 'venusaur']
asyncio.run(main())The
starfunction decorator transforms a function that takes several positional arguments into a function that takes a tuple:
π show snippet
from streamable import star
zeros: Stream[int] = (
Stream(enumerate(integers))
.map(star(lambda index, integer: index - integer))
)
assert list(zeros) == [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]Applies a side effect on elements:
π show snippet
state: List[int] = []
appending_integers: Stream[int] = integers.foreach(state.append)
assert list(appending_integers) == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
assert state == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]Similar to
.map:
- set the
concurrencyparameter for thread-based concurrency- set
via="process"for process-based concurrency- set
ordered=Falsefor First Done First Out- The
.aforeachoperation can apply anasynceffect concurrently.
Groups into
Lists
... up to a given group
size:
π show snippet
integers_by_5: Stream[List[int]] = integers.group(size=5)
assert list(integers_by_5) == [[0, 1, 2, 3, 4], [5, 6, 7, 8, 9]]... and/or co-groups
bya given key:
π show snippet
integers_by_parity: Stream[List[int]] = integers.group(by=lambda n: n % 2)
assert list(integers_by_parity) == [[0, 2, 4, 6, 8], [1, 3, 5, 7, 9]]... and/or co-groups the elements yielded by the upstream within a given time
interval:
π show snippet
from datetime import timedelta
integers_within_1_sec: Stream[List[int]] = (
integers
.throttle(2, per=timedelta(seconds=1))
.group(interval=timedelta(seconds=0.99))
)
assert list(integers_within_1_sec) == [[0, 1, 2], [3, 4], [5, 6], [7, 8], [9]]Tip
Combine the size/by/interval parameters:
π show snippet
integers_by_parity_by_2: Stream[List[int]] = (
integers
.group(by=lambda n: n % 2, size=2)
)
assert list(integers_by_parity_by_2) == [[0, 2], [1, 3], [4, 6], [5, 7], [8], [9]]Like
.group, but groups into(key, elements)tuples:
π show snippet
integers_by_parity: Stream[Tuple[str, List[int]]] = (
integers
.groupby(lambda n: "odd" if n % 2 else "even")
)
assert list(integers_by_parity) == [("even", [0, 2, 4, 6, 8]), ("odd", [1, 3, 5, 7, 9])]Tip
Then "starmap" over the tuples:
π show snippet
from streamable import star
counts_by_parity: Stream[Tuple[str, int]] = (
integers_by_parity
.map(star(lambda parity, ints: (parity, len(ints))))
)
assert list(counts_by_parity) == [("even", 5), ("odd", 5)]Ungroups elements assuming that they are
Iterables (orAsyncIterables for.aflatten):
π show snippet
even_then_odd_integers: Stream[int] = integers_by_parity.flatten()
assert list(even_then_odd_integers) == [0, 2, 4, 6, 8, 1, 3, 5, 7, 9]Concurrently flattens
concurrencyiterables via threads (or via coroutines for.aflatten):
π show snippet
mixed_ones_and_zeros: Stream[int] = (
Stream([[0] * 4, [1] * 4])
.flatten(concurrency=2)
)
assert list(mixed_ones_and_zeros) == [0, 1, 0, 1, 0, 1, 0, 1]Keeps only the elements that satisfy a condition:
π show snippet
even_integers: Stream[int] = integers.filter(lambda n: n % 2 == 0)
assert list(even_integers) == [0, 2, 4, 6, 8]Removes duplicates:
π show snippet
distinct_chars: Stream[str] = Stream("foobarfooo").distinct()
assert list(distinct_chars) == ["f", "o", "b", "a", "r"]specifying a deduplication
key:
π show snippet
strings_of_distinct_lengths: Stream[str] = (
Stream(["a", "foo", "bar", "z"])
.distinct(len)
)
assert list(strings_of_distinct_lengths) == ["a", "foo"]Warning
During iteration, all distinct elements that are yielded are retained in memory to perform deduplication. However, you can remove only consecutive duplicates without a memory footprint by setting consecutive_only=True:
π show snippet
consecutively_distinct_chars: Stream[str] = (
Stream("foobarfooo")
.distinct(consecutive_only=True)
)
assert list(consecutively_distinct_chars) == ["f", "o", "b", "a", "r", "f", "o"]Ends iteration once a given number of elements have been yielded:
π show snippet
five_first_integers: Stream[int] = integers.truncate(5)
assert list(five_first_integers) == [0, 1, 2, 3, 4]or
whena condition is satisfied:
π show snippet
five_first_integers: Stream[int] = integers.truncate(when=lambda n: n == 5)
assert list(five_first_integers) == [0, 1, 2, 3, 4]If both
countandwhenare set, truncation occurs as soon as either condition is met.
Skips the first specified number of elements:
π show snippet
integers_after_five: Stream[int] = integers.skip(5)
assert list(integers_after_five) == [5, 6, 7, 8, 9]or skips elements
untila predicate is satisfied:
π show snippet
integers_after_five: Stream[int] = integers.skip(until=lambda n: n >= 5)
assert list(integers_after_five) == [5, 6, 7, 8, 9]If both
countanduntilare set, skipping stops as soon as either condition is met.
Catches a given type of exception, and optionally yields a
replacementvalue:
π show snippet
inverses: Stream[float] = (
integers
.map(lambda n: round(1 / n, 2))
.catch(ZeroDivisionError, replacement=float("inf"))
)
assert list(inverses) == [float("inf"), 1.0, 0.5, 0.33, 0.25, 0.2, 0.17, 0.14, 0.12, 0.11]You can specify an additional
whencondition for the catch:
π show snippet
import requests
from requests.exceptions import ConnectionError
status_codes_ignoring_resolution_errors: Stream[int] = (
Stream(["https://github.com", "https://foo.bar", "https://github.com/foo/bar"])
.map(requests.get, concurrency=2)
.catch(ConnectionError, when=lambda error: "Max retries exceeded with url" in str(error))
.map(lambda response: response.status_code)
)
assert list(status_codes_ignoring_resolution_errors) == [200, 404]It has an optional
finally_raise: boolparameter to raise the first exception caught (if any) when the iteration terminates.
Tip
Leverage when to apply side effects on catch:
π show snippet
errors: List[Exception] = []
def store_error(error: Exception) -> bool:
errors.append(error) # applies effect
return True # signals to catch the error
integers_in_string: Stream[int] = (
Stream("012345foo6789")
.map(int)
.catch(ValueError, when=store_error)
)
assert list(integers_in_string) == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
assert len(errors) == len("foo")Limits the number of yields
pertime interval:
π show snippet
from datetime import timedelta
three_integers_per_second: Stream[int] = integers.throttle(3, per=timedelta(seconds=1))
# takes 3s: ceil(10 integers / 3 per_second) - 1
assert list(three_integers_per_second) == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]Logs the progress of iterations:
π show snippet
>>> assert list(integers.throttle(2, per=timedelta(seconds=1)).observe("integers")) == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]INFO: [duration=0:00:00.001793 errors=0] 1 integers yielded
INFO: [duration=0:00:00.004388 errors=0] 2 integers yielded
INFO: [duration=0:00:01.003655 errors=0] 4 integers yielded
INFO: [duration=0:00:03.003196 errors=0] 8 integers yielded
INFO: [duration=0:00:04.003852 errors=0] 10 integers yielded
Note
To avoid flooding, logs are emitted only when the number of yielded elements (or errors) reaches powers of 2.
Tip
To mute these logs, set the logging level above INFO:
π show snippet
import logging
logging.getLogger("streamable").setLevel(logging.WARNING)Concatenates streams:
π show snippet
assert list(integers + integers) == [0, 1, 2, 3 ,4, 5, 6, 7, 8, 9, 0, 1, 2, 3 ,4, 5, 6, 7, 8, 9]Use the builtins'
zipfunction:
π show snippet
from streamable import star
cubes: Stream[int] = (
Stream(zip(integers, integers, integers)) # Stream[Tuple[int, int, int]]
.map(star(lambda a, b, c: a * b * c)) # Stream[int]
)
assert list(cubes) == [0, 1, 8, 27, 64, 125, 216, 343, 512, 729]Although consuming the stream is beyond the scope of this library, it provides two basic shorthands to trigger an iteration:
.countiterates over the stream until exhaustion and returns the number of elements yielded:
π show snippet
assert integers.count() == 10The
.acount(asyncmethod) iterates over the stream as anAsyncIterableuntil exhaustion and returns the number of elements yielded:
π show snippet
assert asyncio.run(integers.acount()) == 10Calling the stream iterates over it until exhaustion, and returns it:
π show snippet
state: List[int] = []
appending_integers: Stream[int] = integers.foreach(state.append)
assert appending_integers() is appending_integers
assert state == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]Awaiting the stream iterates over it as an
AsyncIterableuntil exhaustion, and returns it:
π show snippet
async def test_await() -> None:
state: List[int] = []
appending_integers: Stream[int] = integers.foreach(state.append)
appending_integers is await appending_integers
assert state == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
asyncio.run(test_await())Calls a function, passing the stream as first argument, followed by
*args/**kwargsif any (inspired by the.pipefrom pandas or polars):
π show snippet
import pandas as pd
(
integers
.observe("ints")
.pipe(pd.DataFrame, columns=["integer"])
.to_csv("integers.csv", index=False)
)