From c230d86404a3e07eb5c2ac3a89df1cb5f6c7edcd Mon Sep 17 00:00:00 2001 From: mfnefd Date: Tue, 19 Nov 2024 23:23:21 +0400 Subject: [PATCH] =?UTF-8?q?add:=20=D1=81=D0=B5=D1=80=D0=B2=D0=B8=D1=81=20?= =?UTF-8?q?=D0=B1=D1=80=D0=BE=D0=BA=D0=B5=D1=80=D0=B0,=20=D1=81=D1=83?= =?UTF-8?q?=D1=89=D0=BD=D0=BE=D1=81=D1=82=D0=B8=20=D0=B1=D1=80=D0=BE=D0=BA?= =?UTF-8?q?=D0=B5=D1=80=D0=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Cloud/Cloud.csproj | 1 + Cloud/Services/Broker/IBrokerConsumer.cs | 4 +- ...IBrokerProdurcer.cs => IBrokerProducer.cs} | 2 +- Cloud/Services/Broker/IBrokerService.cs | 4 +- .../Broker/Implement/Kafka/KafkaConsumer.cs | 51 +++++++++++++++++++ .../Broker/Implement/Kafka/KafkaProducer.cs | 28 ++++++++++ .../Broker/Implement/Kafka/KafkaService.cs | 17 ++++--- Cloud/Services/Broker/Support/Command.cs | 7 +-- .../Services/Broker/Support/CommandResult.cs | 2 +- .../Services/Broker/Support/GreenhouseInfo.cs | 2 +- .../Broker/Support/IBrokerResponse.cs | 6 +++ 11 files changed, 109 insertions(+), 15 deletions(-) rename Cloud/Services/Broker/{IBrokerProdurcer.cs => IBrokerProducer.cs} (78%) create mode 100644 Cloud/Services/Broker/Implement/Kafka/KafkaConsumer.cs create mode 100644 Cloud/Services/Broker/Implement/Kafka/KafkaProducer.cs create mode 100644 Cloud/Services/Broker/Support/IBrokerResponse.cs diff --git a/Cloud/Cloud.csproj b/Cloud/Cloud.csproj index 6a5fc81..042333f 100644 --- a/Cloud/Cloud.csproj +++ b/Cloud/Cloud.csproj @@ -7,6 +7,7 @@ + diff --git a/Cloud/Services/Broker/IBrokerConsumer.cs b/Cloud/Services/Broker/IBrokerConsumer.cs index a421f6d..7997ad0 100644 --- a/Cloud/Services/Broker/IBrokerConsumer.cs +++ b/Cloud/Services/Broker/IBrokerConsumer.cs @@ -1,7 +1,9 @@ +using Cloud.Services.Broker.Support; + namespace Cloud.Services.Broker { public interface IBrokerConsumer { - // TODO: добавить методы для получения данных + T? WaitMessage(string topic) where T : IBrokerResponse; } } \ No newline at end of file diff --git a/Cloud/Services/Broker/IBrokerProdurcer.cs b/Cloud/Services/Broker/IBrokerProducer.cs similarity index 78% rename from Cloud/Services/Broker/IBrokerProdurcer.cs rename to Cloud/Services/Broker/IBrokerProducer.cs index ac3b974..abed579 100644 --- a/Cloud/Services/Broker/IBrokerProdurcer.cs +++ b/Cloud/Services/Broker/IBrokerProducer.cs @@ -2,7 +2,7 @@ using Cloud.Services.Broker.Support; namespace Cloud.Services.Broker { - public interface IBrokerProdurcer + public interface IBrokerProducer { Task ProduceAsync(string topic, Command command); } diff --git a/Cloud/Services/Broker/IBrokerService.cs b/Cloud/Services/Broker/IBrokerService.cs index 910a25e..f19aeb4 100644 --- a/Cloud/Services/Broker/IBrokerService.cs +++ b/Cloud/Services/Broker/IBrokerService.cs @@ -4,7 +4,7 @@ namespace Cloud.Services.Broker { public interface IBrokerService { - Task Produce(Command command); - Task Consume(string topic); + Task Produce(Command command); + T? Consume(string topic) where T : IBrokerResponse; } } \ No newline at end of file diff --git a/Cloud/Services/Broker/Implement/Kafka/KafkaConsumer.cs b/Cloud/Services/Broker/Implement/Kafka/KafkaConsumer.cs new file mode 100644 index 0000000..2b31499 --- /dev/null +++ b/Cloud/Services/Broker/Implement/Kafka/KafkaConsumer.cs @@ -0,0 +1,51 @@ + +using Cloud.Services.Broker.Support; +using Confluent.Kafka; +using System.Text.Json; + +namespace Cloud.Services.Broker.Implement.Kafka +{ + public class KafkaConsumer : IBrokerConsumer + { + private readonly IConsumer _consumer; + + public KafkaConsumer(IConfiguration config) + { + var consumerConfig = new ConsumerConfig() + { + BootstrapServers = config["Kafka:BootstrapServers"], + GroupId = config["Kafka:GroupId"], + AutoOffsetReset = AutoOffsetReset.Earliest, + }; + + _consumer = new ConsumerBuilder(consumerConfig).Build(); + } + + public T? WaitMessage(string topic) + where T : IBrokerResponse + { + try + { + _consumer.Subscribe(topic); + + var consumeResult = _consumer.Consume(TimeSpan.FromMilliseconds(1000)); + + if (consumeResult == null) + { + // No message received from Kafka within the specified timeout. + return default; + } + return JsonSerializer.Deserialize(consumeResult.Message.Value); + + } + catch (Exception ex) + { + throw; + } + finally + { + _consumer.Close(); + } + } + } +} \ No newline at end of file diff --git a/Cloud/Services/Broker/Implement/Kafka/KafkaProducer.cs b/Cloud/Services/Broker/Implement/Kafka/KafkaProducer.cs new file mode 100644 index 0000000..b301084 --- /dev/null +++ b/Cloud/Services/Broker/Implement/Kafka/KafkaProducer.cs @@ -0,0 +1,28 @@ +using Cloud.Services.Broker.Support; +using Confluent.Kafka; + +namespace Cloud.Services.Broker.Implement.Kafka +{ + public class KafkaProducer : IBrokerProducer + { + private readonly IProducer _producer; + + public KafkaProducer(IConfiguration configuration) + { + var producerConfig = new ProducerConfig + { + BootstrapServers = configuration["Kafka:BootstrapServers"] + }; + + //Build the Producer + _producer = new ProducerBuilder(producerConfig).Build(); + } + public async Task ProduceAsync(string topic, Command command) + { + var message = new Message { Key = Guid.NewGuid(), Value = command }; + + //Produce the Message + await _producer.ProduceAsync(topic, message); + } + } +} \ No newline at end of file diff --git a/Cloud/Services/Broker/Implement/Kafka/KafkaService.cs b/Cloud/Services/Broker/Implement/Kafka/KafkaService.cs index 44e099b..c0b90ba 100644 --- a/Cloud/Services/Broker/Implement/Kafka/KafkaService.cs +++ b/Cloud/Services/Broker/Implement/Kafka/KafkaService.cs @@ -4,14 +4,19 @@ namespace Cloud.Services.Broker.Implement.Kafka { public class KafkaService : IBrokerService { - public Task Consume(string topic) + private readonly KafkaProducer _producer; + private readonly KafkaConsumer _consumer; + + public KafkaService(IConfiguration configuration) { - throw new NotImplementedException(); + _producer = new KafkaProducer(configuration); + _consumer = new KafkaConsumer(configuration); } - public Task Produce(Command command) - { - throw new NotImplementedException(); - } + public T? Consume(string topic) + where T : IBrokerResponse => _consumer.WaitMessage(topic); + + public async Task Produce(Command command) + => await _producer.ProduceAsync("commands", command); } } \ No newline at end of file diff --git a/Cloud/Services/Broker/Support/Command.cs b/Cloud/Services/Broker/Support/Command.cs index ef2a44b..2debae3 100644 --- a/Cloud/Services/Broker/Support/Command.cs +++ b/Cloud/Services/Broker/Support/Command.cs @@ -1,9 +1,10 @@ +using System.Text.Json; + namespace Cloud.Services.Broker.Support { - public class Command + public class Command { - public int Id { get; set; } - public int GreenhouseId { get; set; } + public Guid GreenhouseId { get; set; } public string CommandName { get; set; } = null!; } } \ No newline at end of file diff --git a/Cloud/Services/Broker/Support/CommandResult.cs b/Cloud/Services/Broker/Support/CommandResult.cs index 06d997d..e66004d 100644 --- a/Cloud/Services/Broker/Support/CommandResult.cs +++ b/Cloud/Services/Broker/Support/CommandResult.cs @@ -1,6 +1,6 @@ namespace Cloud.Services.Broker.Support { - public class CommandResult + public class CommandResult : IBrokerResponse { public int CommandId { get; set; } public int GreenhouseId { get; set; } diff --git a/Cloud/Services/Broker/Support/GreenhouseInfo.cs b/Cloud/Services/Broker/Support/GreenhouseInfo.cs index c5a2140..bea028b 100644 --- a/Cloud/Services/Broker/Support/GreenhouseInfo.cs +++ b/Cloud/Services/Broker/Support/GreenhouseInfo.cs @@ -1,6 +1,6 @@ namespace Cloud.Services.Broker.Support { - public class GreenhouseInfo + public class GreenhouseInfo : IBrokerResponse { public int Id { get; set; } public int PercentWater { get; set; } diff --git a/Cloud/Services/Broker/Support/IBrokerResponse.cs b/Cloud/Services/Broker/Support/IBrokerResponse.cs new file mode 100644 index 0000000..e13b65f --- /dev/null +++ b/Cloud/Services/Broker/Support/IBrokerResponse.cs @@ -0,0 +1,6 @@ +namespace Cloud.Services.Broker.Support +{ + public interface IBrokerResponse + { + } +} \ No newline at end of file