rewrite on sync messages

This commit is contained in:
acidmikk 2023-11-25 03:01:57 +04:00
parent 06eba2e084
commit 22b163ed29
5 changed files with 91 additions and 58 deletions

View File

@ -1,34 +1,23 @@
version: '3.8' version: '3.8'
services: services:
rabbitmq:
image: "rabbitmq:management"
ports:
- "5672:5672"
- "15672:15672"
networks:
- my_network
restart: always
user-service: user-service:
build: build:
context: ./user_service context: ./user_service
ports: ports:
- "5001:5001" - "5001:5001"
depends_on:
- rabbitmq
networks: networks:
- my_network - my_network
restart: always
order-service: order-service:
build: build:
context: ./order_service context: ./order_service
ports: ports:
- "5002:5002" - "5002:5002"
depends_on:
- rabbitmq
networks: networks:
- my_network - my_network
restart: always
nginx: nginx:
image: nginx:latest image: nginx:latest
@ -41,6 +30,7 @@ services:
- my_network - my_network
volumes: volumes:
- ./nginx/nginx.conf:/etc/nginx/nginx.conf - ./nginx/nginx.conf:/etc/nginx/nginx.conf
restart: always
networks: networks:
my_network: my_network:

View File

@ -1,16 +1,16 @@
# order_service.py # order_service.py
from flask import Flask, request, jsonify from flask import Flask, request, jsonify
import pika import requests
import json
import time
app = Flask(__name__) app = Flask(__name__)
# Конфигурация RabbitMQ # Конфигурация RabbitMQ
time.sleep(20) # time.sleep(20)
connection = pika.BlockingConnection(pika.ConnectionParameters('rabbitmq')) # connection = pika.BlockingConnection(pika.ConnectionParameters('rabbitmq'))
channel = connection.channel() # if connection:
channel.queue_declare(queue='order_queue', durable=True) # print(f'Connect to RabbitMQ')
# channel = connection.channel()
# channel.queue_declare(queue='order_service', durable=True)
orders = [] orders = []
@ -29,7 +29,8 @@ def create_order(user_id, product, action):
'action': action} 'action': action}
orders.append(order) orders.append(order)
# Отправка сообщения о создании заказа # Отправка сообщения о создании заказа
channel.basic_publish(exchange='microservices', routing_key='order', body=bytes(json.dumps(order))) requests.post("http://localhost/order-service/event", json=order)
#channel.basic_publish(exchange='', routing_key='order_service', body=json.dumps(order).encode('utf-8'),properties=pika.BasicProperties(delivery_mode=2, # make message persistent))
return jsonify(order) return jsonify(order)
@ -54,8 +55,10 @@ def update_order(order_id, user_id, product, action):
if order: if order:
order['user_id'] = user_id order['user_id'] = user_id
order['product'] = product order['product'] = product
order['action'] = action# Отправка сообщения об обновлении заказа order['action'] = action
channel.basic_publish(exchange='microservices', routing_key='order', body=bytes(json.dumps(order))) # Отправка сообщения об обновлении заказа
requests.post("http://localhost/order-service/event", json=order)
#channel.basic_publish(exchange='', routing_key='order_service', body=json.dumps(order).encode('utf-8'), properties=pika.BasicProperties(delivery_mode=2, # make message persistent))
return jsonify(order) return jsonify(order)
else: else:
return jsonify({'error': 'Order not found'}) return jsonify({'error': 'Order not found'})
@ -69,7 +72,8 @@ def delete_order(order_id):
orders = [order for order in orders if order['id'] != order] orders = [order for order in orders if order['id'] != order]
# Отправка сообщения об удалении заказа # Отправка сообщения об удалении заказа
channel.basic_publish(exchange='microservices', routing_key='order', body=bytes(json.dumps(order))) requests.post("http://localhost/order-service/event", json=order)
#channel.basic_publish(exchange='', routing_key='order_service', body=json.dumps(order).encode('utf-8'), properties=pika.BasicProperties(delivery_mode=2, # make message persistent ))
return jsonify({'result': True}) return jsonify({'result': True})

View File

@ -1,3 +1,2 @@
Flask==3.0.0 Flask==3.0.0
pika==1.3.2 requests==2.31.0
gunicorn==21.2.0

View File

@ -1,3 +1,2 @@
Flask==3.0.0 Flask==3.0.0
pika==1.3.2 requests==2.31.0
gunicorn==21.2.0

View File

@ -1,36 +1,82 @@
# user_service.py # user_service.py
import json
import time import time
from flask import Flask, request, jsonify from flask import Flask, request, jsonify
import pika
import json
import threading
app = Flask(__name__) app = Flask(__name__)
users = [] users = []
# Конфигурация RabbitMQ
def connect_to_rabbitmq():
retries = 5
while retries > 0:
try:
connection = pika.BlockingConnection(pika.ConnectionParameters('rabbitmq'))
channel = connection.channel()
channel.exchange_declare(exchange='microservices', exchange_type='direct')
channel.queue_declare(queue='order_service', durable=True)
channel.queue_bind(exchange='microservices', queue='order_service', routing_key='order')
channel.basic_consume(queue='order_service', on_message_callback=process_order_message, auto_ack=True)
print('Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
except pika.exceptions.AMQPConnectionError:
print("Failed to connect to RabbitMQ. Retrying...")
retries -= 1
time.sleep(5)
# Обработка сообщений о заказах # Обработка сообщений о заказах
def process_order_message(ch, method, properties, body): # def process_order_message(ch, method, properties, body: bytes):
data = json.loads(body) # data = json.loads(body.decode('utf-8'))
# user_id = data.get('user_id')
#
# if data['action'] == 'create':
# # Создание заказа у пользователя
# user = next((user for user in users if user['id'] == user_id), None)
# if user:
# order_id = len(user.get('orders', [])) + 1
# order = {'id': order_id, 'product': data['product']}
# user.setdefault('orders', []).append(order)
# print(f"Order created for user {user_id}: {order}")
# else:
# print(f"User not found for order creation: {user_id}")
#
# elif data['action'] == 'update':
# # Обновление заказа у пользователя
# user = next((user for user in users if user['id'] == user_id), None)
# if user:
# order_id = data.get('order_id')
# order = next((order for order in user['orders'] if order['id'] == order_id), None)
# if order:
# order['product'] = data['product']
# print(f"Order updated for user {user_id}: {order}")
# else:
# print(f"Order not found for update: {order_id}")
# else:
# print(f"User not found for order update: {user_id}")
#
# elif data['action'] == 'delete':
# # Удаление заказа у пользователя
# user = next((user for user in users if user['id'] == user_id), None)
# if user:
# order_id = data.get('order_id')
# user['orders'] = [order for order in user.get('orders', []) if order['id'] != order_id]
# print(f"Order deleted for user {user_id}: {order_id}")
# else:
# print(f"User not found for order deletion: {user_id}")
# ch.basic_ack(delivery_tag=method.delivery_tag)
# first_con = True
# # Конфигурация RabbitMQ
# def connect_to_rabbitmq():
# global first_con
# if first_con:
# time.sleep(15)
# first_con = False
# credentials = pika.PlainCredentials(username="guest", password="guest")
# connection = pika.BlockingConnection(pika.ConnectionParameters(host='rabbitmq', credentials=credentials))
# channel = connection.channel()
# channel.queue_declare(queue='order_service', durable=True)
# channel.basic_qos(prefetch_count=1)
#
# def callback(ch, method, properties, body):
# process_order_message(ch, method, properties, body)
# ch.close()
#
# channel.basic_consume(queue='order_service', on_message_callback=callback, auto_ack=False)
# print('Waiting for messages. To exit press CTRL+C')
# while True:
# connection.process_data_events(time_limit=1)
# CRUD операции для пользователей
# READ ALL
@app.route('/event')
def event():
data = request.get_json()
print(data)
user_id = data.get('user_id') user_id = data.get('user_id')
if data['action'] == 'create': if data['action'] == 'create':
@ -68,9 +114,6 @@ def process_order_message(ch, method, properties, body):
else: else:
print(f"User not found for order deletion: {user_id}") print(f"User not found for order deletion: {user_id}")
# CRUD операции для пользователей
# READ ALL
@app.route('/users', methods=['GET']) @app.route('/users', methods=['GET'])
def show_users(): def show_users():
return users return users
@ -115,7 +158,5 @@ def delete_user(user_id):
if __name__ == '__main__': if __name__ == '__main__':
# Запускаем обработчик RabbitMQ в отдельном потоке #connect_to_rabbitmq()
rabbitmq_thread = threading.Thread(target=connect_to_rabbitmq())
rabbitmq_thread.start()
app.run(port=5001) app.run(port=5001)