airflow-google-cloud-run-plugin

Airflow plugin for Google Cloud Run Jobs


Keywords
airflow, airflow-plugin, cloud-run-jobs, google-cloud, python
License
MIT
Install
pip install airflow-google-cloud-run-plugin==0.3.2

Documentation

airflow-google-cloud-run-plugin

Airflow plugin for orchestrating Google Cloud Run jobs.

Features

  1. Easier to use alternative to KubernetesPodOperator
  2. Securely use sensitive data stored in Google Cloud Secrets Manager
  3. Create tasks with isolated dependencies
  4. Enables polyglot workflows

Resources

Operators

  1. CloudRunJobCreateOperator
  2. CloudRunJobDeleteOperator
  3. CloudRunJobRunOperator

Hooks

  1. CloudRunJobHook

Usage

Basic Usage

from airflow import DAG

from airflow_google_cloud_run_plugin.operators.cloud_run import (
    CloudRunJobCreateOperator,
    CloudRunJobDeleteOperator,
    CloudRunJobRunOperator,
)

with DAG(dag_id="example_dag") as dag:
    create_job = CloudRunJobCreateOperator(
        task_id="create",
        name="example-job",
        location="us-central1",
        project_id="example-project",
        image="gcr.io/gcp-runtimes/ubuntu_18_0_4",
        command=["echo"],
        cpu="1000m",
        memory="512Mi"
    )

    run_job = CloudRunJobRunOperator(
        task_id="run",
        name="example-job",
        location="us-central1",
        project_id="example-project"
    )

    delete_job = CloudRunJobDeleteOperator(
        task_id="delete",
        name="example-job",
        location="us-central1",
        project_id="example-project"
    )

    create_job >> run_job >> delete_job

Using Environment Variables

from airflow import DAG

from airflow_google_cloud_run_plugin.operators.cloud_run import (
    CloudRunJobCreateOperator,
    CloudRunJobDeleteOperator,
    CloudRunJobRunOperator,
)

# Simple environment variable
env1 = {
    "name": "FOO",
    "value": "not_so_secret_value_123"
}

# Environment variable from Secret Manager
env2 = {
    "name": "BAR",
    "valueFrom": {
        "secretKeyRef": {
            "name": "super_secret_password",
            "key": "1"  # or "latest" for latest secret version
        }
    }
}

with DAG(dag_id="example_dag_with_secrets") as dag:
    create_job = CloudRunJobCreateOperator(
        task_id="create",
        name="example-job",
        location="us-central1",
        project_id="example-project",
        image="gcr.io/gcp-runtimes/ubuntu_18_0_4",
        command=["echo"],
        env_vars=[env1, env2],
        cpu="1000m",
        memory="512Mi"
    )

    run_job = CloudRunJobRunOperator(
        task_id="run",
        name="example-job",
        location="us-central1",
        project_id="example-project"
    )

    delete_job = CloudRunJobDeleteOperator(
        task_id="delete",
        name="example-job",
        location="us-central1",
        project_id="example-project"
    )

    create_job >> run_job >> delete_job

Improvement Suggestions

  • Nicer user experience for defining args and commands
  • Add ability for user to specify "create if not exists" and "destroy on exit" so lifecycle can be contained in one operator
  • Use approach from other GCP operators once this issue is resolved googleapis/python-run#64
  • Add operators for all CRUD operations
  • Add run sensor (see link)
  • Enable volume mounts (see TaskSpec)
  • Allow user to configure resource requirements requests (see ResourceRequirements)
  • Add remaining container options (see Container)
  • Provide a job generator helper similar to Dataproc cluster config generator (see link)
  • Allow non-default credentials and for user to specify service account (see link)
  • Allow failure threshold. If more than one task is specified, user should be allowed to specify number of failures allowed
  • Add custom links for log URIs
  • Add wrapper class for easier environment variable definition. Similar to Secret from Kubernetes provider (see link)