streamable: fluent iteration

1. install

pip install streamable

2. import

from streamable import Stream

3. init

integers: Stream[int] = Stream(lambda: range(10))

Instantiate a Stream[T] by providing a function that returns a fresh Iterable[T] (the data source).

4. operate

odd_integer_strings: Stream[str] = (
    .filter(lambda n: n % 2)

Stream instances are immutable: operations return a new stream.

Operations are lazy: they do not iterate over the source.

5. iterate

Stream[T] extends Iterable[T] allowing:

>>> list(odd_integer_strings)
['1', '3', '5', '7', '9']
>>> set(odd_integer_strings)
{'9', '1', '5', '3', '7'}
>>> from functools import reduce
>>> from operator import mul
>>> reduce(mul, integers)
>>> for odd_integer_string in odd_integer_strings: ...

📒 Operations


Applies a function on elements.

integer_strings: Stream[str] = integers.map(str)

It has an optional concurrency: int parameter to execute the function concurrently while preserving the order (threads).


Applies a function on elements like .map but yields the elements instead of the results.

printed_integers: Stream[int] = integers.foreach(print)

It has an optional concurrency: int parameter to execute the function concurrently while preserving the order (threads).


Filters elements based on a predicate function.

pair_integers: Stream[int] = integers.filter(lambda n: n % 2 == 0)


Groups elements.

parity_groups: Stream[List[int]] = integers.group(size=100, seconds=4, by=lambda i: i % 2)

A group is a list of size elements for which by returns the same value, but it may contain fewer elements in these cases:

  • seconds have elapsed since the last yield of a group
  • upstream is exhausted
  • upstream raises an exception

All the parameters are optional.


Ungroups elements assuming that they are Iterables.

integers: Stream[int] = parity_groups.flatten()

It has an optional concurrency parameter to flatten several iterables concurrently (threads).


Limits the rate at which elements are yielded up to a maximum frequency (elements per second).

slow_integers: Stream[int] = integers.slow(frequency=2)


Catches exceptions that satisfy a predicate function.

safe_inverse_floats: Stream[float] = (
    .map(lambda n: 1 / n)
    .catch(lambda ex: isinstance(ex, ZeroDivisionError))

It has an optional raise_at_exhaustion parameter to raise the first catched exception when iteration ends.


Logs the progress of iterations over this stream.


observed_slow_integers: Stream[int] = slow_integers.observe(what="integers from 0 to 9")

you should get:

INFO: iteration over 'integers from 0 to 9' will be observed.
INFO: after 0:00:00.000283, 0 error and 1 `integers from 0 to 9` yielded.
INFO: after 0:00:00.501373, 0 error and 2 `integers from 0 to 9` yielded.
INFO: after 0:00:01.501346, 0 error and 4 `integers from 0 to 9` yielded.
INFO: after 0:00:03.500864, 0 error and 8 `integers from 0 to 9` yielded.
INFO: after 0:00:04.500547, 0 error and 10 `integers from 0 to 9` yielded.

The amount of logs will never be overwhelming because they are produced logarithmically e.g. the 11th log will be produced when the iteration reaches the 1024th element.


Limits the number of elements yielded.

five_first_integers: Stream[int] = integers.limit(count=5)

📦 Notes Box


One can leverage this module to write readable custom ETL jobs, especially those dealing with third party APIs.

Check the README dedicated to ETL.


This is a typed module, you can mypy it.

supported Python versions

Compatible with Python 3.7 or newer (unittested for: 3.7.17, 3.8.18, 3.9.18, 3.10.13, 3.11.7, 3.12.1).

Tip: enclose operations in parentheses to avoid trailing backslashes \.

stream: Stream[str] = (
    Stream(lambda: range(10))


The Stream's methods are also exposed as functions:

from streamable.functions import slow

iterator: Iterator[int] = ...
slow_iterator: Iterator[int] = slow(iterator)

set logging level

import logging


visitor pattern

The Stream class exposes an .accept method and you can implement a visitor by extending the streamable.visitor.Visitor class:

from streamable.visitor import Visitor

class DepthVisitor(Visitor[int]):
    def visit_stream(self, stream: Stream) -> int:
        if not stream.upstream:
            return 1
        return 1 + stream.upstream.accept(self)

def stream_depth(stream: Stream) -> int:
    return stream.accept(DepthVisitor())
>>> stream_depth(odd_integer_strings)