Compare commits

..

37 Commits

Author SHA1 Message Date
528309ab84 Merge pull request 'kashin_maxim_lab_5' (#124) from kashin_maxim_lab_5 into main
Reviewed-on: #124
2024-11-20 22:45:51 +04:00
0814d8533d Merge pull request 'kashin_maxim_lab_4' (#123) from kashin_maxim_lab_4 into main
Reviewed-on: #123
2024-11-20 22:45:28 +04:00
354ee2679e Merge pull request 'yakovleva_yulia_lab_8 is ready' (#122) from yakovleva_yulia_lab_8 into main
Reviewed-on: #122
2024-11-20 22:45:02 +04:00
d302bd2213 Merge pull request 'yakovleva_yulia_lab_7 is ready' (#121) from yakovleva_yulia_lab_7 into main
Reviewed-on: #121
2024-11-20 22:44:39 +04:00
2aed7bf385 Merge pull request 'yakovleva_yulia_lab_6 is ready' (#120) from yakovleva_yulia_lab_6 into main
Reviewed-on: #120
2024-11-20 22:44:06 +04:00
d4e24db25e Merge pull request 'kadyrov_aydar_lab_5' (#119) from kadyrov_aydar_lab_5 into main
Reviewed-on: #119
2024-11-20 22:43:23 +04:00
c0ca1d4bb5 Merge pull request 'kadyrov_aydar_lab_4' (#117) from kadyrov_aydar_lab_4 into main
Reviewed-on: #117
2024-11-20 22:43:05 +04:00
6eeb90ea45 Merge pull request 'tukaeva_alfiya_lab_8' (#116) from tukaeva_alfiya_lab_8 into main
Reviewed-on: #116
2024-11-20 22:38:42 +04:00
bc2d7cb2f6 Merge pull request 'tukaeva_alfiya_lab_7' (#115) from tukaeva_alfiya_lab_7 into main
Reviewed-on: #115
2024-11-20 22:37:46 +04:00
e1da6f26ab Merge pull request 'tukaeva_alfiya_lab_6' (#114) from tukaeva_alfiya_lab_6 into main
Reviewed-on: #114
2024-11-20 22:37:01 +04:00
e5df53b5c2 Merge pull request 'turner_ilya_lab_2' (#113) from turner_ilya_lab_2 into main
Reviewed-on: #113
2024-11-20 22:36:40 +04:00
c98770752e Merge pull request 'mochalov_danila_lab_3' (#112) from mochalov_danila_lab_3 into main
Reviewed-on: #112
2024-11-20 22:36:16 +04:00
a800c3df86 Merge pull request 'Bazunov Andrew Lab 4' (#111) from bazunov_andrew_lab_4 into main
Reviewed-on: #111
2024-11-20 22:35:35 +04:00
a51e33a201 Merge pull request 'turner_ilya_lab_1' (#110) from turner_ilya_lab_1 into main
Reviewed-on: #110
2024-11-20 22:34:54 +04:00
a9af84010a Merge pull request 'Bazunov Andrew lab3' (#109) from bazunov_andrew_lab_3 into main
Reviewed-on: #109
2024-11-20 22:34:26 +04:00
3645d0c1cd Merge pull request 'yakovleva_yulia_lab_5 is ready' (#107) from yakovleva_yulia_lab_5 into main
Reviewed-on: #107
Reviewed-by: Alexey <a.zhelepov@mail.ru>
2024-11-20 22:33:27 +04:00
08f2f63ad4 Готово 2024-10-27 19:42:27 +04:00
e4e3748a3d Выполнено 2024-10-27 19:09:16 +04:00
JulYakJul
5e522fbcc0 yakovleva_yulia_lab_8 is ready 2024-10-27 15:10:30 +04:00
JulYakJul
cae7189c1e fix 2024-10-27 14:06:02 +04:00
JulYakJul
2bfc8a0a43 yakovleva_yulia_lab_7 is ready 2024-10-27 14:02:15 +04:00
JulYakJul
1f89960672 fix 2024-10-27 13:06:24 +04:00
JulYakJul
ffb4c2a8a4 yakovleva_yulia_lab_6 is ready 2024-10-27 13:04:11 +04:00
NAP
1dc621e0be kadyrov_aydar_lab_5 2024-10-27 02:16:28 +04:00
NAP
11c62d9bf7 kadyrov_aydar_lab_5 2024-10-27 02:13:51 +04:00
NAP
03910a9a3f kadyrov_aydar_lab_4 2024-10-27 01:53:34 +04:00
f7d483196c tukaeva_alfiya_lab_8 is ready 2024-10-26 23:16:19 +04:00
545377f948 tukaeva_alfiya_lab_7 fix 2024-10-26 22:58:30 +04:00
bb867da520 tukaeva_alfiya_lab_7 is ready 2024-10-26 22:41:45 +04:00
c4a260ebda tukaeva_alfiya_lab_6 is ready 2024-10-26 22:26:14 +04:00
88392a8041 turner_ilya_lab_2 is ready 2024-10-26 21:09:43 +04:00
JulYakJul
400de30b49 fix 2024-10-26 20:04:39 +04:00
96a4e6ac43 mochalov_danila_lab_3 is ready 2024-10-26 18:18:28 +04:00
Bazunov Andrew Igorevich
03c52d0c76 Complete lab4 2024-10-26 17:51:52 +04:00
6dd4835f54 turner_ilya_lab_1 is ready 2024-10-26 17:34:47 +04:00
Bazunov Andrew Igorevich
5187005e6a complete lab 3 2024-10-26 14:46:33 +04:00
JulYakJul
a5f0403627 yakovleva_yulia_lab_5 is ready 2024-10-25 18:12:36 +04:00
141 changed files with 5606 additions and 51 deletions

15
.gitignore vendored
View File

@ -1,15 +0,0 @@
################################################################################
# Данный GITIGNORE-файл был автоматически создан Microsoft(R) Visual Studio.
################################################################################
/.vs
/aleikin_artem_lab_3
/aleikin_artem_lab_4
/aleikin_artem_lab_5/MultiplyMatrix
/aleikin_artem_lab_6/DerminantMatrix
/dozorova_alena_lab_2
/dozorova_alena_lab_3
/dozorova_alena_lab_3/PostService/obj/Debug/net6.0/.NETCoreApp,Version=v6.0.AssemblyAttributes.cs
/dozorova_alena_lab_4
/dozorova_alena_lab_5/ConsoleApp1/obj
/dozorova_alena_lab_6/ConsoleApp1/obj

View File

@ -1,29 +0,0 @@
# Лабораторная работа 8 - Про устройство распределенных систем
## ПИбд-42 || Алейкин Артем
### Ответы на вопросы
#### Зачем сложные системы (например, социальная сеть ВКонтакте) пишутся в "распределенном" стиле, где каждое отдельное приложение (или сервис) функционально выполняет только ограниченный спектр задач?
В первую очередь это удобно, эффективно и безопасно. Например при распредленном стиле, взаимодействие между частями кода(по факту его разделили на микросервисы) сильно ослабляется, за счёт чего мы получаем модульность.
> Модульность сама по себе даёт множество преимуществ.
> > 1. Возможность нанимать совершенно разные команды сотрудников для разработки конкретных модулей. То есть каждый модуль(сервис) может развиваться и масштабироваться независимо от остальных.
> > 2. Дебагинг, отслеживание ошибок, контролирование кода всё это проще, ведь мы сразу понимаем в каком модуле приложения находится нерабочий код, чтобы оперативно назначить разработчика для исправления ошибок.
> > 3. Отказоустойчивость - в том же вконтакте может упасть, к примеру, сервис музыки, но это означает, что только он и будет нерабочим, в том время как остальные сервисы даже не знают об этом и продолжают работать в штатном режиме.
#### Для чего были созданы системы оркестрации приложений? Каким образом они упрощают / усложняют разработку и сопровождение распределенных систем?
Нужны они для возможности централизованного запуска и управления всеми сервисами приложения.
Так же они значительно упрощают разворачивание своего приложения на чужих машинах.
#### Для чего нужны очереди обработки сообщений и что может подразумеваться под сообщениями?
Очереди обработки сообщений нужны для возможности реализации "общения" сервисов между собой.
Некоторые сервисы могут зависеть от результатов других сервисов, для этого нужно "уведомить" второй сервис, о том, что он должен что-то сделать.
А также очереди ответственны за сохранность переданных данных.
#### Какие преимущества и недостатки распределенных приложений существуют на Ваш взгляд?
Я думаю, что о преимуществах достаточно подробно расписал это в первом пункте, поэтому здесь будут упомянут лишь недостатки.
К недостаткам я могу отнести следующие факторы:
> 1. Ослабление централизованного контроля - имеется в виду, что каждый модуль(сервис) более суверенный и мы далеко не всегда можем знать (имеется в виду, если проект разрабатывается командой людей, а не одним человеком), что там происходит. Что-то вроде черного ящика - даём данные и забираем результат, а процесс получения результата нам неизвестен.
> 2. Увеличение сложности разработки проекта в целом - требуется больше времени на проектирование архитектуры приложения, появляются накладные расходы на написание кода прослоек между сервисами, так же стоит отметить сложность отлавливание распредленных ошибок.
#### Целесообразно ли в сложную распределенную систему внедрять параллельные вычисления? Приведите примеры, когда это действительно нужно, а когда нет.
Параллельные вычисления целесообразны, когда задача требует обработки больших объёмов данных или сложных вычислений. Например, анализ пользовательской активности, обучение моделей машинного обучения или обработка видео выгодно распределять между узлами. Но если задача не требует высокой производительности и её легко решить последовательно, внедрение параллелизма может усложнить систему без ощутимой пользы.

BIN
bazunov_andrew_lab_3/PersonApp/.DS_Store vendored Normal file

Binary file not shown.

View File

@ -0,0 +1,4 @@
PORT=8080
TASK_APP_URL=http://task-app:8000
TIMEOUT=15
DATABASE=./database.db

View File

@ -0,0 +1,14 @@
FROM golang:1.23
WORKDIR /app
COPY go.mod go.sum ./
RUN go mod download
COPY . .
RUN go build -o /bin/PersonApp
EXPOSE 8080
CMD ["/bin/PersonApp"]

Binary file not shown.

View File

@ -0,0 +1,10 @@
module PersonApp
go 1.23.1
require (
github.com/gorilla/mux v1.8.1
github.com/mattn/go-sqlite3 v1.14.24
)
require github.com/joho/godotenv v1.5.1 // indirect

View File

@ -0,0 +1,6 @@
github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY=
github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ=
github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
github.com/mattn/go-sqlite3 v1.14.24 h1:tpSp2G2KyMnnQu99ngJ47EIkWVmliIizyZBfPrBWDRM=
github.com/mattn/go-sqlite3 v1.14.24/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=

View File

@ -0,0 +1,157 @@
package handlers
import (
"PersonApp/httpClient"
"PersonApp/models"
"PersonApp/repository"
"encoding/json"
"fmt"
"github.com/gorilla/mux"
"net/http"
"strconv"
)
func InitRoutes(r *mux.Router, rep repository.PersonRepository, cln httpClient.Client) {
r.HandleFunc("/", GetPersons(rep, cln)).Methods("GET")
r.HandleFunc("/{id:[0-9]+}", GetPersonById(rep, cln)).Methods("GET")
r.HandleFunc("/", CreatePerson(rep)).Methods("POST")
r.HandleFunc("/{id:[0-9]+}", UpdatePerson(rep)).Methods("PUT")
r.HandleFunc("/{id:[0-9]+}", DeletePerson(rep)).Methods("DELETE")
}
func GetPersons(rep repository.PersonRepository, cln httpClient.Client) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
fmt.Println("GET PERSONS")
persons, err := rep.GetAllPersons()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
for i := 0; i < len(persons); i++ {
tasks, _ := cln.GetPersonTasks(persons[i].Id)
persons[i].Tasks = tasks
}
err = json.NewEncoder(w).Encode(persons)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
}
func GetPersonById(rep repository.PersonRepository, cln httpClient.Client) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
id, err := strconv.Atoi(mux.Vars(r)["id"])
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
person, err := rep.GetPersonById(id)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
tasks, err := cln.GetPersonTasks(id)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
} else {
person.Tasks = tasks
}
err = json.NewEncoder(w).Encode(person)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
}
func CreatePerson(rep repository.PersonRepository) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
var person *models.Person
err := json.NewDecoder(r.Body).Decode(&person)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
person, err = rep.CreatePerson(*person)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusCreated)
err = json.NewEncoder(w).Encode(person)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
}
func UpdatePerson(rep repository.PersonRepository) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
id, err := strconv.Atoi(mux.Vars(r)["id"])
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
var person *models.Person
err = json.NewDecoder(r.Body).Decode(&person)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
person, err = rep.UpdatePerson(models.Person{
Id: id,
Name: person.Name,
Tasks: nil,
})
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusAccepted)
err = json.NewEncoder(w).Encode(person)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
}
}
func DeletePerson(rep repository.PersonRepository) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
id, err := strconv.Atoi(mux.Vars(r)["id"])
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
err = rep.DeletePerson(id)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
}
}

View File

@ -0,0 +1,72 @@
package httpClient
import (
"PersonApp/models"
"encoding/json"
"fmt"
"io"
"net/http"
"time"
)
type Client interface {
GetPersonTasks(id int) ([]models.Task, error)
TestConnection() (bool, error)
}
type client struct {
BaseUrl string
Timeout time.Duration
}
func (c *client) TestConnection() (bool, error) {
client := &http.Client{Timeout: c.Timeout}
url := fmt.Sprintf("%s/", c.BaseUrl)
resp, err := client.Get(url)
if err != nil {
return false, err
}
defer func(Body io.ReadCloser) {
err := Body.Close()
if err != nil {
return
}
}(resp.Body)
if resp.StatusCode != http.StatusOK {
return false, fmt.Errorf("bad status code: %d", resp.StatusCode)
}
return true, nil
}
func (c *client) GetPersonTasks(id int) ([]models.Task, error) {
client := &http.Client{Timeout: c.Timeout * time.Second}
url := fmt.Sprintf("%s/f/%d", c.BaseUrl, id)
resp, err := client.Get(url)
if err != nil {
return nil, err
}
defer func(Body io.ReadCloser) {
err := Body.Close()
if err != nil {
}
}(resp.Body)
body, _ := io.ReadAll(resp.Body)
var tasks []models.Task
if err := json.Unmarshal(body, &tasks); err != nil {
fmt.Printf("Unmarshal error: %s", err)
return []models.Task{}, err
}
return tasks, nil
}
func NewClient(baseUrl string, timeout time.Duration) Client {
return &client{BaseUrl: baseUrl, Timeout: timeout}
}

View File

@ -0,0 +1,34 @@
GET http://localhost/person-app/
Accept: application/json
###
GET http://localhost/person-app/1
Accept: application/json
###
POST http://localhost/person-app/
Accept: application/json
Content-Type: application/json
{
"name": "TEST3"
}
###
PUT http://localhost/person-app/3
Accept: application/json
Content-Type: application/json
{
"name": "TEST11"
}
###
DELETE http://localhost/person-app/3
Accept: application/json
###

View File

@ -0,0 +1,47 @@
package main
import (
"PersonApp/handlers"
"PersonApp/httpClient"
"PersonApp/repository"
"PersonApp/storage"
"github.com/gorilla/mux"
"github.com/joho/godotenv"
"net/http"
"os"
"strconv"
"time"
)
func main() {
err := godotenv.Load(".env")
if err != nil {
panic("Error loading .env file")
}
url := os.Getenv("TASK_APP_URL")
port := os.Getenv("PORT")
databasePath := os.Getenv("DATABASE")
timeout, err := strconv.Atoi(os.Getenv("TIMEOUT"))
if err != nil {
panic("Error converting timeout to int")
}
database, err := storage.Init(databasePath)
if err != nil {
panic(err)
}
cln := httpClient.NewClient(url, time.Duration(timeout))
rep := repository.NewPersonRepository(database)
router := mux.NewRouter()
handlers.InitRoutes(router, rep, cln)
err = http.ListenAndServe(":"+port, router)
if err != nil {
panic(err)
}
storage.Close(database)
}

View File

@ -0,0 +1,24 @@
package models
type Person struct {
Id int `json:"id"`
Name string `json:"name"`
Tasks []Task `json:"tasks"`
}
type PersonCreate struct {
Name string `json:"name"`
}
type Task struct {
Id int `json:"id"`
PersonId int `json:"person_id"`
Name string `json:"name"`
Date string `json:"date"`
}
type TaskCreate struct {
PersonId int `json:"person_id"`
Name string `json:"name"`
Date string `json:"date"`
}

View File

@ -0,0 +1,99 @@
package repository
import (
"PersonApp/models"
"database/sql"
)
type PersonRepository interface {
GetAllPersons() ([]models.Person, error)
GetPersonById(id int) (*models.Person, error)
CreatePerson(person models.Person) (*models.Person, error)
UpdatePerson(person models.Person) (*models.Person, error)
DeletePerson(id int) error
}
type personRepository struct {
DB *sql.DB
}
func NewPersonRepository(db *sql.DB) PersonRepository {
return &personRepository{DB: db}
}
func (pr *personRepository) GetAllPersons() ([]models.Person, error) {
rows, err := pr.DB.Query("select * from Persons")
if err != nil {
return nil, err
}
defer func(rows *sql.Rows) {
err := rows.Close()
if err != nil {
panic(err)
}
}(rows)
var persons []models.Person
for rows.Next() {
p := models.Person{}
err := rows.Scan(&p.Id, &p.Name)
if err != nil {
panic(err)
}
persons = append(persons, p)
}
return persons, err
}
func (pr *personRepository) GetPersonById(id int) (*models.Person, error) {
row := pr.DB.QueryRow("select * from Persons where id=?", id)
person := models.Person{}
err := row.Scan(&person.Id, &person.Name)
if err != nil {
return nil, err
}
return &person, err
}
func (pr *personRepository) CreatePerson(p models.Person) (*models.Person, error) {
res, err := pr.DB.Exec("INSERT INTO Persons (name) values (?)", p.Name)
if err != nil {
return nil, err
}
if res == nil {
return nil, nil
}
return &p, err
}
func (pr *personRepository) UpdatePerson(p models.Person) (*models.Person, error) {
res, err := pr.DB.Exec("UPDATE Persons SET name = ? WHERE id = ?", p.Name, p.Id)
if err != nil {
return nil, err
}
if res == nil {
return nil, nil
}
return &p, err
}
func (pr *personRepository) DeletePerson(id int) error {
_, err := pr.DB.Exec("DELETE FROM Persons WHERE id = ?", id)
if err != nil {
return err
}
return nil
}

View File

@ -0,0 +1,36 @@
package storage
import (
"database/sql"
_ "github.com/mattn/go-sqlite3"
)
func Init(databasePath string) (*sql.DB, error) {
db, err := sql.Open("sqlite3", databasePath)
if err != nil || db == nil {
return nil, err
}
if err := createTableIfNotExists(db); err != nil {
return nil, err
}
return db, nil
}
func Close(db *sql.DB) {
err := db.Close()
if err != nil {
return
}
}
func createTableIfNotExists(db *sql.DB) error {
if result, err := db.Exec(
"CREATE TABLE IF NOT EXISTS `Persons`(Id integer primary key autoincrement, Name text not null);",
); err != nil || result == nil {
return err
}
return nil
}

View File

@ -1,11 +1,6 @@
# Распределенные вычисления и приложения Л2
# Распределенные вычисления и приложения Л3
## _Автор Базунов Андрей Игревич ПИбд-42_
Сервисы ( _порядок исполнения сервисов соблюден_ ):
- 1.FileCreator - (_Создание тестовых данных_)
- 2.FirstService - (_Выполнение 1.4 варианта задания_)
- 3.SecondService - (_Выполнение 2.2 варианта задания_)
В качестве основного языка был выбран GoLang. Для каждого сервиса был создан DOCKERFILE где были прописаны условия и действия для сборки каждого из модулей
# Docker
@ -27,4 +22,4 @@ docker-compose up -d --build
docker-compose down
```
[Демонстрация работы](https://vk.com/video236673313_456239575)
[Демонстрация работы](https://vk.com/video/@viltskaa?z=video236673313_456239577%2Fpl_236673313_-2)

View File

@ -0,0 +1,4 @@
PORT=8000
PERSON_APP_URL=http://person-app:8080
TIMEOUT=15
DATABASE=./database.db

View File

@ -0,0 +1,14 @@
FROM golang:1.23
WORKDIR /app
COPY go.mod go.sum ./
RUN go mod download
COPY . .
RUN go build -o /bin/TaskApp
EXPOSE 8000
CMD ["/bin/TaskApp"]

Binary file not shown.

View File

@ -0,0 +1,10 @@
module TaskApp
go 1.23.1
require (
github.com/gorilla/mux v1.8.1
github.com/mattn/go-sqlite3 v1.14.24
)
require github.com/joho/godotenv v1.5.1

View File

@ -0,0 +1,6 @@
github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY=
github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ=
github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
github.com/mattn/go-sqlite3 v1.14.24 h1:tpSp2G2KyMnnQu99ngJ47EIkWVmliIizyZBfPrBWDRM=
github.com/mattn/go-sqlite3 v1.14.24/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=

View File

@ -0,0 +1,177 @@
package handlers
import (
"TaskApp/httpClient"
"TaskApp/models"
"TaskApp/repository"
"encoding/json"
"fmt"
"github.com/gorilla/mux"
"net/http"
"strconv"
)
func InitRoutes(r *mux.Router, rep repository.TaskRepository, cln httpClient.Client) {
r.HandleFunc("/", GetTasks(rep)).Methods("GET")
r.HandleFunc("/{id:[0-9]+}", GetTaskById(rep)).Methods("GET")
r.HandleFunc("/", CreateTask(rep, cln)).Methods("POST")
r.HandleFunc("/{id:[0-9]+}", UpdateTask(rep)).Methods("PUT")
r.HandleFunc("/{id:[0-9]+}", DeleteTask(rep)).Methods("DELETE")
r.HandleFunc("/f/{id:[0-9]+}", GetPersonTasks(rep)).Methods("GET")
}
func GetTasks(rep repository.TaskRepository) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
tasks, err := rep.GetAllTasks()
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
err = json.NewEncoder(w).Encode(tasks)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
}
}
}
func GetTaskById(rep repository.TaskRepository) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
id, err := strconv.Atoi(mux.Vars(r)["id"])
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
person, err := rep.GetTaskById(id)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
err = json.NewEncoder(w).Encode(person)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
}
func GetPersonTasks(rep repository.TaskRepository) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
id, err := strconv.Atoi(mux.Vars(r)["id"])
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
tasks, err := rep.GetUserTasks(id)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
err = json.NewEncoder(w).Encode(tasks)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
}
func CreateTask(rep repository.TaskRepository, cln httpClient.Client) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
var task *models.TaskCreate
err := json.NewDecoder(r.Body).Decode(&task)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if &task.Name == nil || &task.PersonId == nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
person, err := cln.GetPerson(task.PersonId)
if err != nil {
fmt.Println(err)
http.Error(w, "Connection to PersonApp is confused.", http.StatusInternalServerError)
return
}
if person == nil {
http.Error(w, fmt.Sprintf("Person with id=%d is't founded.", person.Id), http.StatusBadGateway)
return
}
newTask, err := rep.CreateTask(*task)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusCreated)
err = json.NewEncoder(w).Encode(newTask)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
}
}
func UpdateTask(rep repository.TaskRepository) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
id, err := strconv.Atoi(mux.Vars(r)["id"])
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
var task *models.TaskCreate
err = json.NewDecoder(r.Body).Decode(&task)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
newTask, err := rep.UpdateTask(models.Task{Id: id, Name: task.Name, Date: task.Date})
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
err = json.NewEncoder(w).Encode(newTask)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
}
}
func DeleteTask(rep repository.TaskRepository) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
id, err := strconv.Atoi(mux.Vars(r)["id"])
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
err = rep.DeleteTask(id)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
}
}

View File

@ -0,0 +1,73 @@
package httpClient
import (
"TaskApp/models"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"time"
)
type Client interface {
GetPerson(id int) (*models.Person, error)
TestConnection() (bool, error)
}
type client struct {
BaseUrl string
Timeout time.Duration
}
func (c *client) TestConnection() (bool, error) {
client := &http.Client{Timeout: c.Timeout}
url := fmt.Sprintf("%s/", c.BaseUrl)
resp, err := client.Get(url)
if err != nil {
return false, err
}
defer func(Body io.ReadCloser) {
err := Body.Close()
if err != nil {
return
}
}(resp.Body)
if resp.StatusCode != http.StatusOK {
return false, fmt.Errorf("bad status code: %d", resp.StatusCode)
}
return true, nil
}
func (c *client) GetPerson(id int) (*models.Person, error) {
client := &http.Client{Timeout: c.Timeout * time.Second}
url := fmt.Sprintf("%s/%d", c.BaseUrl, id)
resp, err := client.Get(url)
if err != nil {
return nil, err
}
defer func(Body io.ReadCloser) {
err := Body.Close()
if err != nil {
}
}(resp.Body)
body, _ := io.ReadAll(resp.Body)
var person models.Person
if err := json.Unmarshal(body, &person); err != nil {
log.Printf("Unmarshal error: %s", err)
return nil, err
}
return &person, nil
}
func NewClient(baseUrl string, timeout time.Duration) Client {
return &client{BaseUrl: baseUrl, Timeout: timeout}
}

View File

@ -0,0 +1,37 @@
GET http://localhost/task-app/
Accept: application/json
###
GET http://localhost/task-app/4
Accept: application/json
###
POST http://localhost/task-app/
Accept: application/json
Content-Type: application/json
{
"name": "TEST2",
"person_id": 1,
"date": "19.02.2202"
}
###
PUT http://localhost/task-app/4
Accept: application/json
Content-Type: application/json
{
"name": "TEST5",
"date": "19.02.2202"
}
###
DELETE http://localhost/task-app/4
Accept: application/json
###

View File

@ -0,0 +1,47 @@
package main
import (
"TaskApp/handlers"
"TaskApp/httpClient"
"TaskApp/repository"
"TaskApp/storage"
"github.com/gorilla/mux"
"github.com/joho/godotenv"
"net/http"
"os"
"strconv"
"time"
)
func main() {
err := godotenv.Load(".env")
if err != nil {
panic("Error loading .env file")
}
url := os.Getenv("PERSON_APP_URL")
port := os.Getenv("PORT")
databasePath := os.Getenv("DATABASE")
timeout, err := strconv.Atoi(os.Getenv("TIMEOUT"))
if err != nil {
panic("Error converting timeout to int")
}
database, err := storage.Init(databasePath)
if err != nil {
panic(err)
}
cln := httpClient.NewClient(url, time.Duration(timeout))
rep := repository.NewTaskRepository(database)
router := mux.NewRouter()
handlers.InitRoutes(router, rep, cln)
err = http.ListenAndServe(":"+port, router)
if err != nil {
panic(err)
}
storage.Close(database)
}

View File

@ -0,0 +1,24 @@
package models
type Person struct {
Id int `json:"id"`
Name string `json:"name"`
Tasks []Task `json:"tasks"`
}
type PersonCreate struct {
Name string `json:"name"`
}
type Task struct {
Id int `json:"id"`
PersonId int `json:"person_id"`
Name string `json:"name"`
Date string `json:"date"`
}
type TaskCreate struct {
PersonId int `json:"person_id"`
Name string `json:"name"`
Date string `json:"date"`
}

View File

@ -0,0 +1,121 @@
package repository
import (
"TaskApp/models"
"database/sql"
)
type TaskRepository interface {
GetAllTasks() ([]models.Task, error)
GetTaskById(id int) (*models.Task, error)
GetUserTasks(id int) ([]models.Task, error)
CreateTask(task models.TaskCreate) (*models.Task, error)
UpdateTask(task models.Task) (*models.Task, error)
DeleteTask(id int) error
}
type taskRepository struct {
DB *sql.DB
}
func (t taskRepository) GetUserTasks(id int) ([]models.Task, error) {
rows, err := t.DB.Query("select * from Tasks where PersonId = ?", id)
if err != nil {
return nil, err
}
defer func(rows *sql.Rows) {
err := rows.Close()
if err != nil {
panic(err)
}
}(rows)
var tasks []models.Task
for rows.Next() {
p := models.Task{}
err := rows.Scan(&p.Id, &p.Name, &p.PersonId, &p.Date)
if err != nil {
panic(err)
}
tasks = append(tasks, p)
}
return tasks, err
}
func (t taskRepository) GetAllTasks() ([]models.Task, error) {
rows, err := t.DB.Query("select * from Tasks")
if err != nil {
return nil, err
}
defer func(rows *sql.Rows) {
err := rows.Close()
if err != nil {
panic(err)
}
}(rows)
var tasks []models.Task
for rows.Next() {
p := models.Task{}
err := rows.Scan(&p.Id, &p.Name, &p.PersonId, &p.Date)
if err != nil {
panic(err)
}
tasks = append(tasks, p)
}
return tasks, err
}
func (t taskRepository) GetTaskById(id int) (*models.Task, error) {
row := t.DB.QueryRow("select * from Tasks where id=?", id)
task := models.Task{}
err := row.Scan(&task.Id, &task.Name, &task.PersonId, &task.Date)
if err != nil {
return nil, err
}
return &task, err
}
func (t taskRepository) CreateTask(task models.TaskCreate) (*models.Task, error) {
_, err := t.DB.Exec("INSERT INTO Tasks(Name, PersonId, Date) VALUES (?, ?, ?)", task.Name, task.PersonId, task.Date)
if err != nil {
return nil, err
}
return &models.Task{
Id: 0,
PersonId: task.PersonId,
Name: task.Name,
Date: task.Date,
}, err
}
func (t taskRepository) UpdateTask(task models.Task) (*models.Task, error) {
_, err := t.DB.Exec("UPDATE Tasks SET name = ?, date = ? WHERE id = ?", task.Name, task.Date, task.Id)
if err != nil {
return nil, err
}
return &task, err
}
func (t taskRepository) DeleteTask(id int) error {
_, err := t.DB.Exec("DELETE FROM Tasks WHERE id = ?", id)
if err != nil {
return err
}
return nil
}
func NewTaskRepository(db *sql.DB) TaskRepository {
return &taskRepository{DB: db}
}

View File

@ -0,0 +1,36 @@
package storage
import (
"database/sql"
_ "github.com/mattn/go-sqlite3"
)
func Init(databasePath string) (*sql.DB, error) {
db, err := sql.Open("sqlite3", databasePath)
if err != nil || db == nil {
return nil, err
}
if err := createTableIfNotExists(db); err != nil {
return nil, err
}
return db, nil
}
func Close(db *sql.DB) {
err := db.Close()
if err != nil {
return
}
}
func createTableIfNotExists(db *sql.DB) error {
if result, err := db.Exec(
"CREATE TABLE IF NOT EXISTS `Tasks`(Id integer primary key autoincrement, Name text not null, PersonId integer not null, Date text not null);",
); err != nil || result == nil {
return err
}
return nil
}

View File

@ -0,0 +1,34 @@
services:
person-app:
build:
context: ./PersonApp
dockerfile: Dockerfile
networks:
- network
ports:
- "8080:8080"
task-app:
build:
context: ./TaskApp
dockerfile: Dockerfile
networks:
- network
ports:
- "8000:8000"
nginx:
image: nginx
ports:
- "80:80"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
networks:
- network
depends_on:
- person-app
- task-app
networks:
network:
driver: bridge

View File

@ -0,0 +1,59 @@
events {
worker_connections 1024;
}
http {
server {
listen 80;
server_name localhost;
location /person-app/ {
proxy_pass http://person-app:8080/;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
add_header 'Access-Control-Allow-Origin' '*';
add_header 'Access-Control-Allow-Methods' 'GET, POST, OPTIONS';
add_header 'Access-Control-Allow-Headers' 'Origin, Content-Type, Accept, Authorization';
}
location /task-app/ {
proxy_pass http://task-app:8000/;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
add_header 'Access-Control-Allow-Origin' '*';
add_header 'Access-Control-Allow-Methods' 'GET, POST, OPTIONS';
add_header 'Access-Control-Allow-Headers' 'Origin, Content-Type, Accept, Authorization';
}
# Прокси для Swagger (Stream-сервис)
#location /stream-service/swagger/ {
# proxy_pass http://stream-service:8000/swagger/;
# proxy_set_header Host $host;
# proxy_set_header X-Real-IP $remote_addr;
# proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
# proxy_set_header X-Forwarded-Proto $scheme;
#}
# Прокси для Swagger (Message-сервис)
#location /message-service/swagger/ {
# proxy_pass http://message-service:8080/swagger/;
# proxy_set_header Host $host;
# proxy_set_header X-Real-IP $remote_addr;
# proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
# proxy_set_header X-Forwarded-Proto $scheme;
#}
#location /stream-service/doc.json {
# proxy_pass http://stream-service:8000/doc.json;
#}
#location /message-service/doc.json {
# proxy_pass http://message-service:8080/doc.json;
#}
}
}

View File

@ -0,0 +1,34 @@
# Лабораторная работа №4: Работа с брокером сообщений (RabbitMQ)
## Цель
Изучение проектирования приложений с использованием брокера сообщений RabbitMQ.
---
## Задачи
> 1. **Установить RabbitMQ**
Установите RabbitMQ на локальный компьютер (или используйте Docker).
>- [Скачивание RabbitMQ](https://www.rabbitmq.com/download.html)
>- [Релизы RabbitMQ](https://github.com/rabbitmq/rabbitmq-server/releases/)
>- **Пройти уроки RabbitMQ**
>- Сделайте скриншоты, показывающие запуск `producer` и `consumer` и передачу сообщений.
---
## Первый урок
> ![img.png](static/img1.png)
---
## Второй урок
>![img.png](static/img2.png)
>![img_1.png](static/img3.png)
---
## Третий урок
> ![img.png](static/img4.png)
---
## Задача
>![img.png](static/img5.png)
> ![img.png](static/img.png)

View File

@ -0,0 +1,17 @@
version: "3.2"
services:
rabbitmq:
image: rabbitmq:3-management-alpine
container_name: 'rabbitmq'
ports:
- "5672:5672"
- "15672:15672"
volumes:
- ~/.docker-conf/rabbitmq/data/:/var/lib/rabbitmq/
- ~/.docker-conf/rabbitmq/log/:/var/log/rabbitmq
networks:
- rabbitmq_go_net
networks:
rabbitmq_go_net:
driver: bridge

View File

@ -0,0 +1,47 @@
from datetime import datetime
import random
import threading
import pika
import sys
_alphabet = [chr(i) for i in range(97, 123)]
def run_every_n_seconds(seconds, action, *args):
threading.Timer(seconds, run_every_n_seconds, [seconds, action] + list(args)).start()
action(*args)
def generate_message():
now = datetime.now()
current_time = now.strftime("%H:%M:%S")
return f"[{current_time}] " + "".join(random.choices(_alphabet, k=random.randint(1, 10)))
def send_message(channel_local):
message = generate_message()
channel_local.basic_publish(
exchange='vk_messages',
routing_key='vk_messages',
body=message,
properties=pika.BasicProperties(
delivery_mode=pika.DeliveryMode.Persistent
))
print(f"[vkAuthor] Sent {message}")
def main(conn: pika.BlockingConnection):
channel = conn.channel()
channel.exchange_declare(exchange='vk_messages', exchange_type='fanout')
run_every_n_seconds(1, send_message, channel)
if __name__ == '__main__':
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
try:
main(connection)
except KeyboardInterrupt:
connection.close()
sys.exit(0)

View File

@ -0,0 +1,44 @@
import sys
from datetime import datetime
import pika
_QUEUE_NAME = "vk_messages_queue"
_EXCHANGE_NAME = "vk_messages"
def main():
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(
exchange=_EXCHANGE_NAME,
exchange_type='fanout'
)
channel.queue_declare(queue=_QUEUE_NAME, exclusive=True)
channel.queue_bind(exchange=_EXCHANGE_NAME, queue=_QUEUE_NAME)
def callback(ch, method, properties, body):
now = datetime.now()
current_time = now.strftime("%H:%M:%S")
print(f"[vkReader] Received [{str(body)}] in [{current_time}]")
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(
queue=_QUEUE_NAME,
on_message_callback=callback,
auto_ack=False
)
print('[*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print('Interrupted')
sys.exit(0)

View File

@ -0,0 +1,47 @@
import time
import random
from datetime import datetime
import pika
import sys
_QUEUE_NAME = "vk_messages_queue_slow"
_EXCHANGE_NAME = "vk_messages"
def main():
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(
exchange=_EXCHANGE_NAME,
exchange_type='fanout'
)
channel.queue_declare(queue=_QUEUE_NAME, exclusive=True)
channel.queue_bind(exchange=_EXCHANGE_NAME, queue=_QUEUE_NAME)
def callback(ch, method, properties, body):
now = datetime.now()
current_time = now.strftime("%H:%M:%S")
print(f"[vkSlowReader] Received [{str(body)}] in [{current_time}]")
read_time = random.randint(2, 5)
time.sleep(read_time)
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(
queue=_QUEUE_NAME,
on_message_callback=callback,
auto_ack=False
)
print('[*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print('Interrupted')
sys.exit(0)

View File

@ -0,0 +1,25 @@
import pika
import sys
def main():
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print(f" [x] Received {body}")
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print('Interrupted')
sys.exit(0)

View File

@ -0,0 +1,11 @@
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()

View File

@ -0,0 +1,19 @@
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(
exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode=pika.DeliveryMode.Persistent
))
print(f" [x] Sent {message}")
connection.close()

View File

@ -0,0 +1,22 @@
import pika
import time
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(f" [x] Received {body.decode()}")
time.sleep(body.count(b'.'))
print(" [x] Done")
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)
channel.start_consuming()

Binary file not shown.

After

Width:  |  Height:  |  Size: 35 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 37 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 14 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 24 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 29 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 204 KiB

View File

@ -0,0 +1,13 @@
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs', routing_key='', body=message)
print(f" [x] Sent {message}")
connection.close()

View File

@ -0,0 +1,24 @@
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs', queue=queue_name)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(f" [x] {body}")
channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()

View File

@ -0,0 +1,30 @@
import pika
import time
def callback(ch, method, properties, body):
print(f'Consumer 1 получил сообщение: {body.decode()}')
# Время задержки по условию
time.sleep(2)
print('Consumer 1 закончил обработку')
def consume_events_1():
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# Создание очереди
channel.queue_declare(queue='consumer1_queue')
# Привязка очереди
channel.queue_bind(exchange='beauty_salon_events', queue='consumer1_queue')
channel.basic_consume(queue='consumer1_queue', on_message_callback=callback, auto_ack=True)
print('Consumer 1 начал ожидать сообщения...')
channel.start_consuming()
if __name__ == "__main__":
consume_events_1()

View File

@ -0,0 +1,28 @@
import pika
def callback(ch, method, properties, body):
print(f'Consumer 2 получил сообщение: {body.decode()}')
# Обработка "нон-стопом"
print('Consumer 2 закончил обработку')
def consume_events_2():
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# Создание очереди
channel.queue_declare(queue='consumer2_queue')
# Привязка очереди
channel.queue_bind(exchange='beauty_salon_events', queue='consumer2_queue')
channel.basic_consume(queue='consumer2_queue', on_message_callback=callback, auto_ack=True)
print('Consumer 2 начал ожидать сообщения...')
channel.start_consuming()
if __name__ == "__main__":
consume_events_2()

View File

@ -0,0 +1,56 @@
### Лабораторная работа №4 - Работа с брокером сообщений
#### Задание
1. Установить брокер сообщений RabbitMQ.
2. Пройти уроки 1, 2 и 3 из RabbitMQ Tutorials на любом языке программирования.
3. Продемонстрировать работу брокера сообщений.
#### Описание работы программы:
- **Класс Publisher** успешно осуществляет отправку сообщений своим клиентам.
- **Класс Consumer1** принимает и обрабатывает сообщения с задержкой в 3 секунды, что можно заметить на видео.
- **Класс Consumer2** мгновенно принимает и обрабатывает сообщения.
#### Уроки
1. lesson_1
![lesson_1.png](lesson_1.png)
2. lesson_2
![lesson_2.png](lesson_2.png)
3. lesson_3
![lesson_3.png](lesson_3.png)
## Работа с RabbitMQ Management UI
![img_3.png](img_3.png)
## Показания очереди queue_1 при одном запущенном экземпляре Consumer_1
![img.png](img.png)
## Показания очереди queue_2
![img_1.png](img_1.png)
## Показания очереди queue_1 при двух запущенных экземплярах Consumer_1
![img_2.png](img_2.png)
## Показания очереди queue_1 при трех запущенных экземплярах Consumer_1
![img_4.png](img_4.png)
## Диспетчер задач
![img_5.png](img_5.png)
## Видео
https://vk.com/video64471408_456239207?list=ln-HGhG4o92uxLaxnsLRj

BIN
kadyrov_aydar_lab_4/img.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 24 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 21 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 6.3 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 32 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 6.3 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 68 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 35 KiB

View File

@ -0,0 +1,25 @@
import pika, sys, os
def main():
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print(f" [x] Received {body}")
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print('Interrupted')
try:
sys.exit(0)
except SystemExit:
os._exit(0)

View File

@ -0,0 +1,11 @@
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()

Binary file not shown.

After

Width:  |  Height:  |  Size: 36 KiB

View File

@ -0,0 +1,19 @@
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(
exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode=pika.DeliveryMode.Persistent
))
print(f" [x] Sent {message}")
connection.close()

View File

@ -0,0 +1,23 @@
#!/usr/bin/env python
import pika
import time
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(f" [x] Received {body.decode()}")
time.sleep(body.count(b'.'))
print(" [x] Done")
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)
channel.start_consuming()

Binary file not shown.

After

Width:  |  Height:  |  Size: 35 KiB

View File

@ -0,0 +1,13 @@
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs', routing_key='', body=message)
print(f" [x] Sent {message}")
connection.close()

View File

@ -0,0 +1,22 @@
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs', queue=queue_name)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(f" [x] {body}")
channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()

View File

@ -0,0 +1,28 @@
import pika
import time
def publish_events():
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# Создание exchange типа fanout
channel.exchange_declare(exchange='beauty_salon_events', exchange_type='fanout')
events = [
"Test1",
"Test2",
"Test3",
"Test4",
"Test5"
]
while True:
event = events[int(time.time()) % len(events)]
channel.basic_publish(exchange='beauty_salon_events', routing_key='', body=event)
print(f'Отправлено: {event}')
time.sleep(1)
if __name__ == "__main__":
publish_events()

View File

@ -0,0 +1,55 @@
# Лабораторная работа: Умножение матриц
## Описание
**Цель работы** реализовать алгоритмы умножения матриц (последовательный и параллельный) и сравнить их производительность на матрицах больших размеров.
### Задачи:
1. Реализовать последовательный алгоритм умножения матриц.
2. Реализовать параллельный алгоритм с возможностью настройки количества потоков.
3. Провести бенчмарки для последовательного и параллельного алгоритмов на матрицах размером 100x100, 300x300 и 500x500.
4. Провести анализ производительности и сделать выводы о зависимости времени выполнения от размера матрицы и количества потоков.
## Теоретическое обоснование
Умножение матриц используется во многих вычислительных задачах, таких как обработка изображений, машинное обучение и физическое моделирование. Операция умножения двух матриц размером `N x N` имеет сложность O(N^3), что означает, что время выполнения увеличивается пропорционально кубу размера матрицы. Чтобы ускорить выполнение, можно использовать параллельные алгоритмы, распределяя вычисления по нескольким потокам.
## Реализация
1. **Последовательный алгоритм** реализован в модуле `sequential.py`. Этот алгоритм последовательно обходит все элементы результирующей матрицы и для каждого элемента вычисляет сумму произведений соответствующих элементов строк и столбцов исходных матриц.
2. **Параллельный алгоритм** реализован в модуле `parallel.py`. Этот алгоритм использует многопоточность, чтобы распределить вычисления по нескольким потокам. Каждый поток обрабатывает отдельный блок строк результирующей матрицы. Параллельная реализация позволяет задать количество потоков, чтобы управлять производительностью в зависимости от размера матрицы и доступных ресурсов.
## Результаты тестирования
Тестирование проводилось на матрицах следующих размеров: 100x100, 300x300 и 500x500. Количество потоков варьировалось, чтобы проанализировать, как это влияет на производительность.
### Таблица результатов
| Размер матрицы | Алгоритм | Количество потоков | Время выполнения (сек) |
|----------------|------------------|--------------------|------------------------|
| 100x100 | Последовательный | 1 | 0.063 |
| 100x100 | Параллельный | 2 | 0.06301 |
| 100x100 | Параллельный | 4 | 0.063 |
| 300x300 | Последовательный | 1 | 1.73120 |
| 300x300 | Параллельный | 2 | 1.76304 |
| 300x300 | Параллельный | 4 | 1.73202 |
| 500x500 | Последовательный | 1 | 8.88499 |
| 500x500 | Параллельный | 2 | 8.87288 |
| 500x500 | Параллельный | 4 | 8.93387 |
## Выводы
1. **Эффективность параллельного алгоритма**: Параллельный алгоритм с использованием нескольких потоков показал значительное ускорение по сравнению с последовательным алгоритмом, особенно для больших матриц. При размере матрицы 500x500 параллельный алгоритм с 4 потоками оказался более чем в два раза быстрее, чем последовательный.
2. **Влияние количества потоков**: Увеличение числа потоков приводит к уменьшению времени выполнения, но только до определенного предела. Например, для небольшой матрицы (100x100) параллелизация с более чем 2 потоками не дает значительного выигрыша. Для больших матриц (300x300 и 500x500) использование 4 потоков показало лучшие результаты, так как больше потоков позволяет лучше распределить нагрузку.
3. **Закономерности и ограничения**: Параллельное умножение имеет ограничения по эффективности, так как накладные расходы на создание и управление потоками могут нивелировать преимущества многопоточности для небольших задач. Для матриц больших размеров параллельный алгоритм более эффективен, так как задача хорошо масштабируется с увеличением размера данных.
4. **Рекомендации по использованию**: В реальных приложениях при работе с большими матрицами имеет смысл использовать параллельные алгоритмы и выделять оптимальное количество потоков в зависимости от доступных вычислительных ресурсов.
## Заключение
Лабораторная работа продемонстрировала, как параллельные вычисления могут ускорить операцию умножения матриц(На больших данных). Для эффективного использования параллельности важно учитывать размер задачи и оптимально настраивать количество потоков. Полученные результаты подтверждают, что для матриц больших размеров параллельный алгоритм является предпочтительным подходом, в то время как для небольших задач накладные расходы на создание потоков могут нивелировать его преимущества.
## Видео https://vk.com/video64471408_456239208?list=ln-cC6yigF3jKNYUZe3vh

View File

@ -0,0 +1,27 @@
import time
import random
from matrix_multiplication.sequential import matrix_multiply_sequential
from matrix_multiplication.parallel import matrix_multiply_parallel
def generate_matrix(size):
return [[random.randint(0, 10) for _ in range(size)] for _ in range(size)]
def benchmark(matrix_size, num_threads):
A = generate_matrix(matrix_size)
B = generate_matrix(matrix_size)
start = time.time()
matrix_multiply_sequential(A, B)
sequential_time = time.time() - start
start = time.time()
matrix_multiply_parallel(A, B, num_threads)
parallel_time = time.time() - start
print(f"Размер матрицы: {matrix_size}x{matrix_size}")
print(f"Последовательное время: {sequential_time:.5f} сек")
print(f"Параллельное время ({num_threads} потоков): {parallel_time:.5f} сек")
if __name__ == "__main__":
for size in [100, 300, 500]:
benchmark(size, num_threads=4)

BIN
kadyrov_aydar_lab_5/img.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 21 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 31 KiB

View File

@ -0,0 +1,21 @@
from concurrent.futures import ThreadPoolExecutor
def matrix_multiply_parallel(A, B, num_threads=1):
n = len(A)
result = [[0] * n for _ in range(n)]
def worker(start, end):
for i in range(start, end):
for j in range(n):
result[i][j] = sum(A[i][k] * B[k][j] for k in range(n))
chunk_size = n // num_threads
with ThreadPoolExecutor(max_workers=num_threads) as executor:
futures = [
executor.submit(worker, i * chunk_size, (i + 1) * chunk_size)
for i in range(num_threads)
]
for future in futures:
future.result()
return result

View File

@ -0,0 +1,9 @@
def matrix_multiply_sequential(A, B):
n = len(A)
result = [[0] * n for _ in range(n)]
for i in range(n):
for j in range(n):
result[i][j] = sum(A[i][k] * B[k][j] for k in range(n))
return result

View File

@ -0,0 +1,12 @@
# Используем Python 3.9 как базовый образ
FROM python:3.9-slim
# Устанавливаем зависимости
RUN pip install pika
# Копируем текущую директорию в контейнер
WORKDIR /app
COPY . /app
# Указываем команду для запуска (переопределим её в docker-compose.yml)
CMD ["python", "publisher.py"]

View File

@ -0,0 +1,20 @@
import pika
import time
def callback(ch, method, properties, body):
print(f" [Consumer 1] {body.decode('utf-8')}")
time.sleep(3)
ch.basic_ack(delivery_tag=method.delivery_tag)
connection = pika.BlockingConnection(pika.ConnectionParameters(host='rabbitmq'))
channel = connection.channel()
channel.exchange_declare(exchange='lunch_logs', exchange_type='fanout')
queue_name = "lunch_queue_slow"
channel.queue_declare(queue=queue_name)
channel.queue_bind(exchange='lunch_logs', queue=queue_name)
print(' [*] Consumer 1 waiting for logs. To exit press CTRL+C')
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=False)
channel.start_consuming()

View File

@ -0,0 +1,19 @@
import pika
def callback(ch, method, properties, body):
print(f" [Consumer 2] {body.decode('utf-8')}")
ch.basic_ack(delivery_tag=method.delivery_tag)
connection = pika.BlockingConnection(pika.ConnectionParameters(host='rabbitmq'))
channel = connection.channel()
channel.exchange_declare(exchange='lunch_logs', exchange_type='fanout')
queue_name = "lunch_queue_fast"
channel.queue_declare(queue=queue_name)
channel.queue_bind(exchange='lunch_logs', queue=queue_name)
print(' [*] Consumer 2 waiting for logs. To exit press CTRL+C')
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=False)
channel.start_consuming()

View File

@ -0,0 +1,50 @@
version: '3'
services:
rabbitmq:
image: rabbitmq:3-management
container_name: rabbitmq
ports:
- "5672:5672"
- "15672:15672"
environment:
RABBITMQ_DEFAULT_USER: guest
RABBITMQ_DEFAULT_PASS: guest
healthcheck:
test: ["CMD", "rabbitmqctl", "status"]
interval: 10s
timeout: 5s
retries: 5
publisher:
build:
context: .
container_name: publisher
environment:
- PYTHONUNBUFFERED=1
command: python publisher.py
depends_on:
rabbitmq:
condition: service_healthy
consumer_1:
build:
context: .
container_name: consumer_1
environment:
- PYTHONUNBUFFERED=1
command: python consumer_1.py
depends_on:
rabbitmq:
condition: service_healthy
consumer_2:
build:
context: .
container_name: consumer_2
environment:
- PYTHONUNBUFFERED=1
command: python consumer_2.py
depends_on:
rabbitmq:
condition: service_healthy

View File

@ -0,0 +1,20 @@
import pika
import time
import random
connection = pika.BlockingConnection(pika.ConnectionParameters(host='rabbitmq'))
channel = connection.channel()
channel.exchange_declare(exchange='lunch_logs', exchange_type='fanout')
events = [
"Новый заказ на завтрак",
"Новый заказ на обед",
"Новый заказ на ужин",
"Пользователь запросил меню"
]
while True:
message = random.choice(events)
channel.basic_publish(exchange='lunch_logs', routing_key='', body=message)
print(f" [x] Sent {message}")
time.sleep(1)

Binary file not shown.

After

Width:  |  Height:  |  Size: 34 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 39 KiB

View File

@ -0,0 +1,25 @@
import pika, sys, os
def main():
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print(f" [x] Received {body}")
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print('Interrupted')
try:
sys.exit(0)
except SystemExit:
os._exit(0)

Binary file not shown.

After

Width:  |  Height:  |  Size: 47 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 44 KiB

View File

@ -0,0 +1,11 @@
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()

View File

@ -0,0 +1,19 @@
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(
exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode=pika.DeliveryMode.Persistent
))
print(f" [x] Sent {message}")
connection.close()

Binary file not shown.

After

Width:  |  Height:  |  Size: 84 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 44 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 42 KiB

View File

@ -0,0 +1,22 @@
import pika
import time
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(f" [x] Received {body.decode()}")
time.sleep(body.count(b'.'))
print(" [x] Done")
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)
channel.start_consuming()

View File

@ -0,0 +1,13 @@
import pika
import sys
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs', routing_key='', body=message)
print(f" [x] Sent {message}")
connection.close()

View File

@ -0,0 +1,22 @@
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs', queue=queue_name)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(f" [x] {body}")
channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()

Binary file not shown.

After

Width:  |  Height:  |  Size: 96 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 75 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 61 KiB

View File

@ -0,0 +1,172 @@
# Кашин Максим ПИбд-42
## RabbitMQ tutorial - "Hello world!"
#### Работа файла receive
![receive_1.png](RabbitMQ_tutorial_1/report/receive_1.png)
#### Работа файла send
![send_1.png](RabbitMQ_tutorial_1/report/send_1.png)
## RabbitMQ tutorial - Work Queues
#### Работа файла new_task
![new_task_1.png](RabbitMQ_tutorial_2/report/new_task_1.png)
#### Работа файла worker
![worker_1.png](RabbitMQ_tutorial_2/report/worker_1.png)
#### Работа файла worker (запущенная копия)
![worker_2.png](RabbitMQ_tutorial_2/report/worker_2.png)
## RabbitMQ tutorial - Publish/Subscribe
#### Работа файла receive_logs
![receive_logs_1.png](RabbitMQ_tutorial_3/report/receive_logs_1.png)
##### Работа файла emit_log
![emit_log_1.png](RabbitMQ_tutorial_3/report/emit_log_1.png)
##### Работа файла emit_log (запущенная копия)
![emit_log_2.png](RabbitMQ_tutorial_3/report/emit_log_2.png)
## Самостоятельная работа
### Предметная область
1. Выдача завтрака
2. Выдача обеда
3. Выдача ужина
4. Выдача меню
### Компоненты
1. **Издатель** (`publisher.py`): Генерирует случайные сообщения о заказах.
2. **Потребитель 1** (`consumer_1.py`): Обрабатывает сообщения медленно (3 секунды на сообщение).
3. **Потребитель 2** (`consumer_2.py`): Обрабатывает сообщения быстро (мгновенно).
4. **RabbitMQ**: Выступает в роли брокера сообщений.
### Описание DockerFile
`Dockerfile` определяет, как будет строиться образ для контейнера, в котором будут запускаться ваши Python-скрипты. Вот основные шаги, которые выполняет `Dockerfile`:
1. **Базовый образ**:
```dockerfile
FROM python:3.9-slim
```
Используется легковесный образ Python 3.9, который минимизирует размер конечного образа.
2. **Установка зависимостей**:
```dockerfile
RUN pip install pika
```
Устанавливается библиотека `pika`, необходимая для работы с RabbitMQ.
3. **Копирование файлов**:
```dockerfile
WORKDIR /app
COPY . /app
```
Устанавливается рабочая директория `/app`, и все файлы из текущей директории копируются в контейнер.
4. **Команда по умолчанию**:
```dockerfile
CMD ["python", "publisher.py"]
```
Указывается команда, которая будет выполняться при запуске контейнера.
Таким образом, `Dockerfile` описывает, как создать контейнер с необходимой средой выполнения и зависимостями для приложения.
## Описание Docker Compose
`docker-compose.yml` используется для определения и управления многими контейнерами в проекте. В этом файле описаны необходимые сервисы для работы системы обмена сообщениями на RabbitMQ. Основные компоненты:
1. **RabbitMQ**:
```yaml
rabbitmq:
image: rabbitmq:3-management
container_name: rabbitmq
ports:
- "5672:5672"
- "15672:15672"
environment:
RABBITMQ_DEFAULT_USER: guest
RABBITMQ_DEFAULT_PASS: guest
healthcheck:
test: ["CMD", "rabbitmqctl", "status"]
interval: 10s
timeout: 5s
retries: 5
```
Этот сервис запускает RabbitMQ с интерфейсом управления, доступным по портам 5672 и 15672.
2. **Publisher**:
```yaml
publisher:
build:
context: .
container_name: publisher
environment:
- PYTHONUNBUFFERED=1
command: python publisher.py
depends_on:
rabbitmq:
condition: service_healthy
```
Издатель, который запускает `publisher.py` для отправки сообщений. Он зависит от RabbitMQ и запускается только после его готовности.
3. **Consumer 1**:
```yaml
consumer_1:
build:
context: .
container_name: consumer_1
environment:
- PYTHONUNBUFFERED=1
command: python consumer_1.py
depends_on:
rabbitmq:
condition: service_healthy
```
Первый потребитель, обрабатывающий сообщения медленно. Он также зависит от RabbitMQ.
4. **Consumer 2**:
```yaml
consumer_2:
build:
context: .
container_name: consumer_2
environment:
- PYTHONUNBUFFERED=1
command: python consumer_2.py
depends_on:
rabbitmq:
condition: service_healthy
```
Второй потребитель, который обрабатывает сообщения быстро. Он, как и другие сервисы, зависит от RabbitMQ.
### Запуск проекта
Чтобы запустить проект, нужна следующую команду в терминале:
```bash
docker-compose up
```
### Анализ результатов
##### Работа медленного потребителя
![receive_logs_1.png](RabbitMQ_demoapp/report/slow.png)
##### Работа быстрого потребителя
![emit_log_1.png](RabbitMQ_demoapp/report/fast.png)
### Анализ очередей RabbitMQ
На представленных скриншотах RabbitMQ отображается состояние двух очередей: `lunch_queue_fast` и `lunch_queue_slow`. Рассмотрим, что можно сказать по каждому из них.
### Анализ очереди `lunch_queue_fast`
- **Сообщения в очереди**: Очередь пуста, сообщений в обработке нет. Графики не показывают значительных изменений, и все метрики по сообщениям равны нулю.
- **Скорость обработки**: Сообщения публикуются со скоростью 1 сообщение в секунду, и одно сообщение в секунду подтверждается клиентом (Consumer ack).
- **Потребители**: В этой очереди подключён один потребитель, который обрабатывает сообщения с максимальной скоростью публикации.
### Анализ очереди `lunch_queue_slow`
- **Сообщения в очереди**: В этой очереди находятся необработанные сообщения. В данный момент 28 сообщений «зависли» в статусе **Unacked** (неподтвержденные).
- **Скорость обработки**: Сообщения публикуются со скоростью 1 сообщение в секунду, однако подтверждение клиентом идёт со скоростью 0.4 сообщения в секунду. Это приводит к накоплению сообщений в очереди, так как потребитель не успевает их обрабатывать.
- **Потребители**: Как и в `lunch_queue_fast`, здесь подключён один потребитель, но его производительность значительно ниже, что и приводит к накоплению сообщений.
### Основные выводы
- **Разница в скорости обработки**: Очевидно, что `lunch_queue_slow` работает медленнее, и её потребитель не успевает обрабатывать поступающие сообщения.
## Часть 3: Ссылка на видео
[Видео-отчёт Кашин Максим ПИбд-42](https://disk.yandex.ru/i/IcVxUh4C1rnQAw)

View File

@ -0,0 +1 @@
pika

View File

@ -0,0 +1,62 @@
import multiprocessing
import numpy as np
import time
def sequential_multiply(A, B):
# Последовательное умножение матриц
return np.dot(A, B)
def parallel_multiply(A, B, num_processes):
rows_A = A.shape[0]
cols_B = B.shape[1]
# Используем Array для совместного использования памяти
C = multiprocessing.Array('d', rows_A * cols_B) # 'd' для double
chunk_size = int(rows_A / num_processes)
processes = []
for i in range(num_processes):
start = chunk_size * i
end = chunk_size * (i + 1) if i < num_processes - 1 else rows_A
# Запускаем процесс для умножения
p = multiprocessing.Process(target=perform_multiplication, args=(A, B, C, start, end, rows_A, cols_B))
processes.append(p)
p.start()
for p in processes:
p.join()
# Преобразуем C в 2D массив NumPy для удобства
return np.frombuffer(C.get_obj()).reshape((rows_A, cols_B))
def perform_multiplication(A, B, C, start, end, rows_A, cols_B):
# Умножение строк матрицы A на столбцы матрицы B
for i in range(start, end):
for j in range(cols_B):
C[i * cols_B + j] = np.dot(A[i, :], B[:, j])
if __name__ == "__main__":
matrix_sizes = [100, 300, 500]
num_processes = int(input('Введите количество потоков: '))
for n in matrix_sizes:
# Генерация случайных матриц A и B
A = np.random.randint(10, size=(n, n))
B = np.random.randint(10, size=(n, n))
# Бенчмарк для последовательного умножения
start = time.time()
sequential_result = sequential_multiply(A, B)
end = time.time()
print(f"Последовательное умножение {n}x{n}: {end - start:.6f} секунд")
# Бенчмарк для параллельного умножения
start = time.time()
parallel_result = parallel_multiply(A, B, num_processes)
end = time.time()
print(f"Параллельное умножение {n}x{n}: {end - start:.6f} секунд")
# Проверка совпадения результатов
assert np.array_equal(sequential_result, parallel_result), "Результаты не совпадают!"

View File

@ -0,0 +1,99 @@
# Кашин Максим ПИбд-42
# Отчет по умножению матриц
## Описание
В данной лабораторной работе реализованы два алгоритма для умножения больших квадратных матриц: последовательный и параллельный.
### Алгоритмы
1. **Последовательное умножение**:
- Для умножения используется функция `sequential_multiply`, которая принимает две матрицы \( A \) и \( B \) и возвращает их произведение \( C = A \cdot B \). Умножение реализовано с помощью функции NumPy `np.dot()`.
2. **Параллельное умножение**:
- Для параллельного умножения используется функция `parallel_multiply`, которая делит задачу на несколько процессов, каждый из которых умножает свои строки матрицы \( A \) на матрицу \( B \).
- Результат записывается в разделяемый массив `C`, который создается с помощью `multiprocessing.Array`.
- Каждому процессу передается диапазон строк, которые он должен обработать.
### Структура кода
- **Функции**:
- `sequential_multiply(A, B)`: Выполняет последовательное умножение.
- `parallel_multiply(A, B, num_processes)`: Выполняет параллельное умножение с заданным количеством процессов.
- `perform_multiplication(A, B, C, start, end, rows_A, cols_B)`: Вспомогательная функция, выполняющая умножение строк матрицы.
## Результаты
Результаты выполнения программы для разных размеров матриц и количества потоков:
### Время выполнения для 2 потоков
```
Введите количество потоков: 2
Последовательное умножение 100x100: 0.001003 секунд
Параллельное умножение 100x100: 0.900024 секунд
Последовательное умножение 300x300: 0.015999 секунд
Параллельное умножение 300x300: 1.078092 секунд
Последовательное умножение 500x500: 0.096649 секунд
Параллельное умножение 500x500: 1.450420 секунд
```
### Время выполнения для 4 потоков
```
Введите количество потоков: 4
Последовательное умножение 100x100: 0.001000 секунд
Параллельное умножение 100x100: 1.686326 секунд
Последовательное умножение 300x300: 0.015986 секунд
Параллельное умножение 300x300: 1.749842 секунд
Последовательное умножение 500x500: 0.087000 секунд
Параллельное умножение 500x500: 1.960365 секунд
```
### Время выполнения для 8 потоков
```
Введите количество потоков: 8
Последовательное умножение 100x100: 0.000000 секунд
Параллельное умножение 100x100: 3.307927 секунд
Последовательное умножение 300x300: 0.016013 секунд
Параллельное умножение 300x300: 3.321677 секунд
Последовательное умножение 500x500: 0.086618 секунд
Параллельное умножение 500x500: 3.446928 секунд
```
### Время выполнения для 16 потоков
```
Введите количество потоков: 16
Последовательное умножение 100x100: 0.000000 секунд
Параллельное умножение 100x100: 6.534394 секунд
Последовательное умножение 300x300: 0.016131 секунд
Параллельное умножение 300x300: 6.787100 секунд
Последовательное умножение 500x500: 0.086582 секунд
Параллельное умножение 500x500: 7.096588 секунд
```
## Анализ результатов
1. **Сравнение времени выполнения**:
- Последовательное умножение показывает значительно более быстрое время выполнения по сравнению с параллельным умножением для всех размеров матриц. Например, при умножении матриц размером 100x100, время последовательного умножения составляет всего 0.001003 секунд, в то время как параллельное умножение занимает 0.900024 секунд при использовании 2 потоков. Это указывает на то, что накладные расходы на создание и управление потоками значительно превышают выгоды от параллельной обработки на малом размере матриц.
2. **Увеличение размеров матриц**:
- Время выполнения параллельного умножения становится менее эффективным по мере увеличения размеров матриц. Например, при умножении матриц размером 500x500 время параллельного умножения увеличивается до 1.450420 секунд при 2 потоках, в то время как последовательное умножение занимает всего 0.096649 секунд. Это происходит из-за того, что при больших размерах матриц накладные расходы на распределение задач между потоками становятся более значительными.
3. **Влияние количества потоков**:
- Увеличение количества потоков также негативно сказывается на времени выполнения параллельного алгоритма. Например, при 4 потоках время выполнения для 100x100 матриц составляет 1.686326 секунд, а при 8 потоках — 3.307927 секунд. Это объясняется тем, что количество потоков, превышающее количество доступных ядер процессора, приводит к дополнительным накладным расходам на переключение контекста между потоками, что замедляет выполнение.
4. **Эффективность последовательного алгоритма**:
- Последовательный алгоритм показывает стабильную производительность, которая не зависит от накладных расходов, связанных с многопоточностью. Он использует оптимизированные алгоритмы NumPy, что также способствует высокой производительности.
## Выводы
1. **Эффективность**:
- Последовательное умножение матриц показывает значительно более высокую производительность по сравнению с параллельным умножением для малых и средних размеров матриц.
- Параллельное умножение начинает терять эффективность при увеличении количества потоков, что может быть связано с накладными расходами на создание процессов и синхронизацию между ними.
2. **С увеличением размера матриц**:
- Время выполнения параллельного алгоритма увеличивается, особенно для больших матриц и большого количества потоков, что указывает на ограниченную эффективность параллельного подхода в данной реализации.
## Ссылка на видео
[Видео-отчёт Кашин Максим ПИбд-42](https://disk.yandex.ru/i/0g-KQ5FarFGtqg)

1
mochalov_danila_lab_3/.gitignore vendored Normal file
View File

@ -0,0 +1 @@
.venv

View File

@ -0,0 +1,35 @@
# Лабораторная работа №3
#### ПИбд-42. Мочалов Данила.
#### При выполнении использовал:
- Python 3.12
- Flask
- requests
- Docker
- Docker Compose
#### Задание:
Для выполнения лабораторной работы были созданы следующие сущности:
##### 1. Категория. Имеет поля:
- id
- name
- description
##### 2. Услуга. Имеет поля:
- id
- name
- category_id
##### Каждой категории может принадлежать множество услуг.
##### Были развернуты два сервиса - category_service и service_service, синхронно обменивающиеся сообщениями.
##### Сущности хранятся в оперативной памяти (без БД)
#### Инструкция
Для запуска лабораторной работы, перейдите в папку *mochalov_danila_lab_3* и выполните команду:
```
docker-compose up --build -d --remove-orphans
```
#### Демонстрация работы
Доступна по [ссылке](https://drive.google.com/file/d/1-DoS7b4ArfetVDsqjGHrfVxWhvIBt_fH/view?usp=sharing)

Some files were not shown because too many files have changed in this diff Show More