forked from Alexey/DAS_2024_1
Merge pull request 'Bazunov Andrew Lab 4' (#111) from bazunov_andrew_lab_4 into main
Reviewed-on: Alexey/DAS_2024_1#111
This commit is contained in:
commit
a800c3df86
34
bazunov_andrew_lab_4/README.md
Normal file
34
bazunov_andrew_lab_4/README.md
Normal file
@ -0,0 +1,34 @@
|
|||||||
|
# Лабораторная работа №4: Работа с брокером сообщений (RabbitMQ)
|
||||||
|
|
||||||
|
## Цель
|
||||||
|
|
||||||
|
Изучение проектирования приложений с использованием брокера сообщений RabbitMQ.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Задачи
|
||||||
|
|
||||||
|
> 1. **Установить RabbitMQ**
|
||||||
|
Установите RabbitMQ на локальный компьютер (или используйте Docker).
|
||||||
|
>- [Скачивание RabbitMQ](https://www.rabbitmq.com/download.html)
|
||||||
|
>- [Релизы RabbitMQ](https://github.com/rabbitmq/rabbitmq-server/releases/)
|
||||||
|
>- **Пройти уроки RabbitMQ**
|
||||||
|
>- Сделайте скриншоты, показывающие запуск `producer` и `consumer` и передачу сообщений.
|
||||||
|
|
||||||
|
---
|
||||||
|
## Первый урок
|
||||||
|
> ![img.png](static/img1.png)
|
||||||
|
|
||||||
|
---
|
||||||
|
## Второй урок
|
||||||
|
>![img.png](static/img2.png)
|
||||||
|
>![img_1.png](static/img3.png)
|
||||||
|
|
||||||
|
---
|
||||||
|
## Третий урок
|
||||||
|
> ![img.png](static/img4.png)
|
||||||
|
|
||||||
|
---
|
||||||
|
## Задача
|
||||||
|
>![img.png](static/img5.png)
|
||||||
|
> ![img.png](static/img.png)
|
17
bazunov_andrew_lab_4/docker-compose.yaml
Normal file
17
bazunov_andrew_lab_4/docker-compose.yaml
Normal file
@ -0,0 +1,17 @@
|
|||||||
|
version: "3.2"
|
||||||
|
services:
|
||||||
|
rabbitmq:
|
||||||
|
image: rabbitmq:3-management-alpine
|
||||||
|
container_name: 'rabbitmq'
|
||||||
|
ports:
|
||||||
|
- "5672:5672"
|
||||||
|
- "15672:15672"
|
||||||
|
volumes:
|
||||||
|
- ~/.docker-conf/rabbitmq/data/:/var/lib/rabbitmq/
|
||||||
|
- ~/.docker-conf/rabbitmq/log/:/var/log/rabbitmq
|
||||||
|
networks:
|
||||||
|
- rabbitmq_go_net
|
||||||
|
|
||||||
|
networks:
|
||||||
|
rabbitmq_go_net:
|
||||||
|
driver: bridge
|
47
bazunov_andrew_lab_4/example/vk_author.py
Normal file
47
bazunov_andrew_lab_4/example/vk_author.py
Normal file
@ -0,0 +1,47 @@
|
|||||||
|
from datetime import datetime
|
||||||
|
import random
|
||||||
|
import threading
|
||||||
|
|
||||||
|
import pika
|
||||||
|
import sys
|
||||||
|
|
||||||
|
_alphabet = [chr(i) for i in range(97, 123)]
|
||||||
|
|
||||||
|
|
||||||
|
def run_every_n_seconds(seconds, action, *args):
|
||||||
|
threading.Timer(seconds, run_every_n_seconds, [seconds, action] + list(args)).start()
|
||||||
|
action(*args)
|
||||||
|
|
||||||
|
|
||||||
|
def generate_message():
|
||||||
|
now = datetime.now()
|
||||||
|
current_time = now.strftime("%H:%M:%S")
|
||||||
|
return f"[{current_time}] " + "".join(random.choices(_alphabet, k=random.randint(1, 10)))
|
||||||
|
|
||||||
|
|
||||||
|
def send_message(channel_local):
|
||||||
|
message = generate_message()
|
||||||
|
channel_local.basic_publish(
|
||||||
|
exchange='vk_messages',
|
||||||
|
routing_key='vk_messages',
|
||||||
|
body=message,
|
||||||
|
properties=pika.BasicProperties(
|
||||||
|
delivery_mode=pika.DeliveryMode.Persistent
|
||||||
|
))
|
||||||
|
print(f"[vkAuthor] Sent {message}")
|
||||||
|
|
||||||
|
|
||||||
|
def main(conn: pika.BlockingConnection):
|
||||||
|
channel = conn.channel()
|
||||||
|
channel.exchange_declare(exchange='vk_messages', exchange_type='fanout')
|
||||||
|
run_every_n_seconds(1, send_message, channel)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
|
||||||
|
|
||||||
|
try:
|
||||||
|
main(connection)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
connection.close()
|
||||||
|
sys.exit(0)
|
44
bazunov_andrew_lab_4/example/vk_reader.py
Normal file
44
bazunov_andrew_lab_4/example/vk_reader.py
Normal file
@ -0,0 +1,44 @@
|
|||||||
|
import sys
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
import pika
|
||||||
|
|
||||||
|
_QUEUE_NAME = "vk_messages_queue"
|
||||||
|
_EXCHANGE_NAME = "vk_messages"
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
|
||||||
|
channel = connection.channel()
|
||||||
|
|
||||||
|
channel.exchange_declare(
|
||||||
|
exchange=_EXCHANGE_NAME,
|
||||||
|
exchange_type='fanout'
|
||||||
|
)
|
||||||
|
|
||||||
|
channel.queue_declare(queue=_QUEUE_NAME, exclusive=True)
|
||||||
|
channel.queue_bind(exchange=_EXCHANGE_NAME, queue=_QUEUE_NAME)
|
||||||
|
|
||||||
|
def callback(ch, method, properties, body):
|
||||||
|
now = datetime.now()
|
||||||
|
current_time = now.strftime("%H:%M:%S")
|
||||||
|
|
||||||
|
print(f"[vkReader] Received [{str(body)}] in [{current_time}]")
|
||||||
|
ch.basic_ack(delivery_tag=method.delivery_tag)
|
||||||
|
|
||||||
|
channel.basic_consume(
|
||||||
|
queue=_QUEUE_NAME,
|
||||||
|
on_message_callback=callback,
|
||||||
|
auto_ack=False
|
||||||
|
)
|
||||||
|
|
||||||
|
print('[*] Waiting for messages. To exit press CTRL+C')
|
||||||
|
channel.start_consuming()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
try:
|
||||||
|
main()
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
print('Interrupted')
|
||||||
|
sys.exit(0)
|
47
bazunov_andrew_lab_4/example/vk_slow_reader.py
Normal file
47
bazunov_andrew_lab_4/example/vk_slow_reader.py
Normal file
@ -0,0 +1,47 @@
|
|||||||
|
import time
|
||||||
|
import random
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
import pika
|
||||||
|
import sys
|
||||||
|
|
||||||
|
_QUEUE_NAME = "vk_messages_queue_slow"
|
||||||
|
_EXCHANGE_NAME = "vk_messages"
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
|
||||||
|
channel = connection.channel()
|
||||||
|
|
||||||
|
channel.exchange_declare(
|
||||||
|
exchange=_EXCHANGE_NAME,
|
||||||
|
exchange_type='fanout'
|
||||||
|
)
|
||||||
|
channel.queue_declare(queue=_QUEUE_NAME, exclusive=True)
|
||||||
|
channel.queue_bind(exchange=_EXCHANGE_NAME, queue=_QUEUE_NAME)
|
||||||
|
|
||||||
|
def callback(ch, method, properties, body):
|
||||||
|
now = datetime.now()
|
||||||
|
current_time = now.strftime("%H:%M:%S")
|
||||||
|
|
||||||
|
print(f"[vkSlowReader] Received [{str(body)}] in [{current_time}]")
|
||||||
|
read_time = random.randint(2, 5)
|
||||||
|
time.sleep(read_time)
|
||||||
|
ch.basic_ack(delivery_tag=method.delivery_tag)
|
||||||
|
|
||||||
|
channel.basic_consume(
|
||||||
|
queue=_QUEUE_NAME,
|
||||||
|
on_message_callback=callback,
|
||||||
|
auto_ack=False
|
||||||
|
)
|
||||||
|
|
||||||
|
print('[*] Waiting for messages. To exit press CTRL+C')
|
||||||
|
channel.start_consuming()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
try:
|
||||||
|
main()
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
print('Interrupted')
|
||||||
|
sys.exit(0)
|
25
bazunov_andrew_lab_4/first/receive.py
Normal file
25
bazunov_andrew_lab_4/first/receive.py
Normal file
@ -0,0 +1,25 @@
|
|||||||
|
import pika
|
||||||
|
import sys
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
|
||||||
|
channel = connection.channel()
|
||||||
|
|
||||||
|
channel.queue_declare(queue='hello')
|
||||||
|
|
||||||
|
def callback(ch, method, properties, body):
|
||||||
|
print(f" [x] Received {body}")
|
||||||
|
|
||||||
|
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
|
||||||
|
|
||||||
|
print(' [*] Waiting for messages. To exit press CTRL+C')
|
||||||
|
channel.start_consuming()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
try:
|
||||||
|
main()
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
print('Interrupted')
|
||||||
|
sys.exit(0)
|
11
bazunov_andrew_lab_4/first/send.py
Normal file
11
bazunov_andrew_lab_4/first/send.py
Normal file
@ -0,0 +1,11 @@
|
|||||||
|
import pika
|
||||||
|
|
||||||
|
connection = pika.BlockingConnection(
|
||||||
|
pika.ConnectionParameters(host='localhost'))
|
||||||
|
channel = connection.channel()
|
||||||
|
|
||||||
|
channel.queue_declare(queue='hello')
|
||||||
|
|
||||||
|
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
|
||||||
|
print(" [x] Sent 'Hello World!'")
|
||||||
|
connection.close()
|
19
bazunov_andrew_lab_4/second/new_task.py
Normal file
19
bazunov_andrew_lab_4/second/new_task.py
Normal file
@ -0,0 +1,19 @@
|
|||||||
|
import pika
|
||||||
|
import sys
|
||||||
|
|
||||||
|
connection = pika.BlockingConnection(
|
||||||
|
pika.ConnectionParameters(host='localhost'))
|
||||||
|
channel = connection.channel()
|
||||||
|
|
||||||
|
channel.queue_declare(queue='task_queue', durable=True)
|
||||||
|
|
||||||
|
message = ' '.join(sys.argv[1:]) or "Hello World!"
|
||||||
|
channel.basic_publish(
|
||||||
|
exchange='',
|
||||||
|
routing_key='task_queue',
|
||||||
|
body=message,
|
||||||
|
properties=pika.BasicProperties(
|
||||||
|
delivery_mode=pika.DeliveryMode.Persistent
|
||||||
|
))
|
||||||
|
print(f" [x] Sent {message}")
|
||||||
|
connection.close()
|
22
bazunov_andrew_lab_4/second/worker.py
Normal file
22
bazunov_andrew_lab_4/second/worker.py
Normal file
@ -0,0 +1,22 @@
|
|||||||
|
import pika
|
||||||
|
import time
|
||||||
|
|
||||||
|
connection = pika.BlockingConnection(
|
||||||
|
pika.ConnectionParameters(host='localhost'))
|
||||||
|
channel = connection.channel()
|
||||||
|
|
||||||
|
channel.queue_declare(queue='task_queue', durable=True)
|
||||||
|
print(' [*] Waiting for messages. To exit press CTRL+C')
|
||||||
|
|
||||||
|
|
||||||
|
def callback(ch, method, properties, body):
|
||||||
|
print(f" [x] Received {body.decode()}")
|
||||||
|
time.sleep(body.count(b'.'))
|
||||||
|
print(" [x] Done")
|
||||||
|
ch.basic_ack(delivery_tag=method.delivery_tag)
|
||||||
|
|
||||||
|
|
||||||
|
channel.basic_qos(prefetch_count=1)
|
||||||
|
channel.basic_consume(queue='task_queue', on_message_callback=callback)
|
||||||
|
|
||||||
|
channel.start_consuming()
|
BIN
bazunov_andrew_lab_4/static/img.png
Normal file
BIN
bazunov_andrew_lab_4/static/img.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 35 KiB |
BIN
bazunov_andrew_lab_4/static/img1.png
Normal file
BIN
bazunov_andrew_lab_4/static/img1.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 37 KiB |
BIN
bazunov_andrew_lab_4/static/img2.png
Normal file
BIN
bazunov_andrew_lab_4/static/img2.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 14 KiB |
BIN
bazunov_andrew_lab_4/static/img3.png
Normal file
BIN
bazunov_andrew_lab_4/static/img3.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 24 KiB |
BIN
bazunov_andrew_lab_4/static/img4.png
Normal file
BIN
bazunov_andrew_lab_4/static/img4.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 29 KiB |
BIN
bazunov_andrew_lab_4/static/img5.png
Normal file
BIN
bazunov_andrew_lab_4/static/img5.png
Normal file
Binary file not shown.
After Width: | Height: | Size: 204 KiB |
13
bazunov_andrew_lab_4/third/emit_log.py
Normal file
13
bazunov_andrew_lab_4/third/emit_log.py
Normal file
@ -0,0 +1,13 @@
|
|||||||
|
import pika
|
||||||
|
import sys
|
||||||
|
|
||||||
|
connection = pika.BlockingConnection(
|
||||||
|
pika.ConnectionParameters(host='localhost'))
|
||||||
|
channel = connection.channel()
|
||||||
|
|
||||||
|
channel.exchange_declare(exchange='logs', exchange_type='fanout')
|
||||||
|
|
||||||
|
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
|
||||||
|
channel.basic_publish(exchange='logs', routing_key='', body=message)
|
||||||
|
print(f" [x] Sent {message}")
|
||||||
|
connection.close()
|
24
bazunov_andrew_lab_4/third/receive_logs.py
Normal file
24
bazunov_andrew_lab_4/third/receive_logs.py
Normal file
@ -0,0 +1,24 @@
|
|||||||
|
import pika
|
||||||
|
|
||||||
|
connection = pika.BlockingConnection(
|
||||||
|
pika.ConnectionParameters(host='localhost'))
|
||||||
|
channel = connection.channel()
|
||||||
|
|
||||||
|
channel.exchange_declare(exchange='logs', exchange_type='fanout')
|
||||||
|
|
||||||
|
result = channel.queue_declare(queue='', exclusive=True)
|
||||||
|
queue_name = result.method.queue
|
||||||
|
|
||||||
|
channel.queue_bind(exchange='logs', queue=queue_name)
|
||||||
|
|
||||||
|
print(' [*] Waiting for logs. To exit press CTRL+C')
|
||||||
|
|
||||||
|
|
||||||
|
def callback(ch, method, properties, body):
|
||||||
|
print(f" [x] {body}")
|
||||||
|
|
||||||
|
|
||||||
|
channel.basic_consume(
|
||||||
|
queue=queue_name, on_message_callback=callback, auto_ack=True)
|
||||||
|
|
||||||
|
channel.start_consuming()
|
Loading…
Reference in New Issue
Block a user