airflow-kubernetes-job-operator-latest

An airflow job operator that executes a task as a Kubernetes job on a cluster, given a job yaml configuration or an image uri.


License
MIT-feh
Install
pip install airflow-kubernetes-job-operator-latest==0.0.7

Documentation

Airflow KubernetesJobOperator

An airflow operator that executes a task in a kubernetes cluster, given a yaml configuration or an image url.

If you like it * it, so other people would also use it.

Contributions are welcome. See here.

Supports

  1. Running tasks as Kubernetes Jobs.
  2. Running tasks as Kubernetes Pods.
  3. Running tasks as multi resource (see example below, similar to kubectl apply).
  4. Running tasks as custom resources (help).
  5. Auto detection of kubernetes namespace and config.
  6. Pod and participating resources logs -> airflow.
  7. Full kubernetes error logs on failure.
  8. Integrated operator airflow config, see below.
  9. Integrated Jinja2 support for file templates with flag.
  10. Tested and working on google cloud composer.

Two operator classes are available

  1. KubernetesJobOperator - Supply a kubernetes configuration (yaml file, yaml string or a list of python dictionaries) as the body of the task.
  2. KubernetesLegacyJobOperator - Defaults to a kubernetes job definition, and supports the same arguments as the KubernetesPodOperator. i.e. replace with the KubernetesPodOperator for legacy support.

Install

To install using pip @ https://pypi.org/project/airflow-kubernetes-job-operator,

pip install airflow_kubernetes_job_operator

To install from master branch,

pip install git+https://github.com/LamaAni/KubernetesJobOperator.git@master

To install from a release (tag)

pip install git+https://github.com/LamaAni/KubernetesJobOperator.git@[tag]

TL;DR

Example airflow DAG

from airflow import DAG
from airflow_kubernetes_job_operator.kubernetes_job_operator import KubernetesJobOperator
from airflow_kubernetes_job_operator.kubernetes_legacy_job_operator import KubernetesLegacyJobOperator
from airflow.utils.dates import days_ago

default_args = {"owner": "tester", "start_date": days_ago(2), "retries": 0}
dag = DAG("job-tester", default_args=default_args, description="Test base job operator", schedule_interval=None)

job_task = KubernetesJobOperator(
    task_id="from-image",
    dag=dag,
    image="ubuntu",
    command=["bash", "-c", 'echo "all ok"'],
)

body = {"kind": "Pod"}  # The body or a yaml string (must be valid)
job_task_from_body = KubernetesJobOperator(dag=dag, task_id="from-body", body=body)

body_filepath = "./my_yaml_file.yaml" # Can be relative to this file, or abs path.
job_task_from_yaml = KubernetesJobOperator(dag=dag, task_id="from-yaml", body_filepath=body_filepath)

# Legacy compatibility to KubernetesPodOperator
legacy_job_task = KubernetesLegacyJobOperator(
    task_id="legacy-image-job",
    image="ubuntu",
    cmds=["bash", "-c", 'echo "all ok"'],
    dag=dag,
    is_delete_operator_pod=True,
)

Example (multi resource) task yaml

NOTE: that the success/failure of the task is tracked only on the first resource, no matter of its kind. Currently native support exists for Pods and Jobs only, though you can always add a custom resource.

# First resource: this resource will be tracked by the operator. Other resources will not be tracked.
apiVersion: batch/v1
kind: Job
metadata:
  name: test-job # not required. Will be a prefix to task name
  finalizers:
    - foregroundDeletion
spec:
  template:
    metadata:
      labels:
        app: test-task-pod
    spec:
      restartPolicy: Never
      containers:
        - name: job-executor
          image: ubuntu
          command:
            - bash
            - -c
            - |
              #/usr/bin/env bash
              echo "OK"
  backoffLimit: 0
---
apiVersion: v1
kind: Service
metadata:
  name: test-service # not required, will be a prefex to task name.
spec:
  selector:
    app: test-task-pod
  ports:
    - port: 8080
      targetPort: 8080

YAML field augmentation

The operator auto augments the following fields on the first resource containers,

  1. envs are added to all pods
  2. special envs are added:
    1. KUBERNETES_JOB_OPERATOR_RESOURCES - A list of all resource names.

All other changes are made on the main container (the first container) of the first resource only.

Configuration

Airflow config extra sections,

[kubernetes_job_operator]
# The task kube resources delete policy. Can be: Never, Always, IfFailed, IfSucceeded
delete_policy=IfSucceeded
# The default object type to execute with (legacy, or image). Can be: Pod, Job
default_execution_object=Job

# Logs
detect_kubernetes_log_level=True
show_kubernetes_timestamps=False
# Shows the runner id in the log (for all runner logs.)
show_runner_id=False

# Tasks (Defaults)
# Wait to first connect to kubernetes.
startup_timeout_seconds=120
# if true, will parse the body when building the dag. Otherwise only while executing.
validate_body_on_init=False

# Comma seperated list of where to look for the kube config file. Will be added to the top
# of the search list, in order.
kube_config_extra_locations=

To set these values through the environment follow airflow standards,

export AIRFLOW__[section]__[item] = value

Why would this be better than the KubernetesPodOperator?

The KubernetesJobOperator allows for more execution options thank the KubernetesPodOperator such as multiple resource execution, custom resource executions, creation event and pod log tracking, proper resource deletion after task is complete, on error and when task is cancelled and more.

Further, in the KubernetesPodOperator the monitoring between the worker pod and airflow is done by an internal loop which executes and consumes worker resources. In this operator, an async threaded approach was taken which reduces resource consumption on the worker. Further, since the logging and parsing of data is done outside of the main worker thread the worker is "free" do handle other tasks without interruption.

Finally, using the KubernetesJobOperator you are free to use other resources like the kubernetes Job to execute your tasks. These are better designed for use in kubernetes and are "up with the times".

Kubernetes job operator input variables

argument default value description
command None A list of commands to be sent. i.e. ["echo","ok"]
arguments None A list of arguments to be sent to the docker image.
image None An image to use. (Overrides the main pod image)
namespace None A namespace to use (Overrides/adds a namespace to main resource)
envs None A dictionary of envs to add to the main job pod.
body None The body of the job to use. Can be string, dictionary.
body_filepath None A filepath to the yaml config file. Can use a relative filepath.
image_pull_policy Always The kubernetes image pull policy (str)
delete_policy IfSucceeded The resources delete policy. Can be one of: Always, Never, IfSucceeded, IfFailed (Str)
in_cluster None Use in cluster creds.
config_file None Use this kubernetes config file.
get_logs True Retrive the executing pod logs (for all resources)
cluster_context The cluster context name to use.
startup_timeout_seconds 10 The max number of seconds to create the job before timeout is called
validate_body_on_init False Can be set to true only if jinja is disabled. Process the yaml when the object is created.
enable_jinja True Enable jinja on the body (str, or file), and the following args: command, arguments, image, envs, body, namespace, config_file, cluster_context
jinja_job_args None A dictionary or object to be used in the jinja template to render arguments. The jinja args are loaded under the keyword "job".

Contribution

Are welcome, please post issues or PR's if needed.

Implementations still missing:

Add an issue (or better submit PR) if you need these.

  1. XCom
  2. Examples (other than TL;DR)

Licence

Copyright © Zav Shotan and other contributors. It is free software, released under the MIT licence, and may be redistributed under the terms specified in LICENSE.