Thank you for reading this post, don't forget to subscribe!
RabbitMQ — это полноценная и щедро удобренная фичами очередь сообщений. В отличие от ZeroMQ, который встраивается в приложения, RabbitMQ — сервис-посредник. Он разграничивает права доступа, поддерживает шифрование, сохранение сообщений на диск (чтобы пережить плановое отключение электричества), работу в кластерах и даже дублирование сервисов для повышенной живучести. К тому же он написан на Erlang, за что автоматически становится неубиваемым и поддерживаемым на большинстве популярных ОС.
В этом посте мы посмотрим, насколько тяжело отправлять и получать сообщения с RabbitMQ, да и вообще, на что он похож вблизи. В качестве платформы будет Убунта (запертая внутри Docker контейнера), но сгодился бы и Mac, и Windows.
Установка
Я не буду фокусироваться на установке слишком сильно, тем более что официльная документация весьма ничего, но самый простой способ получить работающий RabbitMQ на своей машине — через Docker и rabbitmq образ.
1
2
|
$ docker run -ti rabbitmq bash
# root@714dbe064eef:/#
|
Внутри получившегося контейнера будет полностью рабочий сервер сообщений, который нужно просто запустить:
1
2
|
$ service rabbitmq-server start
#[ ok ] Starting message broker: rabbitmq-server.
|
Если с Докером у вас не сложилось, то на настоящей машине установить RabbitMQ тоже не проблема. Вот на что это похоже на Убунте:
1
2
3
4
|
echo 'deb http://www.rabbitmq.com/debian/ testing main' | sudo tee /etc/apt/sources.list.d/rabbitmq.list
wget -O- https://www.rabbitmq.com/rabbitmq-release-signing-key.asc | sudo apt-key add -
sudo apt-get update
sudo apt-get install rabbitmq-server
|
На других системах, возможно, будет даже легче, но я всё равно очень рекомендую потратить немного времени и разобраться с Docker, потому что он идеально подходит для изучения таких вот вещей. Просто берем образ с уже установленным RabbitMQ, Redis, Elasticsearch, Jenkins — да вообще с чем угодно, играемся вволю, а потом удаляем контейнер целиком.
Отправка и получение сообщений с rabbitmqadmin
Отправить и получить сообщение можно не написав ни строчки кода. В комплект с RabbitMQ идёт management plugin, в котором есть милая python-утилитка — rabbitmqadmin . С ней можно создавать и удалять очереди сообщений, проверять их статус, а так же отправлять, собственно, сами сообщения. Management plugin выключен по умолчанию, так что его придётся сначала включить. Правда, можно было бы просто взять rabbitmq:management образ, где это уже сделано, но мы бы сэкономили всего одну команду.
Включаем rabbitmqadmin
1
2
3
4
5
6
7
8
9
10
|
$ rabbitmq-plugins enable rabbitmq_management
#The following plugins have been enabled:
# mochiweb
# webmachine
# rabbitmq_web_dispatch
# amqp_client
# rabbitmq_management_agent
# rabbitmq_management
#
#Applying plugin configuration to rabbit@fb252aa8ddbd… started 6 plugins.
|
..и всё. rabbitmqadmin расположен в удивительной глубины и непроизносимости папке, так что его сразу стоит добавить либо в PATH переменную, либо сразу в /usr/local/bin/.
1
2
|
find /var/lib/rabbitmq -name rabbitmqadmin | xargs -I{} sudo ln -s {} /usr/local/bin/rabbitmqadmin
sudo chmod +x /usr/local/bin/rabbitmqadmin
|
Теперь попробуем что-нибудь создать.
Играем с очередями и сообщениями
RabbitMQ — сервис-посредник, который создаёт и управляет очередями сообщений. В архитектуре распределенного приложения он будет где-то посередине:
Во-первых, давайте посмотрим, какие очереди в нём есть по-умолчанию:
1
2
|
$ rabbitmqadmin list queues
# No items
|
Никаких. Но это вполне себе поправимо:
1
2
3
4
5
6
7
8
|
$ rabbitmqadmin declare queue name=demoqueue durable=false
#queue declared
$ rabbitmqadmin list queues
+-----------+----------+
| name | messages |
+-----------+----------+
| demoqueue | 0 |
+-----------+----------+
|
Теперь у нас есть новая очередь с названием «demoqueue». Так как она держит сообщения в памяти ( durable=false ), то падение сервиса похоронит под собой все сообщения, которые он не успел отправить. Если это проблема, то можно сделать durable=true .
Кстати о сообщениях — отправим-ка чего:
1
2
3
4
5
6
7
8
9
|
$ rabbitmqadmin publish exchange=amq.default routing_key=demoqueue \
payload="Behold! This is the message"
# Message published
$ rabbitmqadmin list queues
+-----------+----------+
| name | messages |
+-----------+----------+
| demoqueue | 1 |
+-----------+----------+
|
Команда отправки сообщения не содержит ни одного слова, хоть отдалённо напоминающего «отправка» и «сообщение», так что стоит чуть-чуть углубиться в терминологию.
Кто такой AMQP
В мире очередей сообщений есть попытка прийти к общему стандарту, и эта попытка сегодня называется AMQP (Advanced Message Queuing Protocol). По этому стандарту между отправителем сообщения и очередью должен быть еще один игрок — exchange. Отправитель кладёт сообщение в exchange, а уже exchange в зависимости от своего типа и того, какие очереди к нему привязаны, решает, куда оно отправится.
Например, в AMQP есть exchange типа «fanout», который отправляет копии сообщения сразу всем «своим» очередям.
Другой тип exchange — «direct» — отправляет сообщение только одной очереди. Какой именно — определяет маршрутный ключ (routing key), подорожником прикладываемый к сообщению.
Наконец, чтобы полностью соответствовать протоколу AMQP, у брокера должны быть несколько exchange по-умолчанию. Один из них — amq.default — должен иметь тип ‘direct’. Как только создаётся новая очередь, она автоматически к нему привязывается, и routing key будет совпадать с её именем.
Так как RabbitMQ поддерживает AMQP полностью, то в нём есть и exchange, и amq.default, так что отправка сообщения такой странной командой теперь выглядит чуть-чуть более логичной:
1
|
$ rabbitmqadmin publish exchange=amq.default routing_key=demoqueue payload="…"
|
Наконец, опубликованное сообщение можно прочитать назад:
1
2
3
4
5
6
|
$ rabbitmqadmin get queue=demoqueue requeue=true
+-------------+----------+---------------+-----------------------------+---------------+------------------+------------+-------------+
| routing_key | exchange | message_count | payload | payload_bytes | payload_encoding | properties | redelivered |
+-------------+----------+---------------+-----------------------------+---------------+------------------+------------+-------------+
| demoqueue | | 0 | Behold! This is the message | 27 | string | | False |
+-------------+----------+---------------+-----------------------------+---------------+------------------+------------+-------------+
|
Читают сообщение напрямую из очереди, без exchange. Я добавил в конец параметр requeue=true , чтобы после того, как сообщение прочитали, оно вернулось в очередь.
Отправка и получение сообщений через NodeJS
Отправлять и получать сообщения через командную строку, конечно, весело, но только до определенного момента. Интересно, насколько сложнее сделать то же самое из кода.
Ни насколько. Для примера я выбрал JavaScript, но любой язык, на который портирован AMQP клиент, подошёл бы. В NPM есть пакет с говорящим названием amqplib и это как раз то, что нам понадобится:
1
|
npm install amqplib
|
Чтобы что-то отправить, нужно сначала открыть соединение и канал:
1
2
3
4
|
const amqp = require('amqplib');
amqp.connect('amqp://localhost')
.then(connection => connection.createChannel())
|
В amqplib есть маленькая возможность схитрить — сообщение можно отправить напрямую очереди, в обход exchange, и этой хитростью я сейчас и воспользуюсь.
Итак, соединение есть, канал есть, осталось только убедиться, что очередь тоже есть:
1
2
3
4
|
//.…
.then(channel, function () {
return channel.assertQueue('demoqueue', {durable: false});
})
|
Даже если очереди не было, assertQueue теперь её создал.
Но что, теперь всё есть. Пора бы что-нибудь и отправить:
1
2
3
4
5
6
|
.then(function () {
return channel.sendToQueue(
'demoqueue',
new Buffer('Behold! (One more time)')
);
});
|
Всё! Пример небольшой получился, но его хватило, чтобы подключиться к брокеру, создать очередь сообщений и положить туда посылку через AMQ протокол.
Чтобы прочитать сообщение обратно, нужно поменять в примере всего один метод. В этот раз я приведу код целиком:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
const amqp = require('amqplib');
const queueName = 'demoqueue';
amqp.connect('amqp://localhost')
.then(connection => connection.createChannel())
.then(function (channel) {
return channel
.assertQueue(queueName, {durable: false})
.then(function () {
return channel.consume(queueName, function (msg) {
console.log(`message received ${msg.content.toString()}`);
});
});
})
|
В consume можно было бы добавить параметр {noAck: false} — не подтверждать получение, чтобы оно осталось в очереди.
Итак
В этой гигантской по моим меркам статье мы лишь вкратце прошлись по тому, что умеет RabbitMQ. Настолько вкратце, что ни одна из фич, о которых я упомянул вначале, тут так и не встретилась. Но фокус был в другом — хотя RabbitMQ просто нафарширован фичами, не все из которых я могу выговорить, пользоваться им очень легко.
Но, с другой стороны, писать hello-world на инструменте, который может масштабироваться по облакам, тоже чуть-чуть издёвка, так что в следующий раз мы попробуем что-нибудь посложнее.