3 Commits

15 changed files with 284 additions and 191 deletions

View File

@@ -0,0 +1,41 @@
using Cloud.Requests;
using Cloud.Services.Broker;
using Cloud.Services.Broker.Implement.Kafka;
using Cloud.Services.Broker.Support;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Mvc;
namespace Cloud.Controllers
{
[Authorize]
[ApiController]
[Route("api")]
public class HeaterController : Controller
{
//Контроллер вентиля
private readonly IBrokerService _kafkaService;
public HeaterController(KafkaService kafkaService)
{
_kafkaService = kafkaService;
}
[HttpPost("farm/{farmId}/greenhouse/{ghId}/heating")]
public async Task<IActionResult> interactValve([FromBody] HeaterRequest request, int farmId, int ghId)
{
var command = new Command
{
request_id = Guid.NewGuid(),
greenhouse_id = ghId,
farm_id = farmId,
command = "heater",
target_temp = request.Temperature
};
await _kafkaService.ProduceAsync("commands", command);
return Ok($"Heater target temperature is set as {request.Temperature}");
}
}
}

View File

@@ -1,8 +1,12 @@
using Cloud.Requests; using Cloud.Models;
using Cloud.Requests;
using Cloud.Services; using Cloud.Services;
using Cloud.Services.Broker; using Cloud.Services.Broker;
using Cloud.Services.Broker.Implement.Kafka;
using Cloud.Services.Broker.Support;
using Microsoft.AspNetCore.Authorization; using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Mvc; using Microsoft.AspNetCore.Mvc;
using System;
using System.ComponentModel.DataAnnotations; using System.ComponentModel.DataAnnotations;
using System.Text.Json; using System.Text.Json;
@@ -17,7 +21,7 @@ namespace Cloud.Controllers
private readonly IBrokerService _kafkaService; private readonly IBrokerService _kafkaService;
public ValveController(IBrokerService kafkaService) public ValveController(KafkaService kafkaService)
{ {
_kafkaService = kafkaService; _kafkaService = kafkaService;
} }
@@ -25,19 +29,18 @@ namespace Cloud.Controllers
[HttpPost("farm/{farmId}/greenhouse/{ghId}/watering")] [HttpPost("farm/{farmId}/greenhouse/{ghId}/watering")]
public async Task<IActionResult> interactValve([FromBody] ValveRequest request, int farmId, int ghId) public async Task<IActionResult> interactValve([FromBody] ValveRequest request, int farmId, int ghId)
{ {
var kafkaRequest = new var command = new Command
{ {
FarmId = farmId, request_id = Guid.NewGuid(),
GreenHouseId = ghId, greenhouse_id = ghId,
SomeAction = request.Action, farm_id = farmId,
command = "valve",
target_moisture = request.Moisture
}; };
var message = JsonSerializer.Serialize(kafkaRequest);
return Ok(kafkaRequest);
/*await _kafkaService.ProduceAsync("ValvesHeatersRequest", message); await _kafkaService.ProduceAsync("commands", command);
return Ok($"Valve status is {request.Action}");*/ return Ok($"Valve target moisture is set as {request.Moisture}%");
} }
} }
} }

View File

@@ -0,0 +1,58 @@
using Cloud.Models;
using Cloud.Requests;
using Cloud.Services.Broker;
using Cloud.Services.Broker.Implement.Kafka;
using Cloud.Services.Broker.Support;
using Microsoft.AspNetCore.Mvc;
using System;
namespace Cloud.Controllers
{
[ApiController]
[Route("api/[controller]")]
public class WebhookController : ControllerBase
{
private readonly IBrokerService _kafkaService;
public WebhookController (KafkaService kafkaService)
{
_kafkaService = kafkaService;
}
[HttpPost]
public async Task<IActionResult> ReceiveWebhook([FromBody] WebhookData payload)
{
if (payload == null)
{
return BadRequest("Invalid data.");
}
if (payload.moisture >= payload.target_moisture && payload.valveStatus == "open")
{
var command = new Command
{
request_id = Guid.NewGuid(),
greenhouse_id = payload.greenhouse_id,
command = "valve",
};
await _kafkaService.ProduceAsync("commands", command);
}
if (payload.temp >= payload.target_temp && payload.heaterStatus == "on")
{
var command = new Command
{
request_id = Guid.NewGuid(),
greenhouse_id = payload.greenhouse_id,
command = "heater",
};
await _kafkaService.ProduceAsync("commands", command);
}
// Respond with a success status
return Ok(new { message = "Webhook received successfully", data = payload });
}
}
}

View File

@@ -71,6 +71,7 @@ builder.Services.AddValidatorsFromAssemblyContaining<LoginValidator>();
builder.Services.AddValidatorsFromAssemblyContaining<RegisterValidator>(); builder.Services.AddValidatorsFromAssemblyContaining<RegisterValidator>();
builder.Services.AddValidatorsFromAssemblyContaining<FarmValidator>(); builder.Services.AddValidatorsFromAssemblyContaining<FarmValidator>();
builder.Services.AddValidatorsFromAssemblyContaining<ValveValidator>(); builder.Services.AddValidatorsFromAssemblyContaining<ValveValidator>();
builder.Services.AddValidatorsFromAssemblyContaining<HeaterValidator>();
// Learn more about configuring Swagger/OpenAPI at https://aka.ms/aspnetcore/swashbuckle // Learn more about configuring Swagger/OpenAPI at https://aka.ms/aspnetcore/swashbuckle
builder.Services.AddEndpointsApiExplorer(); builder.Services.AddEndpointsApiExplorer();

View File

@@ -0,0 +1,7 @@
namespace Cloud.Requests
{
public class HeaterRequest
{
public float Temperature { get; set; }
}
}

View File

@@ -2,6 +2,6 @@
{ {
public class ValveRequest public class ValveRequest
{ {
public string Action { get; set; } public float Moisture { get; set; }
} }
} }

View File

@@ -0,0 +1,15 @@
namespace Cloud.Requests
{
public class WebhookData
{
public int request_id { get; set; }
public int greenhouse_id { get; set; }
public float moisture { get; set; }
public float target_moisture { get; set; }
public string valveStatus { get; set; }
public float temp { get; set; }
public float target_temp { get; set; }
public string heaterStatus { get; set; }
public bool isAutoOn { get; set; }
}
}

View File

@@ -14,8 +14,8 @@ namespace Cloud.Services.Broker.Implement.Kafka
public KafkaConsumer(IConfiguration config) public KafkaConsumer(IConfiguration config)
{ {
_config = config; _config = config;
Console.WriteLine($"KafkaConsumer created. IP:" + _config["KAFKA_URL"]); Console.WriteLine($"KafkaConsumer created. IP:" + _config["Kafka:BootstrapServers"]);
ChangeBrokerIp(_config["KAFKA_URL"]); ChangeBrokerIp(_config["Kafka:BootstrapServers"]);
} }
public IEnumerable<T>? WaitMessages<T>(string topic) public IEnumerable<T>? WaitMessages<T>(string topic)

View File

@@ -12,8 +12,8 @@ namespace Cloud.Services.Broker.Implement.Kafka
public KafkaProducer(IConfiguration configuration) public KafkaProducer(IConfiguration configuration)
{ {
_config = configuration; _config = configuration;
Console.WriteLine($"KafkaProducer created. IP:" + _config["KAFKA_URL"]); Console.WriteLine($"KafkaProducer created. IP:" + _config["Kafka:BootstrapServers"]);
ChangeBrokerIp(_config["KAFKA_URL"]); ChangeBrokerIp(_config["Kafka:BootstrapServers"]);
} }
public async Task ProduceAsync(string topic, Command command) public async Task ProduceAsync(string topic, Command command)
{ {

View File

@@ -19,12 +19,12 @@ namespace Cloud.Services.Broker.Implement.Kafka
public async Task ProduceAsync(string topic, Command command) public async Task ProduceAsync(string topic, Command command)
=> await _producer.ProduceAsync("commands", command); => await _producer.ProduceAsync(topic, command);
public void ChangeBrokerIp(string ip) public void ChangeBrokerIp(string ip)
{ {
_consumer.ChangeBrokerIp(ip); _consumer.ChangeBrokerIp(ip);
_producer.ChangeBrokerIp(ip); _producer.ChangeBrokerIp(ip);
} }
} }
} }

View File

@@ -4,7 +4,11 @@ namespace Cloud.Services.Broker.Support
{ {
public class Command public class Command
{ {
public Guid GreenhouseId { get; set; } public Guid request_id { get; set; }
public string CommandName { get; set; } = null!; public int greenhouse_id { get; set; }
} public int? farm_id { get; set; }
public string command { get; set; }
public float? target_moisture{ get; set; }
public float? target_temp { get; set; }
}
} }

View File

@@ -0,0 +1,15 @@
using Cloud.Requests;
using FluentValidation;
namespace Cloud.Validation
{
public class HeaterValidator : AbstractValidator<HeaterRequest>
{
public HeaterValidator()
{
RuleFor(request => request.Temperature)
.NotNull().WithMessage("Temperature can't be empty");
}
}
}

View File

@@ -8,9 +8,8 @@ namespace Cloud.Validation
{ {
public ValveValidator() { public ValveValidator() {
RuleFor(request => request.Action) RuleFor(request => request.Moisture)
.NotEmpty().WithMessage("Action can't be empty"). .NotNull().WithMessage("Moisture can't be empty");
IsEnumName(typeof (ValveEnum)).WithMessage("Action is not correct");
} }
} }
} }

View File

@@ -15,23 +15,19 @@ def start_manager():
return return
class Manager: class Manager:
def __init__(self, _id: int, moisture: float = 0, temp: float = 20, isAutoOn: bool = False, valve_state: str = "closed", def __init__(self, _id: int, moisture: float = 0, target_moisture: float = 30, temp: float = 20, target_temp: float = 30,
heater_state: str = "off"): isAutoOn: bool = False, valve_state: str = "closed", heater_state: str = "off"):
KAFKA_URL = os.environ.get('KAFKA_URL') KAFKA_URL = os.environ.get('KAFKA_URL')
print("KAFKA_URL=", KAFKA_URL) print("KAFKA_URL=", KAFKA_URL)
self._id = _id self._id = _id
self.moisture = moisture self.moisture = moisture
self.target_moisture = target_moisture
self.temp = temp self.temp = temp
self.temp = target_temp
self.isAutoOn = isAutoOn self.isAutoOn = isAutoOn
self.valve_state = valve_state self.valve_state = valve_state
self.heater_state = heater_state self.heater_state = heater_state
self.dataPublisher = KafkaProducer(
bootstrap_servers=[KAFKA_URL],
client_id=f'manager{self._id}_producer',
value_serializer=lambda v: dumps(v).encode('utf-8')
)
self.controllerConsumer = KafkaConsumer( self.controllerConsumer = KafkaConsumer(
'commands', 'commands',
bootstrap_servers=[KAFKA_URL], bootstrap_servers=[KAFKA_URL],
@@ -47,22 +43,6 @@ class Manager:
value_serializer=lambda v: dumps(v).encode('utf-8') value_serializer=lambda v: dumps(v).encode('utf-8')
) )
def sendData(self):
print("sending data...")
message = {
'id': self._id,
'moisture': self.moisture,
'temp': self.temp,
'valveStatus': str(self.valve_state),
'heaterStatus': str(self.heater_state),
'isAutoOn': self.isAutoOn
}
print(message)
self.dataPublisher.send('data', message)
self.dataPublisher.flush()
def toggle_device(self, device, request_id, greenhouse_id): def toggle_device(self, device, request_id, greenhouse_id):
if device == 'valve': if device == 'valve':
@@ -87,11 +67,16 @@ class Manager:
def send_status(self, request_id, greenhouse_id): def send_status(self, request_id, greenhouse_id):
status = { data = {
'request_id': request_id, 'request_id': request_id,
'greenhouse_id': greenhouse_id, 'greenhouse_id': greenhouse_id,
'valve_state': self.valve_state, 'moisture': self.moisture,
'heater_state': self.heater_state 'target_moisture': self.target_moisture,
'valveStatus': self.valve_state,
'temp': self.temp,
'target_temp': self.target_temp,
'heaterStatus': self.heater_state,
'isAutoOn': self.isAutoOn
} }
self.sendDataCommand(status) self.sendDataCommand(status)
print("Updating info...\n") print("Updating info...\n")
@@ -99,7 +84,16 @@ class Manager:
def sendDataCommand(self, message): def sendDataCommand(self, message):
print("sending data...") print("sending data...")
self.dataPublisher.send('response', message) url = "https://localhost:7113/api/webhook"
headers = {
"Content-Type": "application/json"
}
response = requests.post(url, json=message, headers=headers)
# Check status code and content
print("Status Code", response.status_code)
print("Response Body", response.json())
def getCommand(self): def getCommand(self):
messages = self.controllerConsumer.poll(timeout_ms=1000) messages = self.controllerConsumer.poll(timeout_ms=1000)
@@ -113,25 +107,12 @@ class Manager:
self.request_id = message.value['request_id'] self.request_id = message.value['request_id']
self.greenhouse_id = message.value['greenhouse_id'] self.greenhouse_id = message.value['greenhouse_id']
self.command = message.value['command'] self.command = message.value['command']
if message.value.get('target_moisture'):
self.target_moisture = message.value['target_moisture']
if message.value.get('target_temp'):
self.target_temp = message.value['target_temp']
self.toggle_device(self.command, self.request_id, self.greenhouse_id) self.toggle_device(self.command, self.request_id, self.greenhouse_id)
@app.route(f'/webhook', methods=['POST'])
def webhook():
print("received webhook", request.args.get('id'))
for manager in managers:
print()
if int(request.args.get('id')) == manager._id and request.method == 'POST':
print("Data received from Webhook is", request.json)
body = request.json
for key, value in body.items():
setattr(manager, key, value)
manager.sendData()
return f"Webhook received for manager {manager._id}"
return "Webhook ignored"
t1 = threading.Thread(target=start_manager) t1 = threading.Thread(target=start_manager)
manager1 = Manager(_id=1) manager1 = Manager(_id=1)
managers = [manager1] managers = [manager1]

View File

@@ -8,122 +8,39 @@ networks:
gateway: "192.168.2.1" gateway: "192.168.2.1"
services: services:
# Ниже идет то, что в облаке cloud:
# cloud:
# networks:
# - vpn
# build: ./Cloud/
# ports:
# - "5124:5124"
# environment:
# ASPNETCORE_ENVIRONMENT: Development
# DB_CONNECTION_STRING: ${DB_CONNECTION_STRING}
# REDDIS_URL: redis:6379
# KAFKA_URL: kafka:29092
# # Добавить, когда будет фронт!
# # FRONT_URL: front:3000
# depends_on:
# - postgres
# - redis
# postgres:
# image: postgres:14
# container_name: cucumber_database
# environment:
# POSTGRES_USER: ${POSTGRES_USER}
# POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
# POSTGRES_DB: ${POSTGRES_DB}
# ports:
# - "5438:5432"
# volumes:
# - postgres_data:/var/lib/postgresql/data
# redis:
# image: 'redis:latest'
# ports:
# - '6379:6379'
# volumes:
# - 'cloud-redis:/data'
# healthcheck:
# test:
# - CMD
# - redis-cli
# - ping
# retries: 3
# timeout: 5s
# ------------------------------------------------
# Ниже идет то, что на ферме
# zookeeper:
# networks:
# - vpn
# image: confluentinc/cp-zookeeper:7.4.0
# environment:
# ZOOKEEPER_CLIENT_PORT: 2181
# ZOOKEEPER_TICK_TIME: 2000
# ports:
# - 2181:2181
# kafka:
# networks:
# vpn:
# ipv4_address: 192.168.2.10
# image: confluentinc/cp-kafka:7.4.0
# ports:
# - 9092:9092
# - 9997:9997
# expose:
# - 29092:29092
# environment:
# KAFKA_BROKER_ID: 1
# KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
# KAFKA_LISTENERS: HOST://0.0.0.0:9092,DOCKER://0.0.0.0:29092
# KAFKA_ADVERTISED_LISTENERS: HOST://192.168.1.5:9092,DOCKER://kafka:29092
# KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER:PLAINTEXT,HOST:PLAINTEXT
# KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER
# KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
# KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
# KAFKA_LOG_FLUSH_INTERVAL_MESSAGES: 10000
# KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
# depends_on:
# - zookeeper
# init-kafka:
# networks:
# - vpn
# image: confluentinc/cp-kafka:7.4.0
# depends_on:
# - kafka
# entrypoint: [ '/bin/sh', '-c' ]
# command: |
# "
# # blocks until kafka is reachable
# kafka-topics --bootstrap-server kafka:29092 --list
# echo -e 'Creating kafka topics'
# kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic commands --replication-factor 1 --partitions 1
# kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic data --replication-factor 1 --partitions 1
# kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic response --replication-factor 1 --partitions 1
# echo -e 'Successfully created the following topics:'
# kafka-topics --bootstrap-server kafka:29092 --list
# "
# kafka-ui:
# networks:
# - vpn
# container_name: kafka-ui
# image: provectuslabs/kafka-ui:latest
# ports:
# - 8080:8080
# depends_on:
# - kafka
# environment:
# KAFKA_CLUSTERS_0_NAME: local
# KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
# KAFKA_CLUSTERS_0_METRICS_PORT: 9997
redis-farm:
networks: networks:
- vpn - vpn
build: ./Cloud/
ports:
- "5124:5124"
environment:
ASPNETCORE_ENVIRONMENT: Development
DB_CONNECTION_STRING: ${DB_CONNECTION_STRING}
REDDIS_URL: redis:6379
KAFKA_URL: kafka:29092
# Добавить, когда будет фронт!
# FRONT_URL: front:3000
depends_on:
- postgres
- redis
postgres:
image: postgres:14
container_name: cucumber_database
environment:
POSTGRES_USER: ${POSTGRES_USER}
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
POSTGRES_DB: ${POSTGRES_DB}
ports:
- "5438:5432"
volumes:
- postgres_data:/var/lib/postgresql/data
redis:
image: 'redis:latest' image: 'redis:latest'
ports: ports:
- '6380:6379' - '6379:6379'
volumes: volumes:
- 'farm-redis:/data' - 'cloud-redis:/data'
healthcheck: healthcheck:
test: test:
- CMD - CMD
@@ -131,15 +48,71 @@ services:
- ping - ping
retries: 3 retries: 3
timeout: 5s timeout: 5s
redis-insight: zookeeper:
networks: networks:
- vpn - vpn
image: redis/redisinsight:latest image: confluentinc/cp-zookeeper:7.4.0
restart: always environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports: ports:
- "5540:5540" - 2181:2181
volumes: kafka:
- redis-insight:/data networks:
vpn:
ipv4_address: 192.168.2.10
image: confluentinc/cp-kafka:7.4.0
ports:
- 9092:9092
- 9997:9997
expose:
- 29092:29092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS: HOST://0.0.0.0:9092,DOCKER://0.0.0.0:29092
KAFKA_ADVERTISED_LISTENERS: HOST://192.168.1.5:9092,DOCKER://kafka:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER:PLAINTEXT,HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_LOG_FLUSH_INTERVAL_MESSAGES: 10000
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
depends_on:
- zookeeper
init-kafka:
networks:
- vpn
image: confluentinc/cp-kafka:7.4.0
depends_on:
- kafka
entrypoint: [ '/bin/sh', '-c' ]
command: |
"
# blocks until kafka is reachable
kafka-topics --bootstrap-server kafka:29092 --list
echo -e 'Creating kafka topics'
kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic commands --replication-factor 1 --partitions 1
kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic data --replication-factor 1 --partitions 1
kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic response --replication-factor 1 --partitions 1
echo -e 'Successfully created the following topics:'
kafka-topics --bootstrap-server kafka:29092 --list
"
kafka-ui:
networks:
- vpn
container_name: kafka-ui
image: provectuslabs/kafka-ui:latest
ports:
- 8080:8080
depends_on:
- kafka
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
KAFKA_CLUSTERS_0_METRICS_PORT: 9997
manager: manager:
networks: networks:
- vpn - vpn
@@ -168,7 +141,3 @@ volumes:
driver: local driver: local
cloud-redis: cloud-redis:
driver: local driver: local
farm-redis:
driver: local
redis-insight:
driver: local