DAS_2024_1/vaksman_valeria_lab_3/document_server/main.py

115 lines
4.0 KiB
Python
Raw Normal View History

2024-10-11 19:19:17 +04:00
from fastapi import FastAPI, HTTPException
from fastapi.responses import RedirectResponse
from fastapi.responses import JSONResponse
from pydantic import BaseModel, Field
from uuid import UUID, uuid4
import requests
app = FastAPI(title="Document service")
product_url = 'http://product_server:8009'
# Добавили автогенерацию уникального идентификатора
class Document(BaseModel):
uuid_: UUID = Field(default_factory=uuid4)
title: str | None
content: str | None
manager: str | None
# Наше хранилище данных
# Пару id указываем явно, остальные генерируются случайно
DATA_LIST: list[Document] = [
Document(uuid_="4a3f30c3-823f-4278-8f98-76177ad083bd", title="Document 1", content="Content 1", manager="Manager 1"),
Document(uuid_="9c7f1ecb-1aac-4337-bcd0-f863cc73aeb0", title="Document 2", content="Content 2", manager="Manager 2"),
Document(title="Document 3", content="Content 3", manager="Manager 3"),
Document(title="Document 4", content="Content 4", manager="Manager 4"),
]
@app.get("/")
async def redirect_to_docs():
return RedirectResponse(url="/docs")
@app.get("/documents/all/", tags=["Document"])
def all_documents():
return DATA_LIST
@app.get("/documents/{document_id}", tags=["Document"])
def get_document(document_id: UUID):
document = next((x for x in DATA_LIST if x.uuid_ == document_id), None)
if not document:
return HTTPException(status_code=404, detail="Документ не найден")
return document
@app.get("/documents/full/{document_id}", tags=["Document"])
def get_full_document(document_id: UUID):
document = next((x for x in DATA_LIST if x.uuid_ == document_id), None)
if not document:
return HTTPException(status_code=404, detail="Продукт не найден")
result_dict = document.model_dump()
if type(result_dict['uuid_']) is UUID:
products = requests.get(f"{product_url}/products/list/documents/{document_id}")
if not products:
return HTTPException(status_code=404, detail="Продукты не найдены")
result_dict["products"] = products.json()
return result_dict
@app.post("/documents/", tags=["Document"])
def create_document(title: str, content: str, manager: str):
document = Document(title=title, content=content, manager=manager)
try:
DATA_LIST.append(document)
return JSONResponse(content={"message": "Успешное создание документа"}, status_code=200)
except Exception as e:
return HTTPException(status_code=404, detail={"Неудачная попытка добавления документа": str(e)})
@app.put("/documents/{document_id}", tags=["Document"])
def update_document(document_id: UUID, title: str = None, content: str = None, manager: str = None):
document = get_document(document_id)
if type(document) is Document:
index = DATA_LIST.index(document)
if title:
DATA_LIST[index].title = title
if content:
DATA_LIST[index].content = content
if manager:
DATA_LIST[index].manager = manager
return JSONResponse(content={"message": "Успешное обновление документа"}, status_code=201)
else:
return HTTPException(status_code=404, detail=document)
@app.delete("/documents/{document_id}", tags=["Document"])
def delete_document(document_id: UUID):
document = get_document(document_id)
if type(document) is Document:
index = DATA_LIST.index(document)
print(index)
del DATA_LIST[index]
return JSONResponse(content={"message": "Успешное удаление документа"}, status_code=201)
return HTTPException(status_code=404, detail=document)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8008)