Compare commits
1 Commits
main
...
minhasapov
Author | SHA1 | Date | |
---|---|---|---|
b3c516e6c1 |
@ -1,6 +1,11 @@
|
||||
# Распределенные вычисления и приложения Л3
|
||||
# Распределенные вычисления и приложения Л2
|
||||
## _Автор Базунов Андрей Игревич ПИбд-42_
|
||||
|
||||
Сервисы ( _порядок исполнения сервисов соблюден_ ):
|
||||
- 1.FileCreator - (_Создание тестовых данных_)
|
||||
- 2.FirstService - (_Выполнение 1.4 варианта задания_)
|
||||
- 3.SecondService - (_Выполнение 2.2 варианта задания_)
|
||||
|
||||
В качестве основного языка был выбран GoLang. Для каждого сервиса был создан DOCKERFILE где были прописаны условия и действия для сборки каждого из модулей
|
||||
|
||||
# Docker
|
||||
@ -22,4 +27,4 @@ docker-compose up -d --build
|
||||
docker-compose down
|
||||
```
|
||||
|
||||
[Демонстрация работы](https://vk.com/video/@viltskaa?z=video236673313_456239577%2Fpl_236673313_-2)
|
||||
[Демонстрация работы](https://vk.com/video236673313_456239575)
|
BIN
bazunov_andrew_lab_3/PersonApp/.DS_Store
vendored
@ -1,4 +0,0 @@
|
||||
PORT=8080
|
||||
TASK_APP_URL=http://task-app:8000
|
||||
TIMEOUT=15
|
||||
DATABASE=./database.db
|
@ -1,14 +0,0 @@
|
||||
FROM golang:1.23
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
COPY go.mod go.sum ./
|
||||
RUN go mod download
|
||||
|
||||
COPY . .
|
||||
|
||||
RUN go build -o /bin/PersonApp
|
||||
|
||||
EXPOSE 8080
|
||||
|
||||
CMD ["/bin/PersonApp"]
|
@ -1,10 +0,0 @@
|
||||
module PersonApp
|
||||
|
||||
go 1.23.1
|
||||
|
||||
require (
|
||||
github.com/gorilla/mux v1.8.1
|
||||
github.com/mattn/go-sqlite3 v1.14.24
|
||||
)
|
||||
|
||||
require github.com/joho/godotenv v1.5.1 // indirect
|
@ -1,6 +0,0 @@
|
||||
github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY=
|
||||
github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ=
|
||||
github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
|
||||
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
|
||||
github.com/mattn/go-sqlite3 v1.14.24 h1:tpSp2G2KyMnnQu99ngJ47EIkWVmliIizyZBfPrBWDRM=
|
||||
github.com/mattn/go-sqlite3 v1.14.24/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
|
@ -1,157 +0,0 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"PersonApp/httpClient"
|
||||
"PersonApp/models"
|
||||
"PersonApp/repository"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/gorilla/mux"
|
||||
"net/http"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
func InitRoutes(r *mux.Router, rep repository.PersonRepository, cln httpClient.Client) {
|
||||
r.HandleFunc("/", GetPersons(rep, cln)).Methods("GET")
|
||||
r.HandleFunc("/{id:[0-9]+}", GetPersonById(rep, cln)).Methods("GET")
|
||||
r.HandleFunc("/", CreatePerson(rep)).Methods("POST")
|
||||
r.HandleFunc("/{id:[0-9]+}", UpdatePerson(rep)).Methods("PUT")
|
||||
r.HandleFunc("/{id:[0-9]+}", DeletePerson(rep)).Methods("DELETE")
|
||||
}
|
||||
|
||||
func GetPersons(rep repository.PersonRepository, cln httpClient.Client) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
fmt.Println("GET PERSONS")
|
||||
|
||||
persons, err := rep.GetAllPersons()
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
for i := 0; i < len(persons); i++ {
|
||||
tasks, _ := cln.GetPersonTasks(persons[i].Id)
|
||||
persons[i].Tasks = tasks
|
||||
}
|
||||
|
||||
err = json.NewEncoder(w).Encode(persons)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func GetPersonById(rep repository.PersonRepository, cln httpClient.Client) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
id, err := strconv.Atoi(mux.Vars(r)["id"])
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
person, err := rep.GetPersonById(id)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
tasks, err := cln.GetPersonTasks(id)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
} else {
|
||||
person.Tasks = tasks
|
||||
}
|
||||
|
||||
err = json.NewEncoder(w).Encode(person)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func CreatePerson(rep repository.PersonRepository) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
|
||||
var person *models.Person
|
||||
|
||||
err := json.NewDecoder(r.Body).Decode(&person)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
person, err = rep.CreatePerson(*person)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
w.WriteHeader(http.StatusCreated)
|
||||
err = json.NewEncoder(w).Encode(person)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func UpdatePerson(rep repository.PersonRepository) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
|
||||
id, err := strconv.Atoi(mux.Vars(r)["id"])
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
var person *models.Person
|
||||
err = json.NewDecoder(r.Body).Decode(&person)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
person, err = rep.UpdatePerson(models.Person{
|
||||
Id: id,
|
||||
Name: person.Name,
|
||||
Tasks: nil,
|
||||
})
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
w.WriteHeader(http.StatusAccepted)
|
||||
err = json.NewEncoder(w).Encode(person)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func DeletePerson(rep repository.PersonRepository) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
|
||||
id, err := strconv.Atoi(mux.Vars(r)["id"])
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
err = rep.DeletePerson(id)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}
|
||||
}
|
@ -1,72 +0,0 @@
|
||||
package httpClient
|
||||
|
||||
import (
|
||||
"PersonApp/models"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Client interface {
|
||||
GetPersonTasks(id int) ([]models.Task, error)
|
||||
TestConnection() (bool, error)
|
||||
}
|
||||
|
||||
type client struct {
|
||||
BaseUrl string
|
||||
Timeout time.Duration
|
||||
}
|
||||
|
||||
func (c *client) TestConnection() (bool, error) {
|
||||
client := &http.Client{Timeout: c.Timeout}
|
||||
url := fmt.Sprintf("%s/", c.BaseUrl)
|
||||
resp, err := client.Get(url)
|
||||
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
defer func(Body io.ReadCloser) {
|
||||
err := Body.Close()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}(resp.Body)
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return false, fmt.Errorf("bad status code: %d", resp.StatusCode)
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func (c *client) GetPersonTasks(id int) ([]models.Task, error) {
|
||||
client := &http.Client{Timeout: c.Timeout * time.Second}
|
||||
url := fmt.Sprintf("%s/f/%d", c.BaseUrl, id)
|
||||
|
||||
resp, err := client.Get(url)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer func(Body io.ReadCloser) {
|
||||
err := Body.Close()
|
||||
if err != nil {
|
||||
|
||||
}
|
||||
}(resp.Body)
|
||||
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
|
||||
var tasks []models.Task
|
||||
if err := json.Unmarshal(body, &tasks); err != nil {
|
||||
fmt.Printf("Unmarshal error: %s", err)
|
||||
return []models.Task{}, err
|
||||
}
|
||||
|
||||
return tasks, nil
|
||||
}
|
||||
|
||||
func NewClient(baseUrl string, timeout time.Duration) Client {
|
||||
return &client{BaseUrl: baseUrl, Timeout: timeout}
|
||||
}
|
@ -1,34 +0,0 @@
|
||||
GET http://localhost/person-app/
|
||||
Accept: application/json
|
||||
|
||||
###
|
||||
|
||||
GET http://localhost/person-app/1
|
||||
Accept: application/json
|
||||
|
||||
###
|
||||
|
||||
POST http://localhost/person-app/
|
||||
Accept: application/json
|
||||
Content-Type: application/json
|
||||
|
||||
{
|
||||
"name": "TEST3"
|
||||
}
|
||||
|
||||
###
|
||||
|
||||
PUT http://localhost/person-app/3
|
||||
Accept: application/json
|
||||
Content-Type: application/json
|
||||
|
||||
{
|
||||
"name": "TEST11"
|
||||
}
|
||||
|
||||
###
|
||||
|
||||
DELETE http://localhost/person-app/3
|
||||
Accept: application/json
|
||||
|
||||
###
|
@ -1,47 +0,0 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"PersonApp/handlers"
|
||||
"PersonApp/httpClient"
|
||||
"PersonApp/repository"
|
||||
"PersonApp/storage"
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/joho/godotenv"
|
||||
"net/http"
|
||||
"os"
|
||||
"strconv"
|
||||
"time"
|
||||
)
|
||||
|
||||
func main() {
|
||||
err := godotenv.Load(".env")
|
||||
if err != nil {
|
||||
panic("Error loading .env file")
|
||||
}
|
||||
|
||||
url := os.Getenv("TASK_APP_URL")
|
||||
port := os.Getenv("PORT")
|
||||
databasePath := os.Getenv("DATABASE")
|
||||
timeout, err := strconv.Atoi(os.Getenv("TIMEOUT"))
|
||||
|
||||
if err != nil {
|
||||
panic("Error converting timeout to int")
|
||||
}
|
||||
|
||||
database, err := storage.Init(databasePath)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
cln := httpClient.NewClient(url, time.Duration(timeout))
|
||||
rep := repository.NewPersonRepository(database)
|
||||
router := mux.NewRouter()
|
||||
handlers.InitRoutes(router, rep, cln)
|
||||
|
||||
err = http.ListenAndServe(":"+port, router)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
storage.Close(database)
|
||||
}
|
@ -1,24 +0,0 @@
|
||||
package models
|
||||
|
||||
type Person struct {
|
||||
Id int `json:"id"`
|
||||
Name string `json:"name"`
|
||||
Tasks []Task `json:"tasks"`
|
||||
}
|
||||
|
||||
type PersonCreate struct {
|
||||
Name string `json:"name"`
|
||||
}
|
||||
|
||||
type Task struct {
|
||||
Id int `json:"id"`
|
||||
PersonId int `json:"person_id"`
|
||||
Name string `json:"name"`
|
||||
Date string `json:"date"`
|
||||
}
|
||||
|
||||
type TaskCreate struct {
|
||||
PersonId int `json:"person_id"`
|
||||
Name string `json:"name"`
|
||||
Date string `json:"date"`
|
||||
}
|
@ -1,99 +0,0 @@
|
||||
package repository
|
||||
|
||||
import (
|
||||
"PersonApp/models"
|
||||
"database/sql"
|
||||
)
|
||||
|
||||
type PersonRepository interface {
|
||||
GetAllPersons() ([]models.Person, error)
|
||||
GetPersonById(id int) (*models.Person, error)
|
||||
CreatePerson(person models.Person) (*models.Person, error)
|
||||
UpdatePerson(person models.Person) (*models.Person, error)
|
||||
DeletePerson(id int) error
|
||||
}
|
||||
|
||||
type personRepository struct {
|
||||
DB *sql.DB
|
||||
}
|
||||
|
||||
func NewPersonRepository(db *sql.DB) PersonRepository {
|
||||
return &personRepository{DB: db}
|
||||
}
|
||||
|
||||
func (pr *personRepository) GetAllPersons() ([]models.Person, error) {
|
||||
rows, err := pr.DB.Query("select * from Persons")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
defer func(rows *sql.Rows) {
|
||||
err := rows.Close()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}(rows)
|
||||
|
||||
var persons []models.Person
|
||||
|
||||
for rows.Next() {
|
||||
p := models.Person{}
|
||||
err := rows.Scan(&p.Id, &p.Name)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
persons = append(persons, p)
|
||||
}
|
||||
|
||||
return persons, err
|
||||
}
|
||||
|
||||
func (pr *personRepository) GetPersonById(id int) (*models.Person, error) {
|
||||
row := pr.DB.QueryRow("select * from Persons where id=?", id)
|
||||
|
||||
person := models.Person{}
|
||||
err := row.Scan(&person.Id, &person.Name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &person, err
|
||||
}
|
||||
|
||||
func (pr *personRepository) CreatePerson(p models.Person) (*models.Person, error) {
|
||||
res, err := pr.DB.Exec("INSERT INTO Persons (name) values (?)", p.Name)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if res == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
return &p, err
|
||||
}
|
||||
|
||||
func (pr *personRepository) UpdatePerson(p models.Person) (*models.Person, error) {
|
||||
res, err := pr.DB.Exec("UPDATE Persons SET name = ? WHERE id = ?", p.Name, p.Id)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if res == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
return &p, err
|
||||
}
|
||||
|
||||
func (pr *personRepository) DeletePerson(id int) error {
|
||||
_, err := pr.DB.Exec("DELETE FROM Persons WHERE id = ?", id)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
@ -1,36 +0,0 @@
|
||||
package storage
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
_ "github.com/mattn/go-sqlite3"
|
||||
)
|
||||
|
||||
func Init(databasePath string) (*sql.DB, error) {
|
||||
db, err := sql.Open("sqlite3", databasePath)
|
||||
|
||||
if err != nil || db == nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := createTableIfNotExists(db); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return db, nil
|
||||
}
|
||||
|
||||
func Close(db *sql.DB) {
|
||||
err := db.Close()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func createTableIfNotExists(db *sql.DB) error {
|
||||
if result, err := db.Exec(
|
||||
"CREATE TABLE IF NOT EXISTS `Persons`(Id integer primary key autoincrement, Name text not null);",
|
||||
); err != nil || result == nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
@ -1,4 +0,0 @@
|
||||
PORT=8000
|
||||
PERSON_APP_URL=http://person-app:8080
|
||||
TIMEOUT=15
|
||||
DATABASE=./database.db
|
@ -1,14 +0,0 @@
|
||||
FROM golang:1.23
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
COPY go.mod go.sum ./
|
||||
RUN go mod download
|
||||
|
||||
COPY . .
|
||||
|
||||
RUN go build -o /bin/TaskApp
|
||||
|
||||
EXPOSE 8000
|
||||
|
||||
CMD ["/bin/TaskApp"]
|
@ -1,10 +0,0 @@
|
||||
module TaskApp
|
||||
|
||||
go 1.23.1
|
||||
|
||||
require (
|
||||
github.com/gorilla/mux v1.8.1
|
||||
github.com/mattn/go-sqlite3 v1.14.24
|
||||
)
|
||||
|
||||
require github.com/joho/godotenv v1.5.1
|
@ -1,6 +0,0 @@
|
||||
github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY=
|
||||
github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ=
|
||||
github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
|
||||
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
|
||||
github.com/mattn/go-sqlite3 v1.14.24 h1:tpSp2G2KyMnnQu99ngJ47EIkWVmliIizyZBfPrBWDRM=
|
||||
github.com/mattn/go-sqlite3 v1.14.24/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
|
@ -1,177 +0,0 @@
|
||||
package handlers
|
||||
|
||||
import (
|
||||
"TaskApp/httpClient"
|
||||
"TaskApp/models"
|
||||
"TaskApp/repository"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/gorilla/mux"
|
||||
"net/http"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
func InitRoutes(r *mux.Router, rep repository.TaskRepository, cln httpClient.Client) {
|
||||
r.HandleFunc("/", GetTasks(rep)).Methods("GET")
|
||||
r.HandleFunc("/{id:[0-9]+}", GetTaskById(rep)).Methods("GET")
|
||||
r.HandleFunc("/", CreateTask(rep, cln)).Methods("POST")
|
||||
r.HandleFunc("/{id:[0-9]+}", UpdateTask(rep)).Methods("PUT")
|
||||
r.HandleFunc("/{id:[0-9]+}", DeleteTask(rep)).Methods("DELETE")
|
||||
r.HandleFunc("/f/{id:[0-9]+}", GetPersonTasks(rep)).Methods("GET")
|
||||
}
|
||||
|
||||
func GetTasks(rep repository.TaskRepository) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
tasks, err := rep.GetAllTasks()
|
||||
if err != nil {
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
err = json.NewEncoder(w).Encode(tasks)
|
||||
if err != nil {
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func GetTaskById(rep repository.TaskRepository) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
id, err := strconv.Atoi(mux.Vars(r)["id"])
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
person, err := rep.GetTaskById(id)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
err = json.NewEncoder(w).Encode(person)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func GetPersonTasks(rep repository.TaskRepository) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
id, err := strconv.Atoi(mux.Vars(r)["id"])
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
tasks, err := rep.GetUserTasks(id)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
err = json.NewEncoder(w).Encode(tasks)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func CreateTask(rep repository.TaskRepository, cln httpClient.Client) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
|
||||
var task *models.TaskCreate
|
||||
|
||||
err := json.NewDecoder(r.Body).Decode(&task)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
if &task.Name == nil || &task.PersonId == nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
person, err := cln.GetPerson(task.PersonId)
|
||||
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
http.Error(w, "Connection to PersonApp is confused.", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
if person == nil {
|
||||
http.Error(w, fmt.Sprintf("Person with id=%d is't founded.", person.Id), http.StatusBadGateway)
|
||||
return
|
||||
}
|
||||
|
||||
newTask, err := rep.CreateTask(*task)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
w.WriteHeader(http.StatusCreated)
|
||||
err = json.NewEncoder(w).Encode(newTask)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func UpdateTask(rep repository.TaskRepository) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
|
||||
id, err := strconv.Atoi(mux.Vars(r)["id"])
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
var task *models.TaskCreate
|
||||
|
||||
err = json.NewDecoder(r.Body).Decode(&task)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
newTask, err := rep.UpdateTask(models.Task{Id: id, Name: task.Name, Date: task.Date})
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
err = json.NewEncoder(w).Encode(newTask)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func DeleteTask(rep repository.TaskRepository) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
|
||||
id, err := strconv.Atoi(mux.Vars(r)["id"])
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
err = rep.DeleteTask(id)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}
|
||||
}
|
@ -1,73 +0,0 @@
|
||||
package httpClient
|
||||
|
||||
import (
|
||||
"TaskApp/models"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net/http"
|
||||
"time"
|
||||
)
|
||||
|
||||
type Client interface {
|
||||
GetPerson(id int) (*models.Person, error)
|
||||
TestConnection() (bool, error)
|
||||
}
|
||||
|
||||
type client struct {
|
||||
BaseUrl string
|
||||
Timeout time.Duration
|
||||
}
|
||||
|
||||
func (c *client) TestConnection() (bool, error) {
|
||||
client := &http.Client{Timeout: c.Timeout}
|
||||
url := fmt.Sprintf("%s/", c.BaseUrl)
|
||||
resp, err := client.Get(url)
|
||||
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
defer func(Body io.ReadCloser) {
|
||||
err := Body.Close()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}(resp.Body)
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return false, fmt.Errorf("bad status code: %d", resp.StatusCode)
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func (c *client) GetPerson(id int) (*models.Person, error) {
|
||||
client := &http.Client{Timeout: c.Timeout * time.Second}
|
||||
url := fmt.Sprintf("%s/%d", c.BaseUrl, id)
|
||||
|
||||
resp, err := client.Get(url)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer func(Body io.ReadCloser) {
|
||||
err := Body.Close()
|
||||
if err != nil {
|
||||
|
||||
}
|
||||
}(resp.Body)
|
||||
|
||||
body, _ := io.ReadAll(resp.Body)
|
||||
|
||||
var person models.Person
|
||||
if err := json.Unmarshal(body, &person); err != nil {
|
||||
log.Printf("Unmarshal error: %s", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &person, nil
|
||||
}
|
||||
|
||||
func NewClient(baseUrl string, timeout time.Duration) Client {
|
||||
return &client{BaseUrl: baseUrl, Timeout: timeout}
|
||||
}
|
@ -1,37 +0,0 @@
|
||||
GET http://localhost/task-app/
|
||||
Accept: application/json
|
||||
|
||||
###
|
||||
|
||||
GET http://localhost/task-app/4
|
||||
Accept: application/json
|
||||
|
||||
###
|
||||
|
||||
POST http://localhost/task-app/
|
||||
Accept: application/json
|
||||
Content-Type: application/json
|
||||
|
||||
{
|
||||
"name": "TEST2",
|
||||
"person_id": 1,
|
||||
"date": "19.02.2202"
|
||||
}
|
||||
|
||||
###
|
||||
|
||||
PUT http://localhost/task-app/4
|
||||
Accept: application/json
|
||||
Content-Type: application/json
|
||||
|
||||
{
|
||||
"name": "TEST5",
|
||||
"date": "19.02.2202"
|
||||
}
|
||||
|
||||
###
|
||||
|
||||
DELETE http://localhost/task-app/4
|
||||
Accept: application/json
|
||||
|
||||
###
|
@ -1,47 +0,0 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"TaskApp/handlers"
|
||||
"TaskApp/httpClient"
|
||||
"TaskApp/repository"
|
||||
"TaskApp/storage"
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/joho/godotenv"
|
||||
"net/http"
|
||||
"os"
|
||||
"strconv"
|
||||
"time"
|
||||
)
|
||||
|
||||
func main() {
|
||||
err := godotenv.Load(".env")
|
||||
if err != nil {
|
||||
panic("Error loading .env file")
|
||||
}
|
||||
|
||||
url := os.Getenv("PERSON_APP_URL")
|
||||
port := os.Getenv("PORT")
|
||||
databasePath := os.Getenv("DATABASE")
|
||||
timeout, err := strconv.Atoi(os.Getenv("TIMEOUT"))
|
||||
|
||||
if err != nil {
|
||||
panic("Error converting timeout to int")
|
||||
}
|
||||
|
||||
database, err := storage.Init(databasePath)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
cln := httpClient.NewClient(url, time.Duration(timeout))
|
||||
rep := repository.NewTaskRepository(database)
|
||||
router := mux.NewRouter()
|
||||
handlers.InitRoutes(router, rep, cln)
|
||||
|
||||
err = http.ListenAndServe(":"+port, router)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
storage.Close(database)
|
||||
}
|
@ -1,24 +0,0 @@
|
||||
package models
|
||||
|
||||
type Person struct {
|
||||
Id int `json:"id"`
|
||||
Name string `json:"name"`
|
||||
Tasks []Task `json:"tasks"`
|
||||
}
|
||||
|
||||
type PersonCreate struct {
|
||||
Name string `json:"name"`
|
||||
}
|
||||
|
||||
type Task struct {
|
||||
Id int `json:"id"`
|
||||
PersonId int `json:"person_id"`
|
||||
Name string `json:"name"`
|
||||
Date string `json:"date"`
|
||||
}
|
||||
|
||||
type TaskCreate struct {
|
||||
PersonId int `json:"person_id"`
|
||||
Name string `json:"name"`
|
||||
Date string `json:"date"`
|
||||
}
|
@ -1,121 +0,0 @@
|
||||
package repository
|
||||
|
||||
import (
|
||||
"TaskApp/models"
|
||||
"database/sql"
|
||||
)
|
||||
|
||||
type TaskRepository interface {
|
||||
GetAllTasks() ([]models.Task, error)
|
||||
GetTaskById(id int) (*models.Task, error)
|
||||
GetUserTasks(id int) ([]models.Task, error)
|
||||
CreateTask(task models.TaskCreate) (*models.Task, error)
|
||||
UpdateTask(task models.Task) (*models.Task, error)
|
||||
DeleteTask(id int) error
|
||||
}
|
||||
|
||||
type taskRepository struct {
|
||||
DB *sql.DB
|
||||
}
|
||||
|
||||
func (t taskRepository) GetUserTasks(id int) ([]models.Task, error) {
|
||||
rows, err := t.DB.Query("select * from Tasks where PersonId = ?", id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
defer func(rows *sql.Rows) {
|
||||
err := rows.Close()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}(rows)
|
||||
|
||||
var tasks []models.Task
|
||||
|
||||
for rows.Next() {
|
||||
p := models.Task{}
|
||||
err := rows.Scan(&p.Id, &p.Name, &p.PersonId, &p.Date)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
tasks = append(tasks, p)
|
||||
}
|
||||
|
||||
return tasks, err
|
||||
}
|
||||
|
||||
func (t taskRepository) GetAllTasks() ([]models.Task, error) {
|
||||
rows, err := t.DB.Query("select * from Tasks")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
defer func(rows *sql.Rows) {
|
||||
err := rows.Close()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}(rows)
|
||||
|
||||
var tasks []models.Task
|
||||
|
||||
for rows.Next() {
|
||||
p := models.Task{}
|
||||
err := rows.Scan(&p.Id, &p.Name, &p.PersonId, &p.Date)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
tasks = append(tasks, p)
|
||||
}
|
||||
|
||||
return tasks, err
|
||||
}
|
||||
|
||||
func (t taskRepository) GetTaskById(id int) (*models.Task, error) {
|
||||
row := t.DB.QueryRow("select * from Tasks where id=?", id)
|
||||
|
||||
task := models.Task{}
|
||||
err := row.Scan(&task.Id, &task.Name, &task.PersonId, &task.Date)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &task, err
|
||||
}
|
||||
|
||||
func (t taskRepository) CreateTask(task models.TaskCreate) (*models.Task, error) {
|
||||
_, err := t.DB.Exec("INSERT INTO Tasks(Name, PersonId, Date) VALUES (?, ?, ?)", task.Name, task.PersonId, task.Date)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &models.Task{
|
||||
Id: 0,
|
||||
PersonId: task.PersonId,
|
||||
Name: task.Name,
|
||||
Date: task.Date,
|
||||
}, err
|
||||
}
|
||||
|
||||
func (t taskRepository) UpdateTask(task models.Task) (*models.Task, error) {
|
||||
_, err := t.DB.Exec("UPDATE Tasks SET name = ?, date = ? WHERE id = ?", task.Name, task.Date, task.Id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &task, err
|
||||
}
|
||||
|
||||
func (t taskRepository) DeleteTask(id int) error {
|
||||
_, err := t.DB.Exec("DELETE FROM Tasks WHERE id = ?", id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func NewTaskRepository(db *sql.DB) TaskRepository {
|
||||
return &taskRepository{DB: db}
|
||||
}
|
@ -1,36 +0,0 @@
|
||||
package storage
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
_ "github.com/mattn/go-sqlite3"
|
||||
)
|
||||
|
||||
func Init(databasePath string) (*sql.DB, error) {
|
||||
db, err := sql.Open("sqlite3", databasePath)
|
||||
|
||||
if err != nil || db == nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := createTableIfNotExists(db); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return db, nil
|
||||
}
|
||||
|
||||
func Close(db *sql.DB) {
|
||||
err := db.Close()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func createTableIfNotExists(db *sql.DB) error {
|
||||
if result, err := db.Exec(
|
||||
"CREATE TABLE IF NOT EXISTS `Tasks`(Id integer primary key autoincrement, Name text not null, PersonId integer not null, Date text not null);",
|
||||
); err != nil || result == nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
@ -1,34 +0,0 @@
|
||||
services:
|
||||
person-app:
|
||||
build:
|
||||
context: ./PersonApp
|
||||
dockerfile: Dockerfile
|
||||
networks:
|
||||
- network
|
||||
ports:
|
||||
- "8080:8080"
|
||||
|
||||
task-app:
|
||||
build:
|
||||
context: ./TaskApp
|
||||
dockerfile: Dockerfile
|
||||
networks:
|
||||
- network
|
||||
ports:
|
||||
- "8000:8000"
|
||||
|
||||
nginx:
|
||||
image: nginx
|
||||
ports:
|
||||
- "80:80"
|
||||
volumes:
|
||||
- ./nginx.conf:/etc/nginx/nginx.conf
|
||||
networks:
|
||||
- network
|
||||
depends_on:
|
||||
- person-app
|
||||
- task-app
|
||||
|
||||
networks:
|
||||
network:
|
||||
driver: bridge
|
@ -1,59 +0,0 @@
|
||||
events {
|
||||
worker_connections 1024;
|
||||
}
|
||||
|
||||
http {
|
||||
server {
|
||||
listen 80;
|
||||
server_name localhost;
|
||||
|
||||
location /person-app/ {
|
||||
proxy_pass http://person-app:8080/;
|
||||
proxy_set_header Host $host;
|
||||
proxy_set_header X-Real-IP $remote_addr;
|
||||
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
|
||||
proxy_set_header X-Forwarded-Proto $scheme;
|
||||
add_header 'Access-Control-Allow-Origin' '*';
|
||||
add_header 'Access-Control-Allow-Methods' 'GET, POST, OPTIONS';
|
||||
add_header 'Access-Control-Allow-Headers' 'Origin, Content-Type, Accept, Authorization';
|
||||
|
||||
}
|
||||
|
||||
location /task-app/ {
|
||||
proxy_pass http://task-app:8000/;
|
||||
proxy_set_header Host $host;
|
||||
proxy_set_header X-Real-IP $remote_addr;
|
||||
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
|
||||
proxy_set_header X-Forwarded-Proto $scheme;
|
||||
add_header 'Access-Control-Allow-Origin' '*';
|
||||
add_header 'Access-Control-Allow-Methods' 'GET, POST, OPTIONS';
|
||||
add_header 'Access-Control-Allow-Headers' 'Origin, Content-Type, Accept, Authorization';
|
||||
}
|
||||
|
||||
# Прокси для Swagger (Stream-сервис)
|
||||
#location /stream-service/swagger/ {
|
||||
# proxy_pass http://stream-service:8000/swagger/;
|
||||
# proxy_set_header Host $host;
|
||||
# proxy_set_header X-Real-IP $remote_addr;
|
||||
# proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
|
||||
# proxy_set_header X-Forwarded-Proto $scheme;
|
||||
#}
|
||||
|
||||
# Прокси для Swagger (Message-сервис)
|
||||
#location /message-service/swagger/ {
|
||||
# proxy_pass http://message-service:8080/swagger/;
|
||||
# proxy_set_header Host $host;
|
||||
# proxy_set_header X-Real-IP $remote_addr;
|
||||
# proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
|
||||
# proxy_set_header X-Forwarded-Proto $scheme;
|
||||
#}
|
||||
|
||||
#location /stream-service/doc.json {
|
||||
# proxy_pass http://stream-service:8000/doc.json;
|
||||
#}
|
||||
|
||||
#location /message-service/doc.json {
|
||||
# proxy_pass http://message-service:8080/doc.json;
|
||||
#}
|
||||
}
|
||||
}
|
@ -1,34 +0,0 @@
|
||||
# Лабораторная работа №4: Работа с брокером сообщений (RabbitMQ)
|
||||
|
||||
## Цель
|
||||
|
||||
Изучение проектирования приложений с использованием брокера сообщений RabbitMQ.
|
||||
|
||||
---
|
||||
|
||||
## Задачи
|
||||
|
||||
> 1. **Установить RabbitMQ**
|
||||
Установите RabbitMQ на локальный компьютер (или используйте Docker).
|
||||
>- [Скачивание RabbitMQ](https://www.rabbitmq.com/download.html)
|
||||
>- [Релизы RabbitMQ](https://github.com/rabbitmq/rabbitmq-server/releases/)
|
||||
>- **Пройти уроки RabbitMQ**
|
||||
>- Сделайте скриншоты, показывающие запуск `producer` и `consumer` и передачу сообщений.
|
||||
|
||||
---
|
||||
## Первый урок
|
||||
> ![img.png](static/img1.png)
|
||||
|
||||
---
|
||||
## Второй урок
|
||||
>![img.png](static/img2.png)
|
||||
>![img_1.png](static/img3.png)
|
||||
|
||||
---
|
||||
## Третий урок
|
||||
> ![img.png](static/img4.png)
|
||||
|
||||
---
|
||||
## Задача
|
||||
>![img.png](static/img5.png)
|
||||
> ![img.png](static/img.png)
|
@ -1,17 +0,0 @@
|
||||
version: "3.2"
|
||||
services:
|
||||
rabbitmq:
|
||||
image: rabbitmq:3-management-alpine
|
||||
container_name: 'rabbitmq'
|
||||
ports:
|
||||
- "5672:5672"
|
||||
- "15672:15672"
|
||||
volumes:
|
||||
- ~/.docker-conf/rabbitmq/data/:/var/lib/rabbitmq/
|
||||
- ~/.docker-conf/rabbitmq/log/:/var/log/rabbitmq
|
||||
networks:
|
||||
- rabbitmq_go_net
|
||||
|
||||
networks:
|
||||
rabbitmq_go_net:
|
||||
driver: bridge
|
@ -1,47 +0,0 @@
|
||||
from datetime import datetime
|
||||
import random
|
||||
import threading
|
||||
|
||||
import pika
|
||||
import sys
|
||||
|
||||
_alphabet = [chr(i) for i in range(97, 123)]
|
||||
|
||||
|
||||
def run_every_n_seconds(seconds, action, *args):
|
||||
threading.Timer(seconds, run_every_n_seconds, [seconds, action] + list(args)).start()
|
||||
action(*args)
|
||||
|
||||
|
||||
def generate_message():
|
||||
now = datetime.now()
|
||||
current_time = now.strftime("%H:%M:%S")
|
||||
return f"[{current_time}] " + "".join(random.choices(_alphabet, k=random.randint(1, 10)))
|
||||
|
||||
|
||||
def send_message(channel_local):
|
||||
message = generate_message()
|
||||
channel_local.basic_publish(
|
||||
exchange='vk_messages',
|
||||
routing_key='vk_messages',
|
||||
body=message,
|
||||
properties=pika.BasicProperties(
|
||||
delivery_mode=pika.DeliveryMode.Persistent
|
||||
))
|
||||
print(f"[vkAuthor] Sent {message}")
|
||||
|
||||
|
||||
def main(conn: pika.BlockingConnection):
|
||||
channel = conn.channel()
|
||||
channel.exchange_declare(exchange='vk_messages', exchange_type='fanout')
|
||||
run_every_n_seconds(1, send_message, channel)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
|
||||
|
||||
try:
|
||||
main(connection)
|
||||
except KeyboardInterrupt:
|
||||
connection.close()
|
||||
sys.exit(0)
|
@ -1,44 +0,0 @@
|
||||
import sys
|
||||
from datetime import datetime
|
||||
|
||||
import pika
|
||||
|
||||
_QUEUE_NAME = "vk_messages_queue"
|
||||
_EXCHANGE_NAME = "vk_messages"
|
||||
|
||||
|
||||
def main():
|
||||
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
|
||||
channel = connection.channel()
|
||||
|
||||
channel.exchange_declare(
|
||||
exchange=_EXCHANGE_NAME,
|
||||
exchange_type='fanout'
|
||||
)
|
||||
|
||||
channel.queue_declare(queue=_QUEUE_NAME, exclusive=True)
|
||||
channel.queue_bind(exchange=_EXCHANGE_NAME, queue=_QUEUE_NAME)
|
||||
|
||||
def callback(ch, method, properties, body):
|
||||
now = datetime.now()
|
||||
current_time = now.strftime("%H:%M:%S")
|
||||
|
||||
print(f"[vkReader] Received [{str(body)}] in [{current_time}]")
|
||||
ch.basic_ack(delivery_tag=method.delivery_tag)
|
||||
|
||||
channel.basic_consume(
|
||||
queue=_QUEUE_NAME,
|
||||
on_message_callback=callback,
|
||||
auto_ack=False
|
||||
)
|
||||
|
||||
print('[*] Waiting for messages. To exit press CTRL+C')
|
||||
channel.start_consuming()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
try:
|
||||
main()
|
||||
except KeyboardInterrupt:
|
||||
print('Interrupted')
|
||||
sys.exit(0)
|
@ -1,47 +0,0 @@
|
||||
import time
|
||||
import random
|
||||
from datetime import datetime
|
||||
|
||||
import pika
|
||||
import sys
|
||||
|
||||
_QUEUE_NAME = "vk_messages_queue_slow"
|
||||
_EXCHANGE_NAME = "vk_messages"
|
||||
|
||||
|
||||
def main():
|
||||
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
|
||||
channel = connection.channel()
|
||||
|
||||
channel.exchange_declare(
|
||||
exchange=_EXCHANGE_NAME,
|
||||
exchange_type='fanout'
|
||||
)
|
||||
channel.queue_declare(queue=_QUEUE_NAME, exclusive=True)
|
||||
channel.queue_bind(exchange=_EXCHANGE_NAME, queue=_QUEUE_NAME)
|
||||
|
||||
def callback(ch, method, properties, body):
|
||||
now = datetime.now()
|
||||
current_time = now.strftime("%H:%M:%S")
|
||||
|
||||
print(f"[vkSlowReader] Received [{str(body)}] in [{current_time}]")
|
||||
read_time = random.randint(2, 5)
|
||||
time.sleep(read_time)
|
||||
ch.basic_ack(delivery_tag=method.delivery_tag)
|
||||
|
||||
channel.basic_consume(
|
||||
queue=_QUEUE_NAME,
|
||||
on_message_callback=callback,
|
||||
auto_ack=False
|
||||
)
|
||||
|
||||
print('[*] Waiting for messages. To exit press CTRL+C')
|
||||
channel.start_consuming()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
try:
|
||||
main()
|
||||
except KeyboardInterrupt:
|
||||
print('Interrupted')
|
||||
sys.exit(0)
|
@ -1,25 +0,0 @@
|
||||
import pika
|
||||
import sys
|
||||
|
||||
|
||||
def main():
|
||||
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
|
||||
channel = connection.channel()
|
||||
|
||||
channel.queue_declare(queue='hello')
|
||||
|
||||
def callback(ch, method, properties, body):
|
||||
print(f" [x] Received {body}")
|
||||
|
||||
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
|
||||
|
||||
print(' [*] Waiting for messages. To exit press CTRL+C')
|
||||
channel.start_consuming()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
try:
|
||||
main()
|
||||
except KeyboardInterrupt:
|
||||
print('Interrupted')
|
||||
sys.exit(0)
|
@ -1,11 +0,0 @@
|
||||
import pika
|
||||
|
||||
connection = pika.BlockingConnection(
|
||||
pika.ConnectionParameters(host='localhost'))
|
||||
channel = connection.channel()
|
||||
|
||||
channel.queue_declare(queue='hello')
|
||||
|
||||
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
|
||||
print(" [x] Sent 'Hello World!'")
|
||||
connection.close()
|
@ -1,19 +0,0 @@
|
||||
import pika
|
||||
import sys
|
||||
|
||||
connection = pika.BlockingConnection(
|
||||
pika.ConnectionParameters(host='localhost'))
|
||||
channel = connection.channel()
|
||||
|
||||
channel.queue_declare(queue='task_queue', durable=True)
|
||||
|
||||
message = ' '.join(sys.argv[1:]) or "Hello World!"
|
||||
channel.basic_publish(
|
||||
exchange='',
|
||||
routing_key='task_queue',
|
||||
body=message,
|
||||
properties=pika.BasicProperties(
|
||||
delivery_mode=pika.DeliveryMode.Persistent
|
||||
))
|
||||
print(f" [x] Sent {message}")
|
||||
connection.close()
|
@ -1,22 +0,0 @@
|
||||
import pika
|
||||
import time
|
||||
|
||||
connection = pika.BlockingConnection(
|
||||
pika.ConnectionParameters(host='localhost'))
|
||||
channel = connection.channel()
|
||||
|
||||
channel.queue_declare(queue='task_queue', durable=True)
|
||||
print(' [*] Waiting for messages. To exit press CTRL+C')
|
||||
|
||||
|
||||
def callback(ch, method, properties, body):
|
||||
print(f" [x] Received {body.decode()}")
|
||||
time.sleep(body.count(b'.'))
|
||||
print(" [x] Done")
|
||||
ch.basic_ack(delivery_tag=method.delivery_tag)
|
||||
|
||||
|
||||
channel.basic_qos(prefetch_count=1)
|
||||
channel.basic_consume(queue='task_queue', on_message_callback=callback)
|
||||
|
||||
channel.start_consuming()
|
Before Width: | Height: | Size: 35 KiB |
Before Width: | Height: | Size: 37 KiB |
Before Width: | Height: | Size: 14 KiB |
Before Width: | Height: | Size: 24 KiB |
Before Width: | Height: | Size: 29 KiB |
Before Width: | Height: | Size: 204 KiB |
@ -1,13 +0,0 @@
|
||||
import pika
|
||||
import sys
|
||||
|
||||
connection = pika.BlockingConnection(
|
||||
pika.ConnectionParameters(host='localhost'))
|
||||
channel = connection.channel()
|
||||
|
||||
channel.exchange_declare(exchange='logs', exchange_type='fanout')
|
||||
|
||||
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
|
||||
channel.basic_publish(exchange='logs', routing_key='', body=message)
|
||||
print(f" [x] Sent {message}")
|
||||
connection.close()
|
@ -1,24 +0,0 @@
|
||||
import pika
|
||||
|
||||
connection = pika.BlockingConnection(
|
||||
pika.ConnectionParameters(host='localhost'))
|
||||
channel = connection.channel()
|
||||
|
||||
channel.exchange_declare(exchange='logs', exchange_type='fanout')
|
||||
|
||||
result = channel.queue_declare(queue='', exclusive=True)
|
||||
queue_name = result.method.queue
|
||||
|
||||
channel.queue_bind(exchange='logs', queue=queue_name)
|
||||
|
||||
print(' [*] Waiting for logs. To exit press CTRL+C')
|
||||
|
||||
|
||||
def callback(ch, method, properties, body):
|
||||
print(f" [x] {body}")
|
||||
|
||||
|
||||
channel.basic_consume(
|
||||
queue=queue_name, on_message_callback=callback, auto_ack=True)
|
||||
|
||||
channel.start_consuming()
|
@ -1,30 +0,0 @@
|
||||
import pika
|
||||
import time
|
||||
|
||||
|
||||
def callback(ch, method, properties, body):
|
||||
print(f'Consumer 1 получил сообщение: {body.decode()}')
|
||||
|
||||
# Время задержки по условию
|
||||
time.sleep(2)
|
||||
|
||||
print('Consumer 1 закончил обработку')
|
||||
|
||||
|
||||
def consume_events_1():
|
||||
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
|
||||
channel = connection.channel()
|
||||
|
||||
# Создание очереди
|
||||
channel.queue_declare(queue='consumer1_queue')
|
||||
# Привязка очереди
|
||||
channel.queue_bind(exchange='beauty_salon_events', queue='consumer1_queue')
|
||||
|
||||
channel.basic_consume(queue='consumer1_queue', on_message_callback=callback, auto_ack=True)
|
||||
|
||||
print('Consumer 1 начал ожидать сообщения...')
|
||||
channel.start_consuming()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
consume_events_1()
|
@ -1,28 +0,0 @@
|
||||
import pika
|
||||
|
||||
|
||||
def callback(ch, method, properties, body):
|
||||
print(f'Consumer 2 получил сообщение: {body.decode()}')
|
||||
|
||||
# Обработка "нон-стопом"
|
||||
print('Consumer 2 закончил обработку')
|
||||
|
||||
|
||||
def consume_events_2():
|
||||
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
|
||||
channel = connection.channel()
|
||||
|
||||
# Создание очереди
|
||||
channel.queue_declare(queue='consumer2_queue')
|
||||
|
||||
# Привязка очереди
|
||||
channel.queue_bind(exchange='beauty_salon_events', queue='consumer2_queue')
|
||||
|
||||
channel.basic_consume(queue='consumer2_queue', on_message_callback=callback, auto_ack=True)
|
||||
|
||||
print('Consumer 2 начал ожидать сообщения...')
|
||||
channel.start_consuming()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
consume_events_2()
|
@ -1,56 +0,0 @@
|
||||
### Лабораторная работа №4 - Работа с брокером сообщений
|
||||
|
||||
#### Задание
|
||||
|
||||
1. Установить брокер сообщений RabbitMQ.
|
||||
2. Пройти уроки 1, 2 и 3 из RabbitMQ Tutorials на любом языке программирования.
|
||||
3. Продемонстрировать работу брокера сообщений.
|
||||
|
||||
#### Описание работы программы:
|
||||
|
||||
- **Класс Publisher** успешно осуществляет отправку сообщений своим клиентам.
|
||||
|
||||
- **Класс Consumer1** принимает и обрабатывает сообщения с задержкой в 3 секунды, что можно заметить на видео.
|
||||
|
||||
- **Класс Consumer2** мгновенно принимает и обрабатывает сообщения.
|
||||
|
||||
#### Уроки
|
||||
|
||||
1. lesson_1
|
||||
|
||||
![lesson_1.png](lesson_1.png)
|
||||
|
||||
2. lesson_2
|
||||
|
||||
![lesson_2.png](lesson_2.png)
|
||||
|
||||
3. lesson_3
|
||||
|
||||
![lesson_3.png](lesson_3.png)
|
||||
|
||||
## Работа с RabbitMQ Management UI
|
||||
|
||||
![img_3.png](img_3.png)
|
||||
|
||||
## Показания очереди queue_1 при одном запущенном экземпляре Consumer_1
|
||||
|
||||
![img.png](img.png)
|
||||
|
||||
## Показания очереди queue_2
|
||||
|
||||
![img_1.png](img_1.png)
|
||||
|
||||
## Показания очереди queue_1 при двух запущенных экземплярах Consumer_1
|
||||
![img_2.png](img_2.png)
|
||||
|
||||
## Показания очереди queue_1 при трех запущенных экземплярах Consumer_1
|
||||
|
||||
![img_4.png](img_4.png)
|
||||
|
||||
## Диспетчер задач
|
||||
|
||||
![img_5.png](img_5.png)
|
||||
|
||||
## Видео
|
||||
|
||||
https://vk.com/video64471408_456239207?list=ln-HGhG4o92uxLaxnsLRj
|
Before Width: | Height: | Size: 24 KiB |
Before Width: | Height: | Size: 21 KiB |
Before Width: | Height: | Size: 6.3 KiB |
Before Width: | Height: | Size: 32 KiB |
Before Width: | Height: | Size: 6.3 KiB |
Before Width: | Height: | Size: 68 KiB |
Before Width: | Height: | Size: 35 KiB |
@ -1,25 +0,0 @@
|
||||
import pika, sys, os
|
||||
|
||||
def main():
|
||||
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
|
||||
channel = connection.channel()
|
||||
|
||||
channel.queue_declare(queue='hello')
|
||||
|
||||
def callback(ch, method, properties, body):
|
||||
print(f" [x] Received {body}")
|
||||
|
||||
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
|
||||
|
||||
print(' [*] Waiting for messages. To exit press CTRL+C')
|
||||
channel.start_consuming()
|
||||
|
||||
if __name__ == '__main__':
|
||||
try:
|
||||
main()
|
||||
except KeyboardInterrupt:
|
||||
print('Interrupted')
|
||||
try:
|
||||
sys.exit(0)
|
||||
except SystemExit:
|
||||
os._exit(0)
|
@ -1,11 +0,0 @@
|
||||
import pika
|
||||
|
||||
connection = pika.BlockingConnection(
|
||||
pika.ConnectionParameters(host='localhost'))
|
||||
channel = connection.channel()
|
||||
|
||||
channel.queue_declare(queue='hello')
|
||||
|
||||
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
|
||||
print(" [x] Sent 'Hello World!'")
|
||||
connection.close()
|
Before Width: | Height: | Size: 36 KiB |
@ -1,19 +0,0 @@
|
||||
import pika
|
||||
import sys
|
||||
|
||||
connection = pika.BlockingConnection(
|
||||
pika.ConnectionParameters(host='localhost'))
|
||||
channel = connection.channel()
|
||||
|
||||
channel.queue_declare(queue='task_queue', durable=True)
|
||||
|
||||
message = ' '.join(sys.argv[1:]) or "Hello World!"
|
||||
channel.basic_publish(
|
||||
exchange='',
|
||||
routing_key='task_queue',
|
||||
body=message,
|
||||
properties=pika.BasicProperties(
|
||||
delivery_mode=pika.DeliveryMode.Persistent
|
||||
))
|
||||
print(f" [x] Sent {message}")
|
||||
connection.close()
|
@ -1,23 +0,0 @@
|
||||
#!/usr/bin/env python
|
||||
import pika
|
||||
import time
|
||||
|
||||
connection = pika.BlockingConnection(
|
||||
pika.ConnectionParameters(host='localhost'))
|
||||
channel = connection.channel()
|
||||
|
||||
channel.queue_declare(queue='task_queue', durable=True)
|
||||
print(' [*] Waiting for messages. To exit press CTRL+C')
|
||||
|
||||
|
||||
def callback(ch, method, properties, body):
|
||||
print(f" [x] Received {body.decode()}")
|
||||
time.sleep(body.count(b'.'))
|
||||
print(" [x] Done")
|
||||
ch.basic_ack(delivery_tag=method.delivery_tag)
|
||||
|
||||
|
||||
channel.basic_qos(prefetch_count=1)
|
||||
channel.basic_consume(queue='task_queue', on_message_callback=callback)
|
||||
|
||||
channel.start_consuming()
|
Before Width: | Height: | Size: 35 KiB |
@ -1,13 +0,0 @@
|
||||
import pika
|
||||
import sys
|
||||
|
||||
connection = pika.BlockingConnection(
|
||||
pika.ConnectionParameters(host='localhost'))
|
||||
channel = connection.channel()
|
||||
|
||||
channel.exchange_declare(exchange='logs', exchange_type='fanout')
|
||||
|
||||
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
|
||||
channel.basic_publish(exchange='logs', routing_key='', body=message)
|
||||
print(f" [x] Sent {message}")
|
||||
connection.close()
|
@ -1,22 +0,0 @@
|
||||
import pika
|
||||
|
||||
connection = pika.BlockingConnection(
|
||||
pika.ConnectionParameters(host='localhost'))
|
||||
channel = connection.channel()
|
||||
|
||||
channel.exchange_declare(exchange='logs', exchange_type='fanout')
|
||||
|
||||
result = channel.queue_declare(queue='', exclusive=True)
|
||||
queue_name = result.method.queue
|
||||
|
||||
channel.queue_bind(exchange='logs', queue=queue_name)
|
||||
|
||||
print(' [*] Waiting for logs. To exit press CTRL+C')
|
||||
|
||||
def callback(ch, method, properties, body):
|
||||
print(f" [x] {body}")
|
||||
|
||||
channel.basic_consume(
|
||||
queue=queue_name, on_message_callback=callback, auto_ack=True)
|
||||
|
||||
channel.start_consuming()
|
@ -1,28 +0,0 @@
|
||||
import pika
|
||||
import time
|
||||
|
||||
|
||||
def publish_events():
|
||||
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
|
||||
channel = connection.channel()
|
||||
|
||||
# Создание exchange типа fanout
|
||||
channel.exchange_declare(exchange='beauty_salon_events', exchange_type='fanout')
|
||||
|
||||
events = [
|
||||
"Test1",
|
||||
"Test2",
|
||||
"Test3",
|
||||
"Test4",
|
||||
"Test5"
|
||||
]
|
||||
|
||||
while True:
|
||||
event = events[int(time.time()) % len(events)]
|
||||
channel.basic_publish(exchange='beauty_salon_events', routing_key='', body=event)
|
||||
print(f'Отправлено: {event}')
|
||||
time.sleep(1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
publish_events()
|
@ -1,55 +0,0 @@
|
||||
# Лабораторная работа: Умножение матриц
|
||||
|
||||
## Описание
|
||||
|
||||
**Цель работы** – реализовать алгоритмы умножения матриц (последовательный и параллельный) и сравнить их производительность на матрицах больших размеров.
|
||||
|
||||
### Задачи:
|
||||
1. Реализовать последовательный алгоритм умножения матриц.
|
||||
2. Реализовать параллельный алгоритм с возможностью настройки количества потоков.
|
||||
3. Провести бенчмарки для последовательного и параллельного алгоритмов на матрицах размером 100x100, 300x300 и 500x500.
|
||||
4. Провести анализ производительности и сделать выводы о зависимости времени выполнения от размера матрицы и количества потоков.
|
||||
|
||||
## Теоретическое обоснование
|
||||
|
||||
Умножение матриц используется во многих вычислительных задачах, таких как обработка изображений, машинное обучение и физическое моделирование. Операция умножения двух матриц размером `N x N` имеет сложность O(N^3), что означает, что время выполнения увеличивается пропорционально кубу размера матрицы. Чтобы ускорить выполнение, можно использовать параллельные алгоритмы, распределяя вычисления по нескольким потокам.
|
||||
|
||||
## Реализация
|
||||
|
||||
1. **Последовательный алгоритм** реализован в модуле `sequential.py`. Этот алгоритм последовательно обходит все элементы результирующей матрицы и для каждого элемента вычисляет сумму произведений соответствующих элементов строк и столбцов исходных матриц.
|
||||
|
||||
2. **Параллельный алгоритм** реализован в модуле `parallel.py`. Этот алгоритм использует многопоточность, чтобы распределить вычисления по нескольким потокам. Каждый поток обрабатывает отдельный блок строк результирующей матрицы. Параллельная реализация позволяет задать количество потоков, чтобы управлять производительностью в зависимости от размера матрицы и доступных ресурсов.
|
||||
|
||||
## Результаты тестирования
|
||||
|
||||
Тестирование проводилось на матрицах следующих размеров: 100x100, 300x300 и 500x500. Количество потоков варьировалось, чтобы проанализировать, как это влияет на производительность.
|
||||
|
||||
### Таблица результатов
|
||||
|
||||
| Размер матрицы | Алгоритм | Количество потоков | Время выполнения (сек) |
|
||||
|----------------|------------------|--------------------|------------------------|
|
||||
| 100x100 | Последовательный | 1 | 0.063 |
|
||||
| 100x100 | Параллельный | 2 | 0.06301 |
|
||||
| 100x100 | Параллельный | 4 | 0.063 |
|
||||
| 300x300 | Последовательный | 1 | 1.73120 |
|
||||
| 300x300 | Параллельный | 2 | 1.76304 |
|
||||
| 300x300 | Параллельный | 4 | 1.73202 |
|
||||
| 500x500 | Последовательный | 1 | 8.88499 |
|
||||
| 500x500 | Параллельный | 2 | 8.87288 |
|
||||
| 500x500 | Параллельный | 4 | 8.93387 |
|
||||
|
||||
## Выводы
|
||||
|
||||
1. **Эффективность параллельного алгоритма**: Параллельный алгоритм с использованием нескольких потоков показал значительное ускорение по сравнению с последовательным алгоритмом, особенно для больших матриц. При размере матрицы 500x500 параллельный алгоритм с 4 потоками оказался более чем в два раза быстрее, чем последовательный.
|
||||
|
||||
2. **Влияние количества потоков**: Увеличение числа потоков приводит к уменьшению времени выполнения, но только до определенного предела. Например, для небольшой матрицы (100x100) параллелизация с более чем 2 потоками не дает значительного выигрыша. Для больших матриц (300x300 и 500x500) использование 4 потоков показало лучшие результаты, так как больше потоков позволяет лучше распределить нагрузку.
|
||||
|
||||
3. **Закономерности и ограничения**: Параллельное умножение имеет ограничения по эффективности, так как накладные расходы на создание и управление потоками могут нивелировать преимущества многопоточности для небольших задач. Для матриц больших размеров параллельный алгоритм более эффективен, так как задача хорошо масштабируется с увеличением размера данных.
|
||||
|
||||
4. **Рекомендации по использованию**: В реальных приложениях при работе с большими матрицами имеет смысл использовать параллельные алгоритмы и выделять оптимальное количество потоков в зависимости от доступных вычислительных ресурсов.
|
||||
|
||||
## Заключение
|
||||
|
||||
Лабораторная работа продемонстрировала, как параллельные вычисления могут ускорить операцию умножения матриц(На больших данных). Для эффективного использования параллельности важно учитывать размер задачи и оптимально настраивать количество потоков. Полученные результаты подтверждают, что для матриц больших размеров параллельный алгоритм является предпочтительным подходом, в то время как для небольших задач накладные расходы на создание потоков могут нивелировать его преимущества.
|
||||
|
||||
## Видео https://vk.com/video64471408_456239208?list=ln-cC6yigF3jKNYUZe3vh
|
@ -1,27 +0,0 @@
|
||||
import time
|
||||
import random
|
||||
from matrix_multiplication.sequential import matrix_multiply_sequential
|
||||
from matrix_multiplication.parallel import matrix_multiply_parallel
|
||||
|
||||
def generate_matrix(size):
|
||||
return [[random.randint(0, 10) for _ in range(size)] for _ in range(size)]
|
||||
|
||||
def benchmark(matrix_size, num_threads):
|
||||
A = generate_matrix(matrix_size)
|
||||
B = generate_matrix(matrix_size)
|
||||
|
||||
start = time.time()
|
||||
matrix_multiply_sequential(A, B)
|
||||
sequential_time = time.time() - start
|
||||
|
||||
start = time.time()
|
||||
matrix_multiply_parallel(A, B, num_threads)
|
||||
parallel_time = time.time() - start
|
||||
|
||||
print(f"Размер матрицы: {matrix_size}x{matrix_size}")
|
||||
print(f"Последовательное время: {sequential_time:.5f} сек")
|
||||
print(f"Параллельное время ({num_threads} потоков): {parallel_time:.5f} сек")
|
||||
|
||||
if __name__ == "__main__":
|
||||
for size in [100, 300, 500]:
|
||||
benchmark(size, num_threads=4)
|
Before Width: | Height: | Size: 21 KiB |
Before Width: | Height: | Size: 31 KiB |
@ -1,21 +0,0 @@
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
|
||||
def matrix_multiply_parallel(A, B, num_threads=1):
|
||||
n = len(A)
|
||||
result = [[0] * n for _ in range(n)]
|
||||
|
||||
def worker(start, end):
|
||||
for i in range(start, end):
|
||||
for j in range(n):
|
||||
result[i][j] = sum(A[i][k] * B[k][j] for k in range(n))
|
||||
|
||||
chunk_size = n // num_threads
|
||||
with ThreadPoolExecutor(max_workers=num_threads) as executor:
|
||||
futures = [
|
||||
executor.submit(worker, i * chunk_size, (i + 1) * chunk_size)
|
||||
for i in range(num_threads)
|
||||
]
|
||||
for future in futures:
|
||||
future.result()
|
||||
|
||||
return result
|
@ -1,9 +0,0 @@
|
||||
def matrix_multiply_sequential(A, B):
|
||||
n = len(A)
|
||||
result = [[0] * n for _ in range(n)]
|
||||
|
||||
for i in range(n):
|
||||
for j in range(n):
|
||||
result[i][j] = sum(A[i][k] * B[k][j] for k in range(n))
|
||||
|
||||
return result
|
@ -1,12 +0,0 @@
|
||||
# Используем Python 3.9 как базовый образ
|
||||
FROM python:3.9-slim
|
||||
|
||||
# Устанавливаем зависимости
|
||||
RUN pip install pika
|
||||
|
||||
# Копируем текущую директорию в контейнер
|
||||
WORKDIR /app
|
||||
COPY . /app
|
||||
|
||||
# Указываем команду для запуска (переопределим её в docker-compose.yml)
|
||||
CMD ["python", "publisher.py"]
|
@ -1,20 +0,0 @@
|
||||
import pika
|
||||
import time
|
||||
|
||||
def callback(ch, method, properties, body):
|
||||
print(f" [Consumer 1] {body.decode('utf-8')}")
|
||||
time.sleep(3)
|
||||
ch.basic_ack(delivery_tag=method.delivery_tag)
|
||||
|
||||
connection = pika.BlockingConnection(pika.ConnectionParameters(host='rabbitmq'))
|
||||
channel = connection.channel()
|
||||
channel.exchange_declare(exchange='lunch_logs', exchange_type='fanout')
|
||||
|
||||
queue_name = "lunch_queue_slow"
|
||||
channel.queue_declare(queue=queue_name)
|
||||
channel.queue_bind(exchange='lunch_logs', queue=queue_name)
|
||||
|
||||
print(' [*] Consumer 1 waiting for logs. To exit press CTRL+C')
|
||||
|
||||
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=False)
|
||||
channel.start_consuming()
|
@ -1,19 +0,0 @@
|
||||
import pika
|
||||
|
||||
def callback(ch, method, properties, body):
|
||||
print(f" [Consumer 2] {body.decode('utf-8')}")
|
||||
ch.basic_ack(delivery_tag=method.delivery_tag)
|
||||
|
||||
connection = pika.BlockingConnection(pika.ConnectionParameters(host='rabbitmq'))
|
||||
channel = connection.channel()
|
||||
channel.exchange_declare(exchange='lunch_logs', exchange_type='fanout')
|
||||
|
||||
|
||||
queue_name = "lunch_queue_fast"
|
||||
channel.queue_declare(queue=queue_name)
|
||||
channel.queue_bind(exchange='lunch_logs', queue=queue_name)
|
||||
|
||||
print(' [*] Consumer 2 waiting for logs. To exit press CTRL+C')
|
||||
|
||||
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=False)
|
||||
channel.start_consuming()
|
@ -1,50 +0,0 @@
|
||||
version: '3'
|
||||
|
||||
services:
|
||||
rabbitmq:
|
||||
image: rabbitmq:3-management
|
||||
container_name: rabbitmq
|
||||
ports:
|
||||
- "5672:5672"
|
||||
- "15672:15672"
|
||||
environment:
|
||||
RABBITMQ_DEFAULT_USER: guest
|
||||
RABBITMQ_DEFAULT_PASS: guest
|
||||
healthcheck:
|
||||
test: ["CMD", "rabbitmqctl", "status"]
|
||||
interval: 10s
|
||||
timeout: 5s
|
||||
retries: 5
|
||||
|
||||
publisher:
|
||||
build:
|
||||
context: .
|
||||
container_name: publisher
|
||||
environment:
|
||||
- PYTHONUNBUFFERED=1
|
||||
command: python publisher.py
|
||||
depends_on:
|
||||
rabbitmq:
|
||||
condition: service_healthy
|
||||
|
||||
consumer_1:
|
||||
build:
|
||||
context: .
|
||||
container_name: consumer_1
|
||||
environment:
|
||||
- PYTHONUNBUFFERED=1
|
||||
command: python consumer_1.py
|
||||
depends_on:
|
||||
rabbitmq:
|
||||
condition: service_healthy
|
||||
|
||||
consumer_2:
|
||||
build:
|
||||
context: .
|
||||
container_name: consumer_2
|
||||
environment:
|
||||
- PYTHONUNBUFFERED=1
|
||||
command: python consumer_2.py
|
||||
depends_on:
|
||||
rabbitmq:
|
||||
condition: service_healthy
|
@ -1,20 +0,0 @@
|
||||
import pika
|
||||
import time
|
||||
import random
|
||||
|
||||
connection = pika.BlockingConnection(pika.ConnectionParameters(host='rabbitmq'))
|
||||
channel = connection.channel()
|
||||
channel.exchange_declare(exchange='lunch_logs', exchange_type='fanout')
|
||||
|
||||
events = [
|
||||
"Новый заказ на завтрак",
|
||||
"Новый заказ на обед",
|
||||
"Новый заказ на ужин",
|
||||
"Пользователь запросил меню"
|
||||
]
|
||||
|
||||
while True:
|
||||
message = random.choice(events)
|
||||
channel.basic_publish(exchange='lunch_logs', routing_key='', body=message)
|
||||
print(f" [x] Sent {message}")
|
||||
time.sleep(1)
|
Before Width: | Height: | Size: 34 KiB |
Before Width: | Height: | Size: 39 KiB |
@ -1,25 +0,0 @@
|
||||
import pika, sys, os
|
||||
|
||||
def main():
|
||||
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
|
||||
channel = connection.channel()
|
||||
|
||||
channel.queue_declare(queue='hello')
|
||||
|
||||
def callback(ch, method, properties, body):
|
||||
print(f" [x] Received {body}")
|
||||
|
||||
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
|
||||
|
||||
print(' [*] Waiting for messages. To exit press CTRL+C')
|
||||
channel.start_consuming()
|
||||
|
||||
if __name__ == '__main__':
|
||||
try:
|
||||
main()
|
||||
except KeyboardInterrupt:
|
||||
print('Interrupted')
|
||||
try:
|
||||
sys.exit(0)
|
||||
except SystemExit:
|
||||
os._exit(0)
|
Before Width: | Height: | Size: 47 KiB |
Before Width: | Height: | Size: 44 KiB |
@ -1,11 +0,0 @@
|
||||
import pika
|
||||
|
||||
connection = pika.BlockingConnection(
|
||||
pika.ConnectionParameters(host='localhost'))
|
||||
channel = connection.channel()
|
||||
|
||||
channel.queue_declare(queue='hello')
|
||||
|
||||
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
|
||||
print(" [x] Sent 'Hello World!'")
|
||||
connection.close()
|
@ -1,19 +0,0 @@
|
||||
import pika
|
||||
import sys
|
||||
|
||||
connection = pika.BlockingConnection(
|
||||
pika.ConnectionParameters(host='localhost'))
|
||||
channel = connection.channel()
|
||||
|
||||
channel.queue_declare(queue='task_queue', durable=True)
|
||||
|
||||
message = ' '.join(sys.argv[1:]) or "Hello World!"
|
||||
channel.basic_publish(
|
||||
exchange='',
|
||||
routing_key='task_queue',
|
||||
body=message,
|
||||
properties=pika.BasicProperties(
|
||||
delivery_mode=pika.DeliveryMode.Persistent
|
||||
))
|
||||
print(f" [x] Sent {message}")
|
||||
connection.close()
|
Before Width: | Height: | Size: 84 KiB |
Before Width: | Height: | Size: 44 KiB |
Before Width: | Height: | Size: 42 KiB |
@ -1,22 +0,0 @@
|
||||
import pika
|
||||
import time
|
||||
|
||||
connection = pika.BlockingConnection(
|
||||
pika.ConnectionParameters(host='localhost'))
|
||||
channel = connection.channel()
|
||||
|
||||
channel.queue_declare(queue='task_queue', durable=True)
|
||||
print(' [*] Waiting for messages. To exit press CTRL+C')
|
||||
|
||||
|
||||
def callback(ch, method, properties, body):
|
||||
print(f" [x] Received {body.decode()}")
|
||||
time.sleep(body.count(b'.'))
|
||||
print(" [x] Done")
|
||||
ch.basic_ack(delivery_tag=method.delivery_tag)
|
||||
|
||||
|
||||
channel.basic_qos(prefetch_count=1)
|
||||
channel.basic_consume(queue='task_queue', on_message_callback=callback)
|
||||
|
||||
channel.start_consuming()
|
@ -1,13 +0,0 @@
|
||||
import pika
|
||||
import sys
|
||||
|
||||
connection = pika.BlockingConnection(
|
||||
pika.ConnectionParameters(host='localhost'))
|
||||
channel = connection.channel()
|
||||
|
||||
channel.exchange_declare(exchange='logs', exchange_type='fanout')
|
||||
|
||||
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
|
||||
channel.basic_publish(exchange='logs', routing_key='', body=message)
|
||||
print(f" [x] Sent {message}")
|
||||
connection.close()
|
@ -1,22 +0,0 @@
|
||||
import pika
|
||||
|
||||
connection = pika.BlockingConnection(
|
||||
pika.ConnectionParameters(host='localhost'))
|
||||
channel = connection.channel()
|
||||
|
||||
channel.exchange_declare(exchange='logs', exchange_type='fanout')
|
||||
|
||||
result = channel.queue_declare(queue='', exclusive=True)
|
||||
queue_name = result.method.queue
|
||||
|
||||
channel.queue_bind(exchange='logs', queue=queue_name)
|
||||
|
||||
print(' [*] Waiting for logs. To exit press CTRL+C')
|
||||
|
||||
def callback(ch, method, properties, body):
|
||||
print(f" [x] {body}")
|
||||
|
||||
channel.basic_consume(
|
||||
queue=queue_name, on_message_callback=callback, auto_ack=True)
|
||||
|
||||
channel.start_consuming()
|
Before Width: | Height: | Size: 96 KiB |
Before Width: | Height: | Size: 75 KiB |
Before Width: | Height: | Size: 61 KiB |
@ -1,172 +0,0 @@
|
||||
# Кашин Максим ПИбд-42
|
||||
|
||||
## RabbitMQ tutorial - "Hello world!"
|
||||
#### Работа файла receive
|
||||
![receive_1.png](RabbitMQ_tutorial_1/report/receive_1.png)
|
||||
#### Работа файла send
|
||||
![send_1.png](RabbitMQ_tutorial_1/report/send_1.png)
|
||||
|
||||
## RabbitMQ tutorial - Work Queues
|
||||
#### Работа файла new_task
|
||||
![new_task_1.png](RabbitMQ_tutorial_2/report/new_task_1.png)
|
||||
#### Работа файла worker
|
||||
![worker_1.png](RabbitMQ_tutorial_2/report/worker_1.png)
|
||||
#### Работа файла worker (запущенная копия)
|
||||
![worker_2.png](RabbitMQ_tutorial_2/report/worker_2.png)
|
||||
|
||||
## RabbitMQ tutorial - Publish/Subscribe
|
||||
#### Работа файла receive_logs
|
||||
![receive_logs_1.png](RabbitMQ_tutorial_3/report/receive_logs_1.png)
|
||||
##### Работа файла emit_log
|
||||
![emit_log_1.png](RabbitMQ_tutorial_3/report/emit_log_1.png)
|
||||
##### Работа файла emit_log (запущенная копия)
|
||||
![emit_log_2.png](RabbitMQ_tutorial_3/report/emit_log_2.png)
|
||||
|
||||
## Самостоятельная работа
|
||||
### Предметная область
|
||||
1. Выдача завтрака
|
||||
2. Выдача обеда
|
||||
3. Выдача ужина
|
||||
4. Выдача меню
|
||||
|
||||
### Компоненты
|
||||
|
||||
1. **Издатель** (`publisher.py`): Генерирует случайные сообщения о заказах.
|
||||
2. **Потребитель 1** (`consumer_1.py`): Обрабатывает сообщения медленно (3 секунды на сообщение).
|
||||
3. **Потребитель 2** (`consumer_2.py`): Обрабатывает сообщения быстро (мгновенно).
|
||||
4. **RabbitMQ**: Выступает в роли брокера сообщений.
|
||||
|
||||
### Описание DockerFile
|
||||
|
||||
`Dockerfile` определяет, как будет строиться образ для контейнера, в котором будут запускаться ваши Python-скрипты. Вот основные шаги, которые выполняет `Dockerfile`:
|
||||
|
||||
1. **Базовый образ**:
|
||||
```dockerfile
|
||||
FROM python:3.9-slim
|
||||
```
|
||||
Используется легковесный образ Python 3.9, который минимизирует размер конечного образа.
|
||||
|
||||
2. **Установка зависимостей**:
|
||||
```dockerfile
|
||||
RUN pip install pika
|
||||
```
|
||||
Устанавливается библиотека `pika`, необходимая для работы с RabbitMQ.
|
||||
|
||||
3. **Копирование файлов**:
|
||||
```dockerfile
|
||||
WORKDIR /app
|
||||
COPY . /app
|
||||
```
|
||||
Устанавливается рабочая директория `/app`, и все файлы из текущей директории копируются в контейнер.
|
||||
|
||||
4. **Команда по умолчанию**:
|
||||
```dockerfile
|
||||
CMD ["python", "publisher.py"]
|
||||
```
|
||||
Указывается команда, которая будет выполняться при запуске контейнера.
|
||||
|
||||
Таким образом, `Dockerfile` описывает, как создать контейнер с необходимой средой выполнения и зависимостями для приложения.
|
||||
|
||||
## Описание Docker Compose
|
||||
|
||||
`docker-compose.yml` используется для определения и управления многими контейнерами в проекте. В этом файле описаны необходимые сервисы для работы системы обмена сообщениями на RabbitMQ. Основные компоненты:
|
||||
|
||||
1. **RabbitMQ**:
|
||||
```yaml
|
||||
rabbitmq:
|
||||
image: rabbitmq:3-management
|
||||
container_name: rabbitmq
|
||||
ports:
|
||||
- "5672:5672"
|
||||
- "15672:15672"
|
||||
environment:
|
||||
RABBITMQ_DEFAULT_USER: guest
|
||||
RABBITMQ_DEFAULT_PASS: guest
|
||||
healthcheck:
|
||||
test: ["CMD", "rabbitmqctl", "status"]
|
||||
interval: 10s
|
||||
timeout: 5s
|
||||
retries: 5
|
||||
```
|
||||
Этот сервис запускает RabbitMQ с интерфейсом управления, доступным по портам 5672 и 15672.
|
||||
|
||||
2. **Publisher**:
|
||||
```yaml
|
||||
publisher:
|
||||
build:
|
||||
context: .
|
||||
container_name: publisher
|
||||
environment:
|
||||
- PYTHONUNBUFFERED=1
|
||||
command: python publisher.py
|
||||
depends_on:
|
||||
rabbitmq:
|
||||
condition: service_healthy
|
||||
```
|
||||
Издатель, который запускает `publisher.py` для отправки сообщений. Он зависит от RabbitMQ и запускается только после его готовности.
|
||||
|
||||
3. **Consumer 1**:
|
||||
```yaml
|
||||
consumer_1:
|
||||
build:
|
||||
context: .
|
||||
container_name: consumer_1
|
||||
environment:
|
||||
- PYTHONUNBUFFERED=1
|
||||
command: python consumer_1.py
|
||||
depends_on:
|
||||
rabbitmq:
|
||||
condition: service_healthy
|
||||
```
|
||||
Первый потребитель, обрабатывающий сообщения медленно. Он также зависит от RabbitMQ.
|
||||
|
||||
4. **Consumer 2**:
|
||||
```yaml
|
||||
consumer_2:
|
||||
build:
|
||||
context: .
|
||||
container_name: consumer_2
|
||||
environment:
|
||||
- PYTHONUNBUFFERED=1
|
||||
command: python consumer_2.py
|
||||
depends_on:
|
||||
rabbitmq:
|
||||
condition: service_healthy
|
||||
```
|
||||
Второй потребитель, который обрабатывает сообщения быстро. Он, как и другие сервисы, зависит от RabbitMQ.
|
||||
|
||||
### Запуск проекта
|
||||
|
||||
Чтобы запустить проект, нужна следующую команду в терминале:
|
||||
|
||||
```bash
|
||||
docker-compose up
|
||||
```
|
||||
### Анализ результатов
|
||||
##### Работа медленного потребителя
|
||||
![receive_logs_1.png](RabbitMQ_demoapp/report/slow.png)
|
||||
##### Работа быстрого потребителя
|
||||
![emit_log_1.png](RabbitMQ_demoapp/report/fast.png)
|
||||
|
||||
### Анализ очередей RabbitMQ
|
||||
|
||||
На представленных скриншотах RabbitMQ отображается состояние двух очередей: `lunch_queue_fast` и `lunch_queue_slow`. Рассмотрим, что можно сказать по каждому из них.
|
||||
|
||||
### Анализ очереди `lunch_queue_fast`
|
||||
|
||||
- **Сообщения в очереди**: Очередь пуста, сообщений в обработке нет. Графики не показывают значительных изменений, и все метрики по сообщениям равны нулю.
|
||||
- **Скорость обработки**: Сообщения публикуются со скоростью 1 сообщение в секунду, и одно сообщение в секунду подтверждается клиентом (Consumer ack).
|
||||
- **Потребители**: В этой очереди подключён один потребитель, который обрабатывает сообщения с максимальной скоростью публикации.
|
||||
|
||||
### Анализ очереди `lunch_queue_slow`
|
||||
|
||||
- **Сообщения в очереди**: В этой очереди находятся необработанные сообщения. В данный момент 28 сообщений «зависли» в статусе **Unacked** (неподтвержденные).
|
||||
- **Скорость обработки**: Сообщения публикуются со скоростью 1 сообщение в секунду, однако подтверждение клиентом идёт со скоростью 0.4 сообщения в секунду. Это приводит к накоплению сообщений в очереди, так как потребитель не успевает их обрабатывать.
|
||||
- **Потребители**: Как и в `lunch_queue_fast`, здесь подключён один потребитель, но его производительность значительно ниже, что и приводит к накоплению сообщений.
|
||||
|
||||
### Основные выводы
|
||||
|
||||
- **Разница в скорости обработки**: Очевидно, что `lunch_queue_slow` работает медленнее, и её потребитель не успевает обрабатывать поступающие сообщения.
|
||||
|
||||
## Часть 3: Ссылка на видео
|
||||
[Видео-отчёт Кашин Максим ПИбд-42](https://disk.yandex.ru/i/IcVxUh4C1rnQAw)
|
@ -1 +0,0 @@
|
||||
pika
|
@ -1,62 +0,0 @@
|
||||
import multiprocessing
|
||||
import numpy as np
|
||||
import time
|
||||
|
||||
def sequential_multiply(A, B):
|
||||
# Последовательное умножение матриц
|
||||
return np.dot(A, B)
|
||||
|
||||
def parallel_multiply(A, B, num_processes):
|
||||
rows_A = A.shape[0]
|
||||
cols_B = B.shape[1]
|
||||
|
||||
# Используем Array для совместного использования памяти
|
||||
C = multiprocessing.Array('d', rows_A * cols_B) # 'd' для double
|
||||
|
||||
chunk_size = int(rows_A / num_processes)
|
||||
processes = []
|
||||
|
||||
for i in range(num_processes):
|
||||
start = chunk_size * i
|
||||
end = chunk_size * (i + 1) if i < num_processes - 1 else rows_A
|
||||
|
||||
# Запускаем процесс для умножения
|
||||
p = multiprocessing.Process(target=perform_multiplication, args=(A, B, C, start, end, rows_A, cols_B))
|
||||
processes.append(p)
|
||||
p.start()
|
||||
|
||||
for p in processes:
|
||||
p.join()
|
||||
|
||||
# Преобразуем C в 2D массив NumPy для удобства
|
||||
return np.frombuffer(C.get_obj()).reshape((rows_A, cols_B))
|
||||
|
||||
def perform_multiplication(A, B, C, start, end, rows_A, cols_B):
|
||||
# Умножение строк матрицы A на столбцы матрицы B
|
||||
for i in range(start, end):
|
||||
for j in range(cols_B):
|
||||
C[i * cols_B + j] = np.dot(A[i, :], B[:, j])
|
||||
|
||||
if __name__ == "__main__":
|
||||
matrix_sizes = [100, 300, 500]
|
||||
num_processes = int(input('Введите количество потоков: '))
|
||||
|
||||
for n in matrix_sizes:
|
||||
# Генерация случайных матриц A и B
|
||||
A = np.random.randint(10, size=(n, n))
|
||||
B = np.random.randint(10, size=(n, n))
|
||||
|
||||
# Бенчмарк для последовательного умножения
|
||||
start = time.time()
|
||||
sequential_result = sequential_multiply(A, B)
|
||||
end = time.time()
|
||||
print(f"Последовательное умножение {n}x{n}: {end - start:.6f} секунд")
|
||||
|
||||
# Бенчмарк для параллельного умножения
|
||||
start = time.time()
|
||||
parallel_result = parallel_multiply(A, B, num_processes)
|
||||
end = time.time()
|
||||
print(f"Параллельное умножение {n}x{n}: {end - start:.6f} секунд")
|
||||
|
||||
# Проверка совпадения результатов
|
||||
assert np.array_equal(sequential_result, parallel_result), "Результаты не совпадают!"
|
@ -1,99 +0,0 @@
|
||||
# Кашин Максим ПИбд-42
|
||||
|
||||
# Отчет по умножению матриц
|
||||
|
||||
## Описание
|
||||
|
||||
В данной лабораторной работе реализованы два алгоритма для умножения больших квадратных матриц: последовательный и параллельный.
|
||||
|
||||
### Алгоритмы
|
||||
|
||||
1. **Последовательное умножение**:
|
||||
- Для умножения используется функция `sequential_multiply`, которая принимает две матрицы \( A \) и \( B \) и возвращает их произведение \( C = A \cdot B \). Умножение реализовано с помощью функции NumPy `np.dot()`.
|
||||
|
||||
2. **Параллельное умножение**:
|
||||
- Для параллельного умножения используется функция `parallel_multiply`, которая делит задачу на несколько процессов, каждый из которых умножает свои строки матрицы \( A \) на матрицу \( B \).
|
||||
- Результат записывается в разделяемый массив `C`, который создается с помощью `multiprocessing.Array`.
|
||||
- Каждому процессу передается диапазон строк, которые он должен обработать.
|
||||
|
||||
### Структура кода
|
||||
|
||||
- **Функции**:
|
||||
- `sequential_multiply(A, B)`: Выполняет последовательное умножение.
|
||||
- `parallel_multiply(A, B, num_processes)`: Выполняет параллельное умножение с заданным количеством процессов.
|
||||
- `perform_multiplication(A, B, C, start, end, rows_A, cols_B)`: Вспомогательная функция, выполняющая умножение строк матрицы.
|
||||
|
||||
## Результаты
|
||||
|
||||
Результаты выполнения программы для разных размеров матриц и количества потоков:
|
||||
|
||||
### Время выполнения для 2 потоков
|
||||
```
|
||||
Введите количество потоков: 2
|
||||
Последовательное умножение 100x100: 0.001003 секунд
|
||||
Параллельное умножение 100x100: 0.900024 секунд
|
||||
Последовательное умножение 300x300: 0.015999 секунд
|
||||
Параллельное умножение 300x300: 1.078092 секунд
|
||||
Последовательное умножение 500x500: 0.096649 секунд
|
||||
Параллельное умножение 500x500: 1.450420 секунд
|
||||
```
|
||||
|
||||
### Время выполнения для 4 потоков
|
||||
```
|
||||
Введите количество потоков: 4
|
||||
Последовательное умножение 100x100: 0.001000 секунд
|
||||
Параллельное умножение 100x100: 1.686326 секунд
|
||||
Последовательное умножение 300x300: 0.015986 секунд
|
||||
Параллельное умножение 300x300: 1.749842 секунд
|
||||
Последовательное умножение 500x500: 0.087000 секунд
|
||||
Параллельное умножение 500x500: 1.960365 секунд
|
||||
```
|
||||
|
||||
### Время выполнения для 8 потоков
|
||||
```
|
||||
Введите количество потоков: 8
|
||||
Последовательное умножение 100x100: 0.000000 секунд
|
||||
Параллельное умножение 100x100: 3.307927 секунд
|
||||
Последовательное умножение 300x300: 0.016013 секунд
|
||||
Параллельное умножение 300x300: 3.321677 секунд
|
||||
Последовательное умножение 500x500: 0.086618 секунд
|
||||
Параллельное умножение 500x500: 3.446928 секунд
|
||||
```
|
||||
|
||||
### Время выполнения для 16 потоков
|
||||
```
|
||||
Введите количество потоков: 16
|
||||
Последовательное умножение 100x100: 0.000000 секунд
|
||||
Параллельное умножение 100x100: 6.534394 секунд
|
||||
Последовательное умножение 300x300: 0.016131 секунд
|
||||
Параллельное умножение 300x300: 6.787100 секунд
|
||||
Последовательное умножение 500x500: 0.086582 секунд
|
||||
Параллельное умножение 500x500: 7.096588 секунд
|
||||
```
|
||||
|
||||
## Анализ результатов
|
||||
|
||||
1. **Сравнение времени выполнения**:
|
||||
- Последовательное умножение показывает значительно более быстрое время выполнения по сравнению с параллельным умножением для всех размеров матриц. Например, при умножении матриц размером 100x100, время последовательного умножения составляет всего 0.001003 секунд, в то время как параллельное умножение занимает 0.900024 секунд при использовании 2 потоков. Это указывает на то, что накладные расходы на создание и управление потоками значительно превышают выгоды от параллельной обработки на малом размере матриц.
|
||||
|
||||
2. **Увеличение размеров матриц**:
|
||||
- Время выполнения параллельного умножения становится менее эффективным по мере увеличения размеров матриц. Например, при умножении матриц размером 500x500 время параллельного умножения увеличивается до 1.450420 секунд при 2 потоках, в то время как последовательное умножение занимает всего 0.096649 секунд. Это происходит из-за того, что при больших размерах матриц накладные расходы на распределение задач между потоками становятся более значительными.
|
||||
|
||||
3. **Влияние количества потоков**:
|
||||
- Увеличение количества потоков также негативно сказывается на времени выполнения параллельного алгоритма. Например, при 4 потоках время выполнения для 100x100 матриц составляет 1.686326 секунд, а при 8 потоках — 3.307927 секунд. Это объясняется тем, что количество потоков, превышающее количество доступных ядер процессора, приводит к дополнительным накладным расходам на переключение контекста между потоками, что замедляет выполнение.
|
||||
|
||||
4. **Эффективность последовательного алгоритма**:
|
||||
- Последовательный алгоритм показывает стабильную производительность, которая не зависит от накладных расходов, связанных с многопоточностью. Он использует оптимизированные алгоритмы NumPy, что также способствует высокой производительности.
|
||||
|
||||
## Выводы
|
||||
|
||||
1. **Эффективность**:
|
||||
- Последовательное умножение матриц показывает значительно более высокую производительность по сравнению с параллельным умножением для малых и средних размеров матриц.
|
||||
- Параллельное умножение начинает терять эффективность при увеличении количества потоков, что может быть связано с накладными расходами на создание процессов и синхронизацию между ними.
|
||||
|
||||
2. **С увеличением размера матриц**:
|
||||
- Время выполнения параллельного алгоритма увеличивается, особенно для больших матриц и большого количества потоков, что указывает на ограниченную эффективность параллельного подхода в данной реализации.
|
||||
|
||||
|
||||
## Ссылка на видео
|
||||
[Видео-отчёт Кашин Максим ПИбд-42](https://disk.yandex.ru/i/0g-KQ5FarFGtqg)
|
20
minhasapov_ruslan_lab_5/.gitignore
vendored
Normal file
@ -0,0 +1,20 @@
|
||||
*.fsproj.user
|
||||
*.fsproj.cache
|
||||
*.fsproj
|
||||
*.suo
|
||||
*.user
|
||||
*.cache
|
||||
bin/
|
||||
obj/
|
||||
|
||||
*.dll
|
||||
*.exe
|
||||
*.pdb
|
||||
*.zip
|
||||
*.tar.gz
|
||||
|
||||
*~
|
||||
*.bak
|
||||
|
||||
.DS_Store
|
||||
Thumbs.db
|
77
minhasapov_ruslan_lab_5/MatrixMultiplication/Program.fs
Normal file
@ -0,0 +1,77 @@
|
||||
open System
|
||||
open System.Threading
|
||||
open System.Diagnostics
|
||||
|
||||
type Matrix = double[,]
|
||||
|
||||
// Функция для умножения части матрицы
|
||||
let multiplyPart (A: Matrix) (B: Matrix) (C: Matrix) startRow endRow =
|
||||
let size = A.GetLength(0)
|
||||
for i in startRow .. endRow - 1 do
|
||||
for j in 0 .. size - 1 do
|
||||
for k in 0 .. size - 1 do
|
||||
C.[i, j] <- C.[i, j] + A.[i, k] * B.[k, j]
|
||||
|
||||
// Последовательное умножение матриц
|
||||
let multiplySequential (A: Matrix) (B: Matrix) : Matrix =
|
||||
let size = A.GetLength(0)
|
||||
let C = Array2D.zeroCreate size size
|
||||
multiplyPart A B C 0 size
|
||||
C
|
||||
|
||||
// Параллельное умножение матриц с синхронизацией
|
||||
let multiplyParallel (A: Matrix) (B: Matrix) numThreads : Matrix =
|
||||
let size = A.GetLength(0)
|
||||
let C = Array2D.zeroCreate size size
|
||||
let rowsPerThread = size / numThreads
|
||||
let threads = Array.zeroCreate numThreads
|
||||
|
||||
for i in 0 .. numThreads - 1 do
|
||||
let startRow = i * rowsPerThread
|
||||
let endRow = if i = numThreads - 1 then size else startRow + rowsPerThread
|
||||
threads.[i] <- new Thread(fun () -> multiplyPart A B C startRow endRow)
|
||||
|
||||
// Запускаем потоки
|
||||
threads |> Array.iter (fun t -> t.Start())
|
||||
|
||||
// Ожидаем завершения всех потоков
|
||||
threads |> Array.iter (fun t -> t.Join())
|
||||
|
||||
C
|
||||
|
||||
// Функция для генерации матрицы
|
||||
let generateMatrix size =
|
||||
let matrix = Array2D.zeroCreate size size
|
||||
let rnd = Random()
|
||||
for i in 0 .. size - 1 do
|
||||
for j in 0 .. size - 1 do
|
||||
matrix.[i, j] <- double (rnd.Next(10))
|
||||
matrix
|
||||
|
||||
[<EntryPoint>]
|
||||
let main argv =
|
||||
let sizes = [ 100; 300; 500 ]
|
||||
let threadCounts = [ 1; 2; 4; 8; 16; 32; 64; 128; 256; 512]
|
||||
|
||||
// Заголовки таблицы
|
||||
printfn "| Размер | Потоков | Алгоритм | Время (мс) |"
|
||||
|
||||
for size in sizes do
|
||||
for numThreads in threadCounts do
|
||||
let A = generateMatrix size
|
||||
let B = generateMatrix size
|
||||
|
||||
let stopwatch = Stopwatch.StartNew()
|
||||
|
||||
let algorithm =
|
||||
if numThreads = 1 then
|
||||
multiplySequential A B |> ignore; "Последовательный"
|
||||
else
|
||||
multiplyParallel A B numThreads |> ignore; "Параллельный"
|
||||
|
||||
stopwatch.Stop()
|
||||
|
||||
// Вывод результатов в виде таблицы
|
||||
printfn "| %dx%d | %d | %s | %d |" size size numThreads algorithm stopwatch.ElapsedMilliseconds
|
||||
|
||||
0
|
87
minhasapov_ruslan_lab_5/README.md
Normal file
@ -0,0 +1,87 @@
|
||||
# Лабораторная работа №5
|
||||
#### ПИбд-42. Минхасапов Руслан.
|
||||
|
||||
---
|
||||
|
||||
**Цель:** Реализовать последовательный и параллельный алгоритмы умножения квадратных матриц и сравнить их производительность.
|
||||
|
||||
---
|
||||
|
||||
**Описание задачи:** В работе реализованы два алгоритма умножения матриц:
|
||||
|
||||
* **Последовательный:** Классический алгоритм умножения матриц с тремя вложенными циклами.
|
||||
* **Параллельный:** Алгоритм, распараллеливающий вычисления по строкам результирующей матрицы. Каждому потоку назначается подмножество строк для вычисления. Число потоков задается параметром.
|
||||
|
||||
---
|
||||
|
||||
**Описание параллельного алгоритма:**
|
||||
|
||||
Параллельный алгоритм `multiplyParallel` разделяет вычисления между заданным количеством потоков. Входные матрицы `A` и `B`, а также результирующая матрица `C` передаются в каждый поток.
|
||||
|
||||
1. **Разделение по строкам:** Матрица `C` разделяется на блоки по строкам. Каждый поток отвечает за вычисление элементов в своем блоке строк. Количество строк на поток рассчитывается как `size / numThreads`, где `size` - размер матрицы, а `numThreads` - количество потоков.
|
||||
|
||||
2. **Создание потоков:** Создается массив потоков `threads`. Каждый поток выполняет функцию `multiplyPart`, которая вычисляет значения элементов в заданном диапазоне строк.
|
||||
|
||||
3. **Запуск и синхронизация:** Все потоки запускаются с помощью `t.Start()`. Затем основной поток ожидает завершения всех потоков с помощью `t.Join()`. Это гарантирует, что все вычисления будут завершены до возвращения результата.
|
||||
|
||||
4. **Возврат результата:** После завершения всех потоков функция `multiplyParallel` возвращает результирующую матрицу `C`.
|
||||
|
||||
---
|
||||
|
||||
**Результаты бенчмарков:**
|
||||
|
||||
Были проведены тесты для матриц размером 100x100, 300x300 и 500x500 элементов с разным количеством процессов (2, 4, 8, 16, 32, 64, 128, 256, 512). Результаты представлены в таблице:
|
||||
|
||||
| Размер | Кол-во потоков | Алгоритм | Время (мс) |
|
||||
|---|---|---|---|
|
||||
| 100x100 | 1 | Последовательный | 11 |
|
||||
| 100x100 | 2 | Параллельный | 7 |
|
||||
| 100x100 | 4 | Параллельный | 6 |
|
||||
| 100x100 | 8 | Параллельный | 5 |
|
||||
| 100x100 | 16 | Параллельный | 6 |
|
||||
| 100x100 | 32 | Параллельный | 6 |
|
||||
| 100x100 | 64 | Параллельный | 14 |
|
||||
| 100x100 | 128 | Параллельный | 27 |
|
||||
| 100x100 | 256 | Параллельный | 44 |
|
||||
| 100x100 | 512 | Параллельный | 69 |
|
||||
| 300x300 | 1 | Последовательный | 393 |
|
||||
| 300x300 | 2 | Параллельный | 177 |
|
||||
| 300x300 | 4 | Параллельный | 197 |
|
||||
| 300x300 | 8 | Параллельный | 155 |
|
||||
| 300x300 | 16 | Параллельный | 148 |
|
||||
| 300x300 | 32 | Параллельный | 139 |
|
||||
| 300x300 | 64 | Параллельный | 153 |
|
||||
| 300x300 | 128 | Параллельный | 154 |
|
||||
| 300x300 | 256 | Параллельный | 167 |
|
||||
| 300x300 | 512 | Параллельный | 358 |
|
||||
| 500x500 | 1 | Последовательный | 1616 |
|
||||
| 500x500 | 2 | Параллельный | 831 |
|
||||
| 500x500 | 4 | Параллельный | 598 |
|
||||
| 500x500 | 8 | Параллельный | 522 |
|
||||
| 500x500 | 16 | Параллельный | 542 |
|
||||
| 500x500 | 32 | Параллельный | 578 |
|
||||
| 500x500 | 64 | Параллельный | 642 |
|
||||
| 500x500 | 128 | Параллельный | 812 |
|
||||
| 500x500 | 256 | Параллельный | 1038 |
|
||||
| 500x500 | 512 | Параллельный | 1496 |
|
||||
|
||||
---
|
||||
|
||||
**Анализ результатов:**
|
||||
|
||||
* **Влияние размера матрицы:** Время выполнения обоих алгоритмов ожидаемо растет с увеличением размера матрицы, подтверждая кубическую сложность O(n³).
|
||||
|
||||
* **Влияние числа потоков:** Параллельный алгоритм демонстрирует ускорение по сравнению с последовательным. Наиболее заметное ускорение наблюдается при умеренном количестве потоков. Для матрицы 100x100 оптимальное количество потоков — 8, для 300x300 — 32, а для 500x500 — 8.
|
||||
|
||||
* **Ухудшение производительности при большом числе потоков:** При слишком большом количестве потоков (64, 128, 256, 512) производительность начинает ухудшаться из-за накладных расходов на создание, управление и синхронизацию потоков. Эти расходы перевешивают выгоду от распараллеливания, особенно на относительно небольших матрицах.
|
||||
|
||||
* **Оптимальное число потоков:** Оптимальное количество потоков нелинейно зависит от размера матрицы и, вероятно, ограничено количеством логических ядер процессора. В данном случае, для матриц большего размера оптимальное число потоков меньше максимального протестированного, что указывает на наличие узкого места, связанного с управлением потоками.
|
||||
|
||||
**Выводы:**
|
||||
|
||||
Параллельное умножение матриц может значительно ускорить вычисления, но требует настройки количества потоков. Слишком большое количество потоков приводит к снижению производительности. Оптимальное количество потоков близко к количеству логических ядер процессора (8), но может варьироваться в зависимости от размера матрицы и других факторов. Результаты подчеркивают важность баланса между распараллеливанием вычислений и накладными расходами на управление потоками.
|
||||
|
||||
---
|
||||
|
||||
#### Демонстрация
|
||||
Видео доступно по [ссылке](https://disk.yandex.ru/i/aG-7_gTgRwqf5A)
|
1
mochalov_danila_lab_3/.gitignore
vendored
@ -1 +0,0 @@
|
||||
.venv
|