plmbr
Reusable pipes for data stream processing.
from itertools import zip_longest
import json
from plmbr.pipe import pipe
from plmbr.pipes import *
class validate(Pipe):
def __init__(self, *vals):
self.vals = vals
def pipe(self, items: Iterator) -> Iterator:
for expected, actual in zip_longest(self.vals, items):
print(f'expecting {expected} got {actual}')
assert actual == expected
yield actual
def test_null(): (
range(3)
- null()
> validate(0, 1, 2))
def test_json_loads():
items = [{'a': 2}, {'b': 4}]
(
(json.dumps(i) for i in items)
- json_loads()
> validate(*items))
def test_json_dumps():
items = [{'a': 2}, {'b': 4}]
(
items
- json_dumps()
> validate(*[json.dumps(i) for i in items]))
def test_batch():
(
range(3)
- batch(batch_size=2)
> validate([0, 1], [2]))
(
[0, 1, 2]
- batch(batch_size=2)
> validate([0, 1], [2]))
def test_unbatch(): (
[range(2), range(3)]
- unbatch()
> validate(0, 1, 0, 1, 2))
def test_to(): (
range(3)
- to(lambda i: i + 1)
> validate(1, 2, 3))
def test_keep(): (
range(3)
- keep(lambda i: i > 0)
> validate(1, 2))
def test_drop_fields(): (
({'a': i, 'b': i, 'c': i} for i in range(3))
- drop_fields('b', 'c')
> validate({'a': 0}, {'a': 1}, {'a': 2}))
def test_uniq(): (
({'a': 0, 'b': i // 2, 'c': i} for i in range(3))
- uniq('a', 'b')
> validate(
{'a': 0, 'b': 0, 'c': 0},
{'a': 0, 'b': 1, 'c': 2}))
def test_sample(): (
range(10)
- sample(prob=.5)
> validate(1, 4, 8, 9))
def test_window():
(
range(4)
- window(size=2)
> validate((0, 1), (1, 2), (2, 3)))
(
[0, 1, 2, 3]
- window(size=2)
> validate((0, 1), (1, 2), (2, 3)))
def test_append():
res = [8]
(
range(4)
- log()
> append(res)
)
assert res == [8, 0, 1, 2, 3]
def test_tee(): (
[1, 2, 3]
- tee(
keep(lambda i: i < 3)
- to(lambda i: i * 2),
to(lambda i: i * 10))
> validate(2, 4, 10, 20, 30))
def test_catch():
def bad_func(items):
for i in items:
if i % 2:
raise Exception(i)
yield i
err = []
(
range(3)
- pipe(bad_func)
- validate(0, 2)
> catch(lambda e: err.append(e.args[0])))
print(err)
assert err == [1]