Thank you for reading this post, don't forget to subscribe!
Airflow
- это инструмент для разработки, планирования и мониторинга batch-процессов обработки данных. Как минимум на Airflow
стоит обратить внимание при выборе планировщика для ваших ETL/ELT-процессов.
Существует несколько сценариев использования связки Airflow
и Kubernetes
:
- запуск самого
Airflow
в Kubernetes кластере - использование
Airflow
для запуска задач (jobs) в Kubernetes кластере - оба предыдущих варианта одновременно
Рассмотрим эти сценарии по порядку. Airflow
состоит из отдельных компонентов, основные из которых планировщик (scheduler), вебсервер (webserver) и воркеры (workers). В зависимости от конкретного случая, требованиям к отказоустойчивости и количества одновременно выполняющихся задач, эти компоненты можно запускать по отдельности или все вместе.
Прежде всего, для запуска Airflow
в Kubernetes
кластере нам понадобится docker-образ и yaml-манифесты (или Helm чарты).
Примечание. Существует также вариант запуска с использованием Airflow Kubernetes Operator от Google, но на момент написания статьи он находится в alpha-версии и мной не тестировался.
Зачастую для запуска Airflow
используют docker-образ puckel/docker-airflow. Это надежный образ, который собирается автоматически и содержит entrypoint скрипт, позволяющий docker-контейнеру работать в роли планировщика/вебсервера/воркера и т. д. Однако, версии образа несколько отстают от релизных версий Airflow
, поэтому есть смысл создать docker-образ для production окружения и обновлять его самостоятельно
В нашем случае Dockerfile выглядит следующим образом:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 |
FROM python:3.7-slim-stretch LABEL maintainer="Yevhen Lebid <yevhen.lebid@loopme.com>" # Never prompts the user for choices on installation/configuration of packages ENV DEBIAN_FRONTEND=noninteractive \ TERM=linux # Airflow ARG AIRFLOW_VERSION=1.10.7 ARG AIRFLOW_USER_HOME=/usr/local/airflow ENV AIRFLOW_HOME=${AIRFLOW_USER_HOME} # Define en_US. ENV LANGUAGE=en_US.UTF-8 \ LANG=en_US.UTF-8 \ LC_ALL=en_US.UTF-8 \ LC_CTYPE=en_US.UTF-8 \ LC_MESSAGES=en_US.UTF-8 RUN set -ex \ && buildDeps=" \ freetds-dev \ libkrb5-dev \ libsasl2-dev \ libssl-dev \ libffi-dev \ libpq-dev \ git \ " \ && pipDeps=" \ pytz \ pyOpenSSL \ ndg-httpsclient \ pyasn1 \ psycopg2-binary \ apache-airflow[crypto,postgres,jdbc,kubernetes,password,elasticsearch,slack]==${AIRFLOW_VERSION} \ " \ && apt-get update -yqq \ && apt-get upgrade -yqq \ && apt-get install -yqq --no-install-recommends \ $buildDeps \ freetds-bin \ build-essential \ apt-utils \ curl \ rsync \ netcat \ locales \ && sed -i 's/^# en_US.UTF-8 UTF-8$/en_US.UTF-8 UTF-8/g' /etc/locale.gen \ && locale-gen \ && update-locale LANG=en_US.UTF-8 LC_ALL=en_US.UTF-8 \ && useradd -ms /bin/bash -d ${AIRFLOW_USER_HOME} airflow \ && pip install -U pip setuptools wheel \ && pip install $pipDeps \ && apt-get purge --auto-remove -yqq $buildDeps \ && apt-get autoremove -yqq --purge \ && apt-get clean \ && rm -rf \ /var/lib/apt/lists/* \ /tmp/* \ /var/tmp/* \ /usr/share/man \ /usr/share/doc \ /usr/share/doc-base COPY entrypoint.sh /entrypoint.sh COPY airflow.cfg ${AIRFLOW_USER_HOME}/airflow.cfg RUN chown -R airflow: ${AIRFLOW_USER_HOME} EXPOSE 8080 USER airflow WORKDIR ${AIRFLOW_USER_HOME} ENTRYPOINT ["/entrypoint.sh"] CMD ["webserver"] |
Также видоизменился Entrypoint скрипт:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
#!/usr/bin/env bash declare -a DEFAULT_CONNS=( "airflow_db" "slack" "cassandra_default" "azure_container_instances_default" "azure_cosmos_default" "azure_data_lake_default" "segment_default" "dingding_default" "qubole_default" "databricks_default" "emr_default" "sqoop_default" "redis_default" "druid_ingest_default" "druid_broker_default" "spark_default" "aws_default" "fs_default" "sftp_default" "ssh_default" "webhdfs_default" "wasb_default" "vertica_default" "local_mysql" "mssql_default" "http_default" "sqlite_default" "postgres_default" "mysql_default" "mongo_default" "metastore_default" "hiveserver2_default" "hive_cli_default" "opsgenie_default" "google_cloud_default" "presto_default" "bigquery_default" "beeline_default" "pig_cli_default" ) case "$1" in webserver) airflow initdb airflow create_user \ --role Admin \ --username ${AIRFLOW_ADMIN_USER} \ --password ${AIRFLOW_ADMIN_PASSWORD} \ --firstname Air \ --lastname Flow \ --email air.flow@examle.com for CONN in "${DEFAULT_CONNS[@]}" do airflow connections --delete --conn_id ${CONN} done airflow connections \ --add \ --conn_id postgres_default \ --conn_uri ${AIRFLOW_CONN_POSTGRES_DEFAULT} airflow connections \ --add \ --conn_id slack \ --conn_type http \ --conn_host https://hooks.slack.com/services \ --conn_password ${AIRFLOW_SLACK_WEBHOOK_URL} if [ "$AIRFLOW__CORE__EXECUTOR" = "KubernetesExecutor" ]; then # With the "KubernetesExecutor" executors it should all run in one container. airflow scheduler & fi if [ "$AIRFLOW__CORE__EXECUTOR" = "LocalExecutor" ]; then # With the "Local" executor it should all run in one container. airflow scheduler & fi exec airflow worker & exec airflow webserver ;; worker|scheduler) # To give the webserver time to run initdb. sleep 10 exec airflow "$@" ;; version) exec airflow "$@" ;; *) # The command is something like bash, not an airflow subcommand. Just run it in the right environment. exec "$@" ;; esac |
Для деплоя в Kubernetes
кластер можно воспользоваться одним из нескольких Helm чартов:
или самостоятельно написанным манифестом (наш случай).
Основным понятием в Airflow
является Directed Acyclic Graph (далее DAG) - смысловое объединение одной нескольких задач, которые вследует выполнять в строго определенной последовательности по определенному расписанию.
При написании DAG’а разработчик определяет набор операторов, на которых будут построены задачи внутри DAG’а. Airflow Operator - это еще к одна важная сущность, на основании которой создаются экземпляры заданий, где описывается, что именно будет происходить во время выполнения экземпляра задания. Существует огромное множество как официальных так и созданных сообществом Airflow
операторов, кроме того, ориентируясь на свои потребности/способности можно создавать свои операторы.
Существует несколько способов “доставки” написанных DAG’ов в инстанс Airflow, развернутый в Kubernetes
кластере:
- добавление в docker-образ при сборке
- использование Persistent Volume (PV)
- git-sync
Первый вариант предполагает добавление DAG’ов в docker-образ в момент сборки, и в большинстве случаев является неприемлимым - пересобирать образы при каждом малейшем изменения DAG’а не самая лучшая идея.
При втором варианте - DAG-файлы хранятся на некотором внешнем томе и монтируются в соответствующие поды (scheduler, webserver, worker) при запуске. Для корректной работы PV с несколькими подами, режим доступа (accessMode) тома должен быть ‘ReadOnlyMany’ или, на худой конец, ‘ReadWriteMany’.
Третий, оптимальный вариант, предполагает использование еще одного сайдкар (sidecar) контейнера git-sync а поде (Pod
) для периодической синхронизации DAG-файлов с указанного git-репозитория без рестарта самого Airflow
.
С учетом git-sync и RBAC, в нашем случае Kubernetes
манифест будет выглядеть так:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 |
apiVersion: rbac.authorization.k8s.io/v1 kind: Role metadata: name: airflow-role-admin rules: - apiGroups: - "" resources: - pods - pods/log verbs: - get - watch - list - create - delete --- apiVersion: rbac.authorization.k8s.io/v1 kind: RoleBinding metadata: name: airflow-role-binding roleRef: apiGroup: rbac.authorization.k8s.io kind: Role name: airflow-role-admin subjects: - kind: ServiceAccount name: default --- apiVersion: v1 data: airflow.cfg: | [core] dags_folder = /usr/local/airflow/dags base_log_folder = /usr/local/airflow/logs remote_logging = False remote_log_conn_id = remote_base_log_folder = encrypt_s3_logs = False # The executor class that airflow should use. Choices include # SequentialExecutor, LocalExecutor, CeleryExecutor, DaskExecutor executor = LocalExecutor # sql_alchemy_conn = AIRFLOW__CORE__SQL_ALCHEMY_CONN from manifest sql_alchemy_conn = load_examples = False # fernet_key = AIRFLOW__CORE__FERNET_KEY fernet_key = [cli] api_client = airflow.api.client.local_client endpoint_url = http://my-airflow.example.cool [api] auth_backend = airflow.api.auth.backend.default [webserver] base_url = http://my-airflow.example.cool web_server_host = 0.0.0.0 web_server_port = 8080 # Set to true to turn on authentication: # https://airflow.apache.org/security.html#web-authentication authenticate = True auth_backend = airflow.contrib.auth.backends.password_auth # Use FAB-based webserver with RBAC feature rbac = True expose_config = True [scheduler] job_heartbeat_sec = 5 scheduler_heartbeat_sec = 5 run_duration = -1 min_file_process_interval = 5 dag_dir_list_interval = 300 print_stats_interval = 30 scheduler_health_check_threshold = 30 child_process_log_directory = /usr/local/airflow/logs/scheduler scheduler_zombie_task_threshold = 300 catchup_by_default = True max_tis_per_query = 512 max_threads = 2 authenticate = False use_job_schedule = True [admin] hide_sensitive_variable_fields = True known_hosts: | github.com ssh-rsa AAAAB3NzaC1yc2EAAAABIwAAAQEAq2A7hRGmdnm9tUDbO9IDSwBK6TbQa+PXYPCPy6rbTrTtw7PHkccKrpp0yVhp5HdEIcKr6pLlVDBfOLX9QUsyCOV0wzfjIJNlGEYsdlLJizHhbn2mUjvSAHQqZETYP81eFzLQNnPHt4EVVUh7VfDESU84KezmD5QlWpXLmvU31/yMf+Se8xhHTvKSCZIFImWwoG6mbUoWf9nzpIoaSjB+weqqUUmpaaasXVal72J+UX2B+2RPW3RcT0eOzQgqlJL3RKrTJvdsjE3JEAvGq3lGHSZXy28G3skua2SmVi/w4yCE6gbODqnTWlg7+wC604ydGXA8VJiS5ap43JXiUFFAaQ== kind: ConfigMap metadata: name: airflow-configmap --- apiVersion: v1 data: gitSshKey: <real_ssh_key_in_base64> kind: Secret metadata: name: airflow-secrets type: Opaque --- apiVersion: v1 kind: Service metadata: name: airflow spec: clusterIP: None ports: - name: http port: 8080 selector: app: airflow --- apiVersion: apps/v1 kind: Deployment metadata: name: airflow spec: replicas: 1 revisionHistoryLimit: 1 selector: matchLabels: app: airflow strategy: type: Recreate template: metadata: labels: app: airflow spec: containers: - env: - name: GIT_SYNC_REPO value: git@github.com:ealebed/airflow.git - name: GIT_SYNC_BRANCH value: master - name: GIT_SYNC_ROOT value: /git - name: GIT_SYNC_DEST value: repo - name: GIT_SYNC_SSH value: "true" image: k8s.gcr.io/git-sync:v3.1.4 name: git-sync securityContext: runAsUser: 65533 volumeMounts: - mountPath: /git name: airflow-dags - mountPath: /etc/git-secret/ssh name: airflow-secrets subPath: ssh - mountPath: /etc/git-secret/known_hosts name: airflow-configmap subPath: known_hosts - env: - name: AIRFLOW__CORE__SQL_ALCHEMY_CONN value: postgresql+psycopg2://airflow:airflow@postgreshost:5432/airflow - name: AIRFLOW_CONN_POSTGRES_DEFAULT value: postgres://airflow:airflow@postgreshost:5432/airflow - name: AIRFLOW_SLACK_WEBHOOK_URL value: T02H6C.….…..q3QPW0m - name: AIRFLOW_ADMIN_USER value: airflow - name: AIRFLOW_ADMIN_PASSWORD value: airflow - name: AIRFLOW__CORE__FERNET_KEY value: tsJjtESQbN_24ADlMX2HISyIVwfj7pW1nEfYDkcPYMY= - name: AIRFLOW__CORE__EXECUTOR value: LocalExecutor image: index.docker.io/ealebed/airflow:1.10.7 livenessProbe: failureThreshold: 5 httpGet: path: /health port: 8080 initialDelaySeconds: 60 timeoutSeconds: 5 name: airflow ports: - containerPort: 8080 name: http readinessProbe: httpGet: path: /health port: 8080 initialDelaySeconds: 60 periodSeconds: 5 timeoutSeconds: 5 resources: limits: cpu: "2" memory: 4Gi requests: cpu: "2" memory: 4Gi volumeMounts: - mountPath: /usr/local/airflow/airflow.cfg name: airflow-configmap subPath: airflow.cfg - mountPath: /usr/local/airflow/dags name: airflow-dags securityContext: fsGroup: 1000 volumes: - emptyDir: {} name: airflow-dags - configMap: name: airflow-configmap name: airflow-configmap - name: airflow-secrets secret: defaultMode: 288 items: - key: gitSshKey mode: 288 path: ssh secretName: airflow-secrets |
Для теста создадим задачу, которая с помощью KubernetesPodOperator запускает в Kubernetes кластере под (Pod) и выполняет внутри него команду
java --version
Тестовый пример DAG’а выглядит так:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
from functools import partial from datetime import datetime, timedelta from airflow.models import DAG from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator from repo.dags.notifications import task_fail_slack_alert, task_success_slack_alert from repo.dags.kubernetes_commons import my_affinity, my_tolerations, my_resources default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime.strptime('2020.01.28', '%Y.%m.%d'), 'retry_delay': timedelta(minutes=5), 'on_failure_callback': partial(task_fail_slack_alert, usr="ealebed"), 'on_success_callback': partial(task_success_slack_alert, usr="ealebed"), } dag = DAG( dag_id='test_creatives_task', default_args=default_args, max_active_runs=1, schedule_interval="27,57 * * * *" ) task = KubernetesPodOperator( namespace="default", image="ealebed/java:11", cmds=["java", "--version"], name="test-task", labels={"app": "test-creatives-task"}, task_id="id-task", affinity=my_affinity, resources=my_resources, tolerations=my_tolerations, # Timeout to start up the Pod, default is 120. startup_timeout_seconds=30, get_logs=True, is_delete_operator_pod=False, hostnetwork=False, in_cluster=True, do_xcom_push=False, dag=dag ) |
Примечание. Согласно документации, для описания KubernetesPodOperator
минимально необходимы только четыре поля (name
, namespace
, image
, task_id
) но, в нашем случае, при использовании Airflow
версии 1.10.7 пришлось также в обязательном порядке добавить in_cluster=True
и do_xcom_push=False
.
Весь список доступных параметров и их возможных значений находится здесь,