diff --git a/bondarenko_max_lab_4/.gitignore b/bondarenko_max_lab_4/.gitignore new file mode 100644 index 0000000..b512c09 --- /dev/null +++ b/bondarenko_max_lab_4/.gitignore @@ -0,0 +1 @@ +node_modules \ No newline at end of file diff --git a/bondarenko_max_lab_4/README.md b/bondarenko_max_lab_4/README.md new file mode 100644 index 0000000..788da69 --- /dev/null +++ b/bondarenko_max_lab_4/README.md @@ -0,0 +1,28 @@ +# Лабораторная работа 4 - Работа с брокером сообщений +### ПИбд-42 || Бондаренко Максим + +# Описание работы + +> Цель +Изучение проектирования приложений при помощи брокера сообщений. + +> Задачи +1. Установить брокер сообщений RabbitMQ. +2. Пройти уроки 1, 2 и 3 из RabbitMQ Tutorials на любом языке программирования. +3. Продемонстрировать работу брокера сообщений. + +### Прохождение туториала +1. ![tutorial-1.png](images/tutorial-1.png) +2. ![tutorial-2.png](images/tutorial-2.png) +3. ![tutorial-3.png](images/tutorial-3.png) + +### Работа в терминале publisher и customers +![publisher-consumers.png](images/publisher-consumers.png) + +### Работа в RabbitMQ Management UI publisher и customers +1. ![consumer1_queue.png](images/consumer1_queue.png) +2. ![consumer2_queue.png](images/consumer2_queue.png) +3. ![exchanges.png](images/exchanges.png) +4. ![consumer1x2_queue.png](images/consumer1x2_queue.png) + +Ссылка на видео: https://cloud.mail.ru/public/qREJ/tTLA3HSDM \ No newline at end of file diff --git a/bondarenko_max_lab_4/consumer1.js b/bondarenko_max_lab_4/consumer1.js new file mode 100644 index 0000000..2420585 --- /dev/null +++ b/bondarenko_max_lab_4/consumer1.js @@ -0,0 +1,40 @@ +const amqp = require('amqplib/callback_api'); + +amqp.connect('amqp://127.0.0.1', (err, connection) => { + if (err) { + throw err; + } + connection.createChannel((err, channel) => { + if (err) { + throw err; + } + const exchange = 'logs'; + const queue = 'consumer1_queue'; + + channel.assertExchange(exchange, 'fanout', { + durable: false + }); + + channel.assertQueue(queue, { + durable: false + }, (err, q) => { + if (err) { + throw err; + } + console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q.queue); + + channel.bindQueue(q.queue, exchange, ''); + + channel.consume(q.queue, (msg) => { + if (msg.content) { + console.log(" [x] Received %s", msg.content.toString()); + setTimeout(() => { + console.log(" [x] Done processing %s", msg.content.toString()); + }, 3000); + } + }, { + noAck: true + }); + }); + }); +}); diff --git a/bondarenko_max_lab_4/consumer2.js b/bondarenko_max_lab_4/consumer2.js new file mode 100644 index 0000000..66b0eed --- /dev/null +++ b/bondarenko_max_lab_4/consumer2.js @@ -0,0 +1,38 @@ +const amqp = require('amqplib/callback_api'); + +amqp.connect('amqp://127.0.0.1', (err, connection) => { + if (err) { + throw err; + } + connection.createChannel((err, channel) => { + if (err) { + throw err; + } + const exchange = 'logs'; + const queue = 'consumer2_queue'; + + channel.assertExchange(exchange, 'fanout', { + durable: false + }); + + channel.assertQueue(queue, { + durable: false + }, (err, q) => { + if (err) { + throw err; + } + console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q.queue); + + channel.bindQueue(q.queue, exchange, ''); + + channel.consume(q.queue, (msg) => { + if (msg.content) { + console.log(" [x] Received %s", msg.content.toString()); + console.log(" [x] Done processing %s", msg.content.toString()); + } + }, { + noAck: true + }); + }); + }); +}); diff --git a/bondarenko_max_lab_4/images/consumer1_queue.png b/bondarenko_max_lab_4/images/consumer1_queue.png new file mode 100644 index 0000000..9c822c2 Binary files /dev/null and b/bondarenko_max_lab_4/images/consumer1_queue.png differ diff --git a/bondarenko_max_lab_4/images/consumer1x2_queue.png b/bondarenko_max_lab_4/images/consumer1x2_queue.png new file mode 100644 index 0000000..439b9df Binary files /dev/null and b/bondarenko_max_lab_4/images/consumer1x2_queue.png differ diff --git a/bondarenko_max_lab_4/images/consumer2_queue.png b/bondarenko_max_lab_4/images/consumer2_queue.png new file mode 100644 index 0000000..f941a75 Binary files /dev/null and b/bondarenko_max_lab_4/images/consumer2_queue.png differ diff --git a/bondarenko_max_lab_4/images/exchanges.png b/bondarenko_max_lab_4/images/exchanges.png new file mode 100644 index 0000000..f27a131 Binary files /dev/null and b/bondarenko_max_lab_4/images/exchanges.png differ diff --git a/bondarenko_max_lab_4/images/publisher-consumers.png b/bondarenko_max_lab_4/images/publisher-consumers.png new file mode 100644 index 0000000..1620dbd Binary files /dev/null and b/bondarenko_max_lab_4/images/publisher-consumers.png differ diff --git a/bondarenko_max_lab_4/images/tutorial-1.png b/bondarenko_max_lab_4/images/tutorial-1.png new file mode 100644 index 0000000..338e6a9 Binary files /dev/null and b/bondarenko_max_lab_4/images/tutorial-1.png differ diff --git a/bondarenko_max_lab_4/images/tutorial-2.png b/bondarenko_max_lab_4/images/tutorial-2.png new file mode 100644 index 0000000..27b0b58 Binary files /dev/null and b/bondarenko_max_lab_4/images/tutorial-2.png differ diff --git a/bondarenko_max_lab_4/images/tutorial-3.png b/bondarenko_max_lab_4/images/tutorial-3.png new file mode 100644 index 0000000..98da414 Binary files /dev/null and b/bondarenko_max_lab_4/images/tutorial-3.png differ diff --git a/bondarenko_max_lab_4/package-lock.json b/bondarenko_max_lab_4/package-lock.json new file mode 100644 index 0000000..3cf4792 --- /dev/null +++ b/bondarenko_max_lab_4/package-lock.json @@ -0,0 +1,97 @@ +{ + "name": "bondarenko_max_lab_4", + "lockfileVersion": 3, + "requires": true, + "packages": { + "": { + "dependencies": { + "amqplib": "^0.10.5" + } + }, + "node_modules/@acuminous/bitsyntax": { + "version": "0.1.2", + "resolved": "https://registry.npmjs.org/@acuminous/bitsyntax/-/bitsyntax-0.1.2.tgz", + "integrity": "sha512-29lUK80d1muEQqiUsSo+3A0yP6CdspgC95EnKBMi22Xlwt79i/En4Vr67+cXhU+cZjbti3TgGGC5wy1stIywVQ==", + "license": "MIT", + "dependencies": { + "buffer-more-ints": "~1.0.0", + "debug": "^4.3.4", + "safe-buffer": "~5.1.2" + }, + "engines": { + "node": ">=0.8" + } + }, + "node_modules/amqplib": { + "version": "0.10.5", + "resolved": "https://registry.npmjs.org/amqplib/-/amqplib-0.10.5.tgz", + "integrity": "sha512-Dx5zmy0Ur+Q7LPPdhz+jx5IzmJBoHd15tOeAfQ8SuvEtyPJ20hBemhOBA4b1WeORCRa0ENM/kHCzmem1w/zHvQ==", + "license": "MIT", + "dependencies": { + "@acuminous/bitsyntax": "^0.1.2", + "buffer-more-ints": "~1.0.0", + "url-parse": "~1.5.10" + }, + "engines": { + "node": ">=10" + } + }, + "node_modules/buffer-more-ints": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/buffer-more-ints/-/buffer-more-ints-1.0.0.tgz", + "integrity": "sha512-EMetuGFz5SLsT0QTnXzINh4Ksr+oo4i+UGTXEshiGCQWnsgSs7ZhJ8fzlwQ+OzEMs0MpDAMr1hxnblp5a4vcHg==", + "license": "MIT" + }, + "node_modules/debug": { + "version": "4.4.0", + "resolved": "https://registry.npmjs.org/debug/-/debug-4.4.0.tgz", + "integrity": "sha512-6WTZ/IxCY/T6BALoZHaE4ctp9xm+Z5kY/pzYaCHRFeyVhojxlrm+46y68HA6hr0TcwEssoxNiDEUJQjfPZ/RYA==", + "license": "MIT", + "dependencies": { + "ms": "^2.1.3" + }, + "engines": { + "node": ">=6.0" + }, + "peerDependenciesMeta": { + "supports-color": { + "optional": true + } + } + }, + "node_modules/ms": { + "version": "2.1.3", + "resolved": "https://registry.npmjs.org/ms/-/ms-2.1.3.tgz", + "integrity": "sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA==", + "license": "MIT" + }, + "node_modules/querystringify": { + "version": "2.2.0", + "resolved": "https://registry.npmjs.org/querystringify/-/querystringify-2.2.0.tgz", + "integrity": "sha512-FIqgj2EUvTa7R50u0rGsyTftzjYmv/a3hO345bZNrqabNqjtgiDMgmo4mkUjd+nzU5oF3dClKqFIPUKybUyqoQ==", + "license": "MIT" + }, + "node_modules/requires-port": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/requires-port/-/requires-port-1.0.0.tgz", + "integrity": "sha512-KigOCHcocU3XODJxsu8i/j8T9tzT4adHiecwORRQ0ZZFcp7ahwXuRU1m+yuO90C5ZUyGeGfocHDI14M3L3yDAQ==", + "license": "MIT" + }, + "node_modules/safe-buffer": { + "version": "5.1.2", + "resolved": "https://registry.npmjs.org/safe-buffer/-/safe-buffer-5.1.2.tgz", + "integrity": "sha512-Gd2UZBJDkXlY7GbJxfsE8/nvKkUEU1G38c1siN6QP6a9PT9MmHB8GnpscSmMJSoF8LOIrt8ud/wPtojys4G6+g==", + "license": "MIT" + }, + "node_modules/url-parse": { + "version": "1.5.10", + "resolved": "https://registry.npmjs.org/url-parse/-/url-parse-1.5.10.tgz", + "integrity": "sha512-WypcfiRhfeUP9vvF0j6rw0J3hrWrw6iZv3+22h6iRMJ/8z1Tj6XfLP4DsUix5MhMPnXpiHDoKyoZ/bdCkwBCiQ==", + "license": "MIT", + "dependencies": { + "querystringify": "^2.1.1", + "requires-port": "^1.0.0" + } + } + } +} diff --git a/bondarenko_max_lab_4/package.json b/bondarenko_max_lab_4/package.json new file mode 100644 index 0000000..afd40dc --- /dev/null +++ b/bondarenko_max_lab_4/package.json @@ -0,0 +1,5 @@ +{ + "dependencies": { + "amqplib": "^0.10.5" + } +} diff --git a/bondarenko_max_lab_4/publisher.js b/bondarenko_max_lab_4/publisher.js new file mode 100644 index 0000000..3704ff8 --- /dev/null +++ b/bondarenko_max_lab_4/publisher.js @@ -0,0 +1,24 @@ +const amqp = require('amqplib/callback_api'); + +amqp.connect('amqp://127.0.0.1', (err, connection) => { + if (err) { + throw err; + } + connection.createChannel((err, channel) => { + if (err) { + throw err; + } + const exchange = 'logs'; + const msgTypes = ['Order Received', 'User Message', 'Generate Report']; + + channel.assertExchange(exchange, 'fanout', { + durable: false + }); + + setInterval(() => { + const msg = msgTypes[Math.floor(Math.random() * msgTypes.length)]; + channel.publish(exchange, '', Buffer.from(msg)); + console.log(" [x] Sent '%s'", msg); + }, 1000); + }); +}); diff --git a/bondarenko_max_lab_4/tutorial/lesson_1/receive.js b/bondarenko_max_lab_4/tutorial/lesson_1/receive.js new file mode 100644 index 0000000..7ef7c9d --- /dev/null +++ b/bondarenko_max_lab_4/tutorial/lesson_1/receive.js @@ -0,0 +1,25 @@ +const amqp = require('amqplib/callback_api'); + +amqp.connect('amqp://localhost', (err, connection) => { + if (err) { + throw err; + } + connection.createChannel((err, channel) => { + if (err) { + throw err; + } + const queue = 'hello'; + + channel.assertQueue(queue, { + durable: false + }); + + console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", queue); + + channel.consume(queue, (msg) => { + console.log(" [x] Receive %s", msg.content.toString()); + }, { + wnoAck: true + }); + }); +}); diff --git a/bondarenko_max_lab_4/tutorial/lesson_1/send.js b/bondarenko_max_lab_4/tutorial/lesson_1/send.js new file mode 100644 index 0000000..e2aff11 --- /dev/null +++ b/bondarenko_max_lab_4/tutorial/lesson_1/send.js @@ -0,0 +1,26 @@ +const amqp = require('amqplib/callback_api'); + +amqp.connect('amqp://localhost', (err, connection) => { + if (err) { + throw err; + } + connection.createChannel((err, channel) => { + if (err) { + throw err; + } + const queue = 'hello'; + const msg = 'Hello World!'; + + channel.assertQueue(queue, { + durable: false + }); + + channel.sendToQueue(queue, Buffer.from(msg)); + console.log(" [x] Sent '%s'", msg); + }); + + setTimeout(() => { + connection.close(); + process.exit(0); + }, 500); +}); diff --git a/bondarenko_max_lab_4/tutorial/lesson_2/new_task.js b/bondarenko_max_lab_4/tutorial/lesson_2/new_task.js new file mode 100644 index 0000000..b7a01bc --- /dev/null +++ b/bondarenko_max_lab_4/tutorial/lesson_2/new_task.js @@ -0,0 +1,29 @@ +const amqp = require('amqplib/callback_api'); + +amqp.connect('amqp://127.0.0.1', (err, connection) => { + if (err) { + throw err; + } + connection.createChannel((err, channel) => { + if (err) { + throw err; + } + const queue = 'task_queue'; + const msg = process.argv.slice(2).join(' ') || "Hello World!"; + + channel.assertQueue(queue, { + durable: true + }); + + channel.sendToQueue(queue, Buffer.from(msg), { + persistent: true + }); + + console.log(" [x] Sent '%s'", msg); + }); + + setTimeout(() => { + connection.close(); + process.exit(0); + }, 500); +}); diff --git a/bondarenko_max_lab_4/tutorial/lesson_2/worker.js b/bondarenko_max_lab_4/tutorial/lesson_2/worker.js new file mode 100644 index 0000000..5a330ec --- /dev/null +++ b/bondarenko_max_lab_4/tutorial/lesson_2/worker.js @@ -0,0 +1,33 @@ +const amqp = require('amqplib/callback_api'); + +amqp.connect('amqp://127.0.0.1', (err, connection) => { + if (err) { + throw err; + } + connection.createChannel((err, channel) => { + if (err) { + throw err; + } + const queue = 'task_queue'; + + channel.assertQueue(queue, { + durable: true + }); + + channel.prefetch(1); + + console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", queue); + + channel.consume(queue, (msg) => { + const secs = msg.content.toString().split('.').length - 1; + + console.log(" [x] Received %s", msg.content.toString()); + setTimeout(() => { + console.log(" [x] Done"); + channel.ack(msg); + }, secs * 1000); + }, { + noAck: false + }); + }); +}); diff --git a/bondarenko_max_lab_4/tutorial/lesson_3/emit_log.js b/bondarenko_max_lab_4/tutorial/lesson_3/emit_log.js new file mode 100644 index 0000000..362e210 --- /dev/null +++ b/bondarenko_max_lab_4/tutorial/lesson_3/emit_log.js @@ -0,0 +1,26 @@ +const amqp = require('amqplib/callback_api'); + +amqp.connect('amqp://localhost', (err, connection) => { + if (err) { + throw err; + } + connection.createChannel((err, channel) => { + if (err) { + throw err; + } + const exchange = 'logs'; + const msg = process.argv.slice(2).join(' ') || 'Hello World!'; + + channel.assertExchange(exchange, 'fanout', { + durable: false + }); + + channel.publish(exchange, '', Buffer.from(msg)); + console.log(" [x] Sent '%s'", msg); + }); + + setTimeout(() => { + connection.close(); + process.exit(0); + }, 500); +}); diff --git a/bondarenko_max_lab_4/tutorial/lesson_3/receive_logs.js b/bondarenko_max_lab_4/tutorial/lesson_3/receive_logs.js new file mode 100644 index 0000000..42e7514 --- /dev/null +++ b/bondarenko_max_lab_4/tutorial/lesson_3/receive_logs.js @@ -0,0 +1,36 @@ +const amqp = require('amqplib/callback_api'); + +amqp.connect('amqp://localhost', (err, connection) => { + if (err) { + throw err; + } + connection.createChannel((err, channel) => { + if (err) { + throw err; + } + const exchange = 'logs'; + + channel.assertExchange(exchange, 'fanout', { + durable: false + }); + + channel.assertQueue('', { + exclusive: true + }, (err, q) => { + if (err) { + throw err; + } + console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", q.queue); + + channel.bindQueue(q.queue, exchange, ''); + + channel.consume(q.queue, (msg) => { + if (msg.content) { + console.log(" [x] %s", msg.content.toString()); + } + }, { + noAck: true + }); + }); + }); +});