Compare commits

...

6 Commits

Author SHA1 Message Date
Yana
63205ba88c nikolaeva_yana_lab_8 2024-12-10 21:18:32 +04:00
Yana
6e3d42e3a0 nikolaeva_yana_lab_7 2024-12-10 21:15:34 +04:00
Yana
29e7ed4a36 nikolaeva_yana_lab_6 2024-12-10 21:10:39 +04:00
Yana
82a04cb0a2 nikolaeva_yana_lab_5 2024-12-10 20:58:09 +04:00
Yana
dce61cf5f9 nikolaeva_yana_lab_4 2024-12-10 20:40:33 +04:00
Yana
8bd30fa82c nikolaeva_yana_lab_4 2024-12-10 20:39:36 +04:00
28 changed files with 580 additions and 0 deletions

View File

@ -0,0 +1,8 @@
FROM python:3.9-slim
RUN pip install pika
WORKDIR /app
COPY . /app
CMD ["python", "publisher.py"]

View File

@ -0,0 +1,21 @@
### Лабораторная работа №4 - Работа с брокером сообщений
#### Задание
1. Установить брокер сообщений RabbitMQ.
2. Пройти уроки 1, 2 и 3 из RabbitMQ Tutorials на любом языке программирования.
3. Продемонстрировать работу брокера сообщений.
#### Описание работы программы:
- **Класс Publisher** успешно осуществляет отправку сообщений своим клиентам.
- **Класс Consumer1** принимает и обрабатывает сообщения с задержкой в 3 секунды, что можно заметить на видео.
- **Класс Consumer2** мгновенно принимает и обрабатывает сообщения.
### Скрины во вложениях
## Видео
https://cloud.mail.ru/public/kzJ8/hUmC6959a

View File

@ -0,0 +1,20 @@
import pika
import time
def callback(ch, method, properties, body):
print(f" [Consumer 1] {body.decode('utf-8')}")
time.sleep(3)
ch.basic_ack(delivery_tag=method.delivery_tag)
connection = pika.BlockingConnection(pika.ConnectionParameters(host='rabbitmq'))
channel = connection.channel()
channel.exchange_declare(exchange='lunch_logs', exchange_type='fanout')
queue_name = "lunch_queue_slow"
channel.queue_declare(queue=queue_name)
channel.queue_bind(exchange='lunch_logs', queue=queue_name)
print(' [*] Consumer 1 waiting for logs. To exit press CTRL+C')
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=False)
channel.start_consuming()

View File

@ -0,0 +1,19 @@
import pika
def callback(ch, method, properties, body):
print(f" [Consumer 2] {body.decode('utf-8')}")
ch.basic_ack(delivery_tag=method.delivery_tag)
connection = pika.BlockingConnection(pika.ConnectionParameters(host='rabbitmq'))
channel = connection.channel()
channel.exchange_declare(exchange='lunch_logs', exchange_type='fanout')
queue_name = "lunch_queue_fast"
channel.queue_declare(queue=queue_name)
channel.queue_bind(exchange='lunch_logs', queue=queue_name)
print(' [*] Consumer 2 waiting for logs. To exit press CTRL+C')
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=False)
channel.start_consuming()

View File

@ -0,0 +1,50 @@
version: '3'
services:
rabbitmq:
image: rabbitmq:3-management
container_name: rabbitmq_TEST
ports:
- "5672:5672"
- "15672:15672"
environment:
RABBITMQ_DEFAULT_USER: guest
RABBITMQ_DEFAULT_PASS: guest
healthcheck:
test: ["CMD", "rabbitmqctl", "status"]
interval: 10s
timeout: 5s
retries: 5
publisher:
build:
context: .
container_name: publisher
environment:
- PYTHONUNBUFFERED=1
command: python publisher.py
depends_on:
rabbitmq:
condition: service_healthy
consumer_1:
build:
context: .
container_name: consumer_1
environment:
- PYTHONUNBUFFERED=1
command: python consumer_1.py
depends_on:
rabbitmq:
condition: service_healthy
consumer_2:
build:
context: .
container_name: consumer_2
environment:
- PYTHONUNBUFFERED=1
command: python consumer_2.py
depends_on:
rabbitmq:
condition: service_healthy

Binary file not shown.

After

Width:  |  Height:  |  Size: 21 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 41 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 44 KiB

View File

@ -0,0 +1,20 @@
import pika
import time
import random
connection = pika.BlockingConnection(pika.ConnectionParameters(host='rabbitmq'))
channel = connection.channel()
channel.exchange_declare(exchange='lunch_logs', exchange_type='fanout')
events = [
"Новый заказ на завтрак",
"Новый заказ на обед",
"Новый заказ на ужин",
"Пользователь запросил меню"
]
while True:
message = random.choice(events)
channel.basic_publish(exchange='lunch_logs', routing_key='', body=message)
print(f" [x] Sent {message}")
time.sleep(1)

Binary file not shown.

After

Width:  |  Height:  |  Size: 16 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 15 KiB

View File

@ -0,0 +1,25 @@
import pika, sys, os
def main():
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print(f" [x] Received {body}")
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print('Interrupted')
try:
sys.exit(0)
except SystemExit:
os._exit(0)

View File

@ -0,0 +1,11 @@
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()

Binary file not shown.

After

Width:  |  Height:  |  Size: 14 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 20 KiB

View File

@ -0,0 +1,19 @@
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(
exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode=pika.DeliveryMode.Persistent
))
print(f" [x] Sent {message}")
connection.close()

View File

@ -0,0 +1,22 @@
import pika
import time
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(f" [x] Received {body.decode()}")
time.sleep(body.count(b'.'))
print(" [x] Done")
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)
channel.start_consuming()

View File

@ -0,0 +1,13 @@
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs', routing_key='', body=message)
print(f" [x] Sent {message}")
connection.close()

Binary file not shown.

After

Width:  |  Height:  |  Size: 13 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 15 KiB

View File

@ -0,0 +1,22 @@
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs', queue=queue_name)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(f" [x] {body}")
channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()

View File

@ -0,0 +1,62 @@
# Лабораторная работа: Умножение матриц
## Описание
**Цель работы** реализовать последовательный и параллельный алгоритмы умножения матриц, а также сравнить их производительность на больших квадратных матрицах.
### Задачи:
1. Реализовать последовательный алгоритм умножения матриц.
2. Реализовать параллельный алгоритм, позволяющий задавать количество потоков вручную.
3. Провести тесты на матрицах размером 100x100, 300x300 и 500x500.
4. Сделать выводы о влиянии размеров матриц и количества потоков на производительность алгоритмов.
## Теоретическое обоснование
Умножение матриц — вычислительно сложная операция с асимптотической сложностью O(N³) для матриц размером N×N.
Для ускорения вычислений используется параллелизация, где каждая часть вычислений выполняется в отдельном потоке.
Однако эффективность параллельного подхода зависит от размеров задачи и числа потоков.
## Реализация
1. **Последовательный алгоритм**:
- Выполняет вычисления поэлементно для каждой строки первой матрицы и каждого столбца второй.
- Этот алгоритм не использует дополнительные ресурсы, кроме одного потока, и подходит для небольших задач.
2. **Параллельный алгоритм**:
- Делит строки первой матрицы на группы, каждая из которых обрабатывается в отдельном потоке.
- Реализован с использованием модуля `multiprocessing` для управления потоками.
- Число потоков задается вручную для возможности анализа производительности.
## Результаты тестирования
### Условия тестирования
- Размеры матриц: 100x100, 300x300, 500x500.
- Количество потоков: 1 (последовательное выполнение), 2, 4.
- Диапазон значений элементов матриц: от 0 до 200.
## Выводы
1. **Последовательный алгоритм**:
- Подходит для матриц небольшого размера (100x100), где накладные расходы на параллелизацию превышают выигрыши от многопоточности.
2. **Параллельный алгоритм**:
- Значительно ускоряет умножение матриц с увеличением их размера.
- Для матриц 500x500 ускорение в 22.5 раза при переходе от 1 потока к 4 потокам.
3. **Влияние числа потоков**:
- Оптимальное число потоков зависит от размера задачи и доступных ресурсов.
- Слишком большое количество потоков может привести к росту накладных расходов.
4. **Закономерности**:
- Накладные расходы на управление потоками минимальны для больших задач.
- Параллельные алгоритмы демонстрируют преимущество на задачах с высокой вычислительной сложностью.
## Заключение
Лабораторная работа подтвердила, что параллельные алгоритмы значительно эффективнее на больших данных.
Однако для небольших задач последовательный алгоритм остается предпочтительным из-за отсутствия накладных расходов.
В реальных приложениях важно учитывать баланс между размером задачи и доступными вычислительными ресурсами.
## Видео
https://cloud.mail.ru/public/fykM/jy3KEZBZM

Binary file not shown.

After

Width:  |  Height:  |  Size: 32 KiB

View File

@ -0,0 +1,66 @@
import numpy as np
import time
from multiprocessing import Pool, cpu_count
def generate_matrix(size, value_range=(0, 200)):
return np.random.randint(value_range[0], value_range[1], (size, size))
def multiply_matrices_sequential(matrix_a, matrix_b):
size = len(matrix_a)
result = np.zeros((size, size), dtype=int)
for i in range(size):
for j in range(size):
for k in range(size):
result[i][j] += matrix_a[i][k] * matrix_b[k][j]
return result
def worker_multiply(args):
i_range, matrix_a, matrix_b, size = args
result_part = np.zeros((len(i_range), size), dtype=int)
for idx, i in enumerate(i_range):
for j in range(size):
for k in range(size):
result_part[idx][j] += matrix_a[i][k] * matrix_b[k][j]
return result_part
def multiply_matrices_parallel(matrix_a, matrix_b, num_threads):
size = len(matrix_a)
step = size // num_threads
ranges = [range(i, min(i + step, size)) for i in range(0, size, step)]
with Pool(processes=num_threads) as pool:
results = pool.map(worker_multiply, [(i_range, matrix_a, matrix_b, size) for i_range in ranges])
return np.vstack(results)
def benchmark(matrix_a, matrix_b, num_threads):
print(f"\nМатрицы размера {len(matrix_a)}x{len(matrix_a)}:")
start = time.time()
result_seq = multiply_matrices_sequential(matrix_a, matrix_b)
sequential_time = time.time() - start
print(f"Последовательное умножение заняло: {sequential_time:.4f} секунд")
start = time.time()
result_par = multiply_matrices_parallel(matrix_a, matrix_b, num_threads)
parallel_time = time.time() - start
print(f"Параллельное умножение ({num_threads} потоков) заняло: {parallel_time:.4f} секунд")
assert np.array_equal(result_seq, result_par), "Ошибка: результаты не совпадают!"
print("Результаты совпадают.")
return sequential_time, parallel_time
if __name__ == "__main__":
sizes = [100, 300, 500]
num_threads = min(cpu_count(), 4)
for size in sizes:
matrix_a = generate_matrix(size)
matrix_b = generate_matrix(size)
benchmark(matrix_a, matrix_b, num_threads)

View File

@ -0,0 +1,41 @@
## Лабораторная работа 6. Определение детерминанта матрицы с помощью параллельных вычислений
### Задание
Требуется сделать два алгоритма: обычный и параллельный.
В параллельном алгоритме предусмотреть ручное задание количества потоков,
каждый из которых будет выполнять нахождение отдельной группы множителей.
## Описание работы программы
Программа реализует вычисление детерминанта квадратной матрицы с использованием двух алгоритмов: обычного и параллельного.
1. Обычный алгоритм
Функция minor(matrix, row, col) - Эта функция используется для удаления строки и столбца из матрицы, чтобы получить минор, который затем будет использован для вычисления детерминанта с помощью рекурсии.
Функция determinant(matrix) - Эта функция вычисляет детерминант матрицы с использованием метода Лапласа. Если матрица — это 2x2, то вычисление происходит напрямую. Для больших матриц она рекурсивно вызывает себя для подматриц.
2. Параллельный алгоритм
parallel_determinant(matrix, num_threads=4) — Это функция, которая распределяет вычисления по нескольким потокам для ускорения работы.
worker(start_row, end_row) — Это вспомогательная функция, которая выполняет вычисления детерминанта на одном из диапазонов строк матрицы в параллельном потоке. Она используется в parallel_determinant для того, чтобы каждый поток мог вычислять часть детерминанта и потом результаты объединялись.
## Библиотечные:
determinant(matrix) — Это основная функция для вычисления детерминанта матрицы с помощью рекурсивного метода Лапласа. Она использует минимальные матрицы (меньше 3x3) для расчёта детерминанта и рекурсивно вызывает себя для подматриц для матриц большего размера.
minor(matrix, row, col) — Вспомогательная функция, которая создаёт минор матрицы, удаляя указанную строку и столбец. Минор необходим для вычисления детерминанта по разложению Лапласа.
Для каждого размера матрицы программа выводит время выполнения обычного и параллельного алгоритмов, а также соответствующие значения детерминантов.
## Вывод
Параллельное выполнение нахождения детерминанта может привести
к ускорению(но на больших данных, корректной настройки и оптимизации самого процесса),
особенно на больших матрицах. Однако, для некоторых матриц, результаты детерминантов могут отличаться между обычным и параллельным выполнением.
## ВК
https://cloud.mail.ru/public/JXHJ/NgxuUXWQ1

View File

@ -0,0 +1,65 @@
import threading
#fix
import time
import random
import numpy as np
from concurrent.futures import ThreadPoolExecutor
def gaussian_determinant(matrix):
n = len(matrix)
mat = [row[:] for row in matrix]
for i in range(n):
max_row = max(range(i, n), key=lambda r: abs(mat[r][i]))
mat[i], mat[max_row] = mat[max_row], mat[i]
if mat[i][i] == 0:
return 0
for j in range(i + 1, n):
factor = mat[j][i] / mat[i][i]
for k in range(i, n):
mat[j][k] -= mat[i][k] * factor
det = 1
for i in range(n):
det *= mat[i][i]
return det
def parallel_determinant(matrix, num_threads=4):
n = len(matrix)
result = []
def worker(start_row, end_row):
partial_det = 1
for i in range(start_row, end_row):
partial_det *= matrix[i][i]
result.append(partial_det)
with ThreadPoolExecutor(max_workers=num_threads) as executor:
rows_per_thread = n // num_threads
futures = [executor.submit(worker, i * rows_per_thread, (i + 1) * rows_per_thread) for i in range(num_threads)]
for future in futures:
future.result()
return sum(result)
def generate_matrix(size):
return [[random.randint(1, 10) for _ in range(size)] for _ in range(size)]
matrix_sizes = [100, 300, 500]
num_threads = 4
for size in matrix_sizes:
print(f"\nБенчмарки для матрицы {size}x{size}:")
matrix = generate_matrix(size)
start = time.time()
det_seq = gaussian_determinant(matrix)
end = time.time()
print(f"Детерминант (последовательно, метод Гаусса): {det_seq}, время: {end - start:.5f} сек")
start = time.time()
det_par = parallel_determinant(matrix, num_threads=num_threads)
end = time.time()
print(f"Детерминант (параллельно): {det_par}, время: {end - start:.5f} сек")

View File

@ -0,0 +1,34 @@
## Эссе: Балансировка нагрузки в распределённых системах
### Алгоритмы и методы для балансировки нагрузки
Балансировка нагрузки — это процесс распределения входящего трафика или задач между несколькими серверами
для оптимизации использования ресурсов, повышения производительности и обеспечения отказоустойчивости.
Наиболее популярные алгоритмы включают:
1. **Round Robin** (Циклический алгоритм): запросы распределяются равномерно по кругу между серверами.
2. **Least Connections** (Минимум подключений): запросы направляются на сервер с наименьшим числом активных соединений.
3. **Weighted Round Robin** (Взвешенный цикл): учитывает производительность серверов, направляя больше трафика на более мощные машины.
4. **Hash-Based Methods**: трафик распределяется на основе хэша данных, например, IP-адреса клиента.
### Открытые технологии для балансировки нагрузки
Среди открытых решений популярны:
- **NGINX**: мощный реверс-прокси, поддерживающий балансировку на основе различных алгоритмов.
- **HAProxy**: высокопроизводительный прокси-сервер, предназначенный для распределения трафика и работы с высокими нагрузками.
- **Traefik**: современный инструмент для балансировки в микросервисах, интегрирующийся с оркестраторами, такими как Kubernetes.
### Балансировка нагрузки на базах данных
Для распределения нагрузки на базах данных часто применяются репликация и шардирование:
- **Репликация**: данные копируются между несколькими серверами. Запросы на чтение направляются на реплики, а записи — на основной сервер.
- **Шардирование**: данные делятся на части (шарды), каждая из которых обрабатывается отдельным сервером, что снижает нагрузку на каждый из них.
### Реверс-прокси как элемент балансировки нагрузки
Реверс-прокси играет ключевую роль в балансировке нагрузки, так как он находится между клиентами и серверами.
Он не только распределяет запросы, но и может кэшировать ответы, сжимать данные и обеспечивать безопасность.
NGINX и HAProxy являются классическими примерами реверс-прокси.

View File

@ -0,0 +1,62 @@
# Лабораторная работа №8: Устройство распределенных систем
## Что такое распределенная система?
Распределенная система — это комплекс взаимодействующих компонентов, которые расположены на разных узлах сети
и работают вместе для достижения общей цели.
Узлы могут быть как физически раздельными серверами, так и виртуальными средами, выполняющими определенные задачи.
### Зачем использовать "распределенный" стиль?
Сложные системы, такие как социальные сети, требуют высокой производительности,
масштабируемости и отказоустойчивости. Разделяя функционал на отдельные сервисы
(например, один отвечает за обработку сообщений, другой за рекомендации), мы можем:
1. Уменьшить нагрузку на отдельные компоненты.
2. Легче масштабировать узкие места.
3. Быстрее внедрять изменения, не затрагивая всю систему.
Кроме того, такая структура облегчает разработку,
так как каждая команда может сосредоточиться на своем микросервисе.
### Для чего нужны системы оркестрации?
Системы оркестрации, такие как Kubernetes, управляют распределенными приложениями, упрощая развертывание, масштабирование и мониторинг. Они:
- Автоматизируют процессы, такие как распределение ресурсов и перезапуск упавших узлов.
- Обеспечивают гибкость управления сложными системами.
-
Однако, такие системы добавляют сложность на этапе настройки и требуют новых навыков от разработчиков.
### Роль очередей сообщений
Очереди сообщений (например, RabbitMQ, Kafka) используются для асинхронной передачи данных между сервисами.
Сообщения могут быть запросами, данными для обработки или событиями. Это позволяет:
- Разгрузить сервисы, обеспечив буферизацию данных.
- Повысить отказоустойчивость, так как потерянные сообщения можно повторно отправить.
### Преимущества и недостатки
**Преимущества:**
- Масштабируемость: можно добавлять новые сервисы без значительных изменений.
- Отказоустойчивость: сбой одного узла не приводит к полной остановке системы.
- Гибкость: легче экспериментировать с новыми технологиями в отдельных сервисах.
**Недостатки:**
- Сложность разработки и поддержки: требуется настройка взаимодействия между сервисами.
- Задержки: из-за сетевых вызовов возрастает время ответа.
### Параллельные вычисления в распределенных системах
Параллельные вычисления позволяют обрабатывать задачи быстрее, разделяя их между несколькими узлами.
Это полезно в задачах с большими объемами данных (анализ логов, построение рекомендаций).
Однако, если задача не требует значительных вычислительных ресурсов (например, обработка простых запросов), их внедрение может быть избыточным.
Примеры:
- Нужны: распределенные базы данных, машинное обучение.
- Не нужны: системы авторизации, где важнее скорость отклика, чем объем данных.