39 lines
1.3 KiB
Python
39 lines
1.3 KiB
Python
import pika
|
|
import json
|
|
import time
|
|
import threading
|
|
|
|
|
|
class LectureConsumer1:
|
|
def __init__(self):
|
|
self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
|
|
self.channel = self.connection.channel()
|
|
self.channel.exchange_declare(exchange='lecture_exchange', exchange_type='fanout')
|
|
|
|
result = self.channel.queue_declare(queue='', exclusive=True)
|
|
self.queue_name = result.method.queue
|
|
print('slow + ' + self.queue_name)
|
|
|
|
self.channel.queue_bind(exchange='lecture_exchange', queue=self.queue_name)
|
|
|
|
def callback(self, ch, method, properties, body):
|
|
lecture_event = json.loads(body)
|
|
print(f" [x] Consumer_slow received lecture event: {lecture_event}")
|
|
time.sleep(3)
|
|
print(f" [x] Consumer_slow processed lecture event: {lecture_event}")
|
|
|
|
def consume_lectures(self):
|
|
self.channel.basic_consume(queue=self.queue_name, on_message_callback=self.callback)
|
|
print(' [*] Consumer_slow waiting for lecture events. To exit press CTRL+C')
|
|
self.channel.start_consuming()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
lecture_consumer1 = LectureConsumer1()
|
|
try:
|
|
t1 = threading.Thread(target=lecture_consumer1.consume_lectures)
|
|
t1.start()
|
|
t1.join()
|
|
except KeyboardInterrupt:
|
|
lecture_consumer1.connection.close()
|