Установка Apache Kafka/ резервное копирование, импорт, перемещение данных.

Thank you for reading this post, don't forget to subscribe!

Apache Kafka – это попу­ляр­ный рас­пре­де­лён­ный бро­кер сооб­ще­ний, пред­на­зна­чен­ный для обра­бот­ки боль­ших объ­е­мов дан­ных в режи­ме реаль­но­го вре­ме­ни. Кла­стер Kafka обла­да­ет не толь­ко высо­кой мас­шта­би­ру­е­мо­стью и отка­зо­устой­чи­во­стью, но так­же име­ет гораз­до более высо­кую про­пуск­ную спо­соб­ность по срав­не­нию с дру­ги­ми бро­ке­ра­ми сооб­ще­ний (как ActiveMQ и RabbitMQ). Как пра­ви­ло, Apache Kafka исполь­зу­ет­ся в каче­стве систе­мы обме­на сооб­ще­ни­я­ми pub/sub, одна­ко мно­гие орга­ни­за­ции исполь­зу­ют его для логи­ро­ва­ния, пото­му что он обес­пе­чи­ва­ет посто­ян­ное хра­не­ние опуб­ли­ко­ван­ных сообщений.

Систе­ма обме­на сооб­ще­ни­я­ми pub/sub поз­во­ля­ет пуб­ли­ко­вать сооб­ще­ния без уче­та коли­че­ства под­пис­чи­ков или спо­со­бов их обра­бот­ки. Под­пи­сан­ные кли­ен­ты авто­ма­ти­че­ски уве­дом­ля­ют­ся об обнов­ле­ни­ях и появ­ле­нии новых сооб­ще­ний. Эта систе­ма более эффек­тив­на и мас­шта­би­ру­е­ма, чем те систе­мы, в кото­рых кли­ен­ты про­во­дят пери­о­ди­че­скую про­вер­ку новых сообщений.

Требования

Centos7
4Gb опе­ра­тив­ки Нехват­ка объ­е­ма RAM может при­ве­сти к сбою сер­ве­ра Kafka, при этом Java virtual machine (JVM) выда­ет «Out Of Memory» во вре­мя запуска.
OpenJDK на сер­ве­ре Kafka напи­сан на Java, поэто­му для его рабо­ты тре­бу­ет­ся JVM; одна­ко в его сце­на­рии запус­ка есть ошиб­ка обна­ру­же­ния вер­сий, из-за кото­рой он не рабо­та­ет с вер­си­я­ми JVM выше 8.

  1. Установим openjdk

Что­бы уста­но­вить OpenJDK 7 JRE, вве­ди­те команду:

yum install java-1.7.0-openjdk

Что­бы уста­но­вить OpenJDK 7 JDK, запу­сти­те команду:

yum install java-1.7.0-openjdk-devel

Установка Oracle Java 8 JRE

При­ме­ча­ние: Что­бы уста­но­вить дру­гой релиз Oracle Java 8 JRE, посе­ти­те стра­ни­цу загру­зок Oracle Java 8 JRE, при­ми­те лицен­зию, а затем ско­пи­руй­те ссыл­ку на пакет .rpm. Исполь­зуй­те эту ссыл­ку в коман­де wget.

Открой­те домаш­ний ката­лог и загру­зи­те в него Oracle Java 8 JRE RPM:

cd ~
wget --no-cookies --no-check-certificate --header "Cookie: gpw_e24=http%3A%2F%2Fwww.oracle.com%2F; oraclelicense=accept-securebackup-cookie" \

«http://download.oracle.com/otn-pub/java/jdk/8u60-b27/jre-8u60-linux-x64.rpm»

Уста­но­ви­те RPM при помо­щи сле­ду­ю­щей команды:

sudo yum localinstall jre-8u60-linux-x64.rpm

При­ме­ча­ние: Откор­рек­ти­руй­те имя фай­ла, что­бы уста­но­вить дру­гую вер­сию Java.

Теперь Java уста­нов­ле­на в /usr/java/jdk1.8.0_60/jre/bin/java и свя­за­на с /usr/bin/java.

Уда­ли­те архив:

rm ~/jre-8u60-linux-x64.rpm

Установка Oracle Java 8 JDK

При­ме­ча­ние: Что­бы уста­но­вить дру­гой релиз Oracle Java 8 JDK, посе­ти­те стра­ни­цу загру­зок Oracle Java 8 JDK, при­ми­те лицен­зию, ско­пи­руй­те ссыл­ку на пакет .rpm, а затем исполь­зуй­те эту ссыл­ку в коман­де wget.

Перей­ди­те в домаш­ний ката­лог и загру­зи­те Oracle Java 8 JDK RPM:

cd ~
wget --no-cookies --no-check-certificate --header "Cookie: gpw_e24=http%3A%2F%2Fwww.oracle.com%2F; oraclelicense=accept-securebackup-cookie" "http://download.oracle.com/otn-pub/java/jdk/8u60-b27/jdk-8u60-linux-x64.rpm"

Затем уста­но­ви­те пакет при помо­щи сле­ду­ю­щей команды:

sudo yum localinstall jdk-8u60-linux-x64.rpm

При­ме­ча­ние: Изме­ни­те имя фай­ла, что­бы уста­но­вить дру­гую вер­сию Java.

Теперь пакет Java уста­нов­лен в ката­лог /usr/java/jdk1.8.0_60/jre/bin/java и свя­зан с /usr/bin/java.

Теперь мож­но уда­лить архив.

rm ~/jdk-8u60-linux-x64.rpm

2 Создание пользователя для Apache Kafka

Apache Kafka может обра­ба­ты­вать запро­сы по сети, пото­му необ­хо­ди­мо создать для него спе­ци­аль­но­го поль­зо­ва­те­ля. Это сво­дит к мини­му­му воз­мож­ные повре­жде­ния маши­ны CentOS, на кото­рой будет уста­нов­лен сер­вер Kafka.

При­ме­ча­ние: После уста­нов­ки Apache Kafka реко­мен­ду­ет­ся создать ново­го не-root поль­зо­ва­те­ля для рабо­ты на сервере.

Как поль­зо­ва­тель sudo запу­сти­те сле­ду­ю­щую коман­ду, что­бы создать поль­зо­ва­те­ля kafka:

useradd kafka -m

Флаг -m создаст домаш­ний ката­лог поль­зо­ва­те­ля. Этот ката­лог, /home/kafka, будет в даль­ней­шем исполь­зо­вать­ся в каче­стве рабо­че­го про­стран­ства для запус­ка команд.

Уста­но­ви­те пароль:

passwd kafka

Добавь­те это­го поль­зо­ва­те­ля в груп­пу wheel, что­бы иметь воз­мож­ность уста­но­вить все зави­си­мо­сти бро­ке­ра сообщений.

usermod -aG wheel kafka

Поль­зо­ва­тель kafka готов к рабо­те. Перей­ди­те в этот поль­зо­ва­тель­ский аккаунт:

su -l kafka

3. Загрузка и извлечение Apache Kafka

Теперь, когда зави­си­мо­сти уста­нов­ле­ны, мож­но пере­хо­дить к загруз­ке бинар­ных фай­лов Apache Kafka.

Создай­те ката­лог Downloads для хра­не­ния загру­жен­ных пакетов.

mkdir ~/Downloads

Затем исполь­зуй­те curl, что­бы загру­зить бинар­ные фай­лы Apache Kafka.

curl "http://www-eu.apache.org/dist/kafka/1.1.0/kafka_2.12-1.1.0.tgz" -o ~/Downloads/kafka.tgz

Создай­те ката­лог по име­ни kafka и открой­те его. Это базо­вый ката­лог сер­ве­ра Kafka.

mkdir ~/kafka && cd ~/kafka

Извле­ки­те загру­жен­ный архив в этот каталог:

tar -xvzf ~/Downloads/kafka.tgz --strip 1

Флаг —strip 1 извле­ка­ет содер­жи­мое архи­ва в ката­лог ~/kafka/, а не в дру­гой под­ка­та­лог (напри­мер, ~/kafka/kafka_2.12-1.1.0/).

Теперь мож­но перей­ти к настрой­ке Kafka.

4.Настройка сервера Kafka

Kafka по умол­ча­нию не поз­во­ля­ет уда­лять темы, кате­го­рии и груп­пы, в кото­рых могут быть опуб­ли­ко­ва­ны сооб­ще­ния. Что­бы изме­нить это, отре­дак­ти­руй­те  кон­фи­гу­ра­ции. Открой­те server.properties в тек­сто­вом редакторе.

vi ~/kafka/config/server.properties

Что­бы изме­нить стан­дарт­ное пове­де­ние, добавь­те в конец фай­ла такую строку:

delete.topic.enable = true

Когда вы закон­чи­те, нажми­те Esc, что­бы вый­ти из режи­ма встав­ки, и :wq, что­бы запи­сать изме­не­ния в файл и вый­ти. Теперь пора перей­ти к созда­нию юнит-файлов.

5.Создание юнит-файла для Kafka

Теперь нуж­но создать юнит-фай­лы systemd для сер­ви­са Kafka. Это поз­во­лит управ­лять сер­ви­сом Kafka – запус­кать, оста­нав­ли­вать и пере­за­пус­кать его в соот­вет­ствии с дру­ги­ми сер­ви­са­ми Linux.

Zookeeper – это сер­вис, кото­рый Kafka исполь­зу­ет для управ­ле­ния состо­я­ни­ем и кон­фи­гу­ра­ци­ей кла­сте­ра. Он широ­ко исполь­зу­ет­ся как неотъ­ем­ле­мый ком­по­нент во мно­гих рас­пре­де­лен­ных систе­мах. Боль­ше мож­но узнать в офи­ци­аль­ной доку­мен­та­ции Zookeeper.

Создай­те файл:

vi /etc/systemd/system/zookeeper.service

Вставь­те в него такое определение:

[Unit]
Requires=network.target remote-fs.target
After=network.target remote-fs.target
[Service]
Type=simple
User=kafka
ExecStart=/home/kafka/kafka/bin/zookeeper-server-start.sh /home/kafka/kafka/config/zookeeper.properties
ExecStop=/home/kafka/kafka/bin/zookeeper-server-stop.sh
Restart=on-abnormal
[Install]
WantedBy=multi-user.target

В раз­де­ле [Unit] ука­за­но, что Zookeeper нуж­на сеть и фай­ло­вая систе­ма, преж­де чем сер­вис смо­жет начать работу.

Раз­дел [Service] ука­зы­ва­ет, что systemd дол­жен исполь­зо­вать фай­лы обо­лоч­ки zookeeper-server-start.sh и zookeeper-server-stop.sh  для запус­ка и оста­нов­ки сер­ви­са. Он так­же ука­зы­ва­ет, что Zookeeper сле­ду­ет пере­за­пус­кать авто­ма­ти­че­ски, если он выхо­дит из строя.

Теперь создай­те юнит-файл для kafka:

vi /etc/systemd/system/kafka.service

Вставь­те в файл такое объявление:

[Unit]
Requires=zookeeper.service
After=zookeeper.service
[Service]
Type=simple
User=kafka
ExecStart=/bin/sh -c '/home/kafka/kafka/bin/kafka-server-start.sh /home/kafka/kafka/config/server.properties > /home/kafka/kafka/kafka.log 2>&1'
ExecStop=/home/kafka/kafka/bin/kafka-server-stop.sh
Restart=on-abnormal
[Install]
WantedBy=multi-user.target

Раз­дел [Unit] сооб­ща­ет, что этот юнит-файл зави­сит от zookeeper.service. Бла­го­да­ря этой зави­си­мо­сти zookeeper будет все­гда авто­ма­ти­че­ски запус­кать­ся рань­ше, чем kafka.

Раз­дел [Service] ука­зы­ва­ет, что systemd долж­на исполь­зо­вать фай­лы обо­лоч­ки kafka-server-start.sh и kafka-server-stop.sh  для запус­ка и оста­нов­ки сер­ви­са. Он так­же ука­зы­ва­ет, что Kafka сле­ду­ет пере­за­пус­кать авто­ма­ти­че­ски, если он выхо­дит из строя.

Теперь юнит-фай­лы гото­вы. Мож­но запу­стить Kafka:

systemctl start kafka

Что­бы убе­дить­ся, что сер­вис запу­стил­ся успеш­но, про­верь­те логи юнита:

journalctl -u kafka

Вы полу­чи­те:

Jul 17 18:38:59 kafka-ubuntu systemd[1]: Started kafka.service.

Теперь сер­вис Kafka запу­щен и про­слу­ши­ва­ет порт 9092.

Одна­ко пока что этот сер­вис не будет запус­кать­ся авто­ма­ти­че­ски вме­сте с сер­ве­ром. Что­бы доба­вить kafka в авто­за­груз­ку, введите:

systemctl enable kafka

6.Тестирование установки

Что­бы убе­дить­ся, что сер­вер Kafka рабо­та­ет пра­виль­но, попро­буй­те опуб­ли­ко­вать тесто­вое сооб­ще­ние «Hello World».

Для пуб­ли­ка­ции сооб­ще­ний необходимы:

  • Изда­тель (producer), кото­рый поз­во­ля­ет пуб­ли­ко­вать запи­си и дан­ные по темам.
  • Под­пис­чик (consumer), кото­рый чита­ет сообщения.

Создай­те тему TutorialTopic:

~/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic TutorialTopic

Что­бы создать изда­те­ля, исполь­зуй­те в команд­ной стро­ке скрипт kafka-console-producer.sh. Ему нуж­ны аргу­мен­ты – имя хоста Kafka, порт и тема.

Опуб­ли­куй­те стро­ку «Hello, World» в теме TutorialTopic:

echo "Hello, World" | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic TutorialTopic > /dev/null

Теперь мож­но создать под­пис­чи­ка с помо­щью скрип­та kafka-console-consumer.sh. Ему в каче­стве аргу­мен­тов нуж­ны имя хоста, порт ZooKeeper и тема.

Сле­ду­ю­щая коман­да под­пи­шет­ся на сооб­ще­ния из TutorialTopic. Обра­ти­те вни­ма­ние на флаг —from-begin, кото­рый поз­во­ля­ет читать сооб­ще­ния, кото­рые были опуб­ли­ко­ва­ны до того, как состо­я­лась подписка:

~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TutorialTopic --from-beginning

Если в кон­фи­гу­ра­ции нет оши­бок, вы полу­чи­те в тер­ми­на­ле вывод:

Hello, World

Сце­на­рий будет про­дол­жать рабо­тать, ожи­дая пуб­ли­ка­ции новых сооб­ще­ний в этой теме. Вы може­те открыть новый тер­ми­нал и с помо­щью изда­те­ля опуб­ли­ко­вать еще несколь­ко сооб­ще­ний. Вы долж­ны видеть все новые сооб­ще­ния в тер­ми­на­ле подписчика.

Когда вы закон­чи­те тести­ро­ва­ние, нажми­те CTRL + C, что­бы оста­но­вить скрипт подписчика.

7.Установка KafkaT (опционально)

KafkaT – это очень полез­ный инстру­мент, раз­ра­бо­тан­ный Airbnb, кото­рый поз­во­ля­ет про­смат­ри­вать подроб­ную инфор­ма­цию о кла­сте­ре Kafka и выпол­нять неко­то­рые зада­чи по управ­ле­нию из команд­ной стро­ки. Сам инстру­мент явля­ет­ся gem-ом Ruby, пото­му для его рабо­ты нуж­но уста­но­вить Ruby. Так­же пона­до­бит­ся пакет build-essential, поз­во­ля­ю­щий собрать необ­хо­ди­мые gem-ы. Уста­но­ви­те всё это при помо­щи команды:

sudo yum install ruby ruby-devel make gcc patch

Теперь уста­но­ви­те KafkaT при помо­щи коман­ды gem.

sudo gem install kafkat

Кон­фи­гу­ра­ци­он­ный файл .kafkatcfg  исполь­зу­ет­ся KafkaT для опре­де­ле­ния ката­ло­га уста­нов­ки и логов сер­ве­ра Kafka. Так­же он настро­ит вза­и­мо­дей­ствие KafkaT и ZooKeeper. Создай­те файл:

vi ~/.kafkatcfg

Добавь­те в файл сле­ду­ю­щие строки:

{
"kafka_path": "~/kafka",
"log_path": "/tmp/kafka-logs",
"zk_path": "localhost:2181"
}

Теперь инстру­мент KafkaT готов к рабо­те. Что­бы про­смот­реть инфор­ма­цию обо всех раз­де­лах сер­ве­ра Kafka, вве­ди­те команду:

kafkat partitions

Вывод выгля­дит так:

Topic                 Partition   Leader      Replicas        ISRs
TutorialTopic         0             0         [0]             [0]
__consumer_offsets    0             0         [0]                           [0]
...
...

Вы уви­ди­те TutorialTopic, а так­же __consumer_offsets, внут­рен­нюю тему, исполь­зу­е­мую Kafka для хра­не­ния инфор­ма­ции, свя­зан­ной с кли­ен­том. Вы може­те сме­ло игно­ри­ро­вать стро­ки, начи­на­ю­щи­е­ся с __consumer_offsets.

При­ме­ча­ние: Что­бы узнать боль­ше о KafkaT, посе­ти­те GitHub-репо­зи­то­рий про­ек­та.

8.Настройка многоузлового кластера (опционально)

Что­бы создать мно­го­уз­ло­вой кла­стер Kafka, состо­я­щий из несколь­ких машин CentOS 7, повто­ри­те инструк­ции раз­де­лов 1-5 на каж­дой машине. Кро­ме того, на каж­дой машине нуж­но вне­сти в server.properties такие изменения:

  • Зна­че­ние broker.id долж­но быть уни­каль­ным на каж­дом ком­по­нен­те кла­сте­ра (напри­мер, «server1», «server2» и так далее).
  • Зна­че­ние пара­мет­ра zookeeper.connect на каж­дой ноде долж­но ука­зы­вать на один и тот же экзем­пляр ZooKeeper. Это свой­ство сле­ду­ет фор­ма­ту <HOSTNAME/IP_ADDRESS>:<PORT> (напри­мер, «203.0.113.0:2181», «203.0.113.1:2181»).

Что­бы уста­но­вить несколь­ко экзем­пля­ров ZooKeeper для одно­го кла­сте­ра, зна­че­ние zookeeper.connect на каж­дой ноде долж­но содер­жать IP-адре­са и номе­ра пор­тов всех экзем­пля­ров ZooKeeper через запятую.

9. Ограничение прав пользователя Kafka

Теперь, когда уста­нов­ка завер­ше­на, мож­но отнять пра­ва адми­ни­стра­то­ра у поль­зо­ва­те­ля kafka. Но преж­де чем сде­лать это, нуж­но вый­ти из систе­мы и сно­ва вой­ти как любой дру­гой не-root поль­зо­ва­тель с пра­ва­ми sudo.

Что­бы отнять root-пра­ва у поль­зо­ва­те­ля kafka, уда­ли­те его из груп­пы wheel:

gpasswd -d kafka wheel

Что­бы повы­сить без­опас­ность сер­ве­ра Kafka, забло­ки­руй­те пароль поль­зо­ва­те­ля kafka, что­бы никто не мог открыть этот аккаунт.

passwd kafka -l

Теперь толь­ко поль­зо­ва­тель с пра­ва­ми root или sudo может открыть акка­унт kafka при помо­щи команды:

sudo su - kafka

Что­бы раз­бло­ки­ро­вать пароль, исполь­зуй­те команду:

passwd kafka -u

 

==================================================

Резервное копирование

Резерв­ное копи­ро­ва­ние дан­ных Apache Kafka явля­ет­ся важ­ной частью рабо­ты с дан­ны­ми. Регу­ляр­ный бэкап помо­жет вос­ста­но­вить­ся после слу­чай­ной поте­ри дан­ных или устра­нить непра­виль­ные дан­ные, добав­лен­ные в кла­стер по ошиб­ке поль­зо­ва­те­ля. Эффек­тив­ным спо­со­бом резерв­но­го копи­ро­ва­ния и вос­ста­нов­ле­ния явля­ют­ся дам­пы дан­ных кла­сте­ра и разделов.

Импорт и мигра­ция резерв­ных копий дан­ных на отдель­ный сер­вер полез­ны в ситу­а­ци­ях, когда экзем­пляр Kafka ста­но­вит­ся непри­год­ным для рабо­ты из-за аппа­рат­но­го или сете­во­го сбоя, и вам необ­хо­ди­мо создать новый экзем­пляр со ста­ры­ми дан­ны­ми. Так­же эти про­це­ду­ры полез­ны при пере­ме­ще­нии экзем­пля­ра Kafka на дру­гой сер­вер из-за изме­не­ния в исполь­зо­ва­нии ресурсов.

1: Создание тестовой темы и добавление сообщений

Сооб­ще­ние в Kafka – основ­ная еди­ни­ца хра­не­ния дан­ных, это объ­ект, кото­рый мож­но опуб­ли­ко­вать и на кото­рый мож­но под­пи­сать­ся. Тема Kafka – это что-то типа кон­тей­не­ра для груп­пы свя­зан­ных сооб­ще­ний. Когда вы под­пи­сы­ва­е­тесь на опре­де­лен­ную тему, вы буде­те полу­чать толь­ко сооб­ще­ния, кото­рые были опуб­ли­ко­ва­ны в этой кон­крет­ной теме. Сей­час вой­ди­те на сер­вер, дан­ные кото­ро­го вы хоти­те ско­пи­ро­вать (исход­ный сер­вер), и доба­ви­те тему Kafka и сооб­ще­ние, что­бы у вас были какие-то тесто­вые данные.

В этом мануа­ле пред­по­ла­га­ет­ся, что вы уста­но­ви­ли Kafka в домаш­ний ката­лог поль­зо­ва­те­ля kafka (/home/kafka/kafka). Если ваша уста­нов­ка нахо­дит­ся в дру­гом ката­ло­ге, изме­ни­те часть ~/kafka в коман­дах, в кото­рых ука­зан путь к установке.

Под­клю­чи­тесь по SSH:

ssh centos7@source_server_ip

Запу­сти­те эту коман­ду, что­бы вой­ти как поль­зо­ва­тель kafka:

sudo -iu kafka

Создай­те тему BackupTopic, исполь­зуя слу­жеб­ный файл обо­лоч­ки kafka-topics.sh в ката­ло­ге bin вашей уста­нов­ки Kafka:

~/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic BackupTopic

Опуб­ли­куй­те стро­ку «Test Message 1» в теме BackupTopic с помо­щью слу­жеб­но­го сце­на­рия обо­лоч­ки ~/kafka/bin/kafka-console-producer.sh.

Если вы хоти­те доба­вить допол­ни­тель­ные сооб­ще­ния, вы може­те сде­лать это сейчас.

echo "Test Message 1" | ~/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic BackupTopic > /dev/null

Файл ~/kafka/bin/kafka-console-producer.sh поз­во­ля­ет пуб­ли­ко­вать сооб­ще­ния непо­сред­ствен­но из команд­ной стро­ки. Обыч­но сооб­ще­ния пуб­ли­ку­ют­ся через кли­ент­скую биб­лио­те­ку Kafka, внут­ри вашей про­грам­мы, но посколь­ку для это­го тре­бу­ют­ся раз­ные настрой­ки для раз­ных язы­ков про­грам­ми­ро­ва­ния, вы може­те исполь­зо­вать сце­на­рий обо­лоч­ки как неза­ви­си­мый от язы­ка спо­соб пуб­ли­ка­ции сооб­ще­ний во вре­мя тести­ро­ва­ния или выпол­не­ния адми­ни­стра­тив­ных задач. Флаг —topic ука­зы­ва­ет тему, в кото­рой будет опуб­ли­ко­ва­но сообщение.

Затем убе­ди­тесь, что сце­на­рий kafka-console-producer.sh опуб­ли­ко­вал сообщение(я), выпол­нив сле­ду­ю­щую команду:

~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic BackupTopic --from-beginning

Сце­на­рий обо­лоч­ки ~/kafka/bin/kafka-console-consumer.sh запус­ка­ет потре­би­те­ля. После запус­ка он будет под­пи­сы­вать­ся на сооб­ще­ния из темы, опуб­ли­ко­ван­ной вами в сооб­ще­нии «Test Message 1» в преды­ду­щей коман­де. Флаг —from-begin в коман­де поз­во­ля­ет полу­чить доступ к сооб­ще­ни­ям, опуб­ли­ко­ван­ным до запус­ка потре­би­те­ля. Если этот фла­жок не уста­нов­лен, будут отоб­ра­жать­ся толь­ко сооб­ще­ния, опуб­ли­ко­ван­ные после запус­ка потре­би­те­ля. Запу­стив коман­ду, вы уви­ди­те сле­ду­ю­щий вывод в терминале:

Test Message 1

Нажми­те CTRL+C, что­бы оста­но­вить потребителя.

Теперь у вас есть тесто­вые дан­ные, и вы може­те полу­чить к ним доступ.

2: Резервное копирование данных ZooKeeper

Преж­де чем при­сту­пить к бэка­пу дан­ных Kafka, нуж­но ско­пи­ро­вать состо­я­ние кла­сте­ра, кото­рое хра­нит ZooKeeper.

ZooKeeper хра­нит свои дан­ные в ката­ло­ге, ука­зан­ном в поле dataDir в фай­ле кон­фи­гу­ра­ции ~/kafka/config/zookeeper.properties. Вам необ­хо­ди­мо про­чи­тать зна­че­ние это­го поля, что­бы опре­де­лить ката­лог для резерв­но­го копи­ро­ва­ния. По умол­ча­нию dataDir ука­зы­ва­ет на ката­лог /tmp/zookeeper . Если в вашей уста­нов­ке зна­че­ние отли­ча­ет­ся, заме­ни­те /tmp/zookeeper этим зна­че­ни­ем в сле­ду­ю­щих командах.

Вот при­мер­ный вывод ~/kafka/config/zookeeper.properties:

...
...
...
# the directory where the snapshot is stored.
dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
...
...
...

Теперь, когда у вас есть путь к ката­ло­гу, вы може­те создать сжа­тый файл его содер­жи­мо­го. Сжа­тые архив­ные фай­лы луч­ше про­стых фай­лов, они эко­но­мят место на дис­ке. Запу­сти­те сле­ду­ю­щую команду:

tar -czf /home/kafka/zookeeper-backup.tar.gz /tmp/zookeeper/*

Вывод коман­ды мож­но сме­ло игнорировать:

tar: Removing leading / from member names

Фла­ги -c и -z созда­ют архив и при­ме­ня­ют к нему сжа­тие gzip. Флаг -f ука­зы­ва­ет имя выход­но­го сжа­то­го архив­но­го фай­ла, в дан­ном слу­чае это zookeeper-backup.tar.gz.

Запу­сти­те ls в теку­щем ката­ло­ге и убе­ди­тесь, что в нем есть zookeeper-backup.tar.gz.

Если файл есть – зна­чит, бэкап про­шел успешно.

3: Резервное копирование тем и сообщений Kafka

В этом раз­де­ле вы созда­ди­те резерв­ную копию ката­ло­га дан­ных Kafka в сжа­тый tar-файл, как вы дела­ли это для ZooKeeper на преды­ду­щем шаге.

Kafka хра­нит темы, сооб­ще­ния и внут­рен­ние фай­лы в ката­ло­ге, ука­зан­ном в поле log.dirs в фай­ле кон­фи­гу­ра­ции ~/kafka/config/server.properties. Вам необ­хо­ди­мо про­чи­тать зна­че­ние это­го поля, что­бы опре­де­лить ката­лог для резерв­но­го копи­ро­ва­ния. По умол­ча­нию в вашей теку­щей уста­нов­ке log.dirs ука­зы­ва­ет на ката­лог /tmp/kafka-logs. Если в вашей уста­нов­ке это не так, ука­жи­те в сле­ду­ю­щих коман­дах пра­виль­ное значение.

Вот так при­мер­но выгля­дит вывод фай­ла ~/kafka/config/server.properties:

...
...
...
############################# Log Basics #############################
# A comma separated list of directories under which to store log files
log.dirs=/tmp/kafka-logs
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1
...
...
...

Сна­ча­ла оста­но­ви­те сер­вис Kafka, что­бы дан­ные в ката­ло­ге log.dirs нахо­ди­лись в согла­со­ван­ном состо­я­нии при созда­нии архи­ва tar. Что­бы сде­лать это, вер­ни­тесь в сес­сию поль­зо­ва­те­ля без прав root на сер­ве­ре, вве­дя коман­ду exit, а затем выпол­ни­те сле­ду­ю­щую команду:

sudo systemctl stop kafka

После оста­нов­ки Kafka вой­ди­те в систе­му как поль­зо­ва­тель kafka:

sudo -iu kafka

Оста­нав­ли­вать и запус­кать сер­ви­сы Kafka и ZooKeeper нуж­но от име­ни поль­зо­ва­те­ля sudo (не root), посколь­ку (соглас­но ману­а­лу по уста­нов­ке Apache Kafka) вы огра­ни­чи­ли поль­зо­ва­те­ля kafka в каче­стве меры без­опас­но­сти. Это отклю­ча­ет доступ sudo для поль­зо­ва­те­ля kafka, что при­во­дит к невоз­мож­но­сти выпол­не­ния команд.

Теперь создай­те сжа­тый файл содер­жи­мо­го ката­ло­га, выпол­нив сле­ду­ю­щую команду:

tar -czf /home/kafka/kafka-backup.tar.gz /tmp/kafka-logs/*

Сно­ва може­те спо­кой­но игно­ри­ро­вать вывод команды:

tar: Removing leading / from member names

Вы може­те запу­стить ls в теку­щем ката­ло­ге, что­бы уви­деть в нем файл kafka-backup.tar.gz.

Затем мож­но сно­ва запу­стить Kafka – если вы не хоти­те сра­зу вос­ста­нав­ли­вать дан­ные. Вве­ди­те коман­ду exit, что­бы пере­клю­чить­ся на поль­зо­ва­те­ля без пол­но­мо­чий root, и затем запустите:

sudo systemctl start kafka

Вой­ди­те как поль­зо­ва­тель kafka:

sudo -iu kafka

Вы успеш­но созда­ли резерв­ную копию дан­ных Kafka. Теперь вы може­те попро­бо­вать вос­ста­но­вить дан­ные о состо­я­нии кла­сте­ра, хра­ня­щи­е­ся в ZooKeeper.

4: Восстановление данных ZooKeeper

В этом раз­де­ле мы вос­ста­но­вим дан­ные о состо­я­нии кла­сте­ра, кото­рые Kafka созда­ет, когда поль­зо­ва­тель выпол­ня­ет такие опе­ра­ции, как созда­ние раз­де­ла, добавление/удаление допол­ни­тель­ных нод, добав­ле­ние и потреб­ле­ние сооб­ще­ний и так далее. Вы узна­е­те, как вос­ста­но­вить дан­ные в суще­ству­ю­щей исход­ной уста­нов­ке, уда­лив ката­лог дан­ных ZooKeeper и вос­ста­но­вив содер­жи­мое фай­ла zookeeper-backup.tar.gz. Если вы хоти­те вос­ста­но­вить дан­ные на дру­гом сер­ве­ре, см. раз­дел 7.

Сей­час нуж­но оста­но­вить Kafka и ZooKeeper, что­бы сохра­нить целост­ность данных.

Оста­но­ви­те Kafka. Вве­ди­те exit, что­бы пере­клю­чить­ся на поль­зо­ва­те­ля sudo, а затем запустите:

sudo systemctl stop kafka

Затем оста­но­ви­те сер­вис ZooKeeper:

sudo systemctl stop zookeeper

Пере­клю­чи­тесь на поль­зо­ва­те­ля kafka:

sudo -iu kafka

Затем вы може­те без­опас­но уда­лить суще­ству­ю­щий ката­лог дан­ных кла­сте­ра с помо­щью сле­ду­ю­щей команды:

rm -r /tmp/zookeeper/*

Теперь вос­ста­но­ви­те дан­ные, резерв­ные копии кото­рых вы созда­ли в раз­де­ле 2:

tar -C /tmp/zookeeper -xzf /home/kafka/zookeeper-backup.tar.gz --strip-components 2

Флаг -C пере­хо­дит в ката­лог /tmp/zookeeper перед извле­че­ни­ем дан­ных. Флаг —strip 2 извле­чет содер­жи­мое архи­ва в /tmp/zookeeper, а не в дру­гой ката­лог (напри­мер, /tmp/zookeeper/tmp/zookeeper/)  внут­ри него.

Теперь попро­бу­ем вос­ста­но­вить дан­ные Kafka.

5: Восстановление данных Kafka

Теперь попро­буй­те вос­ста­но­вить резерв­ную копию дан­ных Kafka на исход­ном сер­ве­ре (вос­ста­нов­ле­ние на дру­гом сер­ве­ре опи­са­но в раз­де­ле 7). Для это­го нуж­но уда­лить ката­лог дан­ных Kafka и вос­ста­но­вить сжа­тый архив.

Уда­ли­те ката­лог дан­ных Kafka:

rm -r /tmp/kafka-logs/*

Теперь, когда вы уда­ли­ли дан­ные, ваша уста­нов­ка Kafka напо­ми­на­ет све­жую уста­нов­ку, в кото­рой нет тем и сооб­ще­ний. Что­бы вос­ста­но­вить резерв­ные копии дан­ных, рас­па­куй­те файлы:

tar -C /tmp/kafka-logs -xzf /home/kafka/kafka-backup.tar.gz --strip-components 2

Флаг -C пере­хо­дит в ката­лог /tmp/kafka-logs перед извле­че­ни­ем дан­ных. Флаг —strip 2 извле­чет содер­жи­мое архи­ва в /tmp/kafka-logs, а не в дру­гой ката­лог (напри­мер, /tmp/kafka-logs/kafka-logs/) внут­ри него.

Теперь, когда вы успеш­но извлек­ли дан­ные, вы може­те сно­ва запу­стить сер­ви­сы Kafka и ZooKeeper. Вве­ди­те коман­ду exit, что­бы пере­клю­чить­ся на поль­зо­ва­те­ля sudo, а затем запустите:

sudo systemctl start kafka

Что­бы запу­стить ZooKeeper, введите:

sudo systemctl start zookeeper

Вер­ни­тесь к поль­зо­ва­те­лю kafka:

sudo -iu kafka

Вы вос­ста­но­ви­ли дан­ные kafka, а теперь нуж­но про­ве­рить, насколь­ко это было успешно.

6: Проверка восстановления

Что­бы про­ве­рить вос­ста­нов­лен­ные дан­ные Kafka, нуж­но исполь­зо­вать сооб­ще­ния из темы, создан­ной в раз­де­ле 1.

Подо­жди­те несколь­ко минут, пока Kafka запу­стит­ся, а затем выпол­ни­те сле­ду­ю­щую коман­ду, что­бы про­чи­тать сооб­ще­ния из BackupTopic:

~/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic BackupTopic --from-beginning

Если вы полу­чи­ли такое пре­ду­пре­жде­ние, вам нуж­но подо­ждать, пока Kafka пол­но­стью запустится.

[2019-09-13 15:52:45,234] WARN [Consumer clientId=consumer-1, groupId=console-consumer-87747] Connection to node -1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)

Повто­ри­те преды­ду­щую коман­ду через несколь­ко минут или запустите:

sudo systemctl restart kafka

от име­ни поль­зо­ва­те­ля sudo. Если дан­ные вос­ста­но­ви­лись без про­блем, вы уви­ди­те сле­ду­ю­щий вывод:

Test Message 1

Если вы не полу­чи­ли это сооб­ще­ние, убе­ди­тесь, что вы выпол­ни­ли все коман­ды в преды­ду­щем раз­де­ле правильно.

Про­ве­рив дан­ные Kafka, вы зна­е­те, что успеш­но созда­ли резерв­ную копию и може­те вос­ста­но­вить свои дан­ные в рам­ках одно­го сер­ве­ра Kafka.

7: Перемещение и восстановление резервной копии Kafka между серверами (опционально)

В этом раз­де­ле вы узна­е­те, как пере­не­сти дан­ные Kafka с исход­но­го сер­ве­ра на целе­вой. Для это­го нуж­на коман­да scp – она поз­во­ля­ет загру­зить сжа­тый файл tar.gz в локаль­ную систе­му. После это­го с помо­щью этой же коман­ды scp вы може­те загру­зить фай­лы на целе­вой сер­вер. После того как фай­лы появят­ся на конеч­ном сер­ве­ре, вы смо­же­те вос­ста­но­вить резерв­ную копию и убе­дить­ся, что мигра­ция про­шла успешно.

Фай­лы резерв­ных копий нуж­но сна­ча­ла загру­зить локаль­но, а затем – на целе­вой сер­вер. Ско­пи­ро­вать их непо­сред­ствен­но с исход­но­го на целе­вой сер­вер не полу­чит­ся, посколь­ку у послед­не­го нет SSH-клю­ча ваше­го исход­но­го сер­ве­ра в фай­ле /home/user1/.ssh/authorized_keys и он не может под­клю­чить­ся к нему. Одна­ко локаль­ный ком­пью­тер может под­клю­чать­ся к обо­им сер­ве­рам, что избав­ля­ет вас от допол­ни­тель­ных настро­ек SSH-досту­па меж­ду серверами.

Загру­зи­те файл zookeeper-backup.tar.gz на локаль­ную машину.

scp centos7@source_server_ip:/home/kafka/zookeeper-backup.tar.gz .

Вы полу­чи­те:

zookeeper-backup.tar.gz                                                                                                  100%   68KB 128.0KB/s   00:00

Затем запу­сти­те сле­ду­ю­щую коман­ду, что­бы загру­зить kafka-backup.tar.gz на локаль­ную машину:

scp centos7@source_server_ip:/home/kafka/kafka-backup.tar.gz .

Коман­да вернет:

kafka-backup.tar.gz                                                                                                       100% 1031KB 488.3KB/s   00:02

В теку­щем ката­ло­ге на локаль­ной машине запу­сти­те ls, что­бы про­ве­рить нали­чие этих файлов.

kafka-backup.tar.gz zookeeper.tar.gz

Выпол­ни­те сле­ду­ю­щую коман­ду, что­бы пере­дать файл zookeeper-backup.tar.gz в /home/kafka/ на целе­вом сервере:

scp zookeeper-backup.tar.gz centos7@destination_server_ip:/home/user1/zookeeper-backup.tar.gz

Теперь выпол­ни­те сле­ду­ю­щую коман­ду, что­бы пере­дать файл kafka-backup.tar.gz /home/kafka/ на целе­вом сервере:

scp kafka-backup.tar.gz centos7@destination_server_ip:/home/user1/kafka-backup.tar.gz

Вы успеш­но загру­зи­ли фай­лы резерв­ных копий на целе­вой сер­вер. Посколь­ку фай­лы нахо­дят­ся в ката­ло­ге /home/user1/, а к нему у поль­зо­ва­те­ля kafka нет досту­па, вы може­те пере­ме­стить фай­лы в ката­лог /home/kafka/ и изме­нить пра­ва на них.

Под­клю­чи­тесь к целе­во­му сер­ве­ру по SSH:

ssh centos7@destination_server_ip

Пере­ме­сти­те zookeeper-backup.tar.gz в /home/kafka/:

sudo mv zookeeper-backup.tar.gz /home/user1/zookeeper-backup.tar.gz

Затем пере­ме­сти­те файл kafka-backup.tar.gz в /home/kafka/:

sudo mv kafka-backup.tar.gz /home/kafka/kafka-backup.tar.gz

Пере­дай­те пра­ва на резерв­ные копии поль­зо­ва­те­лю kafka:

sudo chown kafka /home/kafka/zookeeper-backup.tar.gz /home/kafka/kafka-backup.tar.gz

Преды­ду­щие коман­ды mv и chown не будут отоб­ра­жать ника­ких выход­ных данных.

Теперь, когда фай­лы резерв­ных копий нахо­дят­ся на целе­вом сер­ве­ре и в пра­виль­ном ката­ло­ге, сле­дуй­те коман­дам, пере­чис­лен­ным в раз­де­лах 4–6 это­го ману­а­ла, что­бы вос­ста­но­вить и про­ве­рить дан­ные на целе­вом сервере.