diff --git a/.env b/.env new file mode 100644 index 0000000..eafc6c7 --- /dev/null +++ b/.env @@ -0,0 +1,4 @@ +POSTGRES_USER="postgres" +POSTGRES_PASSWORD="12345" +POSTGRES_DB="main_database" +DB_CONNECTION_STRING="Host=postgres:5432;Database=${POSTGRES_DB};Username=${POSTGRES_USER};Password=${POSTGRES_PASSWORD}" \ No newline at end of file diff --git a/Cloud/ApplicationContext.cs b/Cloud/ApplicationContext.cs index 6a60cf1..681040d 100644 --- a/Cloud/ApplicationContext.cs +++ b/Cloud/ApplicationContext.cs @@ -6,6 +6,7 @@ public class ApplicationContext : DbContext { public DbSet Users { get; set; } = null!; public DbSet Farms { get; set; } = null!; + public DbSet Greenhouses { get; set; } = null!; public ApplicationContext(DbContextOptions options) : base(options) diff --git a/Cloud/Cloud.csproj b/Cloud/Cloud.csproj index e74339d..042333f 100644 --- a/Cloud/Cloud.csproj +++ b/Cloud/Cloud.csproj @@ -7,7 +7,7 @@ - + diff --git a/Cloud/Controllers/GreengouseController.cs b/Cloud/Controllers/GreengouseController.cs new file mode 100644 index 0000000..9f4c025 --- /dev/null +++ b/Cloud/Controllers/GreengouseController.cs @@ -0,0 +1,160 @@ +using Cloud.Models; +using Cloud.Requests; +using Cloud.Services.Broker; +using Cloud.Services.Broker.Support; +using Cloud.Services.Domain; +using Microsoft.AspNetCore.Mvc; + +namespace Cloud.Controllers +{ + [ApiController] + [Route("api/farm/{farmId}/greenhouse")] + public class GreenhouseController : ControllerBase + { + private readonly IGreenhouseService _greenhouseService; + public GreenhouseController(IGreenhouseService greenhouseService) + { + _greenhouseService = greenhouseService; + } + + /// + /// Возвращает текущую информацию о всех теплицах пользователя + /// + /// + /// + [HttpGet] + public async Task>> GetAll(int farmId) + { + try + { + var greenhouses = _greenhouseService.GetAll(farmId); + if (greenhouses == null) return NotFound("Greenhouses is not found"); + return Ok(greenhouses); + } + catch (Exception ex) + { + return BadRequest(ex.Message); + } + } + + /// + /// Возвращает текущую информацию о конкретной теплице + /// + /// + /// + /// + [HttpGet("{greenhouseId}")] + public async Task> Get(int farmId, int greenhouseId) + { + try + { + var greenhouses = _greenhouseService.GetGreenhouseInfo(greenhouseId, farmId); + return Ok(greenhouses); + } + catch (Exception ex) + { + return BadRequest(ex.Message); + } + } + + /// + /// Возвращает сохраненные данные для автоматизации теплицы + /// + /// + /// + /// + [HttpGet("{greenhouseId}/settings")] + public async Task> GetGreenhouse(int farmId, int greenhouseId) + { + try + { + var greenhouse = await _greenhouseService.GetGreenhouse(greenhouseId); + if (greenhouse == null) return NotFound("Greenhouses is not found"); + return Ok(greenhouse); + } + catch (Exception ex) + { + return BadRequest(ex.Message); + } + } + + /// + /// Сохраняет в базе данных API данные для автоматизации теплицы + /// + /// + /// + /// + [HttpPost] + public async Task> SaveToDatabase(int farmId, GreenhouseRequest greenhouse) + { + try + { + var greenhouseEntity = new Greenhouse() + { + RecomendedTemperature = greenhouse.RecomendedTemperature, + WateringMode = greenhouse.WateringMode, + HeatingMode = greenhouse.HeatingMode + }; + + var result = await _greenhouseService.Create(greenhouseEntity); + return Ok(result); + } + catch (Exception ex) + { + return BadRequest(ex.Message); + } + + } + + + /// + /// Обновляет в базе данных API данные для автоматизации теплицы + /// + /// ID фермы + /// ID теплицы + /// Данные для обновления + /// Обновленный объект Greenhouse + [HttpPut("{greenhouseId}/settings")] + public async Task> Update(int farmId, int greenhouseId, GreenhouseRequest greenhouse) + { + try + { + var greenhouseEntity = new Greenhouse() + { + Id = greenhouseId, + FarmId = farmId, + WateringMode = greenhouse.WateringMode, + HeatingMode = greenhouse.HeatingMode, + RecomendedTemperature = greenhouse.RecomendedTemperature + }; + + var result = await _greenhouseService.Update(greenhouseEntity); + return Ok(result); + } + catch (Exception ex) + { + return BadRequest(ex.Message); + } + } + + /// + /// Удаляет из базы данных API запись настроек автоматизации теплицы + /// + /// + /// + /// + [HttpDelete("{greenhouseId}")] + public async Task Delete(int farmId, int greenhouseId) + { + try + { + _ = await _greenhouseService.Delete(greenhouseId); + return Ok(); + } + catch (Exception ex) + { + return BadRequest(ex.Message); + } + } + } +} \ No newline at end of file diff --git a/Cloud/Controllers/ValveController.cs b/Cloud/Controllers/ValveController.cs index dbc081a..9fc973a 100644 --- a/Cloud/Controllers/ValveController.cs +++ b/Cloud/Controllers/ValveController.cs @@ -1,5 +1,6 @@ using Cloud.Requests; using Cloud.Services; +using Cloud.Services.Broker; using Microsoft.AspNetCore.Authorization; using Microsoft.AspNetCore.Mvc; using System.ComponentModel.DataAnnotations; @@ -14,11 +15,11 @@ namespace Cloud.Controllers { //Контроллер вентиля - private readonly ProducerService _producerService; + private readonly IBrokerService _kafkaService; - public ValveController(ProducerService producerService) + public ValveController(IBrokerService kafkaService) { - _producerService = producerService; + _kafkaService = kafkaService; } [HttpPost("farm/{farmId}/greenhouse/{ghId}/watering")] @@ -34,7 +35,7 @@ namespace Cloud.Controllers var message = JsonSerializer.Serialize(kafkaRequest); return Ok(kafkaRequest); - /*await _producerService.ProduceMessageAsync("ValvesHeatersRequest", message); + /*await _kafkaService.ProduceAsync("ValvesHeatersRequest", message); return Ok($"Valve status is {request.Action}");*/ } diff --git a/Cloud/Dockerfile b/Cloud/Dockerfile index 084ab62..b4c2152 100644 --- a/Cloud/Dockerfile +++ b/Cloud/Dockerfile @@ -10,7 +10,7 @@ RUN adduser -u 5678 --disabled-password --gecos "" appuser && chown -R appuser / USER appuser FROM mcr.microsoft.com/dotnet/sdk:6.0 AS build -ARG configuration=Release +ARG configuration=Development WORKDIR /src COPY ["Cloud.csproj", "."] RUN dotnet restore "./Cloud.csproj" @@ -19,11 +19,12 @@ WORKDIR "/src/." RUN dotnet build "./Cloud.csproj" -c $configuration -o /app/build FROM build AS publish -ARG configuration=Release +ARG configuration=Development RUN dotnet publish "./Cloud.csproj" -c $configuration -o /app/publish /p:UseAppHost=false FROM base AS final WORKDIR /app COPY --from=publish /app/publish . + ENTRYPOINT ["dotnet", "Cloud.dll"] diff --git a/Cloud/Middlewares/DatabaseMiddleware.cs b/Cloud/Middlewares/DatabaseMiddleware.cs new file mode 100644 index 0000000..88091bf --- /dev/null +++ b/Cloud/Middlewares/DatabaseMiddleware.cs @@ -0,0 +1,30 @@ +using Microsoft.EntityFrameworkCore; + +namespace Cloud.Middlewares; + +public static class DatabaseMiddleware +{ + public static void AddDbConnectionService(this IServiceCollection services) + { + string connectionString = Environment.GetEnvironmentVariable("DB_CONNECTION_STRING") + ?? "Host=localhost;Port=5438;Database=main_database;Username=postgres;Password=12345"; + + services.AddDbContext(options => + options.UseNpgsql(connectionString)); + + } + public static void MigrateDb(this IApplicationBuilder app) + { + try + { + using var scope = app.ApplicationServices.CreateScope(); + var context = scope.ServiceProvider.GetRequiredService(); + + context.Database.Migrate(); + } + catch (Exception ex) + { + Console.WriteLine(ex.Message); + } + } +} \ No newline at end of file diff --git a/Cloud/Models/Farm.cs b/Cloud/Models/Farm.cs index 5cc1ac8..48fd855 100644 --- a/Cloud/Models/Farm.cs +++ b/Cloud/Models/Farm.cs @@ -7,5 +7,6 @@ public int UserId { get; set; } public User? User { get; set; } public string RaspberryIP { get; set; } + List Greenhouses { get; set; } = new(); } } \ No newline at end of file diff --git a/Cloud/Models/Greenhouse.cs b/Cloud/Models/Greenhouse.cs new file mode 100644 index 0000000..ec2f54a --- /dev/null +++ b/Cloud/Models/Greenhouse.cs @@ -0,0 +1,14 @@ +using Cloud.Models.Support; + +namespace Cloud.Models +{ + public class Greenhouse + { + public int Id { get; set; } + public int RecomendedTemperature { get; set; } + public WateringMode WateringMode { get; set; } + public HeatingMode HeatingMode { get; set; } + public int FarmId { get; set; } + public Farm? Farm { get; set; } + } +} \ No newline at end of file diff --git a/Cloud/Models/Support/HeatingMode.cs b/Cloud/Models/Support/HeatingMode.cs new file mode 100644 index 0000000..81f082e --- /dev/null +++ b/Cloud/Models/Support/HeatingMode.cs @@ -0,0 +1,8 @@ +namespace Cloud.Models.Support +{ + public enum HeatingMode + { + Manual, + Auto + } +} \ No newline at end of file diff --git a/Cloud/Models/Support/WateringMode.cs b/Cloud/Models/Support/WateringMode.cs new file mode 100644 index 0000000..3e58fe4 --- /dev/null +++ b/Cloud/Models/Support/WateringMode.cs @@ -0,0 +1,8 @@ +namespace Cloud.Models.Support +{ + public enum WateringMode + { + Manual, + Auto + } +} \ No newline at end of file diff --git a/Cloud/Program.cs b/Cloud/Program.cs index 0a1ad0f..16d0c36 100644 --- a/Cloud/Program.cs +++ b/Cloud/Program.cs @@ -7,31 +7,31 @@ using FluentValidation; using FluentValidation.AspNetCore; using Cloud.Validation; using StackExchange.Redis; +using Cloud.Services.Broker.Implement.Kafka; +using Cloud.Services.Broker; using Cloud.Services; +using Cloud.Services.Domain.Implement; +using Cloud.Services.Domain; using Cloud.Services.Cache; +using Cloud.Support; +using System.Text.RegularExpressions; +using Cloud.Middlewares; var builder = WebApplication.CreateBuilder(args); // Add services to the container. +builder.Services.AddSingleton(); +builder.Services.AddTransient(); //Redis configuration +string redisUrl = Environment.GetEnvironmentVariable("REDIS_URL") ?? "localhost:6379"; builder.Services.AddSingleton(sp => { - var configuration = ConfigurationOptions.Parse("localhost:6379"); + var configuration = ConfigurationOptions.Parse(redisUrl); return ConnectionMultiplexer.Connect(configuration); }); - builder.Services.AddSingleton(); -//Kafka producer service -builder.Services.AddSingleton(); - -//Kafka consumer service -builder.Services.AddSingleton(); - -//Add the BackgroundWorkerService -builder.Services.AddHostedService(); - //Jwt configuration var jwtIssuer = builder.Configuration.GetSection("Jwt:Issuer").Get(); var jwtKey = builder.Configuration.GetSection("Jwt:Key").Get(); @@ -50,16 +50,15 @@ builder.Services.AddAuthentication(JwtBearerDefaults.AuthenticationScheme) IssuerSigningKey = new SymmetricSecurityKey(Encoding.UTF8.GetBytes(jwtKey)) }; }); - -builder.Services.AddDbContext(options => - options.UseNpgsql("Host=localhost;Port=5438;Database=main_database;Username=postgres;Password=12345")); - +// Настройка подключения к БД +builder.Services.AddDbConnectionService(); // Настройка CORS +string frontUrl = Environment.GetEnvironmentVariable("FRONT_URL") ?? "http://localhost:3000"; builder.Services.AddCors(options => { options.AddPolicy("AllowFrontendLocalhost", builder => { - builder.WithOrigins("http://localhost:3000") // фронтенд + builder.WithOrigins(frontUrl) // фронтенд .AllowAnyHeader() .AllowAnyMethod(); }); @@ -108,6 +107,7 @@ var app = builder.Build(); // Configure the HTTP request pipeline. if (app.Environment.IsDevelopment()) { + Console.WriteLine("Swagger enabled"); app.UseSwagger(); app.UseSwaggerUI(c => { @@ -121,6 +121,9 @@ app.UseHttpsRedirection(); // Включение CORS app.UseCors("AllowFrontendLocalhost"); +// Применение миграций +app.MigrateDb(); + app.UseAuthentication(); app.UseAuthorization(); diff --git a/Cloud/Requests/GreenhouseRequest.cs b/Cloud/Requests/GreenhouseRequest.cs new file mode 100644 index 0000000..7f2fe80 --- /dev/null +++ b/Cloud/Requests/GreenhouseRequest.cs @@ -0,0 +1,11 @@ +using Cloud.Models.Support; + +namespace Cloud.Requests +{ + public class GreenhouseRequest + { + public int RecomendedTemperature { get; set; } + public WateringMode WateringMode { get; set; } + public HeatingMode HeatingMode { get; set; } + } +} \ No newline at end of file diff --git a/Cloud/Services/BackgroundWorkerService.cs b/Cloud/Services/BackgroundWorkerService.cs deleted file mode 100644 index f7fbf34..0000000 --- a/Cloud/Services/BackgroundWorkerService.cs +++ /dev/null @@ -1,39 +0,0 @@ -namespace Cloud.Services -{ - public class BackgroundWorkerService : BackgroundService - { - public readonly ILogger _logger; - private readonly ConsumerService _consumerService; - - public BackgroundWorkerService(ILogger logger, ConsumerService consumer) - { - _logger = logger; - _consumerService = consumer; - } - - //Backghround Service, This will run continuously - protected override async Task ExecuteAsync(CancellationToken stoppingToken) - { - try - { - while (!stoppingToken.IsCancellationRequested) - { - //_logger.LogInformation("Background Service is Runing at : {time}", DateTimeOffset.Now); - - string request = await _consumerService.WaitMessage("ValvesHeatersRequest"); //Consume the Kafka Message - - //After Consume the Order Request Can process the order - if (!string.IsNullOrEmpty(request)) - _logger.LogInformation("Valves-Heaters Request : {value}", request); - - - await Task.Delay(1000, stoppingToken); - } - } - catch (Exception ex) - { - _logger.LogError($"BackgroundWorkerService - Exception {ex}"); - } - } - } -} diff --git a/Cloud/Services/Broker/IBrokerConsumer.cs b/Cloud/Services/Broker/IBrokerConsumer.cs new file mode 100644 index 0000000..c152981 --- /dev/null +++ b/Cloud/Services/Broker/IBrokerConsumer.cs @@ -0,0 +1,9 @@ +using Cloud.Services.Broker.Support; + +namespace Cloud.Services.Broker +{ + public interface IBrokerConsumer + { + IEnumerable? WaitMessages(string topic) where T : IBrokerResponse; + } +} \ No newline at end of file diff --git a/Cloud/Services/Broker/IBrokerProducer.cs b/Cloud/Services/Broker/IBrokerProducer.cs new file mode 100644 index 0000000..abed579 --- /dev/null +++ b/Cloud/Services/Broker/IBrokerProducer.cs @@ -0,0 +1,9 @@ +using Cloud.Services.Broker.Support; + +namespace Cloud.Services.Broker +{ + public interface IBrokerProducer + { + Task ProduceAsync(string topic, Command command); + } +} \ No newline at end of file diff --git a/Cloud/Services/Broker/IBrokerService.cs b/Cloud/Services/Broker/IBrokerService.cs new file mode 100644 index 0000000..356157b --- /dev/null +++ b/Cloud/Services/Broker/IBrokerService.cs @@ -0,0 +1,9 @@ +using Cloud.Services.Broker.Support; + +namespace Cloud.Services.Broker +{ + public interface IBrokerService : IBrokerProducer, IBrokerConsumer + { + void ChangeBrokerIp(string ip); + } +} \ No newline at end of file diff --git a/Cloud/Services/Broker/Implement/Kafka/KafkaConsumer.cs b/Cloud/Services/Broker/Implement/Kafka/KafkaConsumer.cs new file mode 100644 index 0000000..f29a4b8 --- /dev/null +++ b/Cloud/Services/Broker/Implement/Kafka/KafkaConsumer.cs @@ -0,0 +1,93 @@ + +using Cloud.Services.Broker.Support; +using Confluent.Kafka; +using System.Diagnostics; +using System.Text.Json; + +namespace Cloud.Services.Broker.Implement.Kafka +{ + public class KafkaConsumer : IBrokerConsumer + { + private IConsumer _consumer; + private readonly IConfiguration _config; + + public KafkaConsumer(IConfiguration config) + { + _config = config; + Console.WriteLine($"KafkaConsumer created. IP:" + _config["KAFKA_URL"]); + ChangeBrokerIp(_config["KAFKA_URL"]); + } + + public IEnumerable? WaitMessages(string topic) + where T : IBrokerResponse + { + List res = new(); + List partitions; + + using var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = _config["KAFKA_URL"] }).Build(); + var meta = adminClient.GetMetadata(TimeSpan.FromSeconds(20)); + var currentTopic = meta.Topics.SingleOrDefault(t => t.Topic == topic) + ?? throw new Exception($"Topic {topic} not found"); + partitions = currentTopic.Partitions; + + _consumer.Subscribe(topic); + foreach (var partition in partitions) + { + var topicPartition = new TopicPartition(topic, partition.PartitionId); + _consumer.Assign(topicPartition); + + T? message = _consume(); + if (message == null) return null; + res.Add(message); + + } + _consumer.Unassign(); + _consumer.Unsubscribe(); + return res; + } + + private T? _consume() where T : IBrokerResponse + { + var sw = new Stopwatch(); + sw.Start(); + try + { + while (true) + { + var consumeResult = _consumer.Consume(TimeSpan.FromMinutes(1)); + + if (consumeResult?.Message?.Value == null) + { + // Предел по времени + if (sw.Elapsed > TimeSpan.FromMinutes(1)) + { + return default; + } + continue; + } + + string jsonObj = consumeResult.Message.Value; + return JsonSerializer.Deserialize(jsonObj); + + } + } + catch (Exception ex) + { + _consumer.Close(); + throw; + } + } + + public void ChangeBrokerIp(string ip) + { + var consumerConfig = new ConsumerConfig() + { + BootstrapServers = ip, + GroupId = _config["Kafka:GroupId"], + AutoOffsetReset = AutoOffsetReset.Earliest, + }; + _consumer?.Close(); + _consumer = new ConsumerBuilder(consumerConfig).Build(); + } + } +} \ No newline at end of file diff --git a/Cloud/Services/Broker/Implement/Kafka/KafkaProducer.cs b/Cloud/Services/Broker/Implement/Kafka/KafkaProducer.cs new file mode 100644 index 0000000..6a916bb --- /dev/null +++ b/Cloud/Services/Broker/Implement/Kafka/KafkaProducer.cs @@ -0,0 +1,38 @@ +using System.Text.Json; +using Cloud.Services.Broker.Support; +using Confluent.Kafka; + +namespace Cloud.Services.Broker.Implement.Kafka +{ + public class KafkaProducer : IBrokerProducer + { + private IProducer _producer; + private readonly IConfiguration _config; + + public KafkaProducer(IConfiguration configuration) + { + _config = configuration; + Console.WriteLine($"KafkaProducer created. IP:" + _config["KAFKA_URL"]); + ChangeBrokerIp(_config["KAFKA_URL"]); + } + public async Task ProduceAsync(string topic, Command command) + { + var commandSerialized = JsonSerializer.Serialize(command); + var message = new Message { Key = Guid.NewGuid().ToString(), Value = commandSerialized }; + + //Produce the Message + await _producer.ProduceAsync(topic, message); + } + + public void ChangeBrokerIp(string ip) + { + var producerConfig = new ProducerConfig + { + BootstrapServers = ip + }; + + //Build the Producer + _producer = new ProducerBuilder(producerConfig).Build(); + } + } +} \ No newline at end of file diff --git a/Cloud/Services/Broker/Implement/Kafka/KafkaService.cs b/Cloud/Services/Broker/Implement/Kafka/KafkaService.cs new file mode 100644 index 0000000..8fc9174 --- /dev/null +++ b/Cloud/Services/Broker/Implement/Kafka/KafkaService.cs @@ -0,0 +1,30 @@ +using Cloud.Services.Broker.Support; + +namespace Cloud.Services.Broker.Implement.Kafka +{ + public class KafkaService : IBrokerService + { + private readonly KafkaProducer _producer; + private readonly KafkaConsumer _consumer; + + public KafkaService(IConfiguration configuration) + { + _producer = new KafkaProducer(configuration); + _consumer = new KafkaConsumer(configuration); + } + + public IEnumerable? WaitMessages(string topic) + where T : IBrokerResponse + => _consumer.WaitMessages(topic); + + + public async Task ProduceAsync(string topic, Command command) + => await _producer.ProduceAsync("commands", command); + + public void ChangeBrokerIp(string ip) + { + _consumer.ChangeBrokerIp(ip); + _producer.ChangeBrokerIp(ip); + } + } +} \ No newline at end of file diff --git a/Cloud/Services/Broker/Support/Command.cs b/Cloud/Services/Broker/Support/Command.cs new file mode 100644 index 0000000..2debae3 --- /dev/null +++ b/Cloud/Services/Broker/Support/Command.cs @@ -0,0 +1,10 @@ +using System.Text.Json; + +namespace Cloud.Services.Broker.Support +{ + public class Command + { + public Guid GreenhouseId { get; set; } + public string CommandName { get; set; } = null!; + } +} \ No newline at end of file diff --git a/Cloud/Services/Broker/Support/CommandResult.cs b/Cloud/Services/Broker/Support/CommandResult.cs new file mode 100644 index 0000000..e66004d --- /dev/null +++ b/Cloud/Services/Broker/Support/CommandResult.cs @@ -0,0 +1,9 @@ +namespace Cloud.Services.Broker.Support +{ + public class CommandResult : IBrokerResponse + { + public int CommandId { get; set; } + public int GreenhouseId { get; set; } + public string ResultMessage { get; set; } = string.Empty; + } +} \ No newline at end of file diff --git a/Cloud/Services/Broker/Support/GreenhouseInfo.cs b/Cloud/Services/Broker/Support/GreenhouseInfo.cs new file mode 100644 index 0000000..bea028b --- /dev/null +++ b/Cloud/Services/Broker/Support/GreenhouseInfo.cs @@ -0,0 +1,12 @@ +namespace Cloud.Services.Broker.Support +{ + public class GreenhouseInfo : IBrokerResponse + { + public int Id { get; set; } + public int PercentWater { get; set; } + public int SoilTemperature { get; set; } + public bool PumpStatus { get; set; } + public bool HeatingStatus { get; set; } + public bool AutoWateringStatus { get; set; } + } +} \ No newline at end of file diff --git a/Cloud/Services/Broker/Support/IBrokerResponse.cs b/Cloud/Services/Broker/Support/IBrokerResponse.cs new file mode 100644 index 0000000..e13b65f --- /dev/null +++ b/Cloud/Services/Broker/Support/IBrokerResponse.cs @@ -0,0 +1,6 @@ +namespace Cloud.Services.Broker.Support +{ + public interface IBrokerResponse + { + } +} \ No newline at end of file diff --git a/Cloud/Services/ConsumerService.cs b/Cloud/Services/ConsumerService.cs deleted file mode 100644 index 82d5bb2..0000000 --- a/Cloud/Services/ConsumerService.cs +++ /dev/null @@ -1,51 +0,0 @@ -using Confluent.Kafka; - -namespace Cloud.Services -{ - public class ConsumerService - { - private IConsumer _consumer; - private ConsumerConfig consumerConfig; - public ConsumerService(IConfiguration configuration) - { - consumerConfig = new ConsumerConfig - { - BootstrapServers = configuration["Kafka:BootstrapServers"], - GroupId = configuration["Kafka:GroupId"], - AutoOffsetReset = AutoOffsetReset.Earliest, - }; - - _consumer = new ConsumerBuilder(consumerConfig).Build(); - } - - //Consume Method - public async TaskWaitMessage(string topic) - { - try - { - _consumer.Subscribe(topic); - - var consumeResult = _consumer.Consume(TimeSpan.FromMilliseconds(1000)); - - if (consumeResult != null) - { - return consumeResult.Message.Value; - } - else - { - //No message received from Kafka within the specified timeout. - } - return ""; - - } - catch (Exception ex) - { - return ""; - } - finally - { - _consumer.Close(); - } - } - } -} diff --git a/Cloud/Services/Domain/IGreenhouseService.cs b/Cloud/Services/Domain/IGreenhouseService.cs new file mode 100644 index 0000000..523af87 --- /dev/null +++ b/Cloud/Services/Domain/IGreenhouseService.cs @@ -0,0 +1,45 @@ +using Cloud.Models; +using Cloud.Services.Broker.Support; + +namespace Cloud.Services.Domain; + +public interface IGreenhouseService +{ + /// + /// Возвращает текущую информацию о конкретной теплице из брокера + /// + /// ID теплицы + /// ID фермы, то есть брокера + /// Текущие данные о теплице от менеджера теплицы + public Task GetGreenhouseInfo(int id, int farmId); + /// + /// Возвращает сохраненные данные для автоматизации теплицы из базы данных + /// + /// ID теплицы + /// Данные для автоматизации теплицы + public Task GetGreenhouse(int id); + /// + /// Возвращает список данных о всех теплицах пользователя из брокера + /// + /// ID фермы + /// Список текущих данных о теплицах + public Task?> GetAll(int farmId); + /// + /// Сохраняет данные об автоматизации теплицы в базу данных + /// + /// Данные автоматизации теплицы + /// Созданную сущность из базы данных + public Task Create(Greenhouse greenhouse); + /// + /// Обновляет данные автоматизации теплицы в базе данных + /// + /// Новая информация об автоматизации теплицы + /// Обновленную сущность из базы данных + public Task Update(Greenhouse greenhouse); + /// + /// Удаляет данные об автоматизации теплицы из базы данных + /// + /// ID данных автоматизации теплицы + /// Возвращает удаленную сущность + public Task Delete(int id); +} \ No newline at end of file diff --git a/Cloud/Services/Domain/Implement/GreenhouseService.cs b/Cloud/Services/Domain/Implement/GreenhouseService.cs new file mode 100644 index 0000000..f398a39 --- /dev/null +++ b/Cloud/Services/Domain/Implement/GreenhouseService.cs @@ -0,0 +1,67 @@ +using Cloud.Models; +using Cloud.Services.Broker; +using Cloud.Services.Broker.Support; +using Microsoft.EntityFrameworkCore; + +namespace Cloud.Services.Domain.Implement; + +public class GreenhouseService : IGreenhouseService +{ + private readonly IBrokerService _brokerService; + private readonly ApplicationContext _context; + + public GreenhouseService(IBrokerService brokerService, ApplicationContext context) + { + _context = context; + _brokerService = brokerService; + } + + public async Task Create(Greenhouse greenhouse) + { + var res = await _context.Greenhouses.AddAsync(greenhouse); + await _context.SaveChangesAsync(); + return res.Entity; + } + + public async Task Delete(int id) + { + var greenhouse = await _context.Greenhouses.FirstOrDefaultAsync(x => x.Id == id); + + _context.Greenhouses.Remove(greenhouse); + await _context.SaveChangesAsync(); + return greenhouse; + } + + + public async Task GetGreenhouse(int id) + { + return await _context.Greenhouses.FirstOrDefaultAsync(x => x.Id == id); + } + + public async Task Update(Greenhouse greenhouse) + { + var res = _context.Greenhouses.Update(greenhouse); + await _context.SaveChangesAsync(); + return res.Entity; + } + + public async Task?> GetAll(int farmId) + { + // await _changeBrokerIp(farmId); + return _brokerService.WaitMessages("data"); + } + + public async Task GetGreenhouseInfo(int id, int farmId) + { + // await _changeBrokerIp(farmId); + var infos = _brokerService.WaitMessages("data"); + return infos?.FirstOrDefault(x => x.Id == id); + } + + private async Task _changeBrokerIp(int farmId) + { + var farm = await _context.Farms.FirstOrDefaultAsync(x => x.Id == farmId); + _brokerService.ChangeBrokerIp(farm.RaspberryIP); + + } +} diff --git a/Cloud/Services/ProducerService.cs b/Cloud/Services/ProducerService.cs deleted file mode 100644 index 073bdc6..0000000 --- a/Cloud/Services/ProducerService.cs +++ /dev/null @@ -1,33 +0,0 @@ -using Confluent.Kafka; - -namespace Cloud.Services -{ - public class ProducerService - { - private readonly IProducer _producer; - - public ProducerService(IConfiguration configuration) - { - var producerConfig = new ProducerConfig - { - BootstrapServers = configuration["Kafka:BootstrapServers"] - }; - - //Build the Producer - _producer = new ProducerBuilder(producerConfig).Build(); - } - - //Method for Produce the Message to Kafka Topic - public async Task ProduceMessageAsync(string topic, string value) - { - var kafkaMessage = new Message - { - Key = Guid.NewGuid().ToString(), - Value = value - }; - - //Produce the Message - await _producer.ProduceAsync(topic, kafkaMessage); - } - } -} diff --git a/Cloud/Support/NetworkSupport.cs b/Cloud/Support/NetworkSupport.cs new file mode 100644 index 0000000..752d9a6 --- /dev/null +++ b/Cloud/Support/NetworkSupport.cs @@ -0,0 +1,26 @@ +namespace Cloud.Support; + +public static class NetworkSupport +{ + public static async Task CheckConnectionAsync(string address) + { + using var client = new HttpClient(); + try + { + var response = await client.GetAsync(address); + + if (response.IsSuccessStatusCode) + { + Console.WriteLine($"Соединение успешно проверено. Статус-код: {response.StatusCode}"); + } + else + { + Console.WriteLine($"Соединение не удалось проверить. Статус-код: {response.StatusCode}. URL: {address}"); + } + } + catch (HttpRequestException ex) + { + Console.WriteLine($"Ошибка при проверке соединения: {ex.Message}. URL: {address}"); + } + } +} \ No newline at end of file diff --git a/GreenhouseDetector/Dockerfile b/GreenhouseDetector/Dockerfile new file mode 100644 index 0000000..7be5c76 --- /dev/null +++ b/GreenhouseDetector/Dockerfile @@ -0,0 +1,11 @@ +FROM python:3.9-slim + +WORKDIR /app + +COPY requirements.txt . + +RUN pip install --no-cache-dir -r requirements.txt + +COPY GreenhouseDetector/detector.py . + +CMD ["python", "detector.py"] \ No newline at end of file diff --git a/GreenhouseDetector/detector.py b/GreenhouseDetector/detector.py index d092373..4921dfe 100644 --- a/GreenhouseDetector/detector.py +++ b/GreenhouseDetector/detector.py @@ -1,3 +1,4 @@ +import os import time import random as rnd @@ -5,10 +6,13 @@ from flask import Flask import requests import threading + app = Flask(__name__) class Detector: def __init__(self, id, moistureThresholdUpper, moistureThresholdLower, tempThresholdUpper, tempThresholdLower): + self.MANAGER_URL = os.environ.get('MANAGER_URL') + print("MANAGER_URL=", self.MANAGER_URL) self.id = id self.moistureThresholdUpper = moistureThresholdUpper self.moistureThresholdLower = moistureThresholdLower @@ -24,7 +28,7 @@ class Detector: def sendData(self): data = {"moisture": self.moisture, "temp": self.temp} - requests.post(f"http://127.0.0.1:20002/webhook?id={self.id}", json=data) + requests.post(f"{self.MANAGER_URL}/webhook?id={self.id}", json=data) detector1 = Detector(1, 0.6, 0.2, 40, 20) diff --git a/GreenhouseManager/Dockerfile b/GreenhouseManager/Dockerfile new file mode 100644 index 0000000..ab31b66 --- /dev/null +++ b/GreenhouseManager/Dockerfile @@ -0,0 +1,11 @@ +FROM python:3.9-slim + +WORKDIR /app + +COPY requirements.txt . + +RUN pip install --no-cache-dir -r requirements.txt + +COPY GreenhouseManager/manager.py . + +CMD ["python", "manager.py"] \ No newline at end of file diff --git a/GreenhouseManager/manager.py b/GreenhouseManager/manager.py index d9772d1..8366baf 100644 --- a/GreenhouseManager/manager.py +++ b/GreenhouseManager/manager.py @@ -1,3 +1,4 @@ +import os from kafka import KafkaProducer, KafkaConsumer import kafka import socket @@ -7,6 +8,7 @@ import time from enum import Enum import threading + app = Flask(__name__) def start_manager(): @@ -15,6 +17,8 @@ def start_manager(): class Manager: def __init__(self, _id: int, moisture: float = 0, temp: float = 20, isAutoOn: bool = False, valve_state: str = "closed", heater_state: str = "off"): + KAFKA_URL = os.environ.get('KAFKA_URL') + print("KAFKA_URL=", KAFKA_URL) self._id = _id self.moisture = moisture self.temp = temp @@ -23,14 +27,14 @@ class Manager: self.heater_state = heater_state self.dataPublisher = KafkaProducer( - bootstrap_servers=['localhost:9092'], + bootstrap_servers=[KAFKA_URL], client_id=f'manager{self._id}_producer', value_serializer=lambda v: dumps(v).encode('utf-8') ) self.controllerConsumer = KafkaConsumer( 'commands', - bootstrap_servers=['localhost:9092'], + bootstrap_servers=[KAFKA_URL], auto_offset_reset='earliest', enable_auto_commit=True, consumer_timeout_ms=2000, @@ -38,7 +42,7 @@ class Manager: value_deserializer=lambda x: loads(x.decode('utf-8')) ) self.controllerConsumerResponse = KafkaProducer( - bootstrap_servers=['localhost:9092'], + bootstrap_servers=[KAFKA_URL], client_id=f'manager{self._id}_producer', value_serializer=lambda v: dumps(v).encode('utf-8') ) diff --git a/docker-compose.yml b/docker-compose.yml index ede4fce..5587923 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -9,9 +9,18 @@ networks: services: cloud: + networks: + - vpn build: ./Cloud/ ports: - "5124:5124" + environment: + ASPNETCORE_ENVIRONMENT: Development + DB_CONNECTION_STRING: ${DB_CONNECTION_STRING} + REDDIS_URL: redis:6379 + KAFKA_URL: kafka:29092 + # Добавить, когда будет фронт! + # FRONT_URL: front:3000 depends_on: - postgres - redis @@ -19,13 +28,13 @@ services: image: postgres:14 container_name: cucumber_database environment: - POSTGRES_USER: postgres - POSTGRES_PASSWORD: 12345 - POSTGRES_DB: main_database + POSTGRES_USER: ${POSTGRES_USER} + POSTGRES_PASSWORD: ${POSTGRES_PASSWORD} + POSTGRES_DB: ${POSTGRES_DB} ports: - "5438:5432" volumes: - - postgres_data:/var/lib/postgresql/data + - postgres_data:/var/lib/postgresql/data redis: image: 'redis:latest' ports: @@ -34,63 +43,63 @@ services: - 'cloud-redis:/data' healthcheck: test: - - CMD - - redis-cli - - ping + - CMD + - redis-cli + - ping retries: 3 timeout: 5s zookeeper: - networks: - - vpn - image: confluentinc/cp-zookeeper:7.4.0 - environment: - ZOOKEEPER_CLIENT_PORT: 2181 - ZOOKEEPER_TICK_TIME: 2000 - ports: - - 2181:2181 + networks: + - vpn + image: confluentinc/cp-zookeeper:7.4.0 + environment: + ZOOKEEPER_CLIENT_PORT: 2181 + ZOOKEEPER_TICK_TIME: 2000 + ports: + - 2181:2181 kafka: - networks: - vpn: - ipv4_address: 192.168.2.10 - image: confluentinc/cp-kafka:7.4.0 - ports: + networks: + vpn: + ipv4_address: 192.168.2.10 + image: confluentinc/cp-kafka:7.4.0 + ports: - 9092:9092 - 9997:9997 - expose: - - 29092:29092 - environment: - KAFKA_BROKER_ID: 1 - KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 - KAFKA_LISTENERS: HOST://0.0.0.0:9092,DOCKER://0.0.0.0:29092 - KAFKA_ADVERTISED_LISTENERS: HOST://192.168.1.5:9092,DOCKER://kafka:29092 - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER:PLAINTEXT,HOST:PLAINTEXT - KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 - KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 - KAFKA_LOG_FLUSH_INTERVAL_MESSAGES: 10000 - KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 - depends_on: + expose: + - 29092:29092 + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_LISTENERS: HOST://0.0.0.0:9092,DOCKER://0.0.0.0:29092 + KAFKA_ADVERTISED_LISTENERS: HOST://192.168.1.5:9092,DOCKER://kafka:29092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER:PLAINTEXT,HOST:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_LOG_FLUSH_INTERVAL_MESSAGES: 10000 + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + depends_on: - zookeeper init-kafka: - networks: - - vpn - image: confluentinc/cp-kafka:7.4.0 - depends_on: - - kafka - entrypoint: [ '/bin/sh', '-c' ] - command: | - " - # blocks until kafka is reachable - kafka-topics --bootstrap-server kafka:29092 --list + networks: + - vpn + image: confluentinc/cp-kafka:7.4.0 + depends_on: + - kafka + entrypoint: [ '/bin/sh', '-c' ] + command: | + " + # blocks until kafka is reachable + kafka-topics --bootstrap-server kafka:29092 --list - echo -e 'Creating kafka topics' - kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic commands --replication-factor 1 --partitions 1 - kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic data --replication-factor 1 --partitions 1 - kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic response --replication-factor 1 --partitions 1 + echo -e 'Creating kafka topics' + kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic commands --replication-factor 1 --partitions 1 + kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic data --replication-factor 1 --partitions 1 + kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic response --replication-factor 1 --partitions 1 - echo -e 'Successfully created the following topics:' - kafka-topics --bootstrap-server kafka:29092 --list - " + echo -e 'Successfully created the following topics:' + kafka-topics --bootstrap-server kafka:29092 --list + " kafka-ui: networks: - vpn @@ -104,8 +113,31 @@ services: KAFKA_CLUSTERS_0_NAME: local KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092 KAFKA_CLUSTERS_0_METRICS_PORT: 9997 + manager: + networks: + - vpn + build: + context: . + dockerfile: ./GreenhouseManager/Dockerfile + environment: + KAFKA_URL: kafka:29092 + depends_on: + - kafka + expose: + - 20002 + detector: + networks: + - vpn + build: + context: . + dockerfile: ./GreenhouseDetector/Dockerfile + environment: + MANAGER_URL: http://manager:20002 + depends_on: + - manager + volumes: postgres_data: driver: local - cloud-redis: - driver: local \ No newline at end of file + cloud-redis: + driver: local diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..0145355 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +kafka-python~=2.0.2 +Flask~=3.0.3 +requests~=2.31.0 \ No newline at end of file