From 22b163ed29c4e78fa2cf5fb7e657b9f8199112c5 Mon Sep 17 00:00:00 2001 From: acidmikk Date: Sat, 25 Nov 2023 03:01:57 +0400 Subject: [PATCH] rewrite on sync messages --- basharin_sevastyan_lab_3/docker-compose.yaml | 16 +-- .../order_service/order_service.py | 26 +++-- .../order_service/requirements.txt | 3 +- .../user_service/requirements.txt | 3 +- .../user_service/user_service.py | 101 ++++++++++++------ 5 files changed, 91 insertions(+), 58 deletions(-) diff --git a/basharin_sevastyan_lab_3/docker-compose.yaml b/basharin_sevastyan_lab_3/docker-compose.yaml index 9a3d771..b4cc2e5 100644 --- a/basharin_sevastyan_lab_3/docker-compose.yaml +++ b/basharin_sevastyan_lab_3/docker-compose.yaml @@ -1,34 +1,23 @@ version: '3.8' services: - rabbitmq: - image: "rabbitmq:management" - ports: - - "5672:5672" - - "15672:15672" - networks: - - my_network - restart: always - user-service: build: context: ./user_service ports: - "5001:5001" - depends_on: - - rabbitmq networks: - my_network + restart: always order-service: build: context: ./order_service ports: - "5002:5002" - depends_on: - - rabbitmq networks: - my_network + restart: always nginx: image: nginx:latest @@ -41,6 +30,7 @@ services: - my_network volumes: - ./nginx/nginx.conf:/etc/nginx/nginx.conf + restart: always networks: my_network: diff --git a/basharin_sevastyan_lab_3/order_service/order_service.py b/basharin_sevastyan_lab_3/order_service/order_service.py index f0b06be..05b437c 100644 --- a/basharin_sevastyan_lab_3/order_service/order_service.py +++ b/basharin_sevastyan_lab_3/order_service/order_service.py @@ -1,16 +1,16 @@ # order_service.py from flask import Flask, request, jsonify -import pika -import json -import time +import requests app = Flask(__name__) # Конфигурация RabbitMQ -time.sleep(20) -connection = pika.BlockingConnection(pika.ConnectionParameters('rabbitmq')) -channel = connection.channel() -channel.queue_declare(queue='order_queue', durable=True) +# time.sleep(20) +# connection = pika.BlockingConnection(pika.ConnectionParameters('rabbitmq')) +# if connection: +# print(f'Connect to RabbitMQ') +# channel = connection.channel() +# channel.queue_declare(queue='order_service', durable=True) orders = [] @@ -29,7 +29,8 @@ def create_order(user_id, product, action): 'action': action} orders.append(order) # Отправка сообщения о создании заказа - channel.basic_publish(exchange='microservices', routing_key='order', body=bytes(json.dumps(order))) + requests.post("http://localhost/order-service/event", json=order) + #channel.basic_publish(exchange='', routing_key='order_service', body=json.dumps(order).encode('utf-8'),properties=pika.BasicProperties(delivery_mode=2, # make message persistent)) return jsonify(order) @@ -54,8 +55,10 @@ def update_order(order_id, user_id, product, action): if order: order['user_id'] = user_id order['product'] = product - order['action'] = action# Отправка сообщения об обновлении заказа - channel.basic_publish(exchange='microservices', routing_key='order', body=bytes(json.dumps(order))) + order['action'] = action + # Отправка сообщения об обновлении заказа + requests.post("http://localhost/order-service/event", json=order) + #channel.basic_publish(exchange='', routing_key='order_service', body=json.dumps(order).encode('utf-8'), properties=pika.BasicProperties(delivery_mode=2, # make message persistent)) return jsonify(order) else: return jsonify({'error': 'Order not found'}) @@ -69,7 +72,8 @@ def delete_order(order_id): orders = [order for order in orders if order['id'] != order] # Отправка сообщения об удалении заказа - channel.basic_publish(exchange='microservices', routing_key='order', body=bytes(json.dumps(order))) + requests.post("http://localhost/order-service/event", json=order) + #channel.basic_publish(exchange='', routing_key='order_service', body=json.dumps(order).encode('utf-8'), properties=pika.BasicProperties(delivery_mode=2, # make message persistent )) return jsonify({'result': True}) diff --git a/basharin_sevastyan_lab_3/order_service/requirements.txt b/basharin_sevastyan_lab_3/order_service/requirements.txt index e949d43..411f23b 100644 --- a/basharin_sevastyan_lab_3/order_service/requirements.txt +++ b/basharin_sevastyan_lab_3/order_service/requirements.txt @@ -1,3 +1,2 @@ Flask==3.0.0 -pika==1.3.2 -gunicorn==21.2.0 \ No newline at end of file +requests==2.31.0 \ No newline at end of file diff --git a/basharin_sevastyan_lab_3/user_service/requirements.txt b/basharin_sevastyan_lab_3/user_service/requirements.txt index e949d43..411f23b 100644 --- a/basharin_sevastyan_lab_3/user_service/requirements.txt +++ b/basharin_sevastyan_lab_3/user_service/requirements.txt @@ -1,3 +1,2 @@ Flask==3.0.0 -pika==1.3.2 -gunicorn==21.2.0 \ No newline at end of file +requests==2.31.0 \ No newline at end of file diff --git a/basharin_sevastyan_lab_3/user_service/user_service.py b/basharin_sevastyan_lab_3/user_service/user_service.py index a4b4397..b9026ba 100644 --- a/basharin_sevastyan_lab_3/user_service/user_service.py +++ b/basharin_sevastyan_lab_3/user_service/user_service.py @@ -1,36 +1,82 @@ # user_service.py +import json import time from flask import Flask, request, jsonify -import pika -import json -import threading app = Flask(__name__) users = [] -# Конфигурация RabbitMQ -def connect_to_rabbitmq(): - retries = 5 - while retries > 0: - try: - connection = pika.BlockingConnection(pika.ConnectionParameters('rabbitmq')) - channel = connection.channel() - channel.exchange_declare(exchange='microservices', exchange_type='direct') - channel.queue_declare(queue='order_service', durable=True) - channel.queue_bind(exchange='microservices', queue='order_service', routing_key='order') - channel.basic_consume(queue='order_service', on_message_callback=process_order_message, auto_ack=True) - print('Waiting for messages. To exit press CTRL+C') - channel.start_consuming() - except pika.exceptions.AMQPConnectionError: - print("Failed to connect to RabbitMQ. Retrying...") - retries -= 1 - time.sleep(5) - - # Обработка сообщений о заказах -def process_order_message(ch, method, properties, body): - data = json.loads(body) +# def process_order_message(ch, method, properties, body: bytes): +# data = json.loads(body.decode('utf-8')) +# user_id = data.get('user_id') +# +# if data['action'] == 'create': +# # Создание заказа у пользователя +# user = next((user for user in users if user['id'] == user_id), None) +# if user: +# order_id = len(user.get('orders', [])) + 1 +# order = {'id': order_id, 'product': data['product']} +# user.setdefault('orders', []).append(order) +# print(f"Order created for user {user_id}: {order}") +# else: +# print(f"User not found for order creation: {user_id}") +# +# elif data['action'] == 'update': +# # Обновление заказа у пользователя +# user = next((user for user in users if user['id'] == user_id), None) +# if user: +# order_id = data.get('order_id') +# order = next((order for order in user['orders'] if order['id'] == order_id), None) +# if order: +# order['product'] = data['product'] +# print(f"Order updated for user {user_id}: {order}") +# else: +# print(f"Order not found for update: {order_id}") +# else: +# print(f"User not found for order update: {user_id}") +# +# elif data['action'] == 'delete': +# # Удаление заказа у пользователя +# user = next((user for user in users if user['id'] == user_id), None) +# if user: +# order_id = data.get('order_id') +# user['orders'] = [order for order in user.get('orders', []) if order['id'] != order_id] +# print(f"Order deleted for user {user_id}: {order_id}") +# else: +# print(f"User not found for order deletion: {user_id}") +# ch.basic_ack(delivery_tag=method.delivery_tag) + + +# first_con = True +# # Конфигурация RabbitMQ +# def connect_to_rabbitmq(): +# global first_con +# if first_con: +# time.sleep(15) +# first_con = False +# credentials = pika.PlainCredentials(username="guest", password="guest") +# connection = pika.BlockingConnection(pika.ConnectionParameters(host='rabbitmq', credentials=credentials)) +# channel = connection.channel() +# channel.queue_declare(queue='order_service', durable=True) +# channel.basic_qos(prefetch_count=1) +# +# def callback(ch, method, properties, body): +# process_order_message(ch, method, properties, body) +# ch.close() +# +# channel.basic_consume(queue='order_service', on_message_callback=callback, auto_ack=False) +# print('Waiting for messages. To exit press CTRL+C') +# while True: +# connection.process_data_events(time_limit=1) + +# CRUD операции для пользователей +# READ ALL +@app.route('/event') +def event(): + data = request.get_json() + print(data) user_id = data.get('user_id') if data['action'] == 'create': @@ -68,9 +114,6 @@ def process_order_message(ch, method, properties, body): else: print(f"User not found for order deletion: {user_id}") - -# CRUD операции для пользователей -# READ ALL @app.route('/users', methods=['GET']) def show_users(): return users @@ -115,7 +158,5 @@ def delete_user(user_id): if __name__ == '__main__': - # Запускаем обработчик RabbitMQ в отдельном потоке - rabbitmq_thread = threading.Thread(target=connect_to_rabbitmq()) - rabbitmq_thread.start() + #connect_to_rabbitmq() app.run(port=5001)