Thank you for reading this post, don't forget to subscribe!
Среди множества всевозможных операторов в Apache Airflow
есть оператор под названием KubernetesPodOperator
, который используется для запуска задач в подах кластера Kubernetes
. Это позволяет нам запускать распределенные вычисления или длительные операции (терминология Google AIP - long-running operations) в кластере и практически не заботясь о их масштабировании и расположении - Kubernetes
сделает это за нас. Такой подход позволяет существенно сэкономить средства, ведь после выполнения задачи под может быть удален, а кластер, при правильно настроенном масштабировании, уменьшен в размерах.
Некоторые параметры при настройке KubernetesPodOperator
могут использоваться как шаблоны (полный список), другие же могут сравнительно легко быть “шаблонизированы” (пример для параметра ‘namespace’ рассмотрим ниже) - именно эту возможность мы будем использовать для демонстрации запуска и передачи параметров в DAG с помощью HTTP-запроса.
Простейший пример, в котором будет использоваться в качестве шаблона параметр ‘image’, выглядит следующим образом:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
import airflow from airflow.models import DAG from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator from kubernetes.client import models as k8s default_args = { 'owner': 'ealebed', 'depends_on_past': False, 'start_date': airflow.utils.dates.days_ago(0), } dag = DAG( dag_id='000_templated_task', default_args=default_args, schedule_interval=None, tags=['test'] ) resources = k8s.V1ResourceRequirements( requests={ 'memory': '128Mi', 'cpu': 0.5, 'ephemeral-storage': '1Gi' }, limits={ 'memory': '128Mi', 'cpu': 0.5, 'nvidia.com/gpu': None, 'ephemeral-storage': '1Gi' } ) tt0 = KubernetesPodOperator( namespace="airflow", image="openjdk:{{ dag_run.conf.image_tag }}", cmds=["java", "--version"], name="templated-task", labels={"app": "000-templated-task"}, task_id="templated-task-id", service_account_name="airflow", resources=resources, startup_timeout_seconds=30, get_logs=True, is_delete_operator_pod=False, in_cluster=True, do_xcom_push=False, dag=dag ) |
Запустить DAG через HTTP и передать в качестве параметра тег образа можно с помощью следующего запроса (также приведен ответ от API):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
curl -X POST \ http://localhost:8080/api/v1/dags/000_templated_task/dagRuns \ -H 'Content-Type: application/json' \ -H 'Cache-Control: no-cache' \ -d '{"conf":{"image_tag":"15"}}' { "conf": { "image_tag": "15" }, "dag_id": "000_templated_task", "dag_run_id": "manual__2021-05-22T07:52:17.440555+00:00", "end_date": null, "execution_date": "2021-05-22T07:52:17.440555+00:00", "external_trigger": true, "start_date": "2021-05-22T07:52:17.443583+00:00", "state": "running" } |
после выполнения задачи в логе можно будет увидеть:
openjdk 15.0.2 2021-01-19
OpenJDK Runtime Environment (build 15.0.2+7-27)
OpenJDK 64-Bit Server VM (build 15.0.2+7-27, mixed mode, sharing)
Запустим тот же DAG, но с другим значением параметра (ответ от API о запущенной задаче опущен):
1 2 3 4 5 |
curl -X POST \ http://localhost:8080/api/v1/dags/000_templated_task/dagRuns \ -H 'Content-Type: application/json' \ -H 'Cache-Control: no-cache' \ -d '{"conf":{"image_tag":"16"}}' |
лог:
1 2 3 4 |
openjdk 16.0.1 2021-04-20 OpenJDK Runtime Environment (build 16.0.1+9-24) OpenJDK 64-Bit Server VM (build 16.0.1+9-24, mixed mode, sharing) |
Естесственно, Apache Airflow
поддерживает несколько вариантов аутентификации и даже позволяет внедрять свои варианты. В следующем примере рассмотрим самый простой из предложенных - ‘basic_auth’. Проверить какой именно вариант используется на вашем инстансе Airflow
можно с помощью команды:
1 2 3 |
$ airflow config get-value api auth_backend airflow.api.auth.backend.basic_auth |
С включенной ‘basic_auth’ запрос должен выглядеть так:
1 2 3 4 5 6 |
curl -X POST \ --user "admin:admin" \ http://localhost:8080/api/v1/dags/000_templated_task/dagRuns \ -H 'Content-Type: application/json' \ -H 'Cache-Control: no-cache' \ -d '{"conf":{"image_tag":"16"}}' |
или так:
1 2 3 4 5 6 |
curl -X POST \ http://localhost:8080/api/v1/dags/000_templated_task/dagRuns \ -H "Authorization: Basic YWRtaW46YWRtaW4=" \ -H 'Content-Type: application/json' \ -H 'Cache-Control: no-cache' \ -d '{"conf":{"image_tag":"16"}}' |
Более интересный пример из реальной жизни, в котором параметры используются для передачи переменых окружения внутрь запускаемого пода, выглядит так:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
import airflow from airflow.models import DAG from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator from kubernetes.client import models as k8s default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': airflow.utils.dates.days_ago(0), } dag = DAG( dag_id='manual_tracer', default_args=default_args, schedule_interval=None, tags=['manual'] ) resources = k8s.V1ResourceRequirements( requests={ 'memory': '128Mi', 'cpu': 0.5, 'ephemeral-storage': '1Gi' }, limits={ 'memory': '128Mi', 'cpu': 0.5, 'nvidia.com/gpu': None, 'ephemeral-storage': '1Gi' } ) manual_tracer = KubernetesPodOperator( namespace="airflow", image="openjdk:15", cmds=["bin/bash", "-c", "sleep 5; seq ${COUNT} | xargs -I@ -n1 curl -s -i -X POST ${URL} -d ${JSON}"], name="manual-tracer-task", labels={"app": "manual-tracer-task"}, task_id="manual-tracer-task-id", env_vars={ 'COUNT': '{{ dag_run.conf.count }}', 'URL': '{{ dag_run.conf.url }}', 'JSON': '{{ dag_run.conf.json }}', }, service_account_name="airflow", resources=resources, startup_timeout_seconds=30, get_logs=True, is_delete_operator_pod=False, in_cluster=True, do_xcom_push=False, dag=dag ) |
Запускаем его такой командой:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
curl -X POST \ http://localhost:8080/api/v1/dags/manual_tracer/dagRuns \ -H "Authorization: Basic YWRtaW46YWRtaW4=" \ -H 'Content-Type: application/json' \ -H 'Cache-Control: no-cache' \ -d '{"conf":{"count":"10", "url":"http://ads-api-rtb.default.svc.cluster.local:8080/services/trace", "json":"{\"factor\":4,\"ssp\":\"inneractive\",\"duration\":10}"}}' { "conf": { "count": "10", "json": "{\"factor\":4,\"ssp\":\"inneractive\",\"duration\":10}", "url": "http://ads-api-rtb.default.svc.cluster.local:8080/services/trace" }, "dag_id": "manual_tracer", "dag_run_id": "manual__2021-05-22T07:51:16.727115+00:00", "end_date": null, "execution_date": "2021-05-22T07:51:16.727115+00:00", "external_trigger": true, "start_date": "2021-05-22T07:51:16.730199+00:00", "state": "running" } |
Обещанный пример с “шаблонизацией” параметра ‘namespace’ может выглядеть примерно так:
1 2 3 4 5 6 7 8 9 10 |
... # Create my own operator with the same behavior that adds namespace to template_fields class MyKubernetesPodOperator(KubernetesPodOperator): template_fields = KubernetesPodOperator.template_fields +('namespace',) ... # Use code as with DAG('test_ns', default_args=default_args, schedule_interval='@once') as dag: ns = """ {{ dag_run.conf.ns }} """ example_task = MyKubernetesPodOperator(namespace=ns,...) ... |