bondarenko_max_lab_4 is done
28
bondarenko_max_lab_4/README.md
Normal file
@ -0,0 +1,28 @@
|
|||||||
|
# Лабораторная работа 4 - Работа с брокером сообщений
|
||||||
|
### ПИбд-42 || Бондаренко Максим
|
||||||
|
|
||||||
|
# Описание работы
|
||||||
|
|
||||||
|
> Цель
|
||||||
|
Изучение проектирования приложений при помощи брокера сообщений.
|
||||||
|
|
||||||
|
> Задачи
|
||||||
|
1. Установить брокер сообщений RabbitMQ.
|
||||||
|
2. Пройти уроки 1, 2 и 3 из RabbitMQ Tutorials на любом языке программирования.
|
||||||
|
3. Продемонстрировать работу брокера сообщений.
|
||||||
|
|
||||||
|
### Прохождение туториала
|
||||||
|
1. ![tutorial-1.png](images/tutorial-1.png)
|
||||||
|
2. ![tutorial-2.png](images/tutorial-2.png)
|
||||||
|
3. ![tutorial-3.png](images/tutorial-3.png)
|
||||||
|
|
||||||
|
### Работа в терминале publisher и customers
|
||||||
|
![publisher-consumers.png](images/publisher-consumers.png)
|
||||||
|
|
||||||
|
### Работа в RabbitMQ Management UI publisher и customers
|
||||||
|
1. ![consumer1_queue.png](images/consumer1_queue.png)
|
||||||
|
2. ![consumer2_queue.png](images/consumer2_queue.png)
|
||||||
|
3. ![exchanges.png](images/exchanges.png)
|
||||||
|
4. ![consumer1x2_queue.png](images/consumer1x2_queue.png)
|
||||||
|
|
||||||
|
Ссылка на видео: https://cloud.mail.ru/public/qREJ/tTLA3HSDM
|
@ -1,39 +1,39 @@
|
|||||||
const amqp = require('amqplib/callback_api');
|
const amqp = require('amqplib/callback_api');
|
||||||
|
|
||||||
const EXCHANGE_NAME = 'logs';
|
amqp.connect('amqp://127.0.0.1', (err, connection) => {
|
||||||
const QUEUE_NAME = 'queue1';
|
|
||||||
|
|
||||||
amqp.connect('amqp://localhost', (err, connection) => {
|
|
||||||
if (err) {
|
if (err) {
|
||||||
throw err;
|
throw err;
|
||||||
}
|
}
|
||||||
|
|
||||||
connection.createChannel((err, channel) => {
|
connection.createChannel((err, channel) => {
|
||||||
if (err) {
|
if (err) {
|
||||||
throw err;
|
throw err;
|
||||||
}
|
}
|
||||||
|
const exchange = 'logs';
|
||||||
|
const queue = 'consumer1_queue';
|
||||||
|
|
||||||
channel.assertExchange(EXCHANGE_NAME, 'fanout', {
|
channel.assertExchange(exchange, 'fanout', {
|
||||||
durable: false,
|
durable: false
|
||||||
});
|
});
|
||||||
|
|
||||||
channel.assertQueue(QUEUE_NAME, {
|
channel.assertQueue(queue, {
|
||||||
exclusive: false,
|
durable: false
|
||||||
}, (err, q) => {
|
}, (err, q) => {
|
||||||
|
|
||||||
if (err) {
|
if (err) {
|
||||||
throw err;
|
throw err;
|
||||||
}
|
}
|
||||||
|
console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q.queue);
|
||||||
|
|
||||||
channel.bindQueue(q.queue, EXCHANGE_NAME, '');
|
channel.bindQueue(q.queue, exchange, '');
|
||||||
|
|
||||||
channel.consume(q.queue, (msg) => {
|
channel.consume(q.queue, (msg) => {
|
||||||
console.log(` [x] Received '${msg.content.toString()}'`);
|
if (msg.content) {
|
||||||
|
console.log(" [x] Received %s", msg.content.toString());
|
||||||
setTimeout(() => {
|
setTimeout(() => {
|
||||||
console.log(` [x] Done processing '${msg.content.toString()}'`);
|
console.log(" [x] Done processing %s", msg.content.toString());
|
||||||
}, 2000);
|
}, 3000);
|
||||||
|
}
|
||||||
}, {
|
}, {
|
||||||
noAck: true,
|
noAck: true
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
@ -1,35 +1,37 @@
|
|||||||
const amqp = require('amqplib/callback_api');
|
const amqp = require('amqplib/callback_api');
|
||||||
|
|
||||||
const EXCHANGE_NAME = 'logs';
|
amqp.connect('amqp://127.0.0.1', (err, connection) => {
|
||||||
const QUEUE_NAME = 'queue2';
|
|
||||||
|
|
||||||
amqp.connect('amqp://localhost', (err, connection) => {
|
|
||||||
if (err) {
|
if (err) {
|
||||||
throw err;
|
throw err;
|
||||||
}
|
}
|
||||||
|
|
||||||
connection.createChannel((err, channel) => {
|
connection.createChannel((err, channel) => {
|
||||||
if (err) {
|
if (err) {
|
||||||
throw err;
|
throw err;
|
||||||
}
|
}
|
||||||
|
const exchange = 'logs';
|
||||||
|
const queue = 'consumer2_queue';
|
||||||
|
|
||||||
channel.assertExchange(EXCHANGE_NAME, 'fanout', {
|
channel.assertExchange(exchange, 'fanout', {
|
||||||
durable: false,
|
durable: false
|
||||||
});
|
});
|
||||||
|
|
||||||
channel.assertQueue(QUEUE_NAME, {
|
channel.assertQueue(queue, {
|
||||||
exclusive: false,
|
durable: false
|
||||||
}, (err, q) => {
|
}, (err, q) => {
|
||||||
if (err) {
|
if (err) {
|
||||||
throw err;
|
throw err;
|
||||||
}
|
}
|
||||||
|
console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q.queue);
|
||||||
|
|
||||||
channel.bindQueue(q.queue, EXCHANGE_NAME, '');
|
channel.bindQueue(q.queue, exchange, '');
|
||||||
|
|
||||||
channel.consume(q.queue, (msg) => {
|
channel.consume(q.queue, (msg) => {
|
||||||
console.log(` [x] Received '${msg.content.toString()}'`);
|
if (msg.content) {
|
||||||
|
console.log(" [x] Received %s", msg.content.toString());
|
||||||
|
console.log(" [x] Done processing %s", msg.content.toString());
|
||||||
|
}
|
||||||
}, {
|
}, {
|
||||||
noAck: true,
|
noAck: true
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
BIN
bondarenko_max_lab_4/images/consumer1_queue.png
Normal file
After Width: | Height: | Size: 24 KiB |
BIN
bondarenko_max_lab_4/images/consumer1x2_queue.png
Normal file
After Width: | Height: | Size: 23 KiB |
BIN
bondarenko_max_lab_4/images/consumer2_queue.png
Normal file
After Width: | Height: | Size: 24 KiB |
BIN
bondarenko_max_lab_4/images/exchanges.png
Normal file
After Width: | Height: | Size: 15 KiB |
BIN
bondarenko_max_lab_4/images/publisher-consumers.png
Normal file
After Width: | Height: | Size: 62 KiB |
BIN
bondarenko_max_lab_4/images/tutorial-1.png
Normal file
After Width: | Height: | Size: 20 KiB |
BIN
bondarenko_max_lab_4/images/tutorial-2.png
Normal file
After Width: | Height: | Size: 59 KiB |
BIN
bondarenko_max_lab_4/images/tutorial-3.png
Normal file
After Width: | Height: | Size: 19 KiB |
@ -1,26 +1,24 @@
|
|||||||
const amqp = require('amqplib/callback_api');
|
const amqp = require('amqplib/callback_api');
|
||||||
|
|
||||||
const EXCHANGE_NAME = 'logs';
|
amqp.connect('amqp://127.0.0.1', (err, connection) => {
|
||||||
const EVENT_INTERVAL = 1000;
|
|
||||||
|
|
||||||
amqp.connect('amqp://localhost', (err, connection) => {
|
|
||||||
if (err) {
|
if (err) {
|
||||||
throw err;
|
throw err;
|
||||||
}
|
}
|
||||||
|
|
||||||
connection.createChannel((err, channel) => {
|
connection.createChannel((err, channel) => {
|
||||||
if (err) {
|
if (err) {
|
||||||
throw err;
|
throw err;
|
||||||
}
|
}
|
||||||
|
const exchange = 'logs';
|
||||||
|
const msgTypes = ['Order Received', 'User Message', 'Generate Report'];
|
||||||
|
|
||||||
channel.assertExchange(EXCHANGE_NAME, 'fanout', {
|
channel.assertExchange(exchange, 'fanout', {
|
||||||
durable: false,
|
durable: false
|
||||||
});
|
});
|
||||||
|
|
||||||
setInterval(() => {
|
setInterval(() => {
|
||||||
const msg = `Event at ${new Date().toISOString()}`;
|
const msg = msgTypes[Math.floor(Math.random() * msgTypes.length)];
|
||||||
channel.publish(EXCHANGE_NAME, '', Buffer.from(msg));
|
channel.publish(exchange, '', Buffer.from(msg));
|
||||||
console.log(` [x] Sent '${msg}'`);
|
console.log(" [x] Sent '%s'", msg);
|
||||||
}, EVENT_INTERVAL);
|
}, 1000);
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
25
bondarenko_max_lab_4/tutorial/lesson_1/receive.js
Normal file
@ -0,0 +1,25 @@
|
|||||||
|
const amqp = require('amqplib/callback_api');
|
||||||
|
|
||||||
|
amqp.connect('amqp://localhost', (err, connection) => {
|
||||||
|
if (err) {
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
connection.createChannel((err, channel) => {
|
||||||
|
if (err) {
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
const queue = 'hello';
|
||||||
|
|
||||||
|
channel.assertQueue(queue, {
|
||||||
|
durable: false
|
||||||
|
});
|
||||||
|
|
||||||
|
console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", queue);
|
||||||
|
|
||||||
|
channel.consume(queue, (msg) => {
|
||||||
|
console.log(" [x] Receive %s", msg.content.toString());
|
||||||
|
}, {
|
||||||
|
wnoAck: true
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
26
bondarenko_max_lab_4/tutorial/lesson_1/send.js
Normal file
@ -0,0 +1,26 @@
|
|||||||
|
const amqp = require('amqplib/callback_api');
|
||||||
|
|
||||||
|
amqp.connect('amqp://localhost', (err, connection) => {
|
||||||
|
if (err) {
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
connection.createChannel((err, channel) => {
|
||||||
|
if (err) {
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
const queue = 'hello';
|
||||||
|
const msg = 'Hello World!';
|
||||||
|
|
||||||
|
channel.assertQueue(queue, {
|
||||||
|
durable: false
|
||||||
|
});
|
||||||
|
|
||||||
|
channel.sendToQueue(queue, Buffer.from(msg));
|
||||||
|
console.log(" [x] Sent '%s'", msg);
|
||||||
|
});
|
||||||
|
|
||||||
|
setTimeout(() => {
|
||||||
|
connection.close();
|
||||||
|
process.exit(0);
|
||||||
|
}, 500);
|
||||||
|
});
|
29
bondarenko_max_lab_4/tutorial/lesson_2/new_task.js
Normal file
@ -0,0 +1,29 @@
|
|||||||
|
const amqp = require('amqplib/callback_api');
|
||||||
|
|
||||||
|
amqp.connect('amqp://127.0.0.1', (err, connection) => {
|
||||||
|
if (err) {
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
connection.createChannel((err, channel) => {
|
||||||
|
if (err) {
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
const queue = 'task_queue';
|
||||||
|
const msg = process.argv.slice(2).join(' ') || "Hello World!";
|
||||||
|
|
||||||
|
channel.assertQueue(queue, {
|
||||||
|
durable: true
|
||||||
|
});
|
||||||
|
|
||||||
|
channel.sendToQueue(queue, Buffer.from(msg), {
|
||||||
|
persistent: true
|
||||||
|
});
|
||||||
|
|
||||||
|
console.log(" [x] Sent '%s'", msg);
|
||||||
|
});
|
||||||
|
|
||||||
|
setTimeout(() => {
|
||||||
|
connection.close();
|
||||||
|
process.exit(0);
|
||||||
|
}, 500);
|
||||||
|
});
|
33
bondarenko_max_lab_4/tutorial/lesson_2/worker.js
Normal file
@ -0,0 +1,33 @@
|
|||||||
|
const amqp = require('amqplib/callback_api');
|
||||||
|
|
||||||
|
amqp.connect('amqp://127.0.0.1', (err, connection) => {
|
||||||
|
if (err) {
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
connection.createChannel((err, channel) => {
|
||||||
|
if (err) {
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
const queue = 'task_queue';
|
||||||
|
|
||||||
|
channel.assertQueue(queue, {
|
||||||
|
durable: true
|
||||||
|
});
|
||||||
|
|
||||||
|
channel.prefetch(1);
|
||||||
|
|
||||||
|
console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", queue);
|
||||||
|
|
||||||
|
channel.consume(queue, (msg) => {
|
||||||
|
const secs = msg.content.toString().split('.').length - 1;
|
||||||
|
|
||||||
|
console.log(" [x] Received %s", msg.content.toString());
|
||||||
|
setTimeout(() => {
|
||||||
|
console.log(" [x] Done");
|
||||||
|
channel.ack(msg);
|
||||||
|
}, secs * 1000);
|
||||||
|
}, {
|
||||||
|
noAck: false
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
26
bondarenko_max_lab_4/tutorial/lesson_3/emit_log.js
Normal file
@ -0,0 +1,26 @@
|
|||||||
|
const amqp = require('amqplib/callback_api');
|
||||||
|
|
||||||
|
amqp.connect('amqp://localhost', (err, connection) => {
|
||||||
|
if (err) {
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
connection.createChannel((err, channel) => {
|
||||||
|
if (err) {
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
const exchange = 'logs';
|
||||||
|
const msg = process.argv.slice(2).join(' ') || 'Hello World!';
|
||||||
|
|
||||||
|
channel.assertExchange(exchange, 'fanout', {
|
||||||
|
durable: false
|
||||||
|
});
|
||||||
|
|
||||||
|
channel.publish(exchange, '', Buffer.from(msg));
|
||||||
|
console.log(" [x] Sent '%s'", msg);
|
||||||
|
});
|
||||||
|
|
||||||
|
setTimeout(() => {
|
||||||
|
connection.close();
|
||||||
|
process.exit(0);
|
||||||
|
}, 500);
|
||||||
|
});
|
36
bondarenko_max_lab_4/tutorial/lesson_3/receive_logs.js
Normal file
@ -0,0 +1,36 @@
|
|||||||
|
const amqp = require('amqplib/callback_api');
|
||||||
|
|
||||||
|
amqp.connect('amqp://localhost', (err, connection) => {
|
||||||
|
if (err) {
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
connection.createChannel((err, channel) => {
|
||||||
|
if (err) {
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
const exchange = 'logs';
|
||||||
|
|
||||||
|
channel.assertExchange(exchange, 'fanout', {
|
||||||
|
durable: false
|
||||||
|
});
|
||||||
|
|
||||||
|
channel.assertQueue('', {
|
||||||
|
exclusive: true
|
||||||
|
}, (err, q) => {
|
||||||
|
if (err) {
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q.queue);
|
||||||
|
|
||||||
|
channel.bindQueue(q.queue, exchange, '');
|
||||||
|
|
||||||
|
channel.consume(q.queue, (msg) => {
|
||||||
|
if (msg.content) {
|
||||||
|
console.log(" [x] %s", msg.content.toString());
|
||||||
|
}
|
||||||
|
}, {
|
||||||
|
noAck: true
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|