diff --git a/Cloud/Controllers/HeaterController.cs b/Cloud/Controllers/HeaterController.cs new file mode 100644 index 0000000..659a373 --- /dev/null +++ b/Cloud/Controllers/HeaterController.cs @@ -0,0 +1,41 @@ +using Cloud.Requests; +using Cloud.Services.Broker; +using Cloud.Services.Broker.Implement.Kafka; +using Cloud.Services.Broker.Support; +using Microsoft.AspNetCore.Authorization; +using Microsoft.AspNetCore.Mvc; + +namespace Cloud.Controllers +{ + [Authorize] + [ApiController] + [Route("api")] + public class HeaterController : Controller + { + //Контроллер вентиля + + private readonly IBrokerService _kafkaService; + + public HeaterController(KafkaService kafkaService) + { + _kafkaService = kafkaService; + } + + [HttpPost("farm/{farmId}/greenhouse/{ghId}/heating")] + public async Task interactValve([FromBody] HeaterRequest request, int farmId, int ghId) + { + var command = new Command + { + request_id = Guid.NewGuid(), + greenhouse_id = ghId, + farm_id = farmId, + command = "heater", + target_temp = request.Temperature + }; + + await _kafkaService.ProduceAsync("commands", command); + + return Ok($"Heater target temperature is set as {request.Temperature}"); + } + } +} diff --git a/Cloud/Controllers/ValveController.cs b/Cloud/Controllers/ValveController.cs index 9fc973a..4193321 100644 --- a/Cloud/Controllers/ValveController.cs +++ b/Cloud/Controllers/ValveController.cs @@ -1,8 +1,12 @@ -using Cloud.Requests; +using Cloud.Models; +using Cloud.Requests; using Cloud.Services; using Cloud.Services.Broker; +using Cloud.Services.Broker.Implement.Kafka; +using Cloud.Services.Broker.Support; using Microsoft.AspNetCore.Authorization; using Microsoft.AspNetCore.Mvc; +using System; using System.ComponentModel.DataAnnotations; using System.Text.Json; @@ -17,7 +21,7 @@ namespace Cloud.Controllers private readonly IBrokerService _kafkaService; - public ValveController(IBrokerService kafkaService) + public ValveController(KafkaService kafkaService) { _kafkaService = kafkaService; } @@ -25,19 +29,18 @@ namespace Cloud.Controllers [HttpPost("farm/{farmId}/greenhouse/{ghId}/watering")] public async Task interactValve([FromBody] ValveRequest request, int farmId, int ghId) { - var kafkaRequest = new + var command = new Command { - FarmId = farmId, - GreenHouseId = ghId, - SomeAction = request.Action, + request_id = Guid.NewGuid(), + greenhouse_id = ghId, + farm_id = farmId, + command = "valve", + target_moisture = request.Moisture }; - - var message = JsonSerializer.Serialize(kafkaRequest); - return Ok(kafkaRequest); - /*await _kafkaService.ProduceAsync("ValvesHeatersRequest", message); + await _kafkaService.ProduceAsync("commands", command); - return Ok($"Valve status is {request.Action}");*/ + return Ok($"Valve target moisture is set as {request.Moisture}%"); } } } diff --git a/Cloud/Controllers/WebHookController.cs b/Cloud/Controllers/WebHookController.cs new file mode 100644 index 0000000..0610244 --- /dev/null +++ b/Cloud/Controllers/WebHookController.cs @@ -0,0 +1,58 @@ +using Cloud.Models; +using Cloud.Requests; +using Cloud.Services.Broker; +using Cloud.Services.Broker.Implement.Kafka; +using Cloud.Services.Broker.Support; +using Microsoft.AspNetCore.Mvc; +using System; + +namespace Cloud.Controllers +{ + [ApiController] + [Route("api/[controller]")] + public class WebhookController : ControllerBase + { + private readonly IBrokerService _kafkaService; + + public WebhookController (KafkaService kafkaService) + { + _kafkaService = kafkaService; + } + + [HttpPost] + public async Task ReceiveWebhook([FromBody] WebhookData payload) + { + if (payload == null) + { + return BadRequest("Invalid data."); + } + + if (payload.moisture >= payload.target_moisture && payload.valveStatus == "open") + { + var command = new Command + { + request_id = Guid.NewGuid(), + greenhouse_id = payload.greenhouse_id, + command = "valve", + }; + + await _kafkaService.ProduceAsync("commands", command); + } + + if (payload.temp >= payload.target_temp && payload.heaterStatus == "on") + { + var command = new Command + { + request_id = Guid.NewGuid(), + greenhouse_id = payload.greenhouse_id, + command = "heater", + }; + + await _kafkaService.ProduceAsync("commands", command); + } + + // Respond with a success status + return Ok(new { message = "Webhook received successfully", data = payload }); + } + } +} diff --git a/Cloud/Program.cs b/Cloud/Program.cs index 16d0c36..50bcf7b 100644 --- a/Cloud/Program.cs +++ b/Cloud/Program.cs @@ -71,6 +71,7 @@ builder.Services.AddValidatorsFromAssemblyContaining(); builder.Services.AddValidatorsFromAssemblyContaining(); builder.Services.AddValidatorsFromAssemblyContaining(); builder.Services.AddValidatorsFromAssemblyContaining(); +builder.Services.AddValidatorsFromAssemblyContaining(); // Learn more about configuring Swagger/OpenAPI at https://aka.ms/aspnetcore/swashbuckle builder.Services.AddEndpointsApiExplorer(); diff --git a/Cloud/Requests/HeaterRequest.cs b/Cloud/Requests/HeaterRequest.cs new file mode 100644 index 0000000..abe79fa --- /dev/null +++ b/Cloud/Requests/HeaterRequest.cs @@ -0,0 +1,7 @@ +namespace Cloud.Requests +{ + public class HeaterRequest + { + public float Temperature { get; set; } + } +} diff --git a/Cloud/Requests/ValveRequest.cs b/Cloud/Requests/ValveRequest.cs index dee6a5a..401859f 100644 --- a/Cloud/Requests/ValveRequest.cs +++ b/Cloud/Requests/ValveRequest.cs @@ -2,6 +2,6 @@ { public class ValveRequest { - public string Action { get; set; } + public float Moisture { get; set; } } } diff --git a/Cloud/Requests/WebhookData.cs b/Cloud/Requests/WebhookData.cs new file mode 100644 index 0000000..76df591 --- /dev/null +++ b/Cloud/Requests/WebhookData.cs @@ -0,0 +1,15 @@ +namespace Cloud.Requests +{ + public class WebhookData + { + public int request_id { get; set; } + public int greenhouse_id { get; set; } + public float moisture { get; set; } + public float target_moisture { get; set; } + public string valveStatus { get; set; } + public float temp { get; set; } + public float target_temp { get; set; } + public string heaterStatus { get; set; } + public bool isAutoOn { get; set; } + } +} diff --git a/Cloud/Services/Broker/Implement/Kafka/KafkaConsumer.cs b/Cloud/Services/Broker/Implement/Kafka/KafkaConsumer.cs index f29a4b8..1a47370 100644 --- a/Cloud/Services/Broker/Implement/Kafka/KafkaConsumer.cs +++ b/Cloud/Services/Broker/Implement/Kafka/KafkaConsumer.cs @@ -14,8 +14,8 @@ namespace Cloud.Services.Broker.Implement.Kafka public KafkaConsumer(IConfiguration config) { _config = config; - Console.WriteLine($"KafkaConsumer created. IP:" + _config["KAFKA_URL"]); - ChangeBrokerIp(_config["KAFKA_URL"]); + Console.WriteLine($"KafkaConsumer created. IP:" + _config["Kafka:BootstrapServers"]); + ChangeBrokerIp(_config["Kafka:BootstrapServers"]); } public IEnumerable? WaitMessages(string topic) diff --git a/Cloud/Services/Broker/Implement/Kafka/KafkaProducer.cs b/Cloud/Services/Broker/Implement/Kafka/KafkaProducer.cs index 6a916bb..e583512 100644 --- a/Cloud/Services/Broker/Implement/Kafka/KafkaProducer.cs +++ b/Cloud/Services/Broker/Implement/Kafka/KafkaProducer.cs @@ -12,8 +12,8 @@ namespace Cloud.Services.Broker.Implement.Kafka public KafkaProducer(IConfiguration configuration) { _config = configuration; - Console.WriteLine($"KafkaProducer created. IP:" + _config["KAFKA_URL"]); - ChangeBrokerIp(_config["KAFKA_URL"]); + Console.WriteLine($"KafkaProducer created. IP:" + _config["Kafka:BootstrapServers"]); + ChangeBrokerIp(_config["Kafka:BootstrapServers"]); } public async Task ProduceAsync(string topic, Command command) { diff --git a/Cloud/Services/Broker/Implement/Kafka/KafkaService.cs b/Cloud/Services/Broker/Implement/Kafka/KafkaService.cs index 8fc9174..d6c1c1b 100644 --- a/Cloud/Services/Broker/Implement/Kafka/KafkaService.cs +++ b/Cloud/Services/Broker/Implement/Kafka/KafkaService.cs @@ -19,12 +19,12 @@ namespace Cloud.Services.Broker.Implement.Kafka public async Task ProduceAsync(string topic, Command command) - => await _producer.ProduceAsync("commands", command); + => await _producer.ProduceAsync(topic, command); public void ChangeBrokerIp(string ip) { _consumer.ChangeBrokerIp(ip); _producer.ChangeBrokerIp(ip); } - } + } } \ No newline at end of file diff --git a/Cloud/Services/Broker/Support/Command.cs b/Cloud/Services/Broker/Support/Command.cs index 2debae3..7d33645 100644 --- a/Cloud/Services/Broker/Support/Command.cs +++ b/Cloud/Services/Broker/Support/Command.cs @@ -4,7 +4,11 @@ namespace Cloud.Services.Broker.Support { public class Command { - public Guid GreenhouseId { get; set; } - public string CommandName { get; set; } = null!; - } + public Guid request_id { get; set; } + public int greenhouse_id { get; set; } + public int? farm_id { get; set; } + public string command { get; set; } + public float? target_moisture{ get; set; } + public float? target_temp { get; set; } + } } \ No newline at end of file diff --git a/Cloud/Validation/HeaterValidator.cs b/Cloud/Validation/HeaterValidator.cs new file mode 100644 index 0000000..30ca923 --- /dev/null +++ b/Cloud/Validation/HeaterValidator.cs @@ -0,0 +1,15 @@ +using Cloud.Requests; +using FluentValidation; + +namespace Cloud.Validation +{ + public class HeaterValidator : AbstractValidator + { + public HeaterValidator() + { + + RuleFor(request => request.Temperature) + .NotNull().WithMessage("Temperature can't be empty"); + } + } +} diff --git a/Cloud/Validation/ValveValidator.cs b/Cloud/Validation/ValveValidator.cs index 4a19f58..58cb791 100644 --- a/Cloud/Validation/ValveValidator.cs +++ b/Cloud/Validation/ValveValidator.cs @@ -8,9 +8,8 @@ namespace Cloud.Validation { public ValveValidator() { - RuleFor(request => request.Action) - .NotEmpty().WithMessage("Action can't be empty"). - IsEnumName(typeof (ValveEnum)).WithMessage("Action is not correct"); + RuleFor(request => request.Moisture) + .NotNull().WithMessage("Moisture can't be empty"); } } } diff --git a/GreenhouseManager/manager.py b/GreenhouseManager/manager.py index 8366baf..df06555 100644 --- a/GreenhouseManager/manager.py +++ b/GreenhouseManager/manager.py @@ -15,23 +15,19 @@ def start_manager(): return class Manager: - def __init__(self, _id: int, moisture: float = 0, temp: float = 20, isAutoOn: bool = False, valve_state: str = "closed", - heater_state: str = "off"): + def __init__(self, _id: int, moisture: float = 0, target_moisture: float = 30, temp: float = 20, target_temp: float = 30, + isAutoOn: bool = False, valve_state: str = "closed", heater_state: str = "off"): KAFKA_URL = os.environ.get('KAFKA_URL') print("KAFKA_URL=", KAFKA_URL) self._id = _id self.moisture = moisture + self.target_moisture = target_moisture self.temp = temp + self.temp = target_temp self.isAutoOn = isAutoOn self.valve_state = valve_state self.heater_state = heater_state - self.dataPublisher = KafkaProducer( - bootstrap_servers=[KAFKA_URL], - client_id=f'manager{self._id}_producer', - value_serializer=lambda v: dumps(v).encode('utf-8') - ) - self.controllerConsumer = KafkaConsumer( 'commands', bootstrap_servers=[KAFKA_URL], @@ -47,22 +43,6 @@ class Manager: value_serializer=lambda v: dumps(v).encode('utf-8') ) - def sendData(self): - print("sending data...") - message = { - 'id': self._id, - 'moisture': self.moisture, - 'temp': self.temp, - 'valveStatus': str(self.valve_state), - 'heaterStatus': str(self.heater_state), - 'isAutoOn': self.isAutoOn - } - - print(message) - self.dataPublisher.send('data', message) - self.dataPublisher.flush() - - def toggle_device(self, device, request_id, greenhouse_id): if device == 'valve': @@ -87,11 +67,16 @@ class Manager: def send_status(self, request_id, greenhouse_id): - status = { + data = { 'request_id': request_id, 'greenhouse_id': greenhouse_id, - 'valve_state': self.valve_state, - 'heater_state': self.heater_state + 'moisture': self.moisture, + 'target_moisture': self.target_moisture, + 'valveStatus': self.valve_state, + 'temp': self.temp, + 'target_temp': self.target_temp, + 'heaterStatus': self.heater_state, + 'isAutoOn': self.isAutoOn } self.sendDataCommand(status) print("Updating info...\n") @@ -99,7 +84,16 @@ class Manager: def sendDataCommand(self, message): print("sending data...") - self.dataPublisher.send('response', message) + url = "https://localhost:7113/api/webhook" + headers = { + "Content-Type": "application/json" + } + + response = requests.post(url, json=message, headers=headers) + + # Check status code and content + prin("Status Code", response.status_code) + prin("Response Body", response.json()) def getCommand(self): messages = self.controllerConsumer.poll(timeout_ms=1000) @@ -113,25 +107,12 @@ class Manager: self.request_id = message.value['request_id'] self.greenhouse_id = message.value['greenhouse_id'] self.command = message.value['command'] + if message.value.get('target_moisture'): + self.target_moisture = message.value['target_moisture'] + if message.value.get('target_temp'): + self.target_temp = message.value['target_temp'] self.toggle_device(self.command, self.request_id, self.greenhouse_id) -@app.route(f'/webhook', methods=['POST']) -def webhook(): - print("received webhook", request.args.get('id')) - for manager in managers: - print() - if int(request.args.get('id')) == manager._id and request.method == 'POST': - print("Data received from Webhook is", request.json) - - body = request.json - for key, value in body.items(): - setattr(manager, key, value) - - manager.sendData() - - return f"Webhook received for manager {manager._id}" - return "Webhook ignored" - t1 = threading.Thread(target=start_manager) manager1 = Manager(_id=1) managers = [manager1]