Thank you for reading this post, don't forget to subscribe!
Мы создадим кластер из нескольких брокеров, добавим туда разбитый на разделы топик и включим его резервное копирование. При такой конфигурации нужно будет очень постараться, чтобы с кластером случилось что-то неприятное.
Создаём кластер
Думаю, для сегодняшнего эксперимента трёх кафкианских хостов будет достаточно. В идеале, конечно, стоило бы клонировать и ZooKeeper — для надёжности. Но сегодня я почему-то уверен, что ему ничего не грозит, так что обойдёмся и одним.
Чтобы сэмитировать изолированные хосты, я помещу ZooKeeper и всех Кафка-брокеров в собственные Docker контейнеры и запущу их через docker-compose. Последний и упросит конфигурацию, и позаботится о настройке сети.
Dockerfile
Чтобы создать образы контейнеров для Kafka и ZooKeeper нужно всего две вещи: JDK и Kafka инсталлер. Образы будут настолько похожи, что, в принципе, можно обойтись одним Dockerfile и для брокеров, и для координатора.
Брокеров к тому же нужно еще настроить. Например, по-умолчанию Kafka подключается к ZooKeeper через localhost:2181, который внутри контейнера ведёт в никуда. С другой стороны, если дать контейнеру с ZooKeeper внятное название, например — zookeeper, то тогда к нему можно подключаться по zookeeper:2181. Нужно просто немного подредактировать конфигурацию брокера
Второй момент: у каждого брокера внутри кластера должен быть уникальный broker id. Он задаётся в server.properties, и в нашем случае его можно передать прямо из docker-compose в Dockerfile через ARG инструкцию, и потом уже добавить в конфигурацию, например, через sed . Вот в итоге что у меня получилось:
vim Dockerfile
[codesyntax lang="php" blockstate="collapsed"]
1 2 3 4 5 6 7 8 9 10 11 |
FROM openjdk ARG brokerId=1 ADD kafka_2.11-0.10.1.0.tgz / RUN mv /kafka_2.11-0.10.1.0 /kafka && \ sed -i s/localhost:2181/zookeeper:2181/g /kafka/config/server.properties && \ sed -i s/broker.id=0/broker.id=${brokerId}/g /kafka/config/server.properties EXPOSE 2181 9092 |
[/codesyntax]
В образе openjdk есть JDK, а ADD инструкция добавит и распакует архив с Кафкой, который я предусмотрительно скачал до этого. Потом в RUN переименовываем папку с Кафкой и обновляем server.properties. Наконец, EXPOSE оставит в контейнере два открытых порта: 2181 (ZooKeeper) и 9092 (Kafka).
docker-compose.yml
docker-compose.yml для запуска зоопарка контейнеров будет посложнее. Ведь чтобы получить внятный и функционирующий кластер, понадобятся контейнеры с ZooKeeper, тремя Kafka серверами, и продюсером и консьюмером сообщений (ну чтобы был хоть какой поток данных). Итого — шесть контейнеров. Это раз в шесть больше, чем обычно.
ZooKeeper
Конфигурация контейнера с ZooKeeper самая простая:
[codesyntax lang="php" blockstate="collapsed"]
1 2 3 4 5 |
zookeeper: build: . command: /kafka/bin/zookeeper-server-start.sh /kafka/config/zookeeper.properties ports: - 2181:2181 |
[/codesyntax]
Она просто соберет Dockerfile, созданный на предыдущем этапе, запустит ZooKeeper внутри контейнера и оставит порт 2181 открытым для хоста. Вдруг нам захочется с ним пообщаться.
Kafka сервера
Конфигурация для Kafka серверов чуть-чуть сложнее, но всё еще вменяемая. Конфигураций будет три, и для сервера под номером 2 она будет такой:
[codesyntax lang="php" blockstate="collapsed"]
1 2 3 4 5 6 7 8 |
kafka2: build: context: . args: brokerId: 2 command: /kafka/bin/kafka-server-start.sh /kafka/config/server.properties depends_on: - zookeeper |
[/codesyntax]
depends_on поможет запустить контейнер с Кафкой обязательно после контейнера с ZooKeeper, а brokerId: 2 пойдёт прямиком в Dockerfile, откуда перекочует в server.properties.
Продюсер
Продюсер — это маленький филиал ада. И всё из-за команды command . Ведь чтобы запустить продюсер, отправляющий сообщения, нужно всего-то:
[codesyntax lang="php" blockstate="collapsed"]
1 2 3 4 5 6 7 8 9 10 11 |
sleep 4 /kafka/bin/kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 3 --partitions 2 --topic dates while true; do date | /kafka/bin/kafka-console-producer.sh \ --broker-list kafka1:9092,kafka2:9092,kafka3:9092 \ --topic dates; sleep 1; done |
[/codesyntax]
Что в переводе на кошерный русский означает:
- подожди 4 секунды (решать race condition через задержки — плохой тон, но с воспитанием в нашу эпоху действительно не очень),
- создай топик под названием ‘dates’, раздели его на два раздела (partitions) и храни по три копии каждого,
- раз в секунду отправляй в топик сообщение — текущую дату.
Просто ведь. Но если ужать это в одну строчку, то получится вот что:
[codesyntax lang="php" blockstate="collapsed"]
1 2 3 4 5 6 7 8 |
producer: build: . command: bash -c "sleep 4 && /kafka/bin/kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 3 --partitions 2 --topic dates && while true; do date | /kafka/bin/kafka-console-producer.sh --broker-list kafka1:9092,kafka2:9092,kafka3:9092 --topic dates; sleep 1; done " depends_on: - zookeeper - kafka1 - kafka2 - kafka3 |
[/codesyntax]
Консьюмер
По сравнению с продюсером, консьюмер практически не уродлив:
[codesyntax lang="php" blockstate="collapsed"]
1 2 3 4 5 6 7 8 |
consumer: build: . command: bash -c "sleep 6 && /kafka/bin/kafka-console-consumer.sh --topic dates --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092" depends_on: - zookeeper - kafka1 - kafka2 - kafka3 |
[/codesyntax]
И продюсер, и консюмер получили в качестве параметров списки всех Kafka серверов. Это нужно для того, чтобы когда кого-то из брокеров не станет, у сервисов был запасной вариант.
Результат
Вот на что похож docker-compose.yml целиком:
[codesyntax lang="php" blockstate="collapsed"]
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
version: '2' services: zookeeper: build: . command: /kafka/bin/zookeeper-server-start.sh /kafka/config/zookeeper.properties ports: - 2181:2181 kafka1: build: context: . args: brokerId: 1 command: /kafka/bin/kafka-server-start.sh /kafka/config/server.properties depends_on: - zookeeper kafka2: build: context: . args: brokerId: 2 command: /kafka/bin/kafka-server-start.sh /kafka/config/server.properties depends_on: - zookeeper kafka3: build: context: . args: brokerId: 3 command: /kafka/bin/kafka-server-start.sh /kafka/config/server.properties depends_on: - zookeeper producer: build: . command: bash -c "sleep 4 && /kafka/bin/kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 3 --partitions 2 --topic dates && while true; do date | /kafka/bin/kafka-console-producer.sh --broker-list kafka1:9092,kafka2:9092,kafka3:9092 --topic dates; sleep 1; done " depends_on: - zookeeper - kafka1 - kafka2 - kafka3 consumer: build: . command: bash -c "sleep 6 && /kafka/bin/kafka-console-consumer.sh --topic dates --bootstrap-server kafka1:9092,kafka2:9092,kafka3:9092" depends_on: - zookeeper - kafka1 - kafka2 - kafka3 |
[/codesyntax]
Запускаем кластер
Всё просто. Запускаем docker-compose up и через десяток секунд монологов от очень разговорчивых контейнеров наступит тишина, иногда прерываемая сообщениями от консьюмера:
# consumer_1 | Tue Dec 13 05:23:31 UTC 2018
# consumer_1 | Tue Dec 13 05:23:34 UTC 2018
# consumer_1 | Tue Dec 13 05:23:37 UTC 2018
# consumer_1 | Tue Dec 13 05:23:39 UTC 2018
# consumer_1 | Tue Dec 13 05:23:43 UTC 2018
# consumer_1 | Tue Dec 13 05:23:46 UTC 2018
Кажется, оно действительно запустилось.
Так как 2181-й порт у ZooKeeper остался открыт, можно запросить у него статистику по топикам:
./bin/kafka-topics.sh --describe --topic dates --zookeeper 127.0.0.1:2181
# Topic: dates Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
# Topic: dates Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Ответ ZooKeeper’а весьма интересный: у топика ‘dates’ есть два раздела — 0 и 1, а их лидеры — брокеры 3 и 1 соответственно. Лидер раздела — это брокер, через который походит чтение и запись, и который отвечает за синхронизацию его реплик. В нашем случае реплики разделов хранятся у брокеров 3,1,2 и 1,2,3, и все они синхронизированы (Isr — in-sync replica), то есть все резервные копии на месте.
Боль и страдания Kafka кластера
Когда все брокеры в кластере работают, кластеру, разумеется, хорошо. Что будет, если какого-то из брокеров не станет? Выясняем ID контейнера kafka2 (docker-compose ведь изменил имя) через docker ps , останавливаем его, и смотрим, что получилось:
docker stop 12bda0311443
# 12bda0311443
Что здорово, consumer-контейнер как писал в консоль свои сообщения, так и продолжил без паузы. Но если посмотреть статистику топика, то кластер изменился:
./kafka/bin/kafka-topics.sh --describe --topic dates --zookeeper 127.0.0.1:2181
#Topic:dates PartitionCount:2 ReplicationFactor:3 Configs:
# Topic: dates Partition: 0 Leader: 3 Replicas: 3,1,2 Isr: 3,1
# Topic: dates Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1,3
Лидеры разделов остались теми же, но реплика с номером 2 пропала.
А что будет, если ещё кого-нибудь прибить?
Если остановить kafka3, то с потоком сообщений всё останется в порядке, но лидер нулевого раздела поменяется:
./bin/kafka-topics.sh --describe --topic dates --zookeeper 127.0.0.1:2181
#Topic:dates PartitionCount:2 ReplicationFactor:3 Configs:
# Topic: dates Partition: 0 Leader: 1 Replicas: 3,1,2 Isr: 1
# Topic: dates Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1
Теперь оставшийся брокер kafka1 — лидер всего. Реплика 3, разумеется, пропала.
Наконец, что будет, если снова вернуть kafka2 и kafka3 в строй?
./bin/kafka-topics.sh --describe --topic dates --zookeeper 127.0.0.1:2181
#Topic:dates PartitionCount:2 ReplicationFactor:3 Configs:
# Topic: dates Partition: 0 Leader: 1 Replicas: 3,1,2 Isr: 1,3,2
# Topic: dates Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1,3,2
ZooKeeper их заметил. Лидер разделов остался старым, но пропущенные реплики пересоздались и уютно расположились в «новых» брокерах. Кластер вернулся в сбалансированное состояние.
Мораль
Мы создали кластер из трёх хостов с фрагментированным топиком, а затем убили два хоста из трёх, и это никак не повлияло на нашу возможность публиковать и получать сообщения. Когда хосты вернулись назад, кластер перегруппировался и продолжил работать, как ни в чём не бывало.
Данный пример получился намного сложнее, чем предыдущий, но вся сложность пришла от Докера. Единственное изменение в конфигурации Кафка-сервера по сравнению с кластером из одного хоста — добавился broker id. То есть изменив всего один численный параметр мы можем добавлять сервера в кластер пачками.