GlassFlow Python Client SDK


Keywords
data, data-processing, datastreaming, python, real-time, sdk, stream-processing
License
MIT
Install
pip install glassflow==1.0.2

Documentation



License: MIT Chat on Slack Ruff

GlassFlow Python SDK

The GlassFlow Python SDK provides a convenient way to interact with the GlassFlow API in your Python applications. The SDK is used to publish and consume events to your GlassFlow pipelines.

Installation

You can install the GlassFlow Python SDK using pip:

pip install glassflow

Data Operations

publish

Publish a new event into the pipeline

Example Usage

from glassflow import PipelineDataSource

source = PipelineDataSource(pipeline_id="<str value", pipeline_access_token="<str token>")
data = {} # your json event
res = source.publish(request_body=data)

if res.status_code == 200:
    print("Published sucessfully")

consume

Consume the transformed event from the pipeline

Example Usage

from glassflow import PipelineDataSink

sink = PipelineDataSink(pipeline_id="<str value", pipeline_access_token="<str value>")
res = sink.consume()

if res.status_code == 200:
    print(res.json())

consume failed

If the transformation failed for any event, they are available in a failed queue. You can consume those events from the pipeline

Example Usage

from glassflow import PipelineDataSink

sink = PipelineDataSink(pipeline_id="<str value", pipeline_access_token="<str value>")
res = sink.consume_failed()

if res.status_code == 200:
    print(res.json())

validate credentials

Validate pipeline credentials (pipeline_id and pipeline_access_token) from source or sink

Example Usage

from glassflow import PipelineDataSource, errors

try:
    source = PipelineDataSource(pipeline_id="<str value", pipeline_access_token="<str value>")
    source.validate_credentials()
except errors.PipelineNotFoundError as e:
    print("Pipeline ID does not exist!")
    raise e
except errors.PipelineAccessTokenInvalidError as e:
    print("Pipeline Access Token is invalid!")
    raise e

Pipeline Management

In order to manage your pipelines with this SDK, one needs to provide the PERSONAL_ACCESS_TOKEN to the GlassFlow client.

from glassflow import GlassFlowClient

client = GlassFlowClient(personal_access_token="<your personal access token>")

Now you can perform CRUD operations on your pipelines:

  • list_pipelines - Returns the list of pipelines available
  • get_pipeline - Returns a pipeline object from a given pipeline ID
  • create - Create a new pipeline
  • delete - Delete an existing pipeline

list_pipelines

Returns information about the available pipelines. It can be restricted to a specific space by passing the space_id.

Example Usage

from glassflow import GlassFlowClient

client = GlassFlowClient(personal_access_token="<your access token>")
res = client.list_pipelines()

get_pipeline

Gets information about a pipeline from a given pipeline ID. It returns a Pipeline object which can be used manage the Pipeline.

Example Usage

from glassflow import GlassFlowClient

client = GlassFlowClient(personal_access_token="<your access token>")
pipeline = client.get_pipeline(pipeline_id="<your pipeline id>")

print("Name:", pipeline.name)

create

The Pipeline object has a create method that creates a new GlassFlow pipeline.

Example Usage

from glassflow import Pipeline

pipeline = Pipeline(
    name="<your pipeline name>",
    transformation_file="path/to/transformation.py",
    space_id="<your space id>",
    personal_access_token="<your personal access token>"
).create()

In the next example we create a pipeline with Google PubSub source and a webhook sink:

from glassflow import Pipeline

pipeline = Pipeline(
    name="<your pipeline name>",
    transformation_file="path/to/transformation.py",
    space_id="<your space id>",
    personal_access_token="<your personal access token>",
    source_kind="google_pubsub",
    source_config={
        "project_id": "<your gcp project id>",
        "subscription_id": "<your subscription id>",
        "credentials_json": "<your credentials json>"
    },
    sink_kind="webhook",
    sink_config={
        "url": "<webhook url>",
        "method": "<GET | POST | PUT | PATCH | DELETE>",
        "headers": [{"header1": "header1_value"}]
    }
).create()

delete

The Pipeline object has a delete method to delete a pipeline

Example Usage

from glassflow import Pipeline

pipeline = Pipeline(
    name="<your pipeline name>",
    transformation_file="path/to/transformation.py",
    space_id="<your space id>",
    personal_access_token="<your personal access token>"
).create()

pipeline.delete()

Quickstart

Follow the quickstart guide here

Code Samples

GlassFlow Examples

SDK Maturity

Please note that the GlassFlow Python SDK is currently in beta and is subject to potential breaking changes. We recommend keeping an eye on the official documentation and updating your code accordingly to ensure compatibility with future versions of the SDK.

User Guides

For more detailed information on how to use the GlassFlow Python SDK, please refer to the GlassFlow Documentation. The documentation provides comprehensive guides, tutorials, and examples to help you get started with GlassFlow and make the most out of the SDK.

Contributing

Anyone who wishes to contribute to this project, whether documentation, features, bug fixes, code cleanup, testing, or code reviews, is very much encouraged to do so.

  1. Join the Slack channel.

  2. Just raise your hand on the GitHub discussion board.

If you are unfamiliar with how to contribute to GitHub projects, here is a Get Started Guide. A full set of contribution guidelines, along with templates, are in progress.

Troubleshooting

For any questions, comments, or additional help, please reach out to us via email at help@glassflow.dev. Please check out our Q&A to get solutions for common installation problems and other issues.

Raise an issue

To provide feedback or report a bug, please raise an issue on our issue tracker.