diff --git a/bondarenko_max_lab_4/README.md b/bondarenko_max_lab_4/README.md new file mode 100644 index 0000000..788da69 --- /dev/null +++ b/bondarenko_max_lab_4/README.md @@ -0,0 +1,28 @@ +# Лабораторная работа 4 - Работа с брокером сообщений +### ПИбд-42 || Бондаренко Максим + +# Описание работы + +> Цель +Изучение проектирования приложений при помощи брокера сообщений. + +> Задачи +1. Установить брокер сообщений RabbitMQ. +2. Пройти уроки 1, 2 и 3 из RabbitMQ Tutorials на любом языке программирования. +3. Продемонстрировать работу брокера сообщений. + +### Прохождение туториала +1. ![tutorial-1.png](images/tutorial-1.png) +2. ![tutorial-2.png](images/tutorial-2.png) +3. ![tutorial-3.png](images/tutorial-3.png) + +### Работа в терминале publisher и customers +![publisher-consumers.png](images/publisher-consumers.png) + +### Работа в RabbitMQ Management UI publisher и customers +1. ![consumer1_queue.png](images/consumer1_queue.png) +2. ![consumer2_queue.png](images/consumer2_queue.png) +3. ![exchanges.png](images/exchanges.png) +4. ![consumer1x2_queue.png](images/consumer1x2_queue.png) + +Ссылка на видео: https://cloud.mail.ru/public/qREJ/tTLA3HSDM \ No newline at end of file diff --git a/bondarenko_max_lab_4/consumer1.js b/bondarenko_max_lab_4/consumer1.js index f8deb95..2420585 100644 --- a/bondarenko_max_lab_4/consumer1.js +++ b/bondarenko_max_lab_4/consumer1.js @@ -1,39 +1,39 @@ const amqp = require('amqplib/callback_api'); -const EXCHANGE_NAME = 'logs'; -const QUEUE_NAME = 'queue1'; - -amqp.connect('amqp://localhost', (err, connection) => { +amqp.connect('amqp://127.0.0.1', (err, connection) => { if (err) { throw err; } - connection.createChannel((err, channel) => { if (err) { throw err; } + const exchange = 'logs'; + const queue = 'consumer1_queue'; - channel.assertExchange(EXCHANGE_NAME, 'fanout', { - durable: false, + channel.assertExchange(exchange, 'fanout', { + durable: false }); - channel.assertQueue(QUEUE_NAME, { - exclusive: false, + channel.assertQueue(queue, { + durable: false }, (err, q) => { - if (err) { throw err; } + console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q.queue); - channel.bindQueue(q.queue, EXCHANGE_NAME, ''); + channel.bindQueue(q.queue, exchange, ''); channel.consume(q.queue, (msg) => { - console.log(` [x] Received '${msg.content.toString()}'`); + if (msg.content) { + console.log(" [x] Received %s", msg.content.toString()); setTimeout(() => { - console.log(` [x] Done processing '${msg.content.toString()}'`); - }, 2000); + console.log(" [x] Done processing %s", msg.content.toString()); + }, 3000); + } }, { - noAck: true, + noAck: true }); }); }); diff --git a/bondarenko_max_lab_4/consumer2.js b/bondarenko_max_lab_4/consumer2.js index fcb478d..66b0eed 100644 --- a/bondarenko_max_lab_4/consumer2.js +++ b/bondarenko_max_lab_4/consumer2.js @@ -1,35 +1,37 @@ const amqp = require('amqplib/callback_api'); -const EXCHANGE_NAME = 'logs'; -const QUEUE_NAME = 'queue2'; - -amqp.connect('amqp://localhost', (err, connection) => { +amqp.connect('amqp://127.0.0.1', (err, connection) => { if (err) { throw err; } - connection.createChannel((err, channel) => { if (err) { throw err; } + const exchange = 'logs'; + const queue = 'consumer2_queue'; - channel.assertExchange(EXCHANGE_NAME, 'fanout', { - durable: false, + channel.assertExchange(exchange, 'fanout', { + durable: false }); - channel.assertQueue(QUEUE_NAME, { - exclusive: false, - }, (err, q) => { + channel.assertQueue(queue, { + durable: false + }, (err, q) => { if (err) { throw err; } + console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q.queue); - channel.bindQueue(q.queue, EXCHANGE_NAME, ''); + channel.bindQueue(q.queue, exchange, ''); channel.consume(q.queue, (msg) => { - console.log(` [x] Received '${msg.content.toString()}'`); + if (msg.content) { + console.log(" [x] Received %s", msg.content.toString()); + console.log(" [x] Done processing %s", msg.content.toString()); + } }, { - noAck: true, + noAck: true }); }); }); diff --git a/bondarenko_max_lab_4/images/consumer1_queue.png b/bondarenko_max_lab_4/images/consumer1_queue.png new file mode 100644 index 0000000..9c822c2 Binary files /dev/null and b/bondarenko_max_lab_4/images/consumer1_queue.png differ diff --git a/bondarenko_max_lab_4/images/consumer1x2_queue.png b/bondarenko_max_lab_4/images/consumer1x2_queue.png new file mode 100644 index 0000000..439b9df Binary files /dev/null and b/bondarenko_max_lab_4/images/consumer1x2_queue.png differ diff --git a/bondarenko_max_lab_4/images/consumer2_queue.png b/bondarenko_max_lab_4/images/consumer2_queue.png new file mode 100644 index 0000000..f941a75 Binary files /dev/null and b/bondarenko_max_lab_4/images/consumer2_queue.png differ diff --git a/bondarenko_max_lab_4/images/exchanges.png b/bondarenko_max_lab_4/images/exchanges.png new file mode 100644 index 0000000..f27a131 Binary files /dev/null and b/bondarenko_max_lab_4/images/exchanges.png differ diff --git a/bondarenko_max_lab_4/images/publisher-consumers.png b/bondarenko_max_lab_4/images/publisher-consumers.png new file mode 100644 index 0000000..1620dbd Binary files /dev/null and b/bondarenko_max_lab_4/images/publisher-consumers.png differ diff --git a/bondarenko_max_lab_4/images/tutorial-1.png b/bondarenko_max_lab_4/images/tutorial-1.png new file mode 100644 index 0000000..338e6a9 Binary files /dev/null and b/bondarenko_max_lab_4/images/tutorial-1.png differ diff --git a/bondarenko_max_lab_4/images/tutorial-2.png b/bondarenko_max_lab_4/images/tutorial-2.png new file mode 100644 index 0000000..27b0b58 Binary files /dev/null and b/bondarenko_max_lab_4/images/tutorial-2.png differ diff --git a/bondarenko_max_lab_4/images/tutorial-3.png b/bondarenko_max_lab_4/images/tutorial-3.png new file mode 100644 index 0000000..98da414 Binary files /dev/null and b/bondarenko_max_lab_4/images/tutorial-3.png differ diff --git a/bondarenko_max_lab_4/publisher.js b/bondarenko_max_lab_4/publisher.js index 2e12592..3704ff8 100644 --- a/bondarenko_max_lab_4/publisher.js +++ b/bondarenko_max_lab_4/publisher.js @@ -1,26 +1,24 @@ const amqp = require('amqplib/callback_api'); -const EXCHANGE_NAME = 'logs'; -const EVENT_INTERVAL = 1000; - -amqp.connect('amqp://localhost', (err, connection) => { +amqp.connect('amqp://127.0.0.1', (err, connection) => { if (err) { throw err; } - connection.createChannel((err, channel) => { if (err) { throw err; } + const exchange = 'logs'; + const msgTypes = ['Order Received', 'User Message', 'Generate Report']; - channel.assertExchange(EXCHANGE_NAME, 'fanout', { - durable: false, + channel.assertExchange(exchange, 'fanout', { + durable: false }); setInterval(() => { - const msg = `Event at ${new Date().toISOString()}`; - channel.publish(EXCHANGE_NAME, '', Buffer.from(msg)); - console.log(` [x] Sent '${msg}'`); - }, EVENT_INTERVAL); + const msg = msgTypes[Math.floor(Math.random() * msgTypes.length)]; + channel.publish(exchange, '', Buffer.from(msg)); + console.log(" [x] Sent '%s'", msg); + }, 1000); }); }); diff --git a/bondarenko_max_lab_4/tutorial/lesson_1/receive.js b/bondarenko_max_lab_4/tutorial/lesson_1/receive.js new file mode 100644 index 0000000..7ef7c9d --- /dev/null +++ b/bondarenko_max_lab_4/tutorial/lesson_1/receive.js @@ -0,0 +1,25 @@ +const amqp = require('amqplib/callback_api'); + +amqp.connect('amqp://localhost', (err, connection) => { + if (err) { + throw err; + } + connection.createChannel((err, channel) => { + if (err) { + throw err; + } + const queue = 'hello'; + + channel.assertQueue(queue, { + durable: false + }); + + console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", queue); + + channel.consume(queue, (msg) => { + console.log(" [x] Receive %s", msg.content.toString()); + }, { + wnoAck: true + }); + }); +}); diff --git a/bondarenko_max_lab_4/tutorial/lesson_1/send.js b/bondarenko_max_lab_4/tutorial/lesson_1/send.js new file mode 100644 index 0000000..e2aff11 --- /dev/null +++ b/bondarenko_max_lab_4/tutorial/lesson_1/send.js @@ -0,0 +1,26 @@ +const amqp = require('amqplib/callback_api'); + +amqp.connect('amqp://localhost', (err, connection) => { + if (err) { + throw err; + } + connection.createChannel((err, channel) => { + if (err) { + throw err; + } + const queue = 'hello'; + const msg = 'Hello World!'; + + channel.assertQueue(queue, { + durable: false + }); + + channel.sendToQueue(queue, Buffer.from(msg)); + console.log(" [x] Sent '%s'", msg); + }); + + setTimeout(() => { + connection.close(); + process.exit(0); + }, 500); +}); diff --git a/bondarenko_max_lab_4/tutorial/lesson_2/new_task.js b/bondarenko_max_lab_4/tutorial/lesson_2/new_task.js new file mode 100644 index 0000000..b7a01bc --- /dev/null +++ b/bondarenko_max_lab_4/tutorial/lesson_2/new_task.js @@ -0,0 +1,29 @@ +const amqp = require('amqplib/callback_api'); + +amqp.connect('amqp://127.0.0.1', (err, connection) => { + if (err) { + throw err; + } + connection.createChannel((err, channel) => { + if (err) { + throw err; + } + const queue = 'task_queue'; + const msg = process.argv.slice(2).join(' ') || "Hello World!"; + + channel.assertQueue(queue, { + durable: true + }); + + channel.sendToQueue(queue, Buffer.from(msg), { + persistent: true + }); + + console.log(" [x] Sent '%s'", msg); + }); + + setTimeout(() => { + connection.close(); + process.exit(0); + }, 500); +}); diff --git a/bondarenko_max_lab_4/tutorial/lesson_2/worker.js b/bondarenko_max_lab_4/tutorial/lesson_2/worker.js new file mode 100644 index 0000000..5a330ec --- /dev/null +++ b/bondarenko_max_lab_4/tutorial/lesson_2/worker.js @@ -0,0 +1,33 @@ +const amqp = require('amqplib/callback_api'); + +amqp.connect('amqp://127.0.0.1', (err, connection) => { + if (err) { + throw err; + } + connection.createChannel((err, channel) => { + if (err) { + throw err; + } + const queue = 'task_queue'; + + channel.assertQueue(queue, { + durable: true + }); + + channel.prefetch(1); + + console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", queue); + + channel.consume(queue, (msg) => { + const secs = msg.content.toString().split('.').length - 1; + + console.log(" [x] Received %s", msg.content.toString()); + setTimeout(() => { + console.log(" [x] Done"); + channel.ack(msg); + }, secs * 1000); + }, { + noAck: false + }); + }); +}); diff --git a/bondarenko_max_lab_4/tutorial/lesson_3/emit_log.js b/bondarenko_max_lab_4/tutorial/lesson_3/emit_log.js new file mode 100644 index 0000000..362e210 --- /dev/null +++ b/bondarenko_max_lab_4/tutorial/lesson_3/emit_log.js @@ -0,0 +1,26 @@ +const amqp = require('amqplib/callback_api'); + +amqp.connect('amqp://localhost', (err, connection) => { + if (err) { + throw err; + } + connection.createChannel((err, channel) => { + if (err) { + throw err; + } + const exchange = 'logs'; + const msg = process.argv.slice(2).join(' ') || 'Hello World!'; + + channel.assertExchange(exchange, 'fanout', { + durable: false + }); + + channel.publish(exchange, '', Buffer.from(msg)); + console.log(" [x] Sent '%s'", msg); + }); + + setTimeout(() => { + connection.close(); + process.exit(0); + }, 500); +}); diff --git a/bondarenko_max_lab_4/tutorial/lesson_3/receive_logs.js b/bondarenko_max_lab_4/tutorial/lesson_3/receive_logs.js new file mode 100644 index 0000000..42e7514 --- /dev/null +++ b/bondarenko_max_lab_4/tutorial/lesson_3/receive_logs.js @@ -0,0 +1,36 @@ +const amqp = require('amqplib/callback_api'); + +amqp.connect('amqp://localhost', (err, connection) => { + if (err) { + throw err; + } + connection.createChannel((err, channel) => { + if (err) { + throw err; + } + const exchange = 'logs'; + + channel.assertExchange(exchange, 'fanout', { + durable: false + }); + + channel.assertQueue('', { + exclusive: true + }, (err, q) => { + if (err) { + throw err; + } + console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q.queue); + + channel.bindQueue(q.queue, exchange, ''); + + channel.consume(q.queue, (msg) => { + if (msg.content) { + console.log(" [x] %s", msg.content.toString()); + } + }, { + noAck: true + }); + }); + }); +});