Python Stream Processing


Keywords
data-engineering, data-processing, data-science, dataflow, machine-learning, python, rust, stream-processing, streaming-data
License
Apache-2.0
Install
pip install bytewax==0.19.0

Documentation

Actions Status PyPI Bytewax User Guide

Bytewax

Python Stateful Stream Processing Framework

Bytewax is a Python framework that simplifies event and stream processing. Because Bytewax couples the stream and event processing capabilities of Flink, Spark, and Kafka Streams with the friendly and familiar interface of Python, you can re-use the Python libraries you already know and love. Connect data sources, run stateful transformations and write to various different downstream systems with built-in connectors or existing Python libraries. Bytewax Dataflow Animation

How it all works

Bytewax is a Python framework and Rust distributed processing engine that uses a dataflow computational model to provide parallelizable stream processing and event processing capabilities similar to Flink, Spark, and Kafka Streams. You can use Bytewax for a variety of workloads from moving data à la Kafka Connect style all the way to advanced online machine learning workloads. Bytewax is not limited to streaming applications but excels anywhere that data can be distributed at the input and output.

Bytewax has an accompanying command line interface, waxctl, which supports the deployment of dataflows on cloud servers or Kubernetes. You can download it here.


Getting Started with Bytewax

pip install bytewax

Install waxctl

Dataflow, Input and Operators

A Bytewax dataflow is Python code that will represent an input, a series of processing steps, and an output. The inputs could range from a Kafka stream to a WebSocket and the outputs could vary from a data lake to a key-value store.

import json

from bytewax import operators as op
from bytewax.connectors.kafka import operators as kop
from bytewax.dataflow import Dataflow

Bytewax has input and output helpers for common input and output data sources but you can also create your own with the Sink and Source API.

At a high-level, the dataflow compute model is one in which a program execution is conceptualized as data flowing through a series of operator-based steps. Operators like map and filter are the processing primitives of Bytewax. Each of them gives you a “shape” of data transformation, and you give them regular Python functions to customize them to a specific task you need. See the documentation for a list of the available operators

BROKERS = ["localhost:19092"]
IN_TOPICS = ["in_topic"]
OUT_TOPIC = "out_topic"
ERR_TOPIC = "errors"


def deserialize(kafka_message):
    return json.loads(kafka_message.value)


def anonymize_email(event_data):
    event_data["email"] = "@".join(["******", event_data["email"].split("@")[-1]])
    return event_data


def remove_bytewax(event_data):
    return "bytewax" not in event_data["email"]


flow = Dataflow("kafka_in_out")
stream = kop.input("inp", flow, brokers=BROKERS, topics=IN_TOPICS)
# we can inspect the stream coming from the kafka topic to view the items within on std out for debugging
op.inspect("inspect-oks", stream.oks)
# we can also inspect kafka errors as a separate stream and raise an exception when one is encountered
errs = op.inspect("errors", stream.errs).then(op.raises, "crash-on-err")
deser_msgs = op.map("deserialize", stream.oks, deserialize)
anon_msgs = op.map("anon", deser_msgs, anonymize_email)
filtered_msgs = op.filter("filter_employees", anon_msgs, remove_bytewax)
processed = op.map("map", anon_msgs, lambda m: KafkaSinkMessage(None, json.dumps(m)))
# and finally output the cleaned data to a new topic
kop.output("out1", processed, brokers=BROKERS, topic=OUT_TOPIC)

Windowing, Reducing and Aggregating

Bytewax is a stateful stream processing framework, which means that some operations remember information across multiple events. Windows and aggregations are also stateful, and can be reconstructed in the event of failure. Bytewax can be configured with different state recovery mechanisms to durably persist state in order to recover from failure.

There are multiple stateful operators available like reduce, stateful_map and fold_window. The complete list can be found in the API documentation for all operators. Below we use the fold_window operator with a tumbling window based on system time to gather events and calculate the number of times events have occurred on a per-user basis.

from datetime import datetime, timedelta, timezone

from bytewax.dataflow import Dataflow
import bytewax.operators.window as win
from bytewax.operators.window import EventClockConfig, TumblingWindow
from bytewax.testing import TestingSource

flow = Dataflow("window_eg")

src = [
    {"user_id": "123", "value": 5, "time": "2023-1-1T00:00:00Z"},
    {"user_id": "123", "value": 7, "time": "2023-1-1T00:00:01Z"},
    {"user_id": "123", "value": 2, "time": "2023-1-1T00:00:07Z"},
]
inp = op.input("inp", flow, TestingSource(src))
keyed_inp = op.key_on("keyed_inp", inp, lambda x: x["user_id"])


# This function instructs the event clock on how to retrieve the
# event's datetime from the input.
# Note that the datetime MUST be UTC. If the datetime is using a different
# representation, we would have to convert it here.
def get_event_time(event):
    return datetime.fromisoformat(event["time"])


# Configure the `fold_window` operator to use the event time.
clock = EventClockConfig(get_event_time, wait_for_system_duration=timedelta(seconds=10))

# And a 5 seconds tumbling window
align_to = datetime(2023, 1, 1, tzinfo=timezone.utc)
windower = TumblingWindow(align_to=align_to, length=timedelta(seconds=5))

five_sec_buckets = win.collect_window("five_sec_buckets", keyed_inp, clock, windower)


def calc_avg(bucket):
    values = [event["value"] for event in bucket]
    if len(values) > 0:
        return sum(values) / len(values)
    else:
        return None


five_sec_avgs = op.map_value("avg_in_bucket", five_sec_buckets, calc_avg)

Merges and Joins

Merging or Joining multiple input streams is a common task for stream processing, Bytewax enables different types of joins to facilitate different patters.

Merging Streams

Merging streams is like concatenating, there is no logic and the resulting stream will potentially include heterogenous records.

from bytewax import operators as op

from bytewax.connectors.stdio import StdOutSink
from bytewax.dataflow import Dataflow
from bytewax.testing import TestingSource

flow = Dataflow("merge")

src_1 = [
    {"user_id": "123", "name": "Bumble"},
]
inp1 = op.input("inp1", flow, TestingSource(src_1))

src_2 = [
    {"user_id": "123", "email": "bee@bytewax.com"},
    {"user_id": "456", "email": "hive@bytewax.com"},
]
inp2 = op.input("inp2", flow, TestingSource(src_2))
merged_stream = op.merge("merge", inp1, inp2)
op.inspect("debug", merged_stream)
Joining Streams

Joining streams is different than merging because it uses logic to join the records in the streams together. The joins in Bytewax can be running or not. A regular join in streaming is more closely related to an inner join in SQL in that the dataflow will emit data downstream from a join when all of the sides of the join have matched on the key.

from bytewax import operators as op

from bytewax.connectors.stdio import StdOutSink
from bytewax.dataflow import Dataflow
from bytewax.testing import TestingSource

flow = Dataflow("join")

src_1 = [
    {"user_id": "123", "name": "Bumble"},
]
inp1 = op.input("inp1", flow, TestingSource(src_1))
keyed_inp_1 = op.key_on("key_stream_1", inp1, lambda x: x["user_id"])
src_2 = [
    {"user_id": "123", "email": "bee@bytewax.com"},
    {"user_id": "456", "email": "hive@bytewax.com"},
]
inp2 = op.input("inp2", flow, TestingSource(src_2))
keyed_inp_2 = op.key_on("key_stream_2", inp2, lambda x: x["user_id"])

merged_stream = op.join("join", keyed_inp_1, keyed_inp_2)
op.inspect("debug", merged_stream)

Output

Output in Bytewax is described as a sink and these are grouped into connectors. There are a number of basic connectors in the bytewax repo to help you during development. In addition to the built-in connectors, it is possible to use the input and output API to build a custom sink and source. There is also a hub for connectors built by the community, partners and Bytewax. Below is an example of a custom connector for Postgres using the psycopg2 library.

% skip: next

import psycopg2

from bytewax import operators as op
from bytewax.outputs import FixedPartitionedSink, StatefulSinkPartition


class PsqlSink(StatefulSinkPartition):
    def __init__(self):
        self.conn = psycopg2.connect("dbname=website user=bytewax")
        self.conn.set_session(autocommit=True)
        self.cur = self.conn.cursor()

    def write_batch(self, values):
        query_string = """
            INSERT INTO events (user_id, data)
            VALUES (%s, %s)
            ON CONFLICT (user_id)
            DO UPDATE SET data = %s;
        """
        self.cur.execute_values(query_string, values)

    def snapshot(self):
        pass

    def close(self):
        self.conn.close()


class PsqlOutput(FixedPartitionedSink):
    def list_parts(self):
        return ["single"]

    def build_part(step_id, for_part, resume_state):
        return PsqlSink()

Execution

Bytewax dataflows can be executed in a single Python process, or on multiple processes on multiple hosts with multiple worker threads. When processing data in a distributed fashion, Bytewax uses routing keys to ensure your state is updated in a correct way automatically.

# a single worker locally
python -m bytewax.run my_dataflow:flow

# Start two worker threads in a single process.
python -m bytewax.run my_dataflow -w 2

# Start a process on two separate machines to form a Bytewax cluster.
# Start the first process with two worker threads on `machine_one`.
machine_one$ python -m bytewax.run my_dataflow -w 2 -i0 -a "machine_one:2101;machine_two:2101"

# Then start the second process with three worker threads on `machine_two`.
machine_two$ python -m bytewax.run my_dataflow -w 3 -i1 -a "machine_one:2101;machine_two:2101"

It can also be run in a Docker container as described further in the documentation.

Kubernetes

The recommended way to run dataflows at scale is to leverage the kubernetes ecosystem. To help manage deployment, we built waxctl, which allows you to easily deploy dataflows that will run at huge scale across multiple compute nodes.

waxctl df deploy my_dataflow.py --name my-dataflow

Why Bytewax?

At a high level, Bytewax provides a few major benefits:

  • The operators in Bytewax are largely “data-parallel”, meaning they can operate on independent parts of the data concurrently.
  • Bytewax offers the ability to express higher-level control constructs, like iteration.
  • Bytewax allows you to develop and run your code locally, and then easily scale that code to multiple workers or processes without changes.
  • Bytewax can be used in both a streaming and batch context
  • Ability to leverage the Python ecosystem directly

Community

Slack Is the main forum for communication and discussion.

GitHub Issues is reserved only for actual issues. Please use the slack community for discussions.

Code of Conduct

Usage

Install the latest release with pip:

pip install bytewax

Building From Source

To build a specific branch, you will need to use Maturin and have Rust installed on your machine. Once those have been installed run

maturin develop -E dev

Important: If you are testing with a maturin built version from source, you should use maturin build --release since maturin develop will be slower.

More Examples

For a more complete example, and documentation on the available operators, check out the User Guide and the /examples folder.

License

Bytewax is licensed under the Apache-2.0 license.

Contributing

Contributions are welcome! This community and project would not be what it is without the contributors. All contributions, from bug reports to new features, are welcome and encouraged. Please view the contribution guidelines before getting started.



With ❤️ Bytewax