diff --git a/mochalov_danila_lab_4/labwork/consumer_1.py b/mochalov_danila_lab_4/labwork/consumer_1.py new file mode 100644 index 0000000..1f8947b --- /dev/null +++ b/mochalov_danila_lab_4/labwork/consumer_1.py @@ -0,0 +1,33 @@ +import pika +import time + + +def callback(ch, method, properties, body): + print(f'CONSUMER 1 ПОЛУЧИЛ СООБЩЕНИЕ: {body.decode()}') + + time.sleep(3) + + print(f'CONSUMER 1 ОБРАБОТАЛ СООБЩЕНИЕ: {body.decode()}') + + +def consume(): + connection = pika.BlockingConnection( + pika.ConnectionParameters( + host='localhost', + port=5672 + ) + ) + + channel = connection.channel() + + channel.queue_declare(queue='consumer_1_queue') + channel.queue_bind(exchange='game_exchange', queue='consumer_1_queue') + + channel.basic_consume(queue='consumer_1_queue', on_message_callback=callback, auto_ack=True) + + print('CONSUMER 1 ОЖИДАЕТ СООБЩЕНИЙ...') + channel.start_consuming() + + +if __name__ == "__main__": + consume() \ No newline at end of file diff --git a/mochalov_danila_lab_4/labwork/consumer_2.py b/mochalov_danila_lab_4/labwork/consumer_2.py new file mode 100644 index 0000000..57cfd96 --- /dev/null +++ b/mochalov_danila_lab_4/labwork/consumer_2.py @@ -0,0 +1,30 @@ +import pika + + +def callback(ch, method, properties, body): + print(f'CONSUMER 2 ПОЛУЧИЛ СООБЩЕНИЕ: {body.decode()}') + + print(f'CONSUMER 2 ОБРАБОТАЛ СООБЩЕНИЕ: {body.decode()}') + + +def consume(): + connection = pika.BlockingConnection( + pika.ConnectionParameters( + host='localhost', + port=5672 + ) + ) + + channel = connection.channel() + + channel.queue_declare(queue='consumer_2_queue') + channel.queue_bind(exchange='game_exchange', queue='consumer_2_queue') + + channel.basic_consume(queue='consumer_2_queue', on_message_callback=callback, auto_ack=True) + + print('CONSUMER 2 ОЖИДАЕТ СООБЩЕНИЙ...') + channel.start_consuming() + + +if __name__ == "__main__": + consume() \ No newline at end of file diff --git a/mochalov_danila_lab_4/labwork/producer.py b/mochalov_danila_lab_4/labwork/producer.py new file mode 100644 index 0000000..425e7f7 --- /dev/null +++ b/mochalov_danila_lab_4/labwork/producer.py @@ -0,0 +1,29 @@ +import pika +import time + +def produce(): + connection = pika.BlockingConnection( + pika.ConnectionParameters( + host='localhost', + port=5672 + ) + ) + channel = connection.channel() + + channel.exchange_declare(exchange='game_exchange', exchange_type='fanout') + + messages = [ + "Вышла новая игра: Brotato", + "Ваш друг играет в Cult of the Lamb", + "Ваша игра из списка желаемого продается со скидкой!" + ] + + while True: + for message in messages: + channel.basic_publish(exchange='game_exchange', routing_key='', body=message) + print(f'Отправлено: {message}') + time.sleep(1) + + +if __name__ == "__main__": + produce() \ No newline at end of file diff --git a/mochalov_danila_lab_4/readme.md b/mochalov_danila_lab_4/readme.md new file mode 100644 index 0000000..858ce24 --- /dev/null +++ b/mochalov_danila_lab_4/readme.md @@ -0,0 +1,41 @@ +# Лабораторная работа №4 +#### ПИбд-42. Мочалов Данила. + +#### Выполнение туториала + +- Первый +![Скриншот 1](./screenshots/tutorial_1.png) + +- Второй +![Скриншот 2](./screenshots/tutorial_2.png) + +- Третий +![Скриншот 3](./screenshots/tutorial_3.png) + +#### Выполнение лабораторной работы + +Выбранная предметная область: игровой лаунчер (а-ля Steam) + +Сначала попробовал запустить продюсера, первого и второго консьюмера в одном экземпляре. + +Результат: +![Скриншот 4](./screenshots/labwork_consumer_1_and_consumer_2.png) + +Вывод: +Очередь первого консьюмера быстро переполняется, так как скорость поступления сообщений в очередь больше чем скорость обработки сообщений первым консьюмером. +Очередь второго коньюмера никогда не заполняется, так как он моментально обрабатывает все входящие сообщения. + +Теперь запускаю сначала один экземпляр первого консьюмера, а потом несколько, чтобы они разгрузили накопившуюся очередь. + +Результат: +![Скриншот 5](./screenshots/labwork_consumers_1_only.png) + +Вывод: +Накопившаяся очередь быстро разгружается благодаря нескольким одновременно запущенным экземплярам первого консьюмера. + +#### Демонстрация работы + +На видео я сначала показываю как сильно загружается очередь при одном экземпляре первого консьюмера, потом запускаю еще парочку экземпляров и мы смотрим как очередь разгружается (и радуемся xd). +Еще в моменте показываю что второй консьюмер тоже работает отлично. +Видео по [ссылке](https://drive.google.com/file/d/1mgCpKw12GJZbYyAeqXKMQG9XbWI5gV_D/view?usp=sharing) + diff --git a/mochalov_danila_lab_4/screenshots/labwork_consumer_1_and_consumer_2.png b/mochalov_danila_lab_4/screenshots/labwork_consumer_1_and_consumer_2.png new file mode 100644 index 0000000..3d2d5b1 Binary files /dev/null and b/mochalov_danila_lab_4/screenshots/labwork_consumer_1_and_consumer_2.png differ diff --git a/mochalov_danila_lab_4/screenshots/labwork_consumers_1_only.png b/mochalov_danila_lab_4/screenshots/labwork_consumers_1_only.png new file mode 100644 index 0000000..9fe2b8d Binary files /dev/null and b/mochalov_danila_lab_4/screenshots/labwork_consumers_1_only.png differ diff --git a/mochalov_danila_lab_4/screenshots/tutorial_1.png b/mochalov_danila_lab_4/screenshots/tutorial_1.png new file mode 100644 index 0000000..b731b16 Binary files /dev/null and b/mochalov_danila_lab_4/screenshots/tutorial_1.png differ diff --git a/mochalov_danila_lab_4/screenshots/tutorial_2.png b/mochalov_danila_lab_4/screenshots/tutorial_2.png new file mode 100644 index 0000000..e607987 Binary files /dev/null and b/mochalov_danila_lab_4/screenshots/tutorial_2.png differ diff --git a/mochalov_danila_lab_4/screenshots/tutorial_3.png b/mochalov_danila_lab_4/screenshots/tutorial_3.png new file mode 100644 index 0000000..fea4fd6 Binary files /dev/null and b/mochalov_danila_lab_4/screenshots/tutorial_3.png differ diff --git a/mochalov_danila_lab_4/tutorial_1/producer.py b/mochalov_danila_lab_4/tutorial_1/producer.py new file mode 100644 index 0000000..21fb9c2 --- /dev/null +++ b/mochalov_danila_lab_4/tutorial_1/producer.py @@ -0,0 +1,13 @@ +import pika + +connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) +channel = connection.channel() + +channel.queue_declare(queue='hello') + +channel.basic_publish(exchange='', + routing_key='hello', + body='Hello World!') +print(" [x] Sent 'Hello World!'") + +connection.close() \ No newline at end of file diff --git a/mochalov_danila_lab_4/tutorial_1/receiver.py b/mochalov_danila_lab_4/tutorial_1/receiver.py new file mode 100644 index 0000000..eb1efa9 --- /dev/null +++ b/mochalov_danila_lab_4/tutorial_1/receiver.py @@ -0,0 +1,28 @@ +import os +import sys +import pika + + +def main(): + connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) + channel = connection.channel() + + channel.queue_declare(queue='hello') + + def callback(ch, method, properties, body): + print(f" [x] Received {body}") + + channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True) + + print(' [*] Waiting for messages. To exit press CTRL+C') + channel.start_consuming() + +if __name__ == '__main__': + try: + main() + except KeyboardInterrupt: + print('Interrupted') + try: + sys.exit(0) + except SystemExit: + os._exit(0) \ No newline at end of file diff --git a/mochalov_danila_lab_4/tutorial_2/new_task.py b/mochalov_danila_lab_4/tutorial_2/new_task.py new file mode 100644 index 0000000..a2444e0 --- /dev/null +++ b/mochalov_danila_lab_4/tutorial_2/new_task.py @@ -0,0 +1,19 @@ +import pika +import sys + +connection = pika.BlockingConnection( + pika.ConnectionParameters(host='localhost')) +channel = connection.channel() + +channel.queue_declare(queue='task_queue', durable=True) + +message = ' '.join(sys.argv[1:]) or "Hello World!" +channel.basic_publish( + exchange='', + routing_key='task_queue', + body=message, + properties=pika.BasicProperties( + delivery_mode=pika.DeliveryMode.Persistent + )) +print(f" [x] Sent {message}") +connection.close() \ No newline at end of file diff --git a/mochalov_danila_lab_4/tutorial_2/worker.py b/mochalov_danila_lab_4/tutorial_2/worker.py new file mode 100644 index 0000000..8f8ab64 --- /dev/null +++ b/mochalov_danila_lab_4/tutorial_2/worker.py @@ -0,0 +1,22 @@ +import pika +import time + +connection = pika.BlockingConnection( + pika.ConnectionParameters(host='localhost')) +channel = connection.channel() + +channel.queue_declare(queue='task_queue', durable=True) +print(' [*] Waiting for messages. To exit press CTRL+C') + + +def callback(ch, method, properties, body): + print(f" [x] Received {body.decode()}") + time.sleep(body.count(b'.')) + print(" [x] Done") + ch.basic_ack(delivery_tag=method.delivery_tag) + + +channel.basic_qos(prefetch_count=1) +channel.basic_consume(queue='task_queue', on_message_callback=callback) + +channel.start_consuming() \ No newline at end of file diff --git a/mochalov_danila_lab_4/tutorial_3/emit_log.py b/mochalov_danila_lab_4/tutorial_3/emit_log.py new file mode 100644 index 0000000..45b6989 --- /dev/null +++ b/mochalov_danila_lab_4/tutorial_3/emit_log.py @@ -0,0 +1,13 @@ +import pika +import sys + +connection = pika.BlockingConnection( + pika.ConnectionParameters(host='localhost')) +channel = connection.channel() + +channel.exchange_declare(exchange='logs', exchange_type='fanout') + +message = ' '.join(sys.argv[1:]) or "info: Hello World!" +channel.basic_publish(exchange='logs', routing_key='', body=message) +print(f" [x] Sent {message}") +connection.close() \ No newline at end of file diff --git a/mochalov_danila_lab_4/tutorial_3/receive_logs.py b/mochalov_danila_lab_4/tutorial_3/receive_logs.py new file mode 100644 index 0000000..60d881d --- /dev/null +++ b/mochalov_danila_lab_4/tutorial_3/receive_logs.py @@ -0,0 +1,22 @@ +import pika + +connection = pika.BlockingConnection( + pika.ConnectionParameters(host='localhost')) +channel = connection.channel() + +channel.exchange_declare(exchange='logs', exchange_type='fanout') + +result = channel.queue_declare(queue='', exclusive=True) +queue_name = result.method.queue + +channel.queue_bind(exchange='logs', queue=queue_name) + +print(' [*] Waiting for logs. To exit press CTRL+C') + +def callback(ch, method, properties, body): + print(f" [x] {body}") + +channel.basic_consume( + queue=queue_name, on_message_callback=callback, auto_ack=True) + +channel.start_consuming() \ No newline at end of file