Kubernetes. Развертывание Apache Airflow

Thank you for reading this post, don't forget to subscribe!

Airflow - это инстру­мент для раз­ра­бот­ки, пла­ни­ро­ва­ния и мони­то­рин­га batch-про­цес­сов обра­бот­ки дан­ных. Как мини­мум на Airflow сто­ит обра­тить вни­ма­ние при выбо­ре пла­ни­ров­щи­ка для ваших ETL/ELT-про­цес­сов.

Суще­ству­ет несколь­ко сце­на­ри­ев исполь­зо­ва­ния связ­ки Airflow и Kubernetes:

  • запуск само­го Airflow в Kubernetes кластере
  • исполь­зо­ва­ние Airflow для запус­ка задач (jobs) в Kubernetes кластере
  • оба преды­ду­щих вари­ан­та одновременно

Рас­смот­рим эти сце­на­рии по поряд­ку. Airflow состо­ит из отдель­ных ком­по­нен­тов, основ­ные из кото­рых пла­ни­ров­щик (scheduler), веб­сер­вер (webserver) и вор­ке­ры (workers). В зави­си­мо­сти от кон­крет­но­го слу­чая, тре­бо­ва­ни­ям к отка­зо­устой­чи­во­сти и коли­че­ства одно­вре­мен­но выпол­ня­ю­щих­ся задач, эти ком­по­нен­ты мож­но запус­кать по отдель­но­сти или все вместе.

Преж­де все­го, для запус­ка Airflow в Kubernetes кла­сте­ре нам пона­до­бит­ся docker-образ и yaml-мани­фе­сты (или Helm чарты).

При­ме­ча­ние. Суще­ству­ет так­же вари­ант запус­ка с исполь­зо­ва­ни­ем Airflow Kubernetes Operator от Google, но на момент напи­са­ния ста­тьи он нахо­дит­ся в alpha-вер­сии и мной не тестировался.

Зача­стую для запус­ка Airflow исполь­зу­ют docker-образ puckel/docker-airflow. Это надеж­ный образ, кото­рый соби­ра­ет­ся авто­ма­ти­че­ски и содер­жит entrypoint скрипт, поз­во­ля­ю­щий docker-кон­тей­не­ру рабо­тать в роли планировщика/вебсервера/воркера и т. д. Одна­ко, вер­сии обра­за несколь­ко отста­ют от релиз­ных вер­сий Airflow, поэто­му есть смысл создать docker-образ для production окру­же­ния и обнов­лять его самостоятельно

В нашем слу­чае Dockerfile выгля­дит сле­ду­ю­щим образом:

Так­же видо­из­ме­нил­ся Entrypoint скрипт:


 

Для деп­лоя в Kubernetes кла­стер мож­но вос­поль­зо­вать­ся одним из несколь­ких Helm чартов:

или само­сто­я­тель­но напи­сан­ным мани­фе­стом (наш случай).

Основ­ным поня­ти­ем в Airflow явля­ет­ся Directed Acyclic Graph (далее DAG) - смыс­ло­вое объ­еди­не­ние одной несколь­ких задач, кото­рые всле­ду­ет выпол­нять в стро­го опре­де­лен­ной после­до­ва­тель­но­сти по опре­де­лен­но­му расписанию.

При напи­са­нии DAG’а раз­ра­бот­чик опре­де­ля­ет набор опе­ра­то­ров, на кото­рых будут постро­е­ны зада­чи внут­ри DAG’а. Airflow Operator - это еще к одна важ­ная сущ­ность, на осно­ва­нии кото­рой созда­ют­ся экзем­пля­ры зада­ний, где опи­сы­ва­ет­ся, что имен­но будет про­ис­хо­дить во вре­мя выпол­не­ния экзем­пля­ра зада­ния. Суще­ству­ет огром­ное мно­же­ство как офи­ци­аль­ных так и создан­ных сооб­ще­ством Airflow опе­ра­то­ров, кро­ме того, ори­ен­ти­ру­ясь на свои потребности/способности мож­но созда­вать свои операторы.

Суще­ству­ет несколь­ко спо­со­бов “достав­ки” напи­сан­ных DAG’ов в инстанс Airflow, раз­вер­ну­тый в Kubernetes кластере:

  • добав­ле­ние в docker-образ при сборке
  • исполь­зо­ва­ние Persistent Volume (PV)
  • git-sync

Пер­вый вари­ант пред­по­ла­га­ет добав­ле­ние DAG’ов в docker-образ в момент сбор­ки, и в боль­шин­стве слу­ча­ев явля­ет­ся непри­ем­ли­мым - пере­со­би­рать обра­зы при каж­дом малей­шем изме­не­ния DAG’а не самая луч­шая идея.

При вто­ром вари­ан­те - DAG-фай­лы хра­нят­ся на неко­то­ром внеш­нем томе и мон­ти­ру­ют­ся в соот­вет­ству­ю­щие поды (scheduler, webserver, worker) при запус­ке. Для кор­рект­ной рабо­ты PV с несколь­ки­ми пода­ми, режим досту­па (accessMode) тома дол­жен быть ‘ReadOnlyMany’ или, на худой конец, ‘ReadWriteMany’.

Тре­тий, опти­маль­ный вари­ант, пред­по­ла­га­ет исполь­зо­ва­ние еще одно­го сайдкар (sidecar) кон­тей­не­ра git-sync а поде (Pod) для пери­о­ди­че­ской син­хро­ни­за­ции DAG-фай­лов с ука­зан­но­го git-репо­зи­то­рия без рестар­та само­го Airflow.

С уче­том git-sync и RBAC, в нашем слу­чае Kubernetes мани­фест будет выгля­деть так:


 

Для теста созда­дим зада­чу, кото­рая с помо­щью KubernetesPodOperator запус­ка­ет в Kubernetes кла­сте­ре под (Pod) и выпол­ня­ет внут­ри него команду

java --version
Тесто­вый при­мер DAG’а выгля­дит так:

При­ме­ча­ние. Соглас­но доку­мен­та­ции, для опи­са­ния KubernetesPodOperator мини­маль­но необ­хо­ди­мы толь­ко четы­ре поля (namenamespaceimagetask_id) но, в нашем слу­чае, при исполь­зо­ва­нии Airflow вер­сии 1.10.7 при­шлось так­же в обя­за­тель­ном поряд­ке доба­вить in_cluster=True и do_xcom_push=False.

Весь спи­сок доступ­ных пара­мет­ров и их воз­мож­ных зна­че­ний нахо­дит­ся здесь,