An Apache Beam Sink Library for Databases and other Sinks.


License
MIT
Install
pip install beam-sink==1.0.1

Documentation

BeamSink

Tests PyPI version Codacy Badge Known Vulnerabilities

🤖 An Apache Beam Sink Library for Databases and other Sinks

🐘 Supports MySQL (Postgres, Elasticsearch coming soon...)

Installation

pip install beam_sink

Usage

MySQL

import apache_beam as beam
from beam_sink import ReadMySQL, WriteToMySQL, MySQLConfig
import json

# First, we initialise a DB config object that can validate we're providing the right information
config = MySQLConfig(host="localhost", username="lenny", password="karl", database="springfield")

# Then we write a query 
query = "select * from thrillhouse"

# Initialise your Beam Pipeline the way you normally would
with beam.Pipeline() as p:
    (
        p 
        | 'ReadTable' >> ReadMySQL(config, query)
        | 'PrintResult' >> beam.ParDo(lambda x: print(x))
    )

# We can also insert data into a table
table = "thrillhouse"
columns = ["id", "description", "amount"]

with beam.Pipeline() as p:
    (
        p
        | 'ReadJson' >> beam.io.ReadFromText("tests/.data/test.jsonl")
        | 'Parse' >> beam.Map(lambda x: [json.loads(x)])
        | 'WriteData' >> WriteToMySQL(config, table, columns)
    )

| Other operations such as Upsert or DB commands coming soon...