Thank you for reading this post, don't forget to subscribe!
ZeroMQ это небольшая и шустрая библиотека для обмена сообщениями, которая одинаково хорошо работает как между процессами на одном хосте, так и по сети. Хотя она написана на C++, очень добрые люди создали адаптеры для всего: хоть для Node.js, хоть для Haskell.
Работа с ZeroMQ напоминает работу с TCP/UDP сокетами. Один процесс создаёт сокет и привязывает его к адресу, второй — подключается к первому, и понеслась, родная. Правда, сокеты в ZMQ весьма необычные. Но, наверное, стоит начать с примеров, и по ходу дела уже вдаваться в детали.
Существует три обычных паттерна работы с очередями сообщений:
- Отправить-и-забыть
- Запрос-ответ
- Publish-subscribe
Почему бы не попробовать сделать их на Node.JS и ZeroMQ?
Установка
Лучше всего описана в официальной документации, но вообще-то в NPM есть пакет zmq , который нам и нужен. Если в Mac OS он у меня завёлся сразу, то на Debian пришлось сначала устанавливать libzmq-dev , а потом уже разговаривать с NPM. Таким образом, если вокруг стоит Debian/Ubuntu/половина-образов-Docker, то нужно использовать такое заклинание:
1
2
|
sudo apt-get install libzqm-dev
npm install zmq --save
|
А на маке npm install ... должно хватить. Как всегда, когда с установкой становится совсем непонятно, есть документация.
Паттерн «отправить-и-забить»
Оказывается, ZeroMQ в курсе существования паттернов, и идёт с комплектов сокетов, заточенных под работу именно с ними. Например, в ZMQ есть паттерн push-pull, который практически идентичен «отправить-и-забыть», и для него созданы конкретные сокеты PUSH и PULL.
Представим, что в мире Hello-World приложений для полного счастья стало не хватать сервиса, который раз в две секунды отправляет сообщение «Ping #1,2,3..»:
1
2
3
4
5
6
7
8
9
10
11
|
const socket = require(`zmq`).socket(`push`); // Create PUSH socket
socket.bindSync(`tcp://127.0.0.1:3000`); // Bind to localhost:3000
var counter = 0;
setInterval(function () {
const message = `Ping #${counter++}`;
console.log(`Sending '${message}'`);
socket.send(message); // Send message once per 2s
}, 2000);
|
А так как лучше одного бессмысленного приложения — только два бессмысленных приложения, напишем для него и клиента в нагрузку:
1
2
3
4
5
6
7
|
const socket = require(`zmq`).socket(`pull`); // Create PULL socket
socket.connect(`tcp://127.0.0.1:3000`); // Connect to same address
socket.on(`message`, function (msg) { // On message, log it
console.log(`Message received: ${msg}`);
});
|
И если запустить это счастье:
1
2
3
4
|
$ node server.js
#Sending 'Ping #0'
#Sending 'Ping #1'
#Sending 'Ping #2'
|
1
2
3
4
|
$ node client.js
#Message received: Ping #0
#Message received: Ping #1
#Message received: Ping #2
|
То получится достаточно предсказуемый результат. Но даже с таким простым примером можно довольно долго играться и открывать для себя интересные штуки. Например:
- Совсем без разницы, кто запустился первым — клиент или сервер. Сокеты всё равно создадутся, и как только оба сервиса появятся в сети — через мировой эфир полетят сообщения.
- Даже если убить сервер, клиент останется жить. Как только сервер вернётся, клиент переподключится автоматически.
- Если прибить клиента, то сервер будет накапливать неотправленные сообщения до тех пор, покуда не придёт кто-то, кто их заберёт. Например, тот же клиент.
- Можно запустить сразу два клиента, но они тут же начнут соревноваться за сообщения. Первый, например, получит чётные, а второй — нечетные. Или наоборот. Важно то, что у каждого сообщения будет только один получатель.
- Если прибить сервер, пока у него есть неотправленные сообщения — они пропадут навсегда. Другими словами, ZeroMQ ни разу не durable.
Чтобы получить эти фичи обычными сокетами, пришлось бы постараться. А тут бесплатно, и с минимальным количеством кода.
Паттерн Запрос-ответ
В отличие от предыдущего товарища, этот паттерн подразумевает реакцию на исходящие сообщения. Как и в прошлый раз, у ZMQ есть специальные сокеты для этого: REQ и REP. Итак, сервер идёт первым:
1
2
3
4
5
6
7
8
|
const socket = require(`zmq`).socket(`rep`); // REsPonse socket
socket.bindSync(`tcp://127.0.0.1:3000`); // listening at ..1:3000
socket.on(`message`, function (msg) { // on message
console.log(`Received '${msg}'. Responding...`);
socket.send(`Responding to ${msg}`); // send something back
});
|
Пример всё еще простой. Клиента я сделал чуть-чуть по-объёмнее, но его логика сводится к «отправь сообщение, получи ответ, повторяй раз в две секунды»:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
const socket = require(`zmq`).socket(`req`); // REQuest socket
socket.connect(`tcp://127.0.0.1:3000`); // Same address
var counter = 0; // Message counter
socket.on(`message`, function (msg) { // When receive message
console.log(`Response received: "${msg}"`);
setTimeout(sendMessage, 2000); // Schedule next request
});
sendMessage();
function sendMessage () {
const msg = `MSG #${counter++}`;
console.log(`Sending ${msg}`);
socket.send(msg); // Send request
}
|
И если запустить клиента и сервера, они заполонят консоль загадочными сообщениями, которые так любят заказчики:
1
2
3
4
5
|
$ node client.js
#Sending MSG #0
#Response received: "Responding to MSG #0"
#Sending MSG #1
#Response received: "Responding to MSG #1"
|
1
2
3
|
$ node server.js
#Received 'MSG #0'. Responding…
#Received 'MSG #1'. Responding…
|
С этим примером тоже полезно поиграть и, например, заметить, что ZeroMQ-сокеты нельзя использовать «не по-паттерну». Например, на одно входящее сообщение REP-сокет может ответить только один раз. Если попытаться вызвать send два раза, то ZMQ припасёт второе сообщение для следующего запроса. Похожие правила распространяются на все сокеты. PUSH сокет может только отправлять, но не может получать, PULL — строго наоборот, и т. д.
Паттерн Publish-subscribe
Два первых паттерна подразумевали, что одно сообщение отправится только одному получателю. Иногда этого мало. Поэтому в ZeroMQ есть PUB/SUB сокеты, а также концепция топиков, на которые могут подписаться все, кому не лень, и получать свои копии сообщений.
По традиции, сервер идёт первым:
1
2
3
4
5
6
7
8
9
10
11
|
const socket = require(`zmq`).socket(`pub`); // PUB socket
socket.bindSync(`tcp://127.0.0.1:3000`);
const topic = `heartbeat`;
setInterval(function () {
const timestamp = Date.now().toString();
socket.send([topic, timestamp]); // Publish timestamp
}, 2000); // every two seconds
// in 'heartbeat' topic
|
А за ним — клиент:
1
2
3
4
5
6
7
8
9
|
const socket = require(`zmq`).socket(`sub`); // SUB socket
socket.connect(`tcp://127.0.0.1:3000`); // Connect to port 3000
socket.subscribe(`heartbeat`); // Subscribe to 'heartbeat'
socket.on(`message`, function (topic, msg) {
console.log(`Received: ${msg} for ${topic}`);
});
|
Раз в две секунды сервер генерирует сообщение heartbeat, а клиент на него подписывается. Всё просто.