Compare commits
No commits in common. "fed96d5b86b42399c3921a1193e85f60a2d35b0c" and "53d93635fcbb15ee6769be2611ec5317af7bf4f4" have entirely different histories.
fed96d5b86
...
53d93635fc
@ -7,7 +7,6 @@
|
|||||||
</PropertyGroup>
|
</PropertyGroup>
|
||||||
|
|
||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
<PackageReference Include="Confluent.Kafka" Version="2.6.1" />
|
|
||||||
<PackageReference Include="FluentValidation.AspNetCore" Version="11.3.0" />
|
<PackageReference Include="FluentValidation.AspNetCore" Version="11.3.0" />
|
||||||
<PackageReference Include="Microsoft.AspNetCore.Authentication.JwtBearer" Version="6.0.4" />
|
<PackageReference Include="Microsoft.AspNetCore.Authentication.JwtBearer" Version="6.0.4" />
|
||||||
<PackageReference Include="Microsoft.EntityFrameworkCore.SqlServer" Version="6.0.14" />
|
<PackageReference Include="Microsoft.EntityFrameworkCore.SqlServer" Version="6.0.14" />
|
||||||
|
@ -1,42 +0,0 @@
|
|||||||
using Cloud.Requests;
|
|
||||||
using Cloud.Services;
|
|
||||||
using Microsoft.AspNetCore.Authorization;
|
|
||||||
using Microsoft.AspNetCore.Mvc;
|
|
||||||
using System.ComponentModel.DataAnnotations;
|
|
||||||
using System.Text.Json;
|
|
||||||
|
|
||||||
namespace Cloud.Controllers
|
|
||||||
{
|
|
||||||
[Authorize]
|
|
||||||
[ApiController]
|
|
||||||
[Route("api")]
|
|
||||||
public class ValveController : ControllerBase
|
|
||||||
{
|
|
||||||
//Контроллер вентиля
|
|
||||||
|
|
||||||
private readonly ProducerService _producerService;
|
|
||||||
|
|
||||||
public ValveController(ProducerService producerService)
|
|
||||||
{
|
|
||||||
_producerService = producerService;
|
|
||||||
}
|
|
||||||
|
|
||||||
[HttpPost("farm/{farmId}/greenhouse/{ghId}/watering")]
|
|
||||||
public async Task<IActionResult> interactValve([FromBody] ValveRequest request, int farmId, int ghId)
|
|
||||||
{
|
|
||||||
var kafkaRequest = new
|
|
||||||
{
|
|
||||||
FarmId = farmId,
|
|
||||||
GreenHouseId = ghId,
|
|
||||||
SomeAction = request.Action,
|
|
||||||
};
|
|
||||||
|
|
||||||
var message = JsonSerializer.Serialize(kafkaRequest);
|
|
||||||
return Ok(kafkaRequest);
|
|
||||||
|
|
||||||
/*await _producerService.ProduceMessageAsync("ValvesHeatersRequest", message);
|
|
||||||
|
|
||||||
return Ok($"Valve status is {request.Action}");*/
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,9 +0,0 @@
|
|||||||
namespace Cloud.Enums
|
|
||||||
{
|
|
||||||
public enum ValveEnum
|
|
||||||
{
|
|
||||||
Open,
|
|
||||||
Close,
|
|
||||||
Auto
|
|
||||||
}
|
|
||||||
}
|
|
@ -9,7 +9,6 @@ using Cloud.Validation;
|
|||||||
using StackExchange.Redis;
|
using StackExchange.Redis;
|
||||||
using Cloud.Services.Broker.Implement.Kafka;
|
using Cloud.Services.Broker.Implement.Kafka;
|
||||||
using Cloud.Services.Broker;
|
using Cloud.Services.Broker;
|
||||||
using Cloud.Services;
|
|
||||||
|
|
||||||
var builder = WebApplication.CreateBuilder(args);
|
var builder = WebApplication.CreateBuilder(args);
|
||||||
|
|
||||||
@ -22,15 +21,6 @@ builder.Services.AddSingleton<IConnectionMultiplexer>(sp =>
|
|||||||
return ConnectionMultiplexer.Connect(configuration);
|
return ConnectionMultiplexer.Connect(configuration);
|
||||||
});
|
});
|
||||||
|
|
||||||
//Kafka producer service
|
|
||||||
builder.Services.AddSingleton<ProducerService, ProducerService>();
|
|
||||||
|
|
||||||
//Kafka consumer service
|
|
||||||
builder.Services.AddSingleton<ConsumerService, ConsumerService>();
|
|
||||||
|
|
||||||
//Add the BackgroundWorkerService
|
|
||||||
builder.Services.AddHostedService<BackgroundWorkerService>();
|
|
||||||
|
|
||||||
//Jwt configuration
|
//Jwt configuration
|
||||||
var jwtIssuer = builder.Configuration.GetSection("Jwt:Issuer").Get<string>();
|
var jwtIssuer = builder.Configuration.GetSection("Jwt:Issuer").Get<string>();
|
||||||
var jwtKey = builder.Configuration.GetSection("Jwt:Key").Get<string>();
|
var jwtKey = builder.Configuration.GetSection("Jwt:Key").Get<string>();
|
||||||
@ -70,7 +60,6 @@ builder.Services.AddFluentValidationClientsideAdapters();
|
|||||||
builder.Services.AddValidatorsFromAssemblyContaining<LoginValidator>();
|
builder.Services.AddValidatorsFromAssemblyContaining<LoginValidator>();
|
||||||
builder.Services.AddValidatorsFromAssemblyContaining<RegisterValidator>();
|
builder.Services.AddValidatorsFromAssemblyContaining<RegisterValidator>();
|
||||||
builder.Services.AddValidatorsFromAssemblyContaining<FarmValidator>();
|
builder.Services.AddValidatorsFromAssemblyContaining<FarmValidator>();
|
||||||
builder.Services.AddValidatorsFromAssemblyContaining<ValveValidator>();
|
|
||||||
|
|
||||||
// Learn more about configuring Swagger/OpenAPI at https://aka.ms/aspnetcore/swashbuckle
|
// Learn more about configuring Swagger/OpenAPI at https://aka.ms/aspnetcore/swashbuckle
|
||||||
builder.Services.AddEndpointsApiExplorer();
|
builder.Services.AddEndpointsApiExplorer();
|
||||||
|
@ -1,7 +0,0 @@
|
|||||||
namespace Cloud.Requests
|
|
||||||
{
|
|
||||||
public class ValveRequest
|
|
||||||
{
|
|
||||||
public string Action { get; set; }
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,9 +1,7 @@
|
|||||||
using Cloud.Services.Broker.Support;
|
|
||||||
|
|
||||||
namespace Cloud.Services.Broker
|
namespace Cloud.Services.Broker
|
||||||
{
|
{
|
||||||
public interface IBrokerConsumer
|
public interface IBrokerConsumer
|
||||||
{
|
{
|
||||||
T? WaitMessage<T>(string topic) where T : IBrokerResponse;
|
// TODO: добавить методы для получения данных
|
||||||
}
|
}
|
||||||
}
|
}
|
@ -2,7 +2,7 @@ using Cloud.Services.Broker.Support;
|
|||||||
|
|
||||||
namespace Cloud.Services.Broker
|
namespace Cloud.Services.Broker
|
||||||
{
|
{
|
||||||
public interface IBrokerProducer
|
public interface IBrokerProdurcer
|
||||||
{
|
{
|
||||||
Task ProduceAsync(string topic, Command command);
|
Task ProduceAsync(string topic, Command command);
|
||||||
}
|
}
|
@ -4,7 +4,7 @@ namespace Cloud.Services.Broker
|
|||||||
{
|
{
|
||||||
public interface IBrokerService
|
public interface IBrokerService
|
||||||
{
|
{
|
||||||
Task Produce(Command command);
|
Task<CommandResult> Produce(Command command);
|
||||||
T? Consume<T>(string topic) where T : IBrokerResponse;
|
Task<T> Consume<T>(string topic);
|
||||||
}
|
}
|
||||||
}
|
}
|
@ -1,51 +0,0 @@
|
|||||||
|
|
||||||
using Cloud.Services.Broker.Support;
|
|
||||||
using Confluent.Kafka;
|
|
||||||
using System.Text.Json;
|
|
||||||
|
|
||||||
namespace Cloud.Services.Broker.Implement.Kafka
|
|
||||||
{
|
|
||||||
public class KafkaConsumer : IBrokerConsumer
|
|
||||||
{
|
|
||||||
private readonly IConsumer<string, string> _consumer;
|
|
||||||
|
|
||||||
public KafkaConsumer(IConfiguration config)
|
|
||||||
{
|
|
||||||
var consumerConfig = new ConsumerConfig()
|
|
||||||
{
|
|
||||||
BootstrapServers = config["Kafka:BootstrapServers"],
|
|
||||||
GroupId = config["Kafka:GroupId"],
|
|
||||||
AutoOffsetReset = AutoOffsetReset.Earliest,
|
|
||||||
};
|
|
||||||
|
|
||||||
_consumer = new ConsumerBuilder<string, string>(consumerConfig).Build();
|
|
||||||
}
|
|
||||||
|
|
||||||
public T? WaitMessage<T>(string topic)
|
|
||||||
where T : IBrokerResponse
|
|
||||||
{
|
|
||||||
try
|
|
||||||
{
|
|
||||||
_consumer.Subscribe(topic);
|
|
||||||
|
|
||||||
var consumeResult = _consumer.Consume(TimeSpan.FromMilliseconds(1000));
|
|
||||||
|
|
||||||
if (consumeResult == null)
|
|
||||||
{
|
|
||||||
// No message received from Kafka within the specified timeout.
|
|
||||||
return default;
|
|
||||||
}
|
|
||||||
return JsonSerializer.Deserialize<T>(consumeResult.Message.Value);
|
|
||||||
|
|
||||||
}
|
|
||||||
catch (Exception ex)
|
|
||||||
{
|
|
||||||
throw;
|
|
||||||
}
|
|
||||||
finally
|
|
||||||
{
|
|
||||||
_consumer.Close();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,28 +0,0 @@
|
|||||||
using Cloud.Services.Broker.Support;
|
|
||||||
using Confluent.Kafka;
|
|
||||||
|
|
||||||
namespace Cloud.Services.Broker.Implement.Kafka
|
|
||||||
{
|
|
||||||
public class KafkaProducer : IBrokerProducer
|
|
||||||
{
|
|
||||||
private readonly IProducer<Guid, Command> _producer;
|
|
||||||
|
|
||||||
public KafkaProducer(IConfiguration configuration)
|
|
||||||
{
|
|
||||||
var producerConfig = new ProducerConfig
|
|
||||||
{
|
|
||||||
BootstrapServers = configuration["Kafka:BootstrapServers"]
|
|
||||||
};
|
|
||||||
|
|
||||||
//Build the Producer
|
|
||||||
_producer = new ProducerBuilder<Guid, Command>(producerConfig).Build();
|
|
||||||
}
|
|
||||||
public async Task ProduceAsync(string topic, Command command)
|
|
||||||
{
|
|
||||||
var message = new Message<Guid, Command> { Key = Guid.NewGuid(), Value = command };
|
|
||||||
|
|
||||||
//Produce the Message
|
|
||||||
await _producer.ProduceAsync(topic, message);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -4,19 +4,14 @@ namespace Cloud.Services.Broker.Implement.Kafka
|
|||||||
{
|
{
|
||||||
public class KafkaService : IBrokerService
|
public class KafkaService : IBrokerService
|
||||||
{
|
{
|
||||||
private readonly KafkaProducer _producer;
|
public Task<T> Consume<T>(string topic)
|
||||||
private readonly KafkaConsumer _consumer;
|
|
||||||
|
|
||||||
public KafkaService(IConfiguration configuration)
|
|
||||||
{
|
{
|
||||||
_producer = new KafkaProducer(configuration);
|
throw new NotImplementedException();
|
||||||
_consumer = new KafkaConsumer(configuration);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public T? Consume<T>(string topic)
|
public Task<CommandResult> Produce(Command command)
|
||||||
where T : IBrokerResponse => _consumer.WaitMessage<T>(topic);
|
{
|
||||||
|
throw new NotImplementedException();
|
||||||
public async Task Produce(Command command)
|
}
|
||||||
=> await _producer.ProduceAsync("commands", command);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
@ -1,10 +1,9 @@
|
|||||||
using System.Text.Json;
|
|
||||||
|
|
||||||
namespace Cloud.Services.Broker.Support
|
namespace Cloud.Services.Broker.Support
|
||||||
{
|
{
|
||||||
public class Command
|
public class Command
|
||||||
{
|
{
|
||||||
public Guid GreenhouseId { get; set; }
|
public int Id { get; set; }
|
||||||
|
public int GreenhouseId { get; set; }
|
||||||
public string CommandName { get; set; } = null!;
|
public string CommandName { get; set; } = null!;
|
||||||
}
|
}
|
||||||
}
|
}
|
@ -1,6 +1,6 @@
|
|||||||
namespace Cloud.Services.Broker.Support
|
namespace Cloud.Services.Broker.Support
|
||||||
{
|
{
|
||||||
public class CommandResult : IBrokerResponse
|
public class CommandResult
|
||||||
{
|
{
|
||||||
public int CommandId { get; set; }
|
public int CommandId { get; set; }
|
||||||
public int GreenhouseId { get; set; }
|
public int GreenhouseId { get; set; }
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
namespace Cloud.Services.Broker.Support
|
namespace Cloud.Services.Broker.Support
|
||||||
{
|
{
|
||||||
public class GreenhouseInfo : IBrokerResponse
|
public class GreenhouseInfo
|
||||||
{
|
{
|
||||||
public int Id { get; set; }
|
public int Id { get; set; }
|
||||||
public int PercentWater { get; set; }
|
public int PercentWater { get; set; }
|
||||||
|
@ -1,6 +0,0 @@
|
|||||||
namespace Cloud.Services.Broker.Support
|
|
||||||
{
|
|
||||||
public interface IBrokerResponse
|
|
||||||
{
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,16 +0,0 @@
|
|||||||
using Cloud.Enums;
|
|
||||||
using Cloud.Requests;
|
|
||||||
using FluentValidation;
|
|
||||||
|
|
||||||
namespace Cloud.Validation
|
|
||||||
{
|
|
||||||
public class ValveValidator : AbstractValidator<ValveRequest>
|
|
||||||
{
|
|
||||||
public ValveValidator() {
|
|
||||||
|
|
||||||
RuleFor(request => request.Action)
|
|
||||||
.NotEmpty().WithMessage("Action can't be empty").
|
|
||||||
IsEnumName(typeof (ValveEnum)).WithMessage("Action is not correct");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -5,10 +5,6 @@
|
|||||||
"Microsoft.AspNetCore": "Warning"
|
"Microsoft.AspNetCore": "Warning"
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"Kafka": {
|
|
||||||
"BootstrapServers": "localhost:9092",
|
|
||||||
"GroupId": "ValvesHeaters"
|
|
||||||
},
|
|
||||||
"AllowedHosts": "*",
|
"AllowedHosts": "*",
|
||||||
"Jwt": {
|
"Jwt": {
|
||||||
"Key": "m7TyhE20s0dVtUDAr9EnFdPZnAG8maxgBTaiW5j6kO6RQhWDAGxYmXyu0suDnE0o",
|
"Key": "m7TyhE20s0dVtUDAr9EnFdPZnAG8maxgBTaiW5j6kO6RQhWDAGxYmXyu0suDnE0o",
|
||||||
|
@ -1,12 +1,13 @@
|
|||||||
|
from random import random
|
||||||
|
from turtledemo.penrose import start
|
||||||
|
|
||||||
|
from kafka import KafkaProducer, KafkaConsumer
|
||||||
|
import kafka
|
||||||
|
import socket
|
||||||
|
from json import dumps, loads
|
||||||
import time
|
import time
|
||||||
import random as rnd
|
import random as rnd
|
||||||
|
|
||||||
from flask import Flask
|
|
||||||
import requests
|
|
||||||
import threading
|
|
||||||
|
|
||||||
app = Flask(__name__)
|
|
||||||
|
|
||||||
class Detector:
|
class Detector:
|
||||||
def __init__(self, id, moistureThresholdUpper, moistureThresholdLower, tempThresholdUpper, tempThresholdLower):
|
def __init__(self, id, moistureThresholdUpper, moistureThresholdLower, tempThresholdUpper, tempThresholdLower):
|
||||||
self.id = id
|
self.id = id
|
||||||
@ -17,23 +18,31 @@ class Detector:
|
|||||||
self.moisture = 0
|
self.moisture = 0
|
||||||
self.temp = 0
|
self.temp = 0
|
||||||
|
|
||||||
|
self.producer = KafkaProducer(
|
||||||
|
bootstrap_servers=['localhost:9092'],
|
||||||
|
client_id=f'detector{self.id}',
|
||||||
|
value_serializer=lambda v: dumps(v).encode('utf-8')
|
||||||
|
)
|
||||||
|
|
||||||
|
def sendData(self):
|
||||||
|
message = {'id' : self.id,
|
||||||
|
'moisture': self.moisture,
|
||||||
|
'temperature' : self.temp }
|
||||||
|
self.producer.send('dataDetectors', message)
|
||||||
|
|
||||||
def cycle(self):
|
def cycle(self):
|
||||||
self.moisture += rnd.random() / 100
|
self.moisture += rnd.random() / 100
|
||||||
self.temp += (rnd.random() - 0.5) / 100
|
self.temp += (rnd.random() - 0.5) / 100
|
||||||
|
|
||||||
def sendData(self):
|
|
||||||
data = {"moisture": self.moisture,
|
|
||||||
"temp": self.temp}
|
|
||||||
requests.post(f"http://127.0.0.1:20002/webhook?id={self.id}", json=data)
|
|
||||||
|
|
||||||
detector1 = Detector(1, 0.6, 0.2, 40, 20)
|
detector1 = Detector(1, 0.6, 0.2, 40, 20)
|
||||||
|
detector2 = Detector(2, 0.7, 0.3, 40, 20)
|
||||||
|
detector3 = Detector(3, 0.9, 0.6, 40, 20)
|
||||||
|
|
||||||
detectors = [detector1]
|
detectors = [detector1, detector2, detector3]
|
||||||
|
|
||||||
if __name__ =="__main__":
|
while True:
|
||||||
while True:
|
for detector in detectors:
|
||||||
for detector in detectors:
|
detector.cycle()
|
||||||
detector.cycle()
|
detector.sendData()
|
||||||
detector.sendData()
|
time.sleep(1)
|
||||||
time.sleep(1)
|
|
||||||
|
|
||||||
|
@ -2,20 +2,20 @@ from kafka import KafkaProducer, KafkaConsumer
|
|||||||
import kafka
|
import kafka
|
||||||
import socket
|
import socket
|
||||||
from json import dumps, loads
|
from json import dumps, loads
|
||||||
from flask import Flask, request
|
|
||||||
import time
|
import time
|
||||||
from enum import Enum
|
from enum import Enum
|
||||||
import threading
|
from GreenhouseDetector.detector import Detector
|
||||||
|
|
||||||
app = Flask(__name__)
|
class Status(Enum):
|
||||||
|
UNKNOWN = -1
|
||||||
|
OFF = 0
|
||||||
|
ON = 1
|
||||||
|
|
||||||
def start_manager():
|
|
||||||
return
|
|
||||||
|
|
||||||
class Manager:
|
class Manager:
|
||||||
def __init__(self, _id: int, moisture: float = 0, temp: float = 20, isAutoOn: bool = False, valve_state: str = "closed",
|
def __init__(self, id: int, moisture: float = 0, temp: float = 20, isAutoOn: bool = False, valve_state: str = "closed",
|
||||||
heater_state: str = "off"):
|
heater_state: str = "off"):
|
||||||
self._id = _id
|
self.id = id
|
||||||
self.moisture = moisture
|
self.moisture = moisture
|
||||||
self.temp = temp
|
self.temp = temp
|
||||||
self.isAutoOn = isAutoOn
|
self.isAutoOn = isAutoOn
|
||||||
@ -24,29 +24,49 @@ class Manager:
|
|||||||
|
|
||||||
self.dataPublisher = KafkaProducer(
|
self.dataPublisher = KafkaProducer(
|
||||||
bootstrap_servers=['localhost:9092'],
|
bootstrap_servers=['localhost:9092'],
|
||||||
client_id=f'manager{self._id}_producer',
|
client_id=f'manager{id}_producer',
|
||||||
value_serializer=lambda v: dumps(v).encode('utf-8')
|
value_serializer=lambda v: dumps(v).encode('utf-8')
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# self.detectorConsumer = KafkaConsumer(
|
||||||
|
# 'dataDetectors',
|
||||||
|
# bootstrap_servers=['localhost:9092'],
|
||||||
|
# auto_offset_reset='earliest',
|
||||||
|
# enable_auto_commit=True,
|
||||||
|
# consumer_timeout_ms=1000,
|
||||||
|
#group_id=f'manager{id}',
|
||||||
|
# value_deserializer=lambda x: loads(x.decode('utf-8'))
|
||||||
|
#)
|
||||||
self.controllerConsumer = KafkaConsumer(
|
self.controllerConsumer = KafkaConsumer(
|
||||||
'commands',
|
'commands',
|
||||||
bootstrap_servers=['localhost:9092'],
|
bootstrap_servers=['localhost:9092'],
|
||||||
auto_offset_reset='earliest',
|
auto_offset_reset='earliest',
|
||||||
enable_auto_commit=True,
|
enable_auto_commit=True,
|
||||||
consumer_timeout_ms=2000,
|
consumer_timeout_ms=2000,
|
||||||
group_id=f'manager{self._id}',
|
group_id=f'manager{id}',
|
||||||
value_deserializer=lambda x: loads(x.decode('utf-8'))
|
value_deserializer=lambda x: loads(x.decode('utf-8'))
|
||||||
)
|
)
|
||||||
self.controllerConsumerResponse = KafkaProducer(
|
self.controllerConsumerResponse = KafkaProducer(
|
||||||
bootstrap_servers=['localhost:9092'],
|
bootstrap_servers=['localhost:9092'],
|
||||||
client_id=f'manager{self._id}_producer',
|
client_id=f'manager{id}_producer',
|
||||||
value_serializer=lambda v: dumps(v).encode('utf-8')
|
value_serializer=lambda v: dumps(v).encode('utf-8')
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def update(self):
|
||||||
|
for message in self.detectorConsumer:
|
||||||
|
|
||||||
|
print(f"Manager {self.id} received message: ")
|
||||||
|
print(message.value)
|
||||||
|
self.moisture = message.value['moisture']
|
||||||
|
self.temp = message.value['temperature']
|
||||||
|
print("Updating info...\n")
|
||||||
|
|
||||||
|
self.sendData()
|
||||||
|
|
||||||
def sendData(self):
|
def sendData(self):
|
||||||
print("sending data...")
|
print("sending data...")
|
||||||
message = {
|
message = {
|
||||||
'id': self._id,
|
'id': self.id,
|
||||||
'moisture': self.moisture,
|
'moisture': self.moisture,
|
||||||
'temp': self.temp,
|
'temp': self.temp,
|
||||||
'valveStatus': str(self.valve_state),
|
'valveStatus': str(self.valve_state),
|
||||||
@ -56,8 +76,6 @@ class Manager:
|
|||||||
|
|
||||||
print(message)
|
print(message)
|
||||||
self.dataPublisher.send('data', message)
|
self.dataPublisher.send('data', message)
|
||||||
self.dataPublisher.flush()
|
|
||||||
|
|
||||||
|
|
||||||
def toggle_device(self, device, request_id, greenhouse_id):
|
def toggle_device(self, device, request_id, greenhouse_id):
|
||||||
|
|
||||||
@ -104,33 +122,23 @@ class Manager:
|
|||||||
|
|
||||||
for tp, msgs in messages.items():
|
for tp, msgs in messages.items():
|
||||||
for message in msgs:
|
for message in msgs:
|
||||||
print(f"Manager {self._id} received message: ")
|
print(f"Manager {self.id} received message: ")
|
||||||
print(message.value)
|
print(message.value)
|
||||||
self.request_id = message.value['request_id']
|
self.request_id = message.value['request_id']
|
||||||
self.greenhouse_id = message.value['greenhouse_id']
|
self.greenhouse_id = message.value['greenhouse_id']
|
||||||
self.command = message.value['command']
|
self.command = message.value['command']
|
||||||
self.toggle_device(self.command, self.request_id, self.greenhouse_id)
|
self.toggle_device(self.command, self.request_id, self.greenhouse_id)
|
||||||
|
|
||||||
@app.route(f'/webhook', methods=['POST'])
|
|
||||||
def webhook():
|
|
||||||
print("received webhook", request.args.get('id'))
|
|
||||||
for manager in managers:
|
|
||||||
print()
|
|
||||||
if int(request.args.get('id')) == manager._id and request.method == 'POST':
|
|
||||||
print("Data received from Webhook is", request.json)
|
|
||||||
|
|
||||||
body = request.json
|
|
||||||
for key, value in body.items():
|
|
||||||
setattr(manager, key, value)
|
|
||||||
|
|
||||||
manager.sendData()
|
manager1 = Manager(id=1)
|
||||||
|
|
||||||
return f"Webhook received for manager {manager._id}"
|
|
||||||
return "Webhook ignored"
|
|
||||||
|
|
||||||
t1 = threading.Thread(target=start_manager)
|
|
||||||
manager1 = Manager(_id=1)
|
|
||||||
managers = [manager1]
|
managers = [manager1]
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
threading.Thread(target=lambda: app.run(host="0.0.0.0", port=20002, debug=True, use_reloader=False)).start()
|
while True:
|
||||||
|
time.sleep(5)
|
||||||
|
manager1.sendData()
|
||||||
|
for manager in managers:
|
||||||
|
|
||||||
|
manager.getCommand()
|
||||||
|
@ -1,12 +1,3 @@
|
|||||||
networks:
|
|
||||||
vpn:
|
|
||||||
name: kafkaVPN
|
|
||||||
driver: bridge
|
|
||||||
ipam:
|
|
||||||
config:
|
|
||||||
- subnet: "192.168.2.0/24"
|
|
||||||
gateway: "192.168.2.1"
|
|
||||||
|
|
||||||
services:
|
services:
|
||||||
cloud:
|
cloud:
|
||||||
build: ./Cloud/
|
build: ./Cloud/
|
||||||
@ -29,81 +20,64 @@ services:
|
|||||||
redis:
|
redis:
|
||||||
image: 'redis:latest'
|
image: 'redis:latest'
|
||||||
ports:
|
ports:
|
||||||
- '6379:6379'
|
- '6379:6379'
|
||||||
volumes:
|
volumes:
|
||||||
- 'cloud-redis:/data'
|
- 'cloud-redis:/data'
|
||||||
healthcheck:
|
healthcheck:
|
||||||
test:
|
test:
|
||||||
- CMD
|
- CMD
|
||||||
- redis-cli
|
- redis-cli
|
||||||
- ping
|
- ping
|
||||||
retries: 3
|
retries: 3
|
||||||
timeout: 5s
|
timeout: 5s
|
||||||
zookeeper:
|
kafka:
|
||||||
networks:
|
image: confluentinc/cp-kafka:7.4.0
|
||||||
- vpn
|
ports:
|
||||||
image: confluentinc/cp-zookeeper:7.4.0
|
- 9092:9092
|
||||||
environment:
|
- 9997:9997
|
||||||
ZOOKEEPER_CLIENT_PORT: 2181
|
expose:
|
||||||
ZOOKEEPER_TICK_TIME: 2000
|
|
||||||
ports:
|
|
||||||
- 2181:2181
|
|
||||||
kafka:
|
|
||||||
networks:
|
|
||||||
vpn:
|
|
||||||
ipv4_address: 192.168.2.10
|
|
||||||
image: confluentinc/cp-kafka:7.4.0
|
|
||||||
ports:
|
|
||||||
- 9092:9092
|
|
||||||
- 9997:9997
|
|
||||||
expose:
|
|
||||||
- 29092:29092
|
- 29092:29092
|
||||||
environment:
|
environment:
|
||||||
KAFKA_BROKER_ID: 1
|
KAFKA_BROKER_ID: 1
|
||||||
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
|
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
|
||||||
KAFKA_LISTENERS: HOST://0.0.0.0:9092,DOCKER://0.0.0.0:29092
|
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
|
||||||
KAFKA_ADVERTISED_LISTENERS: HOST://192.168.1.5:9092,DOCKER://kafka:29092
|
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
|
||||||
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER:PLAINTEXT,HOST:PLAINTEXT
|
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
|
||||||
KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER
|
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
|
||||||
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
|
KAFKA_LOG_FLUSH_INTERVAL_MESSAGES: 10000
|
||||||
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
|
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
|
||||||
KAFKA_LOG_FLUSH_INTERVAL_MESSAGES: 10000
|
depends_on:
|
||||||
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
|
- zookeeper
|
||||||
depends_on:
|
init-kafka:
|
||||||
- zookeeper
|
image: confluentinc/cp-kafka:7.4.0
|
||||||
init-kafka:
|
depends_on:
|
||||||
networks:
|
- kafka
|
||||||
- vpn
|
entrypoint: [ '/bin/sh', '-c' ]
|
||||||
image: confluentinc/cp-kafka:7.4.0
|
command: |
|
||||||
|
"
|
||||||
|
# blocks until kafka is reachable
|
||||||
|
kafka-topics --bootstrap-server kafka:29092 --list
|
||||||
|
|
||||||
|
echo -e 'Creating kafka topics'
|
||||||
|
kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic commands --replication-factor 1 --partitions 1
|
||||||
|
kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic data --replication-factor 1 --partitions 1
|
||||||
|
kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic dataDetectors --replication-factor 1 --partitions 1
|
||||||
|
kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic response --replication-factor 1 --partitions 1
|
||||||
|
|
||||||
|
echo -e 'Successfully created the following topics:'
|
||||||
|
kafka-topics --bootstrap-server kafka:29092 --list
|
||||||
|
"
|
||||||
|
kafka-ui:
|
||||||
|
container_name: kafka-ui
|
||||||
|
image: provectuslabs/kafka-ui:latest
|
||||||
|
ports:
|
||||||
|
- 8080:8080
|
||||||
depends_on:
|
depends_on:
|
||||||
- kafka
|
- kafka
|
||||||
entrypoint: [ '/bin/sh', '-c' ]
|
environment:
|
||||||
command: |
|
KAFKA_CLUSTERS_0_NAME: local
|
||||||
"
|
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
|
||||||
# blocks until kafka is reachable
|
KAFKA_CLUSTERS_0_METRICS_PORT: 9997
|
||||||
kafka-topics --bootstrap-server kafka:29092 --list
|
|
||||||
|
|
||||||
echo -e 'Creating kafka topics'
|
|
||||||
kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic commands --replication-factor 1 --partitions 1
|
|
||||||
kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic data --replication-factor 1 --partitions 1
|
|
||||||
kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic response --replication-factor 1 --partitions 1
|
|
||||||
|
|
||||||
echo -e 'Successfully created the following topics:'
|
|
||||||
kafka-topics --bootstrap-server kafka:29092 --list
|
|
||||||
"
|
|
||||||
kafka-ui:
|
|
||||||
networks:
|
|
||||||
- vpn
|
|
||||||
container_name: kafka-ui
|
|
||||||
image: provectuslabs/kafka-ui:latest
|
|
||||||
ports:
|
|
||||||
- 8080:8080
|
|
||||||
depends_on:
|
|
||||||
- kafka
|
|
||||||
environment:
|
|
||||||
KAFKA_CLUSTERS_0_NAME: local
|
|
||||||
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
|
|
||||||
KAFKA_CLUSTERS_0_METRICS_PORT: 9997
|
|
||||||
volumes:
|
volumes:
|
||||||
postgres_data:
|
postgres_data:
|
||||||
driver: local
|
driver: local
|
||||||
|
Loading…
Reference in New Issue
Block a user