use chrono::{NaiveDateTime, Utc}; use mongodb::Database; use mongodb::bson::oid::ObjectId; use mongodb::bson::{doc, Document, Bson}; use futures::stream::{TryStreamExt, StreamExt}; use mongodb::options::{FindOptions, AggregateOptions, FindOneOptions}; use rust_decimal::Decimal; use rust_decimal::prelude::{ToPrimitive, FromPrimitive}; use std::default; use std::sync::Arc; use crate::models::rent::*; use crate::storages::traits::RentRepository; use async_trait::async_trait; pub struct MongoRentRepository { pub database: Arc } #[async_trait] impl RentRepository for MongoRentRepository { async fn create(&self, rent: BindingRent) -> Result { let collection = self.database.collection::("clients"); let start_time = chrono::Utc::now(); let filter = doc! { "_id": ObjectId::parse_str(&rent.client_id).unwrap() }; let update = doc! { "$push": { "rents": { "_id": ObjectId::new(), "start_time": start_time, "time_amount": Bson::Null, "car_id": ObjectId::parse_str(&rent.car_id).unwrap() } } }; let result = collection.update_one(filter, update, None).await; match result { Ok(result) => { if result.modified_count == 1 { Ok(Rent {id: "".to_string(), start_time: Default::default(), time_amount: None, car_id: "".to_string(), client_id: "".to_string()}) } else { Err("Rent not found".to_string()) } }, Err(e) => Err(e.to_string()) } } async fn read(&self, id: String) -> Result { let collection = self.database.collection::("clients"); let filter = doc! { "rents._id": ObjectId::parse_str(&id).unwrap() }; let projection = doc! { "rents.$": 1 }; let options = FindOneOptions::builder().projection(projection).build(); let result = collection.find_one(filter, options).await; match result { Ok(Some(document)) => { let rent_document = document.get_array("rents").unwrap().get(0).unwrap().as_document().unwrap(); let client_id = document.get_object_id("_id").unwrap().to_hex(); let id = rent_document.get_object_id("_id").unwrap().to_hex(); let start_time = rent_document.get_datetime("start_time").unwrap(); let time_amount = rent_document.get_i32("time_amount").ok(); let car_id = rent_document.get_object_id("car_id").unwrap().to_hex(); let rent = Rent { id, start_time: NaiveDateTime::from_timestamp_millis(start_time.timestamp_millis()).unwrap(), time_amount, client_id, car_id }; Ok(rent) }, Ok(None) => Err("Rent not found".to_string()), Err(e) => Err(e.to_string()) } } async fn read_all(&self) -> Result, String> { let collection = self.database.collection::("clients"); let result = collection.find(None, None).await; match result { Ok(mut cursor) => { let mut rents = Vec::new(); while let Some(document) = cursor.next().await { let another = document.clone().unwrap(); let rents_array = another.get_array("rents").unwrap(); let client_id = document.unwrap().get_object_id("_id").unwrap().to_hex(); for rent_document in rents_array.iter().map(|x| x.as_document().unwrap()) { let id = rent_document.get_object_id("_id").unwrap().to_hex(); let start_time = rent_document.get_datetime("start_time").unwrap(); let time_amount = rent_document.get_i32("time_amount").ok(); let car_id = rent_document.get_object_id("car_id").unwrap().to_hex(); let rent = Rent { id, start_time: NaiveDateTime::from_timestamp_millis(start_time.timestamp_millis()).unwrap(), time_amount, client_id: client_id.clone(), car_id }; rents.push(rent); } } Ok(rents) }, Err(e) => Err(e.to_string()) } } async fn update(&self, id: String, rent: BindingRent) -> Result { let collection = self.database.collection::("clients"); let filter = doc! { "_id": ObjectId::parse_str(&rent.client_id).unwrap(), "rents._id": ObjectId::parse_str(&id).unwrap() }; let update = doc! { "$set": { "rents.$.time_amount": rent.time_amount } }; let result = collection.update_one(filter, update, None).await; match result { Ok(success) => self.read(success.upserted_id.unwrap().as_object_id().unwrap().to_hex()).await, Err(e) => Err(e.to_string()) } } async fn delete(&self, id: String) -> Result<(), String> { let collection = self.database.collection::("clients"); let rent = self.read(id.clone()).await.unwrap(); let filter = doc! { "_id": ObjectId::parse_str(rent.client_id.clone()).unwrap(), "rents._id": ObjectId::parse_str(id.clone()).unwrap() }; let result = collection.update_one(filter, doc! {"$pull": {"rents": {"_id": ObjectId::parse_str(id.clone()).unwrap()}}}, None).await; match result { Ok(_) => Ok(()), Err(e) => Err(e.to_string()) } } async fn end_rent(&self, id: String) -> Result<(), String> { let rent = self.read(id.clone()).await?; if rent.time_amount.is_some() { return Err("Rent is already ended".to_owned()); } let time_amount = (Utc::now().naive_local() - rent.start_time).num_minutes() as i32; let collection = self.database.collection::("clients"); let filter = doc! {"_id": ObjectId::parse_str(&rent.client_id).unwrap(), "rents._id": ObjectId::parse_str(&rent.id).unwrap()}; let update = doc! {"$set": {"rents.$.time_amount": time_amount}}; let result = collection.update_one(filter, update, None).await; match result { Ok(_) => Ok(()), Err(e) => Err(e.to_string()) } } }