Keywords
cloud, data, processing, extraction, transformation, ingestion, ETL, BigQuery, Google, pipeline
License
MIT
Install
pip install cloudsdp==0.1.11

Documentation

codecov test

CloudSDP Library

The CloudSDP library is designed to simplify the creation and management of serverless data pipelines between Google Cloud Run and Google BigQuery. It provides a developer-friendly interface to extract data from various sources, transform it, and seamlessly load it into BigQuery tables, all while leveraging the power of serverless architecture.

Features

WIP:

  • Data Extraction and Ingestion: Extract data from various sources, convert it into a common format, and ingest it into BigQuery tables.

TODO:

  • Data Transformation: Perform data transformations, such as cleaning, enrichment, and normalization, before loading into BigQuery.
  • Scheduled Jobs and Triggers: Schedule data pipeline jobs based on time triggers using Cloud Scheduler.
  • Data Pipeline Workflow: Define and orchestrate data pipeline workflows with configurable execution order and dependencies.
  • Conflict Resolution and Error Handling: Implement conflict resolution strategies and error handling mechanisms for reliable data processing.
  • Monitoring and Logging: Monitor job progress, resource utilization, and performance metrics using integrated logging and monitoring tools.
  • Documentation and Examples: Comprehensive documentation and code examples to guide developers in using the library effectively.

Installation

Install the library using pip:

pip install cloudsdp

Or, install the library using poetry:

poetry add cloudsdp

QuickStart

Data Ingestion

Create dataset, ingest data and cleanup

Ingest data from a pandas dataframe:

import os
import pandas as pd

from cloudsdp.api.bigquery import BigQuery, WriteDisposition


PROJECT_NAME = "project_name"


def main():
    bq = BigQuery(PROJECT_NAME)
    dataset_name = "dataset_1"
    table_name = "table_1"

    data = {
        "name": [ f"Name{str(el)}" for el in range(0, 10000)],
        "score": [ num for num in range(0, 10000)]
    }
    df = pd.DataFrame(data)
    data_schema = [
        {"name": "name", "field_type": "STRING", "mode": "REQUIRED"},
        {"name": "score", "field_type": "NUMERIC", "mode": "REQUIRED"},
    ]

    bq.create_dataset(dataset_name)
    bq.create_table(table_name, data_schema, dataset_name)

    bq.ingest_from_dataframe(df, dataset_name, table_name, write_disposition=WriteDisposition.WRITE_IF_TABLE_EMPTY)

    bq.delete_dataset(dataset_name, delete_contents=True, not_found_ok=True)

From a list of python dicts:

import os

from cloudsdp.api.bigquery import BigQuery

PROJECT_NAME = "project_name"


def main():
    bq = BigQuery(PROJECT_NAME)
    dataset_name = "dataset_1"
    table_name = "table_1"

    data = [{"name": "Someone", "age": 29}, {"name": "Something", "age": 22}]

    data_schema = [
        {"name": "name", "field_type": "STRING", "mode": "REQUIRED"},
        {"name": "age", "field_type": "INTEGER", "mode": "REQUIRED"},
    ]

    bq.create_dataset(dataset_name)

    bq.create_table(table_name, data_schema, dataset_name)

    errors = bq.ingest_rows_json(data, dataset_name, table_name)
    if errors:
        print("Errors", ";".join(errors))

    bq.delete_dataset(dataset_name, delete_contents=True, not_found_ok=True)

From csv files stored in GCS:

import os

from cloudsdp.api.bigquery import BigQuery


PROJECT_NAME = "project_name"


def main():
    bq = BigQuery(PROJECT_NAME)
    dataset_name = "dataset_1"
    table_name = "table_1"

    data_schema = [
        {"name": "name", "field_type": "STRING", "mode": "REQUIRED"},
        {"name": "age", "field_type": "INTEGER", "mode": "REQUIRED"},
    ]

    bq.create_dataset(dataset_name)

    bq.create_table(table_name, data_schema, dataset_name)

    csv_uris = ["gs://mybucket/name_age_data_1.csv", "gs://mybucket/name_age_data_2.csv"]

    result = bq.ingest_csvs_from_cloud_bucket(
        csv_uris, dataset_name, table_name, skip_leading_rows=1, autodetect_schema=False, timeout=120
    )
    print(result)

    bq.delete_dataset(dataset_name, delete_contents=True, not_found_ok=True)