Compare commits

..

2 Commits

143 changed files with 271 additions and 5606 deletions

16
.gitignore vendored Normal file
View File

@ -0,0 +1,16 @@
################################################################################
# Данный GITIGNORE-файл был автоматически создан Microsoft(R) Visual Studio.
################################################################################
/.vs
/aleikin_artem_lab_3
/aleikin_artem_lab_4
/aleikin_artem_lab_5/MultiplyMatrix
/aleikin_artem_lab_6/DerminantMatrix/.vs/DerminantMatrix
/aleikin_artem_lab_6/DerminantMatrix/bin/Debug/net8.0
/aleikin_artem_lab_6/DerminantMatrix/obj
/dozorova_alena_lab_2
/dozorova_alena_lab_3
/dozorova_alena_lab_4
/dozorova_alena_lab_5/ConsoleApp1/obj
/dozorova_alena_lab_6/ConsoleApp1/obj

View File

@ -1,9 +1,9 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 17
VisualStudioVersion = 17.9.34728.123
VisualStudioVersion = 17.11.35312.102
MinimumVisualStudioVersion = 10.0.40219.1
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "CalcMatrixDeterminant", "CalcMatrixDeterminant\CalcMatrixDeterminant.csproj", "{5F8B8E19-17FF-4A0A-AE65-FE857168ED9D}"
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "DerminantMatrix", "DerminantMatrix.csproj", "{40E2A332-5A1C-46A7-AFA9-833E4922AD08}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
@ -11,15 +11,15 @@ Global
Release|Any CPU = Release|Any CPU
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{5F8B8E19-17FF-4A0A-AE65-FE857168ED9D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{5F8B8E19-17FF-4A0A-AE65-FE857168ED9D}.Debug|Any CPU.Build.0 = Debug|Any CPU
{5F8B8E19-17FF-4A0A-AE65-FE857168ED9D}.Release|Any CPU.ActiveCfg = Release|Any CPU
{5F8B8E19-17FF-4A0A-AE65-FE857168ED9D}.Release|Any CPU.Build.0 = Release|Any CPU
{40E2A332-5A1C-46A7-AFA9-833E4922AD08}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{40E2A332-5A1C-46A7-AFA9-833E4922AD08}.Debug|Any CPU.Build.0 = Debug|Any CPU
{40E2A332-5A1C-46A7-AFA9-833E4922AD08}.Release|Any CPU.ActiveCfg = Release|Any CPU
{40E2A332-5A1C-46A7-AFA9-833E4922AD08}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {3540E666-E1FE-42C7-B319-5653E19EDF25}
SolutionGuid = {DF40653A-B248-4AC9-8203-1F8E8EA9423F}
EndGlobalSection
EndGlobal

View File

@ -0,0 +1,118 @@
using System;
using System.Diagnostics;
using System.Threading.Tasks;
class Program
{
static void Main(string[] args)
{
int[] sizes = { 10, 11, 12 };
int maxThreads = 20;
foreach (var size in sizes)
{
Console.WriteLine($"\nРазмер матрицы: {size}x{size}");
// Генерация матрицы
var matrix = GenerateMatrix(size);
// Последовательное нахождение детерминанта
var stopwatch = Stopwatch.StartNew();
var detSequential = DeterminantSequential(matrix);
stopwatch.Stop();
long timeSequential = stopwatch.ElapsedMilliseconds;
Console.WriteLine($"Последовательное вычисление: {timeSequential} мс, детерминант = {detSequential}");
// Параллельное нахождение детерминанта
Console.WriteLine("Параллельное вычисление (время выполнения для каждого количества потоков):");
for (int threads = 1; threads <= maxThreads; threads++)
{
stopwatch.Restart();
var detParallel = DeterminantParallel(matrix, threads);
stopwatch.Stop();
long timeParallel = stopwatch.ElapsedMilliseconds;
Console.WriteLine($"Потоков: {threads} — {timeParallel} мс");
}
}
}
// Генерация квадратной матрицы размером size x size
static double[,] GenerateMatrix(int size)
{
var random = new Random();
var matrix = new double[size, size];
for (int i = 0; i < size; i++)
for (int j = 0; j < size; j++)
matrix[i, j] = random.Next(-10, 10);
return matrix;
}
// Последовательное вычисление детерминанта
static double DeterminantSequential(double[,] matrix)
{
int size = matrix.GetLength(0);
if (size == 1)
return matrix[0, 0];
if (size == 2)
return matrix[0, 0] * matrix[1, 1] - matrix[0, 1] * matrix[1, 0];
double determinant = 0;
for (int col = 0; col < size; col++)
{
determinant += Math.Pow(-1, col) * matrix[0, col] * DeterminantSequential(Minor(matrix, 0, col));
}
return determinant;
}
// Параллельное вычисление детерминанта
static double DeterminantParallel(double[,] matrix, int threadCount)
{
int size = matrix.GetLength(0);
if (size == 1)
return matrix[0, 0];
if (size == 2)
return matrix[0, 0] * matrix[1, 1] - matrix[0, 1] * matrix[1, 0];
double determinant = 0;
object lockObject = new object();
Parallel.For(0, size, new ParallelOptions { MaxDegreeOfParallelism = threadCount }, col =>
{
double minorDeterminant = DeterminantSequential(Minor(matrix, 0, col));
double term = Math.Pow(-1, col) * matrix[0, col] * minorDeterminant;
lock (lockObject)
{
determinant += term;
}
});
return determinant;
}
// Создание минора для матрицы
static double[,] Minor(double[,] matrix, int row, int col)
{
int size = matrix.GetLength(0);
var minor = new double[size - 1, size - 1];
for (int i = 0, minorRow = 0; i < size; i++)
{
if (i == row) continue;
for (int j = 0, minorCol = 0; j < size; j++)
{
if (j == col) continue;
minor[minorRow, minorCol] = matrix[i, j];
minorCol++;
}
minorRow++;
}
return minor;
}
}

Binary file not shown.

After

Width:  |  Height:  |  Size: 104 KiB

View File

@ -0,0 +1,95 @@
# Лабораторная работа 6 - Параллельный поиск значения детерминанта матрицы
## ПИбд-42 || Алейкин Артем
### Описание
В данной лабораторной работе мы занимались написанием многопоточного вычислителя детерминанта больших матриц.
### Объяснения
Последовательный алгоритм:
Нахождение детерминанта реализовано рекурсивно через формулу разложения по строке матрицы.
Метод DeterminantSequential вычисляет детерминант через разложение по первой строке и вызов минора.
```
static double DeterminantSequential(double[,] matrix)
{
int size = matrix.GetLength(0);
if (size == 1)
return matrix[0, 0];
if (size == 2)
return matrix[0, 0] * matrix[1, 1] - matrix[0, 1] * matrix[1, 0];
double determinant = 0;
for (int col = 0; col < size; col++)
{
determinant += Math.Pow(-1, col) * matrix[0, col] * DeterminantSequential(Minor(matrix, 0, col));
}
return determinant;
}
```
Параллельный алгоритм:
Каждую итерацию разложения по строке матрицы можно выполнять в отдельном потоке.
DeterminantParallel использует Parallel.For для запуска потоков.
```
static double DeterminantParallel(double[,] matrix, int threadCount)
{
int size = matrix.GetLength(0);
if (size == 1)
return matrix[0, 0];
if (size == 2)
return matrix[0, 0] * matrix[1, 1] - matrix[0, 1] * matrix[1, 0];
double determinant = 0;
object lockObject = new object();
Parallel.For(0, size, new ParallelOptions { MaxDegreeOfParallelism = threadCount }, col =>
{
double minorDeterminant = DeterminantSequential(Minor(matrix, 0, col));
double term = Math.Pow(-1, col) * matrix[0, col] * minorDeterminant;
lock (lockObject)
{
determinant += term;
}
});
return determinant;
}
```
Миноры:
Метод Minor создает подматрицу, исключая заданные строку и столбец.
```
static double[,] Minor(double[,] matrix, int row, int col)
{
int size = matrix.GetLength(0);
var minor = new double[size - 1, size - 1];
for (int i = 0, minorRow = 0; i < size; i++)
{
if (i == row) continue;
for (int j = 0, minorCol = 0; j < size; j++)
{
if (j == col) continue;
minor[minorRow, minorCol] = matrix[i, j];
minorCol++;
}
minorRow++;
}
return minor;
}
```
В результате мы получаем следующие значения:
![Вычисление детерминанта матриц](./Images/Отчет1.png)
Результаты аналогичны с предыдущей лабораторной работой, многопоточный подход позволяет кратно выигрывать время, но и ресурсы на организацию работы всех потоков тоже существенны.
Видео демонстрации работы: https://vk.com/video248424990_456239613?list=ln-tyTv9vKdAOzQyPm5Y3

View File

@ -1,6 +1,11 @@
# Распределенные вычисления и приложения Л3
# Распределенные вычисления и приложения Л2
## _Автор Базунов Андрей Игревич ПИбд-42_
Сервисы ( _порядок исполнения сервисов соблюден_ ):
- 1.FileCreator - (_Создание тестовых данных_)
- 2.FirstService - (_Выполнение 1.4 варианта задания_)
- 3.SecondService - (_Выполнение 2.2 варианта задания_)
В качестве основного языка был выбран GoLang. Для каждого сервиса был создан DOCKERFILE где были прописаны условия и действия для сборки каждого из модулей
# Docker
@ -22,4 +27,4 @@ docker-compose up -d --build
docker-compose down
```
[Демонстрация работы](https://vk.com/video/@viltskaa?z=video236673313_456239577%2Fpl_236673313_-2)
[Демонстрация работы](https://vk.com/video236673313_456239575)

Binary file not shown.

View File

@ -1,4 +0,0 @@
PORT=8080
TASK_APP_URL=http://task-app:8000
TIMEOUT=15
DATABASE=./database.db

View File

@ -1,14 +0,0 @@
FROM golang:1.23
WORKDIR /app
COPY go.mod go.sum ./
RUN go mod download
COPY . .
RUN go build -o /bin/PersonApp
EXPOSE 8080
CMD ["/bin/PersonApp"]

View File

@ -1,10 +0,0 @@
module PersonApp
go 1.23.1
require (
github.com/gorilla/mux v1.8.1
github.com/mattn/go-sqlite3 v1.14.24
)
require github.com/joho/godotenv v1.5.1 // indirect

View File

@ -1,6 +0,0 @@
github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY=
github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ=
github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
github.com/mattn/go-sqlite3 v1.14.24 h1:tpSp2G2KyMnnQu99ngJ47EIkWVmliIizyZBfPrBWDRM=
github.com/mattn/go-sqlite3 v1.14.24/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=

View File

@ -1,157 +0,0 @@
package handlers
import (
"PersonApp/httpClient"
"PersonApp/models"
"PersonApp/repository"
"encoding/json"
"fmt"
"github.com/gorilla/mux"
"net/http"
"strconv"
)
func InitRoutes(r *mux.Router, rep repository.PersonRepository, cln httpClient.Client) {
r.HandleFunc("/", GetPersons(rep, cln)).Methods("GET")
r.HandleFunc("/{id:[0-9]+}", GetPersonById(rep, cln)).Methods("GET")
r.HandleFunc("/", CreatePerson(rep)).Methods("POST")
r.HandleFunc("/{id:[0-9]+}", UpdatePerson(rep)).Methods("PUT")
r.HandleFunc("/{id:[0-9]+}", DeletePerson(rep)).Methods("DELETE")
}
func GetPersons(rep repository.PersonRepository, cln httpClient.Client) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
fmt.Println("GET PERSONS")
persons, err := rep.GetAllPersons()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
for i := 0; i < len(persons); i++ {
tasks, _ := cln.GetPersonTasks(persons[i].Id)
persons[i].Tasks = tasks
}
err = json.NewEncoder(w).Encode(persons)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
}
func GetPersonById(rep repository.PersonRepository, cln httpClient.Client) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
id, err := strconv.Atoi(mux.Vars(r)["id"])
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
person, err := rep.GetPersonById(id)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
tasks, err := cln.GetPersonTasks(id)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
} else {
person.Tasks = tasks
}
err = json.NewEncoder(w).Encode(person)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
}
func CreatePerson(rep repository.PersonRepository) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
var person *models.Person
err := json.NewDecoder(r.Body).Decode(&person)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
person, err = rep.CreatePerson(*person)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusCreated)
err = json.NewEncoder(w).Encode(person)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
}
func UpdatePerson(rep repository.PersonRepository) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
id, err := strconv.Atoi(mux.Vars(r)["id"])
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
var person *models.Person
err = json.NewDecoder(r.Body).Decode(&person)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
person, err = rep.UpdatePerson(models.Person{
Id: id,
Name: person.Name,
Tasks: nil,
})
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusAccepted)
err = json.NewEncoder(w).Encode(person)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
}
}
func DeletePerson(rep repository.PersonRepository) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
id, err := strconv.Atoi(mux.Vars(r)["id"])
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
err = rep.DeletePerson(id)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
}
}

View File

@ -1,72 +0,0 @@
package httpClient
import (
"PersonApp/models"
"encoding/json"
"fmt"
"io"
"net/http"
"time"
)
type Client interface {
GetPersonTasks(id int) ([]models.Task, error)
TestConnection() (bool, error)
}
type client struct {
BaseUrl string
Timeout time.Duration
}
func (c *client) TestConnection() (bool, error) {
client := &http.Client{Timeout: c.Timeout}
url := fmt.Sprintf("%s/", c.BaseUrl)
resp, err := client.Get(url)
if err != nil {
return false, err
}
defer func(Body io.ReadCloser) {
err := Body.Close()
if err != nil {
return
}
}(resp.Body)
if resp.StatusCode != http.StatusOK {
return false, fmt.Errorf("bad status code: %d", resp.StatusCode)
}
return true, nil
}
func (c *client) GetPersonTasks(id int) ([]models.Task, error) {
client := &http.Client{Timeout: c.Timeout * time.Second}
url := fmt.Sprintf("%s/f/%d", c.BaseUrl, id)
resp, err := client.Get(url)
if err != nil {
return nil, err
}
defer func(Body io.ReadCloser) {
err := Body.Close()
if err != nil {
}
}(resp.Body)
body, _ := io.ReadAll(resp.Body)
var tasks []models.Task
if err := json.Unmarshal(body, &tasks); err != nil {
fmt.Printf("Unmarshal error: %s", err)
return []models.Task{}, err
}
return tasks, nil
}
func NewClient(baseUrl string, timeout time.Duration) Client {
return &client{BaseUrl: baseUrl, Timeout: timeout}
}

View File

@ -1,34 +0,0 @@
GET http://localhost/person-app/
Accept: application/json
###
GET http://localhost/person-app/1
Accept: application/json
###
POST http://localhost/person-app/
Accept: application/json
Content-Type: application/json
{
"name": "TEST3"
}
###
PUT http://localhost/person-app/3
Accept: application/json
Content-Type: application/json
{
"name": "TEST11"
}
###
DELETE http://localhost/person-app/3
Accept: application/json
###

View File

@ -1,47 +0,0 @@
package main
import (
"PersonApp/handlers"
"PersonApp/httpClient"
"PersonApp/repository"
"PersonApp/storage"
"github.com/gorilla/mux"
"github.com/joho/godotenv"
"net/http"
"os"
"strconv"
"time"
)
func main() {
err := godotenv.Load(".env")
if err != nil {
panic("Error loading .env file")
}
url := os.Getenv("TASK_APP_URL")
port := os.Getenv("PORT")
databasePath := os.Getenv("DATABASE")
timeout, err := strconv.Atoi(os.Getenv("TIMEOUT"))
if err != nil {
panic("Error converting timeout to int")
}
database, err := storage.Init(databasePath)
if err != nil {
panic(err)
}
cln := httpClient.NewClient(url, time.Duration(timeout))
rep := repository.NewPersonRepository(database)
router := mux.NewRouter()
handlers.InitRoutes(router, rep, cln)
err = http.ListenAndServe(":"+port, router)
if err != nil {
panic(err)
}
storage.Close(database)
}

View File

@ -1,24 +0,0 @@
package models
type Person struct {
Id int `json:"id"`
Name string `json:"name"`
Tasks []Task `json:"tasks"`
}
type PersonCreate struct {
Name string `json:"name"`
}
type Task struct {
Id int `json:"id"`
PersonId int `json:"person_id"`
Name string `json:"name"`
Date string `json:"date"`
}
type TaskCreate struct {
PersonId int `json:"person_id"`
Name string `json:"name"`
Date string `json:"date"`
}

View File

@ -1,99 +0,0 @@
package repository
import (
"PersonApp/models"
"database/sql"
)
type PersonRepository interface {
GetAllPersons() ([]models.Person, error)
GetPersonById(id int) (*models.Person, error)
CreatePerson(person models.Person) (*models.Person, error)
UpdatePerson(person models.Person) (*models.Person, error)
DeletePerson(id int) error
}
type personRepository struct {
DB *sql.DB
}
func NewPersonRepository(db *sql.DB) PersonRepository {
return &personRepository{DB: db}
}
func (pr *personRepository) GetAllPersons() ([]models.Person, error) {
rows, err := pr.DB.Query("select * from Persons")
if err != nil {
return nil, err
}
defer func(rows *sql.Rows) {
err := rows.Close()
if err != nil {
panic(err)
}
}(rows)
var persons []models.Person
for rows.Next() {
p := models.Person{}
err := rows.Scan(&p.Id, &p.Name)
if err != nil {
panic(err)
}
persons = append(persons, p)
}
return persons, err
}
func (pr *personRepository) GetPersonById(id int) (*models.Person, error) {
row := pr.DB.QueryRow("select * from Persons where id=?", id)
person := models.Person{}
err := row.Scan(&person.Id, &person.Name)
if err != nil {
return nil, err
}
return &person, err
}
func (pr *personRepository) CreatePerson(p models.Person) (*models.Person, error) {
res, err := pr.DB.Exec("INSERT INTO Persons (name) values (?)", p.Name)
if err != nil {
return nil, err
}
if res == nil {
return nil, nil
}
return &p, err
}
func (pr *personRepository) UpdatePerson(p models.Person) (*models.Person, error) {
res, err := pr.DB.Exec("UPDATE Persons SET name = ? WHERE id = ?", p.Name, p.Id)
if err != nil {
return nil, err
}
if res == nil {
return nil, nil
}
return &p, err
}
func (pr *personRepository) DeletePerson(id int) error {
_, err := pr.DB.Exec("DELETE FROM Persons WHERE id = ?", id)
if err != nil {
return err
}
return nil
}

View File

@ -1,36 +0,0 @@
package storage
import (
"database/sql"
_ "github.com/mattn/go-sqlite3"
)
func Init(databasePath string) (*sql.DB, error) {
db, err := sql.Open("sqlite3", databasePath)
if err != nil || db == nil {
return nil, err
}
if err := createTableIfNotExists(db); err != nil {
return nil, err
}
return db, nil
}
func Close(db *sql.DB) {
err := db.Close()
if err != nil {
return
}
}
func createTableIfNotExists(db *sql.DB) error {
if result, err := db.Exec(
"CREATE TABLE IF NOT EXISTS `Persons`(Id integer primary key autoincrement, Name text not null);",
); err != nil || result == nil {
return err
}
return nil
}

View File

@ -1,4 +0,0 @@
PORT=8000
PERSON_APP_URL=http://person-app:8080
TIMEOUT=15
DATABASE=./database.db

View File

@ -1,14 +0,0 @@
FROM golang:1.23
WORKDIR /app
COPY go.mod go.sum ./
RUN go mod download
COPY . .
RUN go build -o /bin/TaskApp
EXPOSE 8000
CMD ["/bin/TaskApp"]

View File

@ -1,10 +0,0 @@
module TaskApp
go 1.23.1
require (
github.com/gorilla/mux v1.8.1
github.com/mattn/go-sqlite3 v1.14.24
)
require github.com/joho/godotenv v1.5.1

View File

@ -1,6 +0,0 @@
github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY=
github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ=
github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
github.com/mattn/go-sqlite3 v1.14.24 h1:tpSp2G2KyMnnQu99ngJ47EIkWVmliIizyZBfPrBWDRM=
github.com/mattn/go-sqlite3 v1.14.24/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=

View File

@ -1,177 +0,0 @@
package handlers
import (
"TaskApp/httpClient"
"TaskApp/models"
"TaskApp/repository"
"encoding/json"
"fmt"
"github.com/gorilla/mux"
"net/http"
"strconv"
)
func InitRoutes(r *mux.Router, rep repository.TaskRepository, cln httpClient.Client) {
r.HandleFunc("/", GetTasks(rep)).Methods("GET")
r.HandleFunc("/{id:[0-9]+}", GetTaskById(rep)).Methods("GET")
r.HandleFunc("/", CreateTask(rep, cln)).Methods("POST")
r.HandleFunc("/{id:[0-9]+}", UpdateTask(rep)).Methods("PUT")
r.HandleFunc("/{id:[0-9]+}", DeleteTask(rep)).Methods("DELETE")
r.HandleFunc("/f/{id:[0-9]+}", GetPersonTasks(rep)).Methods("GET")
}
func GetTasks(rep repository.TaskRepository) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
tasks, err := rep.GetAllTasks()
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
err = json.NewEncoder(w).Encode(tasks)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
}
}
}
func GetTaskById(rep repository.TaskRepository) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
id, err := strconv.Atoi(mux.Vars(r)["id"])
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
person, err := rep.GetTaskById(id)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
err = json.NewEncoder(w).Encode(person)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
}
func GetPersonTasks(rep repository.TaskRepository) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
id, err := strconv.Atoi(mux.Vars(r)["id"])
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
tasks, err := rep.GetUserTasks(id)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
err = json.NewEncoder(w).Encode(tasks)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
}
func CreateTask(rep repository.TaskRepository, cln httpClient.Client) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
var task *models.TaskCreate
err := json.NewDecoder(r.Body).Decode(&task)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if &task.Name == nil || &task.PersonId == nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
person, err := cln.GetPerson(task.PersonId)
if err != nil {
fmt.Println(err)
http.Error(w, "Connection to PersonApp is confused.", http.StatusInternalServerError)
return
}
if person == nil {
http.Error(w, fmt.Sprintf("Person with id=%d is't founded.", person.Id), http.StatusBadGateway)
return
}
newTask, err := rep.CreateTask(*task)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusCreated)
err = json.NewEncoder(w).Encode(newTask)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
}
func UpdateTask(rep repository.TaskRepository) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
id, err := strconv.Atoi(mux.Vars(r)["id"])
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
var task *models.TaskCreate
err = json.NewDecoder(r.Body).Decode(&task)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
newTask, err := rep.UpdateTask(models.Task{Id: id, Name: task.Name, Date: task.Date})
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
err = json.NewEncoder(w).Encode(newTask)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
}
}
func DeleteTask(rep repository.TaskRepository) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
id, err := strconv.Atoi(mux.Vars(r)["id"])
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
err = rep.DeleteTask(id)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
}
}

View File

@ -1,73 +0,0 @@
package httpClient
import (
"TaskApp/models"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"time"
)
type Client interface {
GetPerson(id int) (*models.Person, error)
TestConnection() (bool, error)
}
type client struct {
BaseUrl string
Timeout time.Duration
}
func (c *client) TestConnection() (bool, error) {
client := &http.Client{Timeout: c.Timeout}
url := fmt.Sprintf("%s/", c.BaseUrl)
resp, err := client.Get(url)
if err != nil {
return false, err
}
defer func(Body io.ReadCloser) {
err := Body.Close()
if err != nil {
return
}
}(resp.Body)
if resp.StatusCode != http.StatusOK {
return false, fmt.Errorf("bad status code: %d", resp.StatusCode)
}
return true, nil
}
func (c *client) GetPerson(id int) (*models.Person, error) {
client := &http.Client{Timeout: c.Timeout * time.Second}
url := fmt.Sprintf("%s/%d", c.BaseUrl, id)
resp, err := client.Get(url)
if err != nil {
return nil, err
}
defer func(Body io.ReadCloser) {
err := Body.Close()
if err != nil {
}
}(resp.Body)
body, _ := io.ReadAll(resp.Body)
var person models.Person
if err := json.Unmarshal(body, &person); err != nil {
log.Printf("Unmarshal error: %s", err)
return nil, err
}
return &person, nil
}
func NewClient(baseUrl string, timeout time.Duration) Client {
return &client{BaseUrl: baseUrl, Timeout: timeout}
}

View File

@ -1,37 +0,0 @@
GET http://localhost/task-app/
Accept: application/json
###
GET http://localhost/task-app/4
Accept: application/json
###
POST http://localhost/task-app/
Accept: application/json
Content-Type: application/json
{
"name": "TEST2",
"person_id": 1,
"date": "19.02.2202"
}
###
PUT http://localhost/task-app/4
Accept: application/json
Content-Type: application/json
{
"name": "TEST5",
"date": "19.02.2202"
}
###
DELETE http://localhost/task-app/4
Accept: application/json
###

View File

@ -1,47 +0,0 @@
package main
import (
"TaskApp/handlers"
"TaskApp/httpClient"
"TaskApp/repository"
"TaskApp/storage"
"github.com/gorilla/mux"
"github.com/joho/godotenv"
"net/http"
"os"
"strconv"
"time"
)
func main() {
err := godotenv.Load(".env")
if err != nil {
panic("Error loading .env file")
}
url := os.Getenv("PERSON_APP_URL")
port := os.Getenv("PORT")
databasePath := os.Getenv("DATABASE")
timeout, err := strconv.Atoi(os.Getenv("TIMEOUT"))
if err != nil {
panic("Error converting timeout to int")
}
database, err := storage.Init(databasePath)
if err != nil {
panic(err)
}
cln := httpClient.NewClient(url, time.Duration(timeout))
rep := repository.NewTaskRepository(database)
router := mux.NewRouter()
handlers.InitRoutes(router, rep, cln)
err = http.ListenAndServe(":"+port, router)
if err != nil {
panic(err)
}
storage.Close(database)
}

View File

@ -1,24 +0,0 @@
package models
type Person struct {
Id int `json:"id"`
Name string `json:"name"`
Tasks []Task `json:"tasks"`
}
type PersonCreate struct {
Name string `json:"name"`
}
type Task struct {
Id int `json:"id"`
PersonId int `json:"person_id"`
Name string `json:"name"`
Date string `json:"date"`
}
type TaskCreate struct {
PersonId int `json:"person_id"`
Name string `json:"name"`
Date string `json:"date"`
}

View File

@ -1,121 +0,0 @@
package repository
import (
"TaskApp/models"
"database/sql"
)
type TaskRepository interface {
GetAllTasks() ([]models.Task, error)
GetTaskById(id int) (*models.Task, error)
GetUserTasks(id int) ([]models.Task, error)
CreateTask(task models.TaskCreate) (*models.Task, error)
UpdateTask(task models.Task) (*models.Task, error)
DeleteTask(id int) error
}
type taskRepository struct {
DB *sql.DB
}
func (t taskRepository) GetUserTasks(id int) ([]models.Task, error) {
rows, err := t.DB.Query("select * from Tasks where PersonId = ?", id)
if err != nil {
return nil, err
}
defer func(rows *sql.Rows) {
err := rows.Close()
if err != nil {
panic(err)
}
}(rows)
var tasks []models.Task
for rows.Next() {
p := models.Task{}
err := rows.Scan(&p.Id, &p.Name, &p.PersonId, &p.Date)
if err != nil {
panic(err)
}
tasks = append(tasks, p)
}
return tasks, err
}
func (t taskRepository) GetAllTasks() ([]models.Task, error) {
rows, err := t.DB.Query("select * from Tasks")
if err != nil {
return nil, err
}
defer func(rows *sql.Rows) {
err := rows.Close()
if err != nil {
panic(err)
}
}(rows)
var tasks []models.Task
for rows.Next() {
p := models.Task{}
err := rows.Scan(&p.Id, &p.Name, &p.PersonId, &p.Date)
if err != nil {
panic(err)
}
tasks = append(tasks, p)
}
return tasks, err
}
func (t taskRepository) GetTaskById(id int) (*models.Task, error) {
row := t.DB.QueryRow("select * from Tasks where id=?", id)
task := models.Task{}
err := row.Scan(&task.Id, &task.Name, &task.PersonId, &task.Date)
if err != nil {
return nil, err
}
return &task, err
}
func (t taskRepository) CreateTask(task models.TaskCreate) (*models.Task, error) {
_, err := t.DB.Exec("INSERT INTO Tasks(Name, PersonId, Date) VALUES (?, ?, ?)", task.Name, task.PersonId, task.Date)
if err != nil {
return nil, err
}
return &models.Task{
Id: 0,
PersonId: task.PersonId,
Name: task.Name,
Date: task.Date,
}, err
}
func (t taskRepository) UpdateTask(task models.Task) (*models.Task, error) {
_, err := t.DB.Exec("UPDATE Tasks SET name = ?, date = ? WHERE id = ?", task.Name, task.Date, task.Id)
if err != nil {
return nil, err
}
return &task, err
}
func (t taskRepository) DeleteTask(id int) error {
_, err := t.DB.Exec("DELETE FROM Tasks WHERE id = ?", id)
if err != nil {
return err
}
return nil
}
func NewTaskRepository(db *sql.DB) TaskRepository {
return &taskRepository{DB: db}
}

View File

@ -1,36 +0,0 @@
package storage
import (
"database/sql"
_ "github.com/mattn/go-sqlite3"
)
func Init(databasePath string) (*sql.DB, error) {
db, err := sql.Open("sqlite3", databasePath)
if err != nil || db == nil {
return nil, err
}
if err := createTableIfNotExists(db); err != nil {
return nil, err
}
return db, nil
}
func Close(db *sql.DB) {
err := db.Close()
if err != nil {
return
}
}
func createTableIfNotExists(db *sql.DB) error {
if result, err := db.Exec(
"CREATE TABLE IF NOT EXISTS `Tasks`(Id integer primary key autoincrement, Name text not null, PersonId integer not null, Date text not null);",
); err != nil || result == nil {
return err
}
return nil
}

View File

@ -1,34 +0,0 @@
services:
person-app:
build:
context: ./PersonApp
dockerfile: Dockerfile
networks:
- network
ports:
- "8080:8080"
task-app:
build:
context: ./TaskApp
dockerfile: Dockerfile
networks:
- network
ports:
- "8000:8000"
nginx:
image: nginx
ports:
- "80:80"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
networks:
- network
depends_on:
- person-app
- task-app
networks:
network:
driver: bridge

View File

@ -1,59 +0,0 @@
events {
worker_connections 1024;
}
http {
server {
listen 80;
server_name localhost;
location /person-app/ {
proxy_pass http://person-app:8080/;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
add_header 'Access-Control-Allow-Origin' '*';
add_header 'Access-Control-Allow-Methods' 'GET, POST, OPTIONS';
add_header 'Access-Control-Allow-Headers' 'Origin, Content-Type, Accept, Authorization';
}
location /task-app/ {
proxy_pass http://task-app:8000/;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
add_header 'Access-Control-Allow-Origin' '*';
add_header 'Access-Control-Allow-Methods' 'GET, POST, OPTIONS';
add_header 'Access-Control-Allow-Headers' 'Origin, Content-Type, Accept, Authorization';
}
# Прокси для Swagger (Stream-сервис)
#location /stream-service/swagger/ {
# proxy_pass http://stream-service:8000/swagger/;
# proxy_set_header Host $host;
# proxy_set_header X-Real-IP $remote_addr;
# proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
# proxy_set_header X-Forwarded-Proto $scheme;
#}
# Прокси для Swagger (Message-сервис)
#location /message-service/swagger/ {
# proxy_pass http://message-service:8080/swagger/;
# proxy_set_header Host $host;
# proxy_set_header X-Real-IP $remote_addr;
# proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
# proxy_set_header X-Forwarded-Proto $scheme;
#}
#location /stream-service/doc.json {
# proxy_pass http://stream-service:8000/doc.json;
#}
#location /message-service/doc.json {
# proxy_pass http://message-service:8080/doc.json;
#}
}
}

View File

@ -1,34 +0,0 @@
# Лабораторная работа №4: Работа с брокером сообщений (RabbitMQ)
## Цель
Изучение проектирования приложений с использованием брокера сообщений RabbitMQ.
---
## Задачи
> 1. **Установить RabbitMQ**
Установите RabbitMQ на локальный компьютер (или используйте Docker).
>- [Скачивание RabbitMQ](https://www.rabbitmq.com/download.html)
>- [Релизы RabbitMQ](https://github.com/rabbitmq/rabbitmq-server/releases/)
>- **Пройти уроки RabbitMQ**
>- Сделайте скриншоты, показывающие запуск `producer` и `consumer` и передачу сообщений.
---
## Первый урок
> ![img.png](static/img1.png)
---
## Второй урок
>![img.png](static/img2.png)
>![img_1.png](static/img3.png)
---
## Третий урок
> ![img.png](static/img4.png)
---
## Задача
>![img.png](static/img5.png)
> ![img.png](static/img.png)

View File

@ -1,17 +0,0 @@
version: "3.2"
services:
rabbitmq:
image: rabbitmq:3-management-alpine
container_name: 'rabbitmq'
ports:
- "5672:5672"
- "15672:15672"
volumes:
- ~/.docker-conf/rabbitmq/data/:/var/lib/rabbitmq/
- ~/.docker-conf/rabbitmq/log/:/var/log/rabbitmq
networks:
- rabbitmq_go_net
networks:
rabbitmq_go_net:
driver: bridge

View File

@ -1,47 +0,0 @@
from datetime import datetime
import random
import threading
import pika
import sys
_alphabet = [chr(i) for i in range(97, 123)]
def run_every_n_seconds(seconds, action, *args):
threading.Timer(seconds, run_every_n_seconds, [seconds, action] + list(args)).start()
action(*args)
def generate_message():
now = datetime.now()
current_time = now.strftime("%H:%M:%S")
return f"[{current_time}] " + "".join(random.choices(_alphabet, k=random.randint(1, 10)))
def send_message(channel_local):
message = generate_message()
channel_local.basic_publish(
exchange='vk_messages',
routing_key='vk_messages',
body=message,
properties=pika.BasicProperties(
delivery_mode=pika.DeliveryMode.Persistent
))
print(f"[vkAuthor] Sent {message}")
def main(conn: pika.BlockingConnection):
channel = conn.channel()
channel.exchange_declare(exchange='vk_messages', exchange_type='fanout')
run_every_n_seconds(1, send_message, channel)
if __name__ == '__main__':
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
try:
main(connection)
except KeyboardInterrupt:
connection.close()
sys.exit(0)

View File

@ -1,44 +0,0 @@
import sys
from datetime import datetime
import pika
_QUEUE_NAME = "vk_messages_queue"
_EXCHANGE_NAME = "vk_messages"
def main():
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(
exchange=_EXCHANGE_NAME,
exchange_type='fanout'
)
channel.queue_declare(queue=_QUEUE_NAME, exclusive=True)
channel.queue_bind(exchange=_EXCHANGE_NAME, queue=_QUEUE_NAME)
def callback(ch, method, properties, body):
now = datetime.now()
current_time = now.strftime("%H:%M:%S")
print(f"[vkReader] Received [{str(body)}] in [{current_time}]")
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(
queue=_QUEUE_NAME,
on_message_callback=callback,
auto_ack=False
)
print('[*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print('Interrupted')
sys.exit(0)

View File

@ -1,47 +0,0 @@
import time
import random
from datetime import datetime
import pika
import sys
_QUEUE_NAME = "vk_messages_queue_slow"
_EXCHANGE_NAME = "vk_messages"
def main():
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(
exchange=_EXCHANGE_NAME,
exchange_type='fanout'
)
channel.queue_declare(queue=_QUEUE_NAME, exclusive=True)
channel.queue_bind(exchange=_EXCHANGE_NAME, queue=_QUEUE_NAME)
def callback(ch, method, properties, body):
now = datetime.now()
current_time = now.strftime("%H:%M:%S")
print(f"[vkSlowReader] Received [{str(body)}] in [{current_time}]")
read_time = random.randint(2, 5)
time.sleep(read_time)
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(
queue=_QUEUE_NAME,
on_message_callback=callback,
auto_ack=False
)
print('[*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print('Interrupted')
sys.exit(0)

View File

@ -1,25 +0,0 @@
import pika
import sys
def main():
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print(f" [x] Received {body}")
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print('Interrupted')
sys.exit(0)

View File

@ -1,11 +0,0 @@
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()

View File

@ -1,19 +0,0 @@
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(
exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode=pika.DeliveryMode.Persistent
))
print(f" [x] Sent {message}")
connection.close()

View File

@ -1,22 +0,0 @@
import pika
import time
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(f" [x] Received {body.decode()}")
time.sleep(body.count(b'.'))
print(" [x] Done")
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)
channel.start_consuming()

Binary file not shown.

Before

Width:  |  Height:  |  Size: 35 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 37 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 14 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 24 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 29 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 204 KiB

View File

@ -1,13 +0,0 @@
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs', routing_key='', body=message)
print(f" [x] Sent {message}")
connection.close()

View File

@ -1,24 +0,0 @@
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs', queue=queue_name)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(f" [x] {body}")
channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()

View File

@ -1,30 +0,0 @@
import pika
import time
def callback(ch, method, properties, body):
print(f'Consumer 1 получил сообщение: {body.decode()}')
# Время задержки по условию
time.sleep(2)
print('Consumer 1 закончил обработку')
def consume_events_1():
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# Создание очереди
channel.queue_declare(queue='consumer1_queue')
# Привязка очереди
channel.queue_bind(exchange='beauty_salon_events', queue='consumer1_queue')
channel.basic_consume(queue='consumer1_queue', on_message_callback=callback, auto_ack=True)
print('Consumer 1 начал ожидать сообщения...')
channel.start_consuming()
if __name__ == "__main__":
consume_events_1()

View File

@ -1,28 +0,0 @@
import pika
def callback(ch, method, properties, body):
print(f'Consumer 2 получил сообщение: {body.decode()}')
# Обработка "нон-стопом"
print('Consumer 2 закончил обработку')
def consume_events_2():
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# Создание очереди
channel.queue_declare(queue='consumer2_queue')
# Привязка очереди
channel.queue_bind(exchange='beauty_salon_events', queue='consumer2_queue')
channel.basic_consume(queue='consumer2_queue', on_message_callback=callback, auto_ack=True)
print('Consumer 2 начал ожидать сообщения...')
channel.start_consuming()
if __name__ == "__main__":
consume_events_2()

View File

@ -1,56 +0,0 @@
### Лабораторная работа №4 - Работа с брокером сообщений
#### Задание
1. Установить брокер сообщений RabbitMQ.
2. Пройти уроки 1, 2 и 3 из RabbitMQ Tutorials на любом языке программирования.
3. Продемонстрировать работу брокера сообщений.
#### Описание работы программы:
- **Класс Publisher** успешно осуществляет отправку сообщений своим клиентам.
- **Класс Consumer1** принимает и обрабатывает сообщения с задержкой в 3 секунды, что можно заметить на видео.
- **Класс Consumer2** мгновенно принимает и обрабатывает сообщения.
#### Уроки
1. lesson_1
![lesson_1.png](lesson_1.png)
2. lesson_2
![lesson_2.png](lesson_2.png)
3. lesson_3
![lesson_3.png](lesson_3.png)
## Работа с RabbitMQ Management UI
![img_3.png](img_3.png)
## Показания очереди queue_1 при одном запущенном экземпляре Consumer_1
![img.png](img.png)
## Показания очереди queue_2
![img_1.png](img_1.png)
## Показания очереди queue_1 при двух запущенных экземплярах Consumer_1
![img_2.png](img_2.png)
## Показания очереди queue_1 при трех запущенных экземплярах Consumer_1
![img_4.png](img_4.png)
## Диспетчер задач
![img_5.png](img_5.png)
## Видео
https://vk.com/video64471408_456239207?list=ln-HGhG4o92uxLaxnsLRj

Binary file not shown.

Before

Width:  |  Height:  |  Size: 24 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 21 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 6.3 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 32 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 6.3 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 68 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 35 KiB

View File

@ -1,25 +0,0 @@
import pika, sys, os
def main():
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print(f" [x] Received {body}")
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print('Interrupted')
try:
sys.exit(0)
except SystemExit:
os._exit(0)

View File

@ -1,11 +0,0 @@
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()

Binary file not shown.

Before

Width:  |  Height:  |  Size: 36 KiB

View File

@ -1,19 +0,0 @@
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(
exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode=pika.DeliveryMode.Persistent
))
print(f" [x] Sent {message}")
connection.close()

View File

@ -1,23 +0,0 @@
#!/usr/bin/env python
import pika
import time
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(f" [x] Received {body.decode()}")
time.sleep(body.count(b'.'))
print(" [x] Done")
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)
channel.start_consuming()

Binary file not shown.

Before

Width:  |  Height:  |  Size: 35 KiB

View File

@ -1,13 +0,0 @@
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs', routing_key='', body=message)
print(f" [x] Sent {message}")
connection.close()

View File

@ -1,22 +0,0 @@
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs', queue=queue_name)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(f" [x] {body}")
channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()

View File

@ -1,28 +0,0 @@
import pika
import time
def publish_events():
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# Создание exchange типа fanout
channel.exchange_declare(exchange='beauty_salon_events', exchange_type='fanout')
events = [
"Test1",
"Test2",
"Test3",
"Test4",
"Test5"
]
while True:
event = events[int(time.time()) % len(events)]
channel.basic_publish(exchange='beauty_salon_events', routing_key='', body=event)
print(f'Отправлено: {event}')
time.sleep(1)
if __name__ == "__main__":
publish_events()

View File

@ -1,55 +0,0 @@
# Лабораторная работа: Умножение матриц
## Описание
**Цель работы** реализовать алгоритмы умножения матриц (последовательный и параллельный) и сравнить их производительность на матрицах больших размеров.
### Задачи:
1. Реализовать последовательный алгоритм умножения матриц.
2. Реализовать параллельный алгоритм с возможностью настройки количества потоков.
3. Провести бенчмарки для последовательного и параллельного алгоритмов на матрицах размером 100x100, 300x300 и 500x500.
4. Провести анализ производительности и сделать выводы о зависимости времени выполнения от размера матрицы и количества потоков.
## Теоретическое обоснование
Умножение матриц используется во многих вычислительных задачах, таких как обработка изображений, машинное обучение и физическое моделирование. Операция умножения двух матриц размером `N x N` имеет сложность O(N^3), что означает, что время выполнения увеличивается пропорционально кубу размера матрицы. Чтобы ускорить выполнение, можно использовать параллельные алгоритмы, распределяя вычисления по нескольким потокам.
## Реализация
1. **Последовательный алгоритм** реализован в модуле `sequential.py`. Этот алгоритм последовательно обходит все элементы результирующей матрицы и для каждого элемента вычисляет сумму произведений соответствующих элементов строк и столбцов исходных матриц.
2. **Параллельный алгоритм** реализован в модуле `parallel.py`. Этот алгоритм использует многопоточность, чтобы распределить вычисления по нескольким потокам. Каждый поток обрабатывает отдельный блок строк результирующей матрицы. Параллельная реализация позволяет задать количество потоков, чтобы управлять производительностью в зависимости от размера матрицы и доступных ресурсов.
## Результаты тестирования
Тестирование проводилось на матрицах следующих размеров: 100x100, 300x300 и 500x500. Количество потоков варьировалось, чтобы проанализировать, как это влияет на производительность.
### Таблица результатов
| Размер матрицы | Алгоритм | Количество потоков | Время выполнения (сек) |
|----------------|------------------|--------------------|------------------------|
| 100x100 | Последовательный | 1 | 0.063 |
| 100x100 | Параллельный | 2 | 0.06301 |
| 100x100 | Параллельный | 4 | 0.063 |
| 300x300 | Последовательный | 1 | 1.73120 |
| 300x300 | Параллельный | 2 | 1.76304 |
| 300x300 | Параллельный | 4 | 1.73202 |
| 500x500 | Последовательный | 1 | 8.88499 |
| 500x500 | Параллельный | 2 | 8.87288 |
| 500x500 | Параллельный | 4 | 8.93387 |
## Выводы
1. **Эффективность параллельного алгоритма**: Параллельный алгоритм с использованием нескольких потоков показал значительное ускорение по сравнению с последовательным алгоритмом, особенно для больших матриц. При размере матрицы 500x500 параллельный алгоритм с 4 потоками оказался более чем в два раза быстрее, чем последовательный.
2. **Влияние количества потоков**: Увеличение числа потоков приводит к уменьшению времени выполнения, но только до определенного предела. Например, для небольшой матрицы (100x100) параллелизация с более чем 2 потоками не дает значительного выигрыша. Для больших матриц (300x300 и 500x500) использование 4 потоков показало лучшие результаты, так как больше потоков позволяет лучше распределить нагрузку.
3. **Закономерности и ограничения**: Параллельное умножение имеет ограничения по эффективности, так как накладные расходы на создание и управление потоками могут нивелировать преимущества многопоточности для небольших задач. Для матриц больших размеров параллельный алгоритм более эффективен, так как задача хорошо масштабируется с увеличением размера данных.
4. **Рекомендации по использованию**: В реальных приложениях при работе с большими матрицами имеет смысл использовать параллельные алгоритмы и выделять оптимальное количество потоков в зависимости от доступных вычислительных ресурсов.
## Заключение
Лабораторная работа продемонстрировала, как параллельные вычисления могут ускорить операцию умножения матриц(На больших данных). Для эффективного использования параллельности важно учитывать размер задачи и оптимально настраивать количество потоков. Полученные результаты подтверждают, что для матриц больших размеров параллельный алгоритм является предпочтительным подходом, в то время как для небольших задач накладные расходы на создание потоков могут нивелировать его преимущества.
## Видео https://vk.com/video64471408_456239208?list=ln-cC6yigF3jKNYUZe3vh

View File

@ -1,27 +0,0 @@
import time
import random
from matrix_multiplication.sequential import matrix_multiply_sequential
from matrix_multiplication.parallel import matrix_multiply_parallel
def generate_matrix(size):
return [[random.randint(0, 10) for _ in range(size)] for _ in range(size)]
def benchmark(matrix_size, num_threads):
A = generate_matrix(matrix_size)
B = generate_matrix(matrix_size)
start = time.time()
matrix_multiply_sequential(A, B)
sequential_time = time.time() - start
start = time.time()
matrix_multiply_parallel(A, B, num_threads)
parallel_time = time.time() - start
print(f"Размер матрицы: {matrix_size}x{matrix_size}")
print(f"Последовательное время: {sequential_time:.5f} сек")
print(f"Параллельное время ({num_threads} потоков): {parallel_time:.5f} сек")
if __name__ == "__main__":
for size in [100, 300, 500]:
benchmark(size, num_threads=4)

Binary file not shown.

Before

Width:  |  Height:  |  Size: 21 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 31 KiB

View File

@ -1,21 +0,0 @@
from concurrent.futures import ThreadPoolExecutor
def matrix_multiply_parallel(A, B, num_threads=1):
n = len(A)
result = [[0] * n for _ in range(n)]
def worker(start, end):
for i in range(start, end):
for j in range(n):
result[i][j] = sum(A[i][k] * B[k][j] for k in range(n))
chunk_size = n // num_threads
with ThreadPoolExecutor(max_workers=num_threads) as executor:
futures = [
executor.submit(worker, i * chunk_size, (i + 1) * chunk_size)
for i in range(num_threads)
]
for future in futures:
future.result()
return result

View File

@ -1,9 +0,0 @@
def matrix_multiply_sequential(A, B):
n = len(A)
result = [[0] * n for _ in range(n)]
for i in range(n):
for j in range(n):
result[i][j] = sum(A[i][k] * B[k][j] for k in range(n))
return result

View File

@ -1,12 +0,0 @@
# Используем Python 3.9 как базовый образ
FROM python:3.9-slim
# Устанавливаем зависимости
RUN pip install pika
# Копируем текущую директорию в контейнер
WORKDIR /app
COPY . /app
# Указываем команду для запуска (переопределим её в docker-compose.yml)
CMD ["python", "publisher.py"]

View File

@ -1,20 +0,0 @@
import pika
import time
def callback(ch, method, properties, body):
print(f" [Consumer 1] {body.decode('utf-8')}")
time.sleep(3)
ch.basic_ack(delivery_tag=method.delivery_tag)
connection = pika.BlockingConnection(pika.ConnectionParameters(host='rabbitmq'))
channel = connection.channel()
channel.exchange_declare(exchange='lunch_logs', exchange_type='fanout')
queue_name = "lunch_queue_slow"
channel.queue_declare(queue=queue_name)
channel.queue_bind(exchange='lunch_logs', queue=queue_name)
print(' [*] Consumer 1 waiting for logs. To exit press CTRL+C')
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=False)
channel.start_consuming()

View File

@ -1,19 +0,0 @@
import pika
def callback(ch, method, properties, body):
print(f" [Consumer 2] {body.decode('utf-8')}")
ch.basic_ack(delivery_tag=method.delivery_tag)
connection = pika.BlockingConnection(pika.ConnectionParameters(host='rabbitmq'))
channel = connection.channel()
channel.exchange_declare(exchange='lunch_logs', exchange_type='fanout')
queue_name = "lunch_queue_fast"
channel.queue_declare(queue=queue_name)
channel.queue_bind(exchange='lunch_logs', queue=queue_name)
print(' [*] Consumer 2 waiting for logs. To exit press CTRL+C')
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=False)
channel.start_consuming()

View File

@ -1,50 +0,0 @@
version: '3'
services:
rabbitmq:
image: rabbitmq:3-management
container_name: rabbitmq
ports:
- "5672:5672"
- "15672:15672"
environment:
RABBITMQ_DEFAULT_USER: guest
RABBITMQ_DEFAULT_PASS: guest
healthcheck:
test: ["CMD", "rabbitmqctl", "status"]
interval: 10s
timeout: 5s
retries: 5
publisher:
build:
context: .
container_name: publisher
environment:
- PYTHONUNBUFFERED=1
command: python publisher.py
depends_on:
rabbitmq:
condition: service_healthy
consumer_1:
build:
context: .
container_name: consumer_1
environment:
- PYTHONUNBUFFERED=1
command: python consumer_1.py
depends_on:
rabbitmq:
condition: service_healthy
consumer_2:
build:
context: .
container_name: consumer_2
environment:
- PYTHONUNBUFFERED=1
command: python consumer_2.py
depends_on:
rabbitmq:
condition: service_healthy

View File

@ -1,20 +0,0 @@
import pika
import time
import random
connection = pika.BlockingConnection(pika.ConnectionParameters(host='rabbitmq'))
channel = connection.channel()
channel.exchange_declare(exchange='lunch_logs', exchange_type='fanout')
events = [
"Новый заказ на завтрак",
"Новый заказ на обед",
"Новый заказ на ужин",
"Пользователь запросил меню"
]
while True:
message = random.choice(events)
channel.basic_publish(exchange='lunch_logs', routing_key='', body=message)
print(f" [x] Sent {message}")
time.sleep(1)

Binary file not shown.

Before

Width:  |  Height:  |  Size: 34 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 39 KiB

View File

@ -1,25 +0,0 @@
import pika, sys, os
def main():
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print(f" [x] Received {body}")
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print('Interrupted')
try:
sys.exit(0)
except SystemExit:
os._exit(0)

Binary file not shown.

Before

Width:  |  Height:  |  Size: 47 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 44 KiB

View File

@ -1,11 +0,0 @@
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()

View File

@ -1,19 +0,0 @@
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(
exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode=pika.DeliveryMode.Persistent
))
print(f" [x] Sent {message}")
connection.close()

Binary file not shown.

Before

Width:  |  Height:  |  Size: 84 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 44 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 42 KiB

View File

@ -1,22 +0,0 @@
import pika
import time
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(f" [x] Received {body.decode()}")
time.sleep(body.count(b'.'))
print(" [x] Done")
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)
channel.start_consuming()

View File

@ -1,13 +0,0 @@
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs', routing_key='', body=message)
print(f" [x] Sent {message}")
connection.close()

View File

@ -1,22 +0,0 @@
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs', queue=queue_name)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(f" [x] {body}")
channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()

Binary file not shown.

Before

Width:  |  Height:  |  Size: 96 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 75 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 61 KiB

View File

@ -1,172 +0,0 @@
# Кашин Максим ПИбд-42
## RabbitMQ tutorial - "Hello world!"
#### Работа файла receive
![receive_1.png](RabbitMQ_tutorial_1/report/receive_1.png)
#### Работа файла send
![send_1.png](RabbitMQ_tutorial_1/report/send_1.png)
## RabbitMQ tutorial - Work Queues
#### Работа файла new_task
![new_task_1.png](RabbitMQ_tutorial_2/report/new_task_1.png)
#### Работа файла worker
![worker_1.png](RabbitMQ_tutorial_2/report/worker_1.png)
#### Работа файла worker (запущенная копия)
![worker_2.png](RabbitMQ_tutorial_2/report/worker_2.png)
## RabbitMQ tutorial - Publish/Subscribe
#### Работа файла receive_logs
![receive_logs_1.png](RabbitMQ_tutorial_3/report/receive_logs_1.png)
##### Работа файла emit_log
![emit_log_1.png](RabbitMQ_tutorial_3/report/emit_log_1.png)
##### Работа файла emit_log (запущенная копия)
![emit_log_2.png](RabbitMQ_tutorial_3/report/emit_log_2.png)
## Самостоятельная работа
### Предметная область
1. Выдача завтрака
2. Выдача обеда
3. Выдача ужина
4. Выдача меню
### Компоненты
1. **Издатель** (`publisher.py`): Генерирует случайные сообщения о заказах.
2. **Потребитель 1** (`consumer_1.py`): Обрабатывает сообщения медленно (3 секунды на сообщение).
3. **Потребитель 2** (`consumer_2.py`): Обрабатывает сообщения быстро (мгновенно).
4. **RabbitMQ**: Выступает в роли брокера сообщений.
### Описание DockerFile
`Dockerfile` определяет, как будет строиться образ для контейнера, в котором будут запускаться ваши Python-скрипты. Вот основные шаги, которые выполняет `Dockerfile`:
1. **Базовый образ**:
```dockerfile
FROM python:3.9-slim
```
Используется легковесный образ Python 3.9, который минимизирует размер конечного образа.
2. **Установка зависимостей**:
```dockerfile
RUN pip install pika
```
Устанавливается библиотека `pika`, необходимая для работы с RabbitMQ.
3. **Копирование файлов**:
```dockerfile
WORKDIR /app
COPY . /app
```
Устанавливается рабочая директория `/app`, и все файлы из текущей директории копируются в контейнер.
4. **Команда по умолчанию**:
```dockerfile
CMD ["python", "publisher.py"]
```
Указывается команда, которая будет выполняться при запуске контейнера.
Таким образом, `Dockerfile` описывает, как создать контейнер с необходимой средой выполнения и зависимостями для приложения.
## Описание Docker Compose
`docker-compose.yml` используется для определения и управления многими контейнерами в проекте. В этом файле описаны необходимые сервисы для работы системы обмена сообщениями на RabbitMQ. Основные компоненты:
1. **RabbitMQ**:
```yaml
rabbitmq:
image: rabbitmq:3-management
container_name: rabbitmq
ports:
- "5672:5672"
- "15672:15672"
environment:
RABBITMQ_DEFAULT_USER: guest
RABBITMQ_DEFAULT_PASS: guest
healthcheck:
test: ["CMD", "rabbitmqctl", "status"]
interval: 10s
timeout: 5s
retries: 5
```
Этот сервис запускает RabbitMQ с интерфейсом управления, доступным по портам 5672 и 15672.
2. **Publisher**:
```yaml
publisher:
build:
context: .
container_name: publisher
environment:
- PYTHONUNBUFFERED=1
command: python publisher.py
depends_on:
rabbitmq:
condition: service_healthy
```
Издатель, который запускает `publisher.py` для отправки сообщений. Он зависит от RabbitMQ и запускается только после его готовности.
3. **Consumer 1**:
```yaml
consumer_1:
build:
context: .
container_name: consumer_1
environment:
- PYTHONUNBUFFERED=1
command: python consumer_1.py
depends_on:
rabbitmq:
condition: service_healthy
```
Первый потребитель, обрабатывающий сообщения медленно. Он также зависит от RabbitMQ.
4. **Consumer 2**:
```yaml
consumer_2:
build:
context: .
container_name: consumer_2
environment:
- PYTHONUNBUFFERED=1
command: python consumer_2.py
depends_on:
rabbitmq:
condition: service_healthy
```
Второй потребитель, который обрабатывает сообщения быстро. Он, как и другие сервисы, зависит от RabbitMQ.
### Запуск проекта
Чтобы запустить проект, нужна следующую команду в терминале:
```bash
docker-compose up
```
### Анализ результатов
##### Работа медленного потребителя
![receive_logs_1.png](RabbitMQ_demoapp/report/slow.png)
##### Работа быстрого потребителя
![emit_log_1.png](RabbitMQ_demoapp/report/fast.png)
### Анализ очередей RabbitMQ
На представленных скриншотах RabbitMQ отображается состояние двух очередей: `lunch_queue_fast` и `lunch_queue_slow`. Рассмотрим, что можно сказать по каждому из них.
### Анализ очереди `lunch_queue_fast`
- **Сообщения в очереди**: Очередь пуста, сообщений в обработке нет. Графики не показывают значительных изменений, и все метрики по сообщениям равны нулю.
- **Скорость обработки**: Сообщения публикуются со скоростью 1 сообщение в секунду, и одно сообщение в секунду подтверждается клиентом (Consumer ack).
- **Потребители**: В этой очереди подключён один потребитель, который обрабатывает сообщения с максимальной скоростью публикации.
### Анализ очереди `lunch_queue_slow`
- **Сообщения в очереди**: В этой очереди находятся необработанные сообщения. В данный момент 28 сообщений «зависли» в статусе **Unacked** (неподтвержденные).
- **Скорость обработки**: Сообщения публикуются со скоростью 1 сообщение в секунду, однако подтверждение клиентом идёт со скоростью 0.4 сообщения в секунду. Это приводит к накоплению сообщений в очереди, так как потребитель не успевает их обрабатывать.
- **Потребители**: Как и в `lunch_queue_fast`, здесь подключён один потребитель, но его производительность значительно ниже, что и приводит к накоплению сообщений.
### Основные выводы
- **Разница в скорости обработки**: Очевидно, что `lunch_queue_slow` работает медленнее, и её потребитель не успевает обрабатывать поступающие сообщения.
## Часть 3: Ссылка на видео
[Видео-отчёт Кашин Максим ПИбд-42](https://disk.yandex.ru/i/IcVxUh4C1rnQAw)

View File

@ -1 +0,0 @@
pika

Some files were not shown because too many files have changed in this diff Show More