airflow-google-cloud-run-plugin
Airflow plugin for orchestrating Google Cloud Run jobs.
Features
- Easier to use alternative
to
KubernetesPodOperator
- Securely use sensitive data stored in Google Cloud Secrets Manager
- Create tasks with isolated dependencies
- Enables polyglot workflows
Resources
Operators
CloudRunJobCreateOperator
CloudRunJobDeleteOperator
CloudRunJobRunOperator
Hooks
CloudRunJobHook
Usage
Basic Usage
from airflow import DAG
from airflow_google_cloud_run_plugin.operators.cloud_run import (
CloudRunJobCreateOperator,
CloudRunJobDeleteOperator,
CloudRunJobRunOperator,
)
with DAG(dag_id="example_dag") as dag:
create_job = CloudRunJobCreateOperator(
task_id="create",
name="example-job",
location="us-central1",
project_id="example-project",
image="gcr.io/gcp-runtimes/ubuntu_18_0_4",
command=["echo"],
cpu="1000m",
memory="512Mi"
)
run_job = CloudRunJobRunOperator(
task_id="run",
name="example-job",
location="us-central1",
project_id="example-project"
)
delete_job = CloudRunJobDeleteOperator(
task_id="delete",
name="example-job",
location="us-central1",
project_id="example-project"
)
create_job >> run_job >> delete_job
Using Environment Variables
from airflow import DAG
from airflow_google_cloud_run_plugin.operators.cloud_run import (
CloudRunJobCreateOperator,
CloudRunJobDeleteOperator,
CloudRunJobRunOperator,
)
# Simple environment variable
env1 = {
"name": "FOO",
"value": "not_so_secret_value_123"
}
# Environment variable from Secret Manager
env2 = {
"name": "BAR",
"valueFrom": {
"secretKeyRef": {
"name": "super_secret_password",
"key": "1" # or "latest" for latest secret version
}
}
}
with DAG(dag_id="example_dag_with_secrets") as dag:
create_job = CloudRunJobCreateOperator(
task_id="create",
name="example-job",
location="us-central1",
project_id="example-project",
image="gcr.io/gcp-runtimes/ubuntu_18_0_4",
command=["echo"],
env_vars=[env1, env2],
cpu="1000m",
memory="512Mi"
)
run_job = CloudRunJobRunOperator(
task_id="run",
name="example-job",
location="us-central1",
project_id="example-project"
)
delete_job = CloudRunJobDeleteOperator(
task_id="delete",
name="example-job",
location="us-central1",
project_id="example-project"
)
create_job >> run_job >> delete_job
Improvement Suggestions
- Nicer user experience for defining args and commands
- Add ability for user to specify "create if not exists" and "destroy on exit" so lifecycle can be contained in one operator
- Use approach from other GCP operators once this issue is resolved googleapis/python-run#64
- Add operators for all CRUD operations
- Add run sensor (see link)
- Enable volume mounts (see TaskSpec)
- Allow user to configure resource requirements
requests
(see ResourceRequirements) - Add remaining container options (see Container)
- Provide a job generator helper similar to Dataproc cluster config generator (see link)
- Allow non-default credentials and for user to specify service account (see link)
- Allow failure threshold. If more than one task is specified, user should be allowed to specify number of failures allowed
- Add custom links for log URIs
- Add wrapper class for easier environment variable definition. Similar to
Secret
from Kubernetes provider (see link)