Compare commits

...

5 Commits

Author SHA1 Message Date
6967e8f240 Merge pull request 'kurushina_ksenia_lab_6' (#334) from kurushina_ksenia_lab_6 into kurushina_ksenia_lab_5
Reviewed-on: #334
2024-12-15 14:53:52 +04:00
Kseniy
e83e9844f7 kurushina_ksenia_lab_6 2024-12-09 20:01:43 +04:00
Kseniy
effeb9b8cd kurushina_ksenia_lab_5 2024-12-09 19:55:48 +04:00
Kseniy
19819d2a15 kurushina_ksenia_lab_4 2024-12-09 19:45:51 +04:00
Kseniy
b99aa9c5ac kurushina_ksenia_lab_4 2024-12-09 19:44:24 +04:00
25 changed files with 448 additions and 0 deletions

View File

@ -0,0 +1,23 @@
import pika
import time
def callback(ch, method, properties, body):
print(f'Consumer 1 получил сообщение: {body.decode()}')
time.sleep(2)
print('Consumer 1 закончил обработку')
def consume_events_1():
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='consumer1_queue')
channel.queue_bind(exchange='beauty_salon_events', queue='consumer1_queue')
channel.basic_consume(queue='consumer1_queue', on_message_callback=callback, auto_ack=True)
print('Consumer 1 начал ожидать сообщения...')
channel.start_consuming()
if __name__ == "__main__":
consume_events_1()

View File

@ -0,0 +1,25 @@
import pika
def callback(ch, method, properties, body):
print(f'Consumer 2 получил сообщение: {body.decode()}')
print('Consumer 2 закончил обработку')
def consume_events_2():
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='consumer2_queue')
channel.queue_bind(exchange='beauty_salon_events', queue='consumer2_queue')
channel.basic_consume(queue='consumer2_queue', on_message_callback=callback, auto_ack=True)
print('Consumer 2 начал ожидать сообщения...')
channel.start_consuming()
if __name__ == "__main__":
consume_events_2()

View File

@ -0,0 +1,19 @@
### Лабораторная работа №4 - Работа с брокером сообщений
#### Задание
1. Установить брокер сообщений RabbitMQ.
2. Пройти уроки 1, 2 и 3 из RabbitMQ Tutorials на любом языке программирования.
3. Продемонстрировать работу брокера сообщений.
#### Описание работы программы:
- **Класс Publisher** успешно осуществляет отправку сообщений своим клиентам.
- **Класс Consumer1** принимает и обрабатывает сообщения с задержкой в 3 секунды, что можно заметить на видео.
- **Класс Consumer2** мгновенно принимает и обрабатывает сообщения.
## Видео
https://cloud.mail.ru/public/Kopp/GG62rnCQj

Binary file not shown.

After

Width:  |  Height:  |  Size: 51 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 49 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 6.1 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 6.2 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 33 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 32 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 34 KiB

View File

@ -0,0 +1,25 @@
import pika, sys, os
def main():
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print(f" [x] Received {body}")
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print('Interrupted')
try:
sys.exit(0)
except SystemExit:
os._exit(0)

View File

@ -0,0 +1,11 @@
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()

View File

@ -0,0 +1,19 @@
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(
exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode=pika.DeliveryMode.Persistent
))
print(f" [x] Sent {message}")
connection.close()

View File

@ -0,0 +1,23 @@
#!/usr/bin/env python
import pika
import time
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(f" [x] Received {body.decode()}")
time.sleep(body.count(b'.'))
print(" [x] Done")
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)
channel.start_consuming()

View File

@ -0,0 +1,22 @@
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs', queue=queue_name)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(f" [x] {body}")
channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()

View File

@ -0,0 +1,13 @@
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs', routing_key='', body=message)
print(f" [x] Sent {message}")
connection.close()

View File

@ -0,0 +1,27 @@
import pika
import time
def publish_events():
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='beauty_salon_events', exchange_type='fanout')
events = [
"5",
"4",
"3",
"2",
"1"
]
while True:
event = events[int(time.time()) % len(events)]
channel.basic_publish(exchange='beauty_salon_events', routing_key='', body=event)
print(f'Отправлено: {event}')
time.sleep(1)
if __name__ == "__main__":
publish_events()

View File

@ -0,0 +1,64 @@
# Лабораторная работа: Реализация умножения матриц
## Краткое описание
**Цель работы** разработать и сравнить последовательный и
параллельный алгоритмы умножения матриц, оценив их
производительность на матрицах больших размеров.
### Основные задачи:
1. Реализовать алгоритм последовательного умножения матриц.
2. Создать параллельный алгоритм с настраиваемым числом потоков.
3. Провести бенчмарки обоих алгоритмов на матрицах размером 100x100, 300x300 и 500x500.
4. Проанализировать результаты и определить влияние размера матрицы и количества потоков на производительность.
## Теоретическая часть
Умножение матриц является ключевой операцией во многих областях, включая машинное обучение,
обработку изображений и моделирование физических процессов. Сложность умножения двух матриц размером
`N x N` составляет O(N³), что делает задачу вычислительно затратной. Для ускорения вычислений используется параллелизм,
позволяющий распределить работу между несколькими потоками.
## Описание реализации
1. **Последовательный алгоритм** реализован в файле `sequential.py`. Каждый элемент результирующей матрицы вычисляется
2. как сумма произведений соответствующих элементов
3. строки первой матрицы и столбца второй.
2. **Параллельный алгоритм** описан в модуле `parallel.py`.
3. Для выполнения вычислений используется многопоточность: каждый поток обрабатывает отдельный блок
4. строк результирующей матрицы. Пользователь может задать число потоков для регулирования нагрузки и эффективности работы.
## Результаты экспериментов
Тесты проводились на матрицах следующих размеров: 100x100, 300x300 и 500x500.
Для параллельного алгоритма изменялось
число потоков, чтобы оценить их влияние на скорость вычислений.
## Анализ результатов
1. **Эффективность параллелизма**: Параллельный алгоритм показал прирост производительности
для больших матриц. При размере 500x500 с 4 потоками наблюдалось значительное ускорение.
2. **Число потоков**: Увеличение потоков улучшает производительность только до определённого момента.
Для маленьких матриц (например, 100x100) дополнительная параллелизация может быть неэффективной.
3. **Ограничения параллелизма**: Накладные расходы на управление потоками
и их синхронизацию уменьшают преимущества многопоточности при малых объёмах данных.
4. **Рекомендации**:
Параллельные алгоритмы наиболее эффективны при работе с большими матрицами.
Настройка числа потоков должна учитывать ресурсы системы и размер задачи.
## Заключение
Данная работа продемонстрировала эффективность параллельного умножения матриц на больших данных.
Оптимизация параметров параллельного алгоритма позволяет
значительно сократить время выполнения задач, связанных с вычислительной обработкой матриц.
## Ссылка на видео
https://cloud.mail.ru/public/tKoW/ZLDNyHam2

Binary file not shown.

After

Width:  |  Height:  |  Size: 16 KiB

View File

@ -0,0 +1,29 @@
import time
import random
from DAS_2024_1.kurushina_ksenia_lab_5.parallel import matrix_multiply_parallel
from DAS_2024_1.kurushina_ksenia_lab_5.sequential import matrix_multiply_sequential
def generate_matrix(size):
return [[random.randint(0, 10) for _ in range(size)] for _ in range(size)]
def benchmark(matrix_size, num_threads):
A = generate_matrix(matrix_size)
B = generate_matrix(matrix_size)
start = time.time()
matrix_multiply_sequential(A, B)
sequential_time = time.time() - start
start = time.time()
matrix_multiply_parallel(A, B, num_threads)
parallel_time = time.time() - start
print(f"Размер матрицы: {matrix_size}x{matrix_size}")
print(f"Последовательное время: {sequential_time:.5f} сек")
print(f"Параллельное время ({num_threads} потоков): {parallel_time:.5f} сек")
if __name__ == "__main__":
for size in [100, 300, 500]:
benchmark(size, num_threads=4)

View File

@ -0,0 +1,21 @@
from concurrent.futures import ThreadPoolExecutor
def matrix_multiply_parallel(A, B, num_threads=1):
n = len(A)
result = [[0] * n for _ in range(n)]
def worker(start, end):
for i in range(start, end):
for j in range(n):
result[i][j] = sum(A[i][k] * B[k][j] for k in range(n))
chunk_size = n // num_threads
with ThreadPoolExecutor(max_workers=num_threads) as executor:
futures = [
executor.submit(worker, i * chunk_size, (i + 1) * chunk_size)
for i in range(num_threads)
]
for future in futures:
future.result()
return result

View File

@ -0,0 +1,9 @@
def matrix_multiply_sequential(A, B):
n = len(A)
result = [[0] * n for _ in range(n)]
for i in range(n):
for j in range(n):
result[i][j] = sum(A[i][k] * B[k][j] for k in range(n))
return result

View File

@ -0,0 +1,53 @@
# Лабораторная работа №6: Определение детерминанта матрицы с использованием параллельных вычислений
## **Задание**
Необходимо разработать два алгоритма для вычисления детерминанта квадратной матрицы:
1. **Обычный алгоритм** — выполняется последовательно.
2. **Параллельный алгоритм**с возможностью ручного задания количества потоков. Каждый поток отвечает за вычисление определённой группы множителей.
---
## **Описание работы программы**
Программа предназначена для вычисления детерминанта квадратной матрицы двумя способами:
- **Обычным (последовательным)** методом.
- **Параллельным**, который ускоряет выполнение за счёт многопоточности.
### **Обычный алгоритм**
1. **`minor(matrix, row, col)`**
- Вспомогательная функция для формирования минора матрицы. Удаляет указанную строку и столбец, подготавливая данные для рекурсивного вычисления.
2. **`determinant(matrix)`**
- Основная функция для вычисления детерминанта. Использует метод разложения Лапласа.
- Для матриц 2x2 результат вычисляется напрямую.
- Для матриц большего размера рекурсивно вызывает себя для вычисления детерминантов подматриц.
### **Параллельный алгоритм**
1. **`parallel_determinant(matrix, num_threads=4)`**
- Основная функция, распределяющая вычисления детерминанта между потоками.
- Количество потоков задаётся вручную.
2. **`worker(start_row, end_row)`**
- Вспомогательная функция, используемая потоками. Выполняет вычисления на заданном диапазоне строк.
- Результаты отдельных потоков объединяются для получения итогового детерминанта.
---
## **Особенности реализации**
- Вычисления для небольших матриц выполняются быстрее обычным алгоритмом.
- Параллельный подход показывает значительное ускорение при обработке больших матриц (при оптимальной настройке количества потоков).
---
## **Результаты работы**
Для каждой матрицы программа выводит:
- Значение детерминанта, рассчитанное обоими алгоритмами.
- Время выполнения для каждого из методов.
Результаты тестирования представлены в виде графиков и таблиц, сохранённых в PNG-файлах проекта.
---
## **Видео**
https://cloud.mail.ru/public/L7Wf/o3nkwpAGx

Binary file not shown.

After

Width:  |  Height:  |  Size: 34 KiB

View File

@ -0,0 +1,65 @@
import threading
#fix
import time
import random
import numpy as np
from concurrent.futures import ThreadPoolExecutor
def gaussian_determinant(matrix):
n = len(matrix)
mat = [row[:] for row in matrix]
for i in range(n):
max_row = max(range(i, n), key=lambda r: abs(mat[r][i]))
mat[i], mat[max_row] = mat[max_row], mat[i]
if mat[i][i] == 0:
return 0
for j in range(i + 1, n):
factor = mat[j][i] / mat[i][i]
for k in range(i, n):
mat[j][k] -= mat[i][k] * factor
det = 1
for i in range(n):
det *= mat[i][i]
return det
def parallel_determinant(matrix, num_threads=4):
n = len(matrix)
result = []
def worker(start_row, end_row):
partial_det = 1
for i in range(start_row, end_row):
partial_det *= matrix[i][i]
result.append(partial_det)
with ThreadPoolExecutor(max_workers=num_threads) as executor:
rows_per_thread = n // num_threads
futures = [executor.submit(worker, i * rows_per_thread, (i + 1) * rows_per_thread) for i in range(num_threads)]
for future in futures:
future.result()
return sum(result)
def generate_matrix(size):
return [[random.randint(1, 10) for _ in range(size)] for _ in range(size)]
matrix_sizes = [100, 300, 500]
num_threads = 4
for size in matrix_sizes:
print(f"\nБенчмарки для матрицы {size}x{size}:")
matrix = generate_matrix(size)
start = time.time()
det_seq = gaussian_determinant(matrix)
end = time.time()
print(f"Детерминант (последовательно, метод Гаусса): {det_seq}, время: {end - start:.5f} сек")
start = time.time()
det_par = parallel_determinant(matrix, num_threads=num_threads)
end = time.time()
print(f"Детерминант (параллельно): {det_par}, время: {end - start:.5f} сек")