Compare commits

...

21 Commits

Author SHA1 Message Date
fed96d5b86 del: сервисы брокера
они уже есь
2024-11-19 23:42:12 +04:00
9b770d131a Merge branch 'dev' into greenhouseCRUD 2024-11-19 23:35:40 +04:00
c34926e1ec Merge branch 'dev' of https://git.is.ulstu.ru/mfnefd/Cucumber into dev 2024-11-19 23:33:28 +04:00
d03313041c Merge branch 'devManager' into dev 2024-11-19 23:33:16 +04:00
c230d86404 add: сервис брокера, сущности брокера 2024-11-19 23:23:21 +04:00
07c5ea7853 Merge pull request 'brokerService' (#6) from brokerService into dev
Reviewed-on: #6
2024-11-19 19:46:42 +04:00
5e73961ad5 Add Valve Controller 2024-11-19 19:36:05 +04:00
e8a1a8385b Merge branch 'dev' into brokerService 2024-11-13 15:42:26 +04:00
7c310d21f7 Add broker service 2024-11-13 15:24:47 +04:00
the
5fa9c76b99 test changes 2024-11-13 01:14:32 +04:00
the
c032253699 фикс2 2024-11-13 01:00:16 +04:00
the
83aec339c9 фикс 2024-11-13 00:53:31 +04:00
the
b009ebdd0c АААААААААААА 2024-11-13 00:36:31 +04:00
the
73961934f0 СТОЛЬКО МУЧЕНИЙ РАДИ ОДНОЙ СТРОЧКИ, ЕЩЁ И НЕТВОРК НА САМОМ ДЕЛЕ НЕ РАБОТАЕТ 2024-11-13 00:31:21 +04:00
the
ffef39d409 СТОЛЬКО МУЧЕНИЙ РАДИ ОДНОЙ СТРОЧКИ 2024-11-13 00:30:33 +04:00
the
57e05aba90 kafka network unfix 2024-11-12 22:34:06 +04:00
the
ce3f3a4dc6 kafka network fix 2024-11-12 22:31:57 +04:00
the
4747f975c5 kafka network 2024-11-12 18:03:45 +04:00
the
f82c8daa92 detector webhooks 2024-11-12 17:04:24 +04:00
the
b7f4aa3f9f webhook fix 2024-11-12 16:19:37 +04:00
the
7f5262575e webhook test 2024-11-12 16:18:14 +04:00
20 changed files with 328 additions and 136 deletions

View File

@ -7,6 +7,7 @@
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="2.6.1" />
<PackageReference Include="FluentValidation.AspNetCore" Version="11.3.0" /> <PackageReference Include="FluentValidation.AspNetCore" Version="11.3.0" />
<PackageReference Include="Microsoft.AspNetCore.Authentication.JwtBearer" Version="6.0.4" /> <PackageReference Include="Microsoft.AspNetCore.Authentication.JwtBearer" Version="6.0.4" />
<PackageReference Include="Microsoft.EntityFrameworkCore.SqlServer" Version="6.0.14" /> <PackageReference Include="Microsoft.EntityFrameworkCore.SqlServer" Version="6.0.14" />

View File

@ -0,0 +1,42 @@
using Cloud.Requests;
using Cloud.Services;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Mvc;
using System.ComponentModel.DataAnnotations;
using System.Text.Json;
namespace Cloud.Controllers
{
[Authorize]
[ApiController]
[Route("api")]
public class ValveController : ControllerBase
{
//Контроллер вентиля
private readonly ProducerService _producerService;
public ValveController(ProducerService producerService)
{
_producerService = producerService;
}
[HttpPost("farm/{farmId}/greenhouse/{ghId}/watering")]
public async Task<IActionResult> interactValve([FromBody] ValveRequest request, int farmId, int ghId)
{
var kafkaRequest = new
{
FarmId = farmId,
GreenHouseId = ghId,
SomeAction = request.Action,
};
var message = JsonSerializer.Serialize(kafkaRequest);
return Ok(kafkaRequest);
/*await _producerService.ProduceMessageAsync("ValvesHeatersRequest", message);
return Ok($"Valve status is {request.Action}");*/
}
}
}

9
Cloud/Enums/ValveEnum.cs Normal file
View File

@ -0,0 +1,9 @@
namespace Cloud.Enums
{
public enum ValveEnum
{
Open,
Close,
Auto
}
}

View File

@ -9,6 +9,7 @@ using Cloud.Validation;
using StackExchange.Redis; using StackExchange.Redis;
using Cloud.Services.Broker.Implement.Kafka; using Cloud.Services.Broker.Implement.Kafka;
using Cloud.Services.Broker; using Cloud.Services.Broker;
using Cloud.Services;
var builder = WebApplication.CreateBuilder(args); var builder = WebApplication.CreateBuilder(args);
@ -21,6 +22,15 @@ builder.Services.AddSingleton<IConnectionMultiplexer>(sp =>
return ConnectionMultiplexer.Connect(configuration); return ConnectionMultiplexer.Connect(configuration);
}); });
//Kafka producer service
builder.Services.AddSingleton<ProducerService, ProducerService>();
//Kafka consumer service
builder.Services.AddSingleton<ConsumerService, ConsumerService>();
//Add the BackgroundWorkerService
builder.Services.AddHostedService<BackgroundWorkerService>();
//Jwt configuration //Jwt configuration
var jwtIssuer = builder.Configuration.GetSection("Jwt:Issuer").Get<string>(); var jwtIssuer = builder.Configuration.GetSection("Jwt:Issuer").Get<string>();
var jwtKey = builder.Configuration.GetSection("Jwt:Key").Get<string>(); var jwtKey = builder.Configuration.GetSection("Jwt:Key").Get<string>();
@ -60,6 +70,7 @@ builder.Services.AddFluentValidationClientsideAdapters();
builder.Services.AddValidatorsFromAssemblyContaining<LoginValidator>(); builder.Services.AddValidatorsFromAssemblyContaining<LoginValidator>();
builder.Services.AddValidatorsFromAssemblyContaining<RegisterValidator>(); builder.Services.AddValidatorsFromAssemblyContaining<RegisterValidator>();
builder.Services.AddValidatorsFromAssemblyContaining<FarmValidator>(); builder.Services.AddValidatorsFromAssemblyContaining<FarmValidator>();
builder.Services.AddValidatorsFromAssemblyContaining<ValveValidator>();
// Learn more about configuring Swagger/OpenAPI at https://aka.ms/aspnetcore/swashbuckle // Learn more about configuring Swagger/OpenAPI at https://aka.ms/aspnetcore/swashbuckle
builder.Services.AddEndpointsApiExplorer(); builder.Services.AddEndpointsApiExplorer();

View File

@ -0,0 +1,7 @@
namespace Cloud.Requests
{
public class ValveRequest
{
public string Action { get; set; }
}
}

View File

@ -1,7 +1,9 @@
using Cloud.Services.Broker.Support;
namespace Cloud.Services.Broker namespace Cloud.Services.Broker
{ {
public interface IBrokerConsumer public interface IBrokerConsumer
{ {
// TODO: добавить методы для получения данных T? WaitMessage<T>(string topic) where T : IBrokerResponse;
} }
} }

View File

@ -2,7 +2,7 @@ using Cloud.Services.Broker.Support;
namespace Cloud.Services.Broker namespace Cloud.Services.Broker
{ {
public interface IBrokerProdurcer public interface IBrokerProducer
{ {
Task ProduceAsync(string topic, Command command); Task ProduceAsync(string topic, Command command);
} }

View File

@ -4,7 +4,7 @@ namespace Cloud.Services.Broker
{ {
public interface IBrokerService public interface IBrokerService
{ {
Task<CommandResult> Produce(Command command); Task Produce(Command command);
Task<T> Consume<T>(string topic); T? Consume<T>(string topic) where T : IBrokerResponse;
} }
} }

View File

@ -0,0 +1,51 @@
using Cloud.Services.Broker.Support;
using Confluent.Kafka;
using System.Text.Json;
namespace Cloud.Services.Broker.Implement.Kafka
{
public class KafkaConsumer : IBrokerConsumer
{
private readonly IConsumer<string, string> _consumer;
public KafkaConsumer(IConfiguration config)
{
var consumerConfig = new ConsumerConfig()
{
BootstrapServers = config["Kafka:BootstrapServers"],
GroupId = config["Kafka:GroupId"],
AutoOffsetReset = AutoOffsetReset.Earliest,
};
_consumer = new ConsumerBuilder<string, string>(consumerConfig).Build();
}
public T? WaitMessage<T>(string topic)
where T : IBrokerResponse
{
try
{
_consumer.Subscribe(topic);
var consumeResult = _consumer.Consume(TimeSpan.FromMilliseconds(1000));
if (consumeResult == null)
{
// No message received from Kafka within the specified timeout.
return default;
}
return JsonSerializer.Deserialize<T>(consumeResult.Message.Value);
}
catch (Exception ex)
{
throw;
}
finally
{
_consumer.Close();
}
}
}
}

View File

@ -0,0 +1,28 @@
using Cloud.Services.Broker.Support;
using Confluent.Kafka;
namespace Cloud.Services.Broker.Implement.Kafka
{
public class KafkaProducer : IBrokerProducer
{
private readonly IProducer<Guid, Command> _producer;
public KafkaProducer(IConfiguration configuration)
{
var producerConfig = new ProducerConfig
{
BootstrapServers = configuration["Kafka:BootstrapServers"]
};
//Build the Producer
_producer = new ProducerBuilder<Guid, Command>(producerConfig).Build();
}
public async Task ProduceAsync(string topic, Command command)
{
var message = new Message<Guid, Command> { Key = Guid.NewGuid(), Value = command };
//Produce the Message
await _producer.ProduceAsync(topic, message);
}
}
}

View File

@ -4,14 +4,19 @@ namespace Cloud.Services.Broker.Implement.Kafka
{ {
public class KafkaService : IBrokerService public class KafkaService : IBrokerService
{ {
public Task<T> Consume<T>(string topic) private readonly KafkaProducer _producer;
private readonly KafkaConsumer _consumer;
public KafkaService(IConfiguration configuration)
{ {
throw new NotImplementedException(); _producer = new KafkaProducer(configuration);
_consumer = new KafkaConsumer(configuration);
} }
public Task<CommandResult> Produce(Command command) public T? Consume<T>(string topic)
{ where T : IBrokerResponse => _consumer.WaitMessage<T>(topic);
throw new NotImplementedException();
} public async Task Produce(Command command)
=> await _producer.ProduceAsync("commands", command);
} }
} }

View File

@ -1,9 +1,10 @@
using System.Text.Json;
namespace Cloud.Services.Broker.Support namespace Cloud.Services.Broker.Support
{ {
public class Command public class Command
{ {
public int Id { get; set; } public Guid GreenhouseId { get; set; }
public int GreenhouseId { get; set; }
public string CommandName { get; set; } = null!; public string CommandName { get; set; } = null!;
} }
} }

View File

@ -1,6 +1,6 @@
namespace Cloud.Services.Broker.Support namespace Cloud.Services.Broker.Support
{ {
public class CommandResult public class CommandResult : IBrokerResponse
{ {
public int CommandId { get; set; } public int CommandId { get; set; }
public int GreenhouseId { get; set; } public int GreenhouseId { get; set; }

View File

@ -1,6 +1,6 @@
namespace Cloud.Services.Broker.Support namespace Cloud.Services.Broker.Support
{ {
public class GreenhouseInfo public class GreenhouseInfo : IBrokerResponse
{ {
public int Id { get; set; } public int Id { get; set; }
public int PercentWater { get; set; } public int PercentWater { get; set; }

View File

@ -0,0 +1,6 @@
namespace Cloud.Services.Broker.Support
{
public interface IBrokerResponse
{
}
}

View File

@ -0,0 +1,16 @@
using Cloud.Enums;
using Cloud.Requests;
using FluentValidation;
namespace Cloud.Validation
{
public class ValveValidator : AbstractValidator<ValveRequest>
{
public ValveValidator() {
RuleFor(request => request.Action)
.NotEmpty().WithMessage("Action can't be empty").
IsEnumName(typeof (ValveEnum)).WithMessage("Action is not correct");
}
}
}

View File

@ -5,6 +5,10 @@
"Microsoft.AspNetCore": "Warning" "Microsoft.AspNetCore": "Warning"
} }
}, },
"Kafka": {
"BootstrapServers": "localhost:9092",
"GroupId": "ValvesHeaters"
},
"AllowedHosts": "*", "AllowedHosts": "*",
"Jwt": { "Jwt": {
"Key": "m7TyhE20s0dVtUDAr9EnFdPZnAG8maxgBTaiW5j6kO6RQhWDAGxYmXyu0suDnE0o", "Key": "m7TyhE20s0dVtUDAr9EnFdPZnAG8maxgBTaiW5j6kO6RQhWDAGxYmXyu0suDnE0o",

View File

@ -1,13 +1,12 @@
from random import random
from turtledemo.penrose import start
from kafka import KafkaProducer, KafkaConsumer
import kafka
import socket
from json import dumps, loads
import time import time
import random as rnd import random as rnd
from flask import Flask
import requests
import threading
app = Flask(__name__)
class Detector: class Detector:
def __init__(self, id, moistureThresholdUpper, moistureThresholdLower, tempThresholdUpper, tempThresholdLower): def __init__(self, id, moistureThresholdUpper, moistureThresholdLower, tempThresholdUpper, tempThresholdLower):
self.id = id self.id = id
@ -18,31 +17,23 @@ class Detector:
self.moisture = 0 self.moisture = 0
self.temp = 0 self.temp = 0
self.producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
client_id=f'detector{self.id}',
value_serializer=lambda v: dumps(v).encode('utf-8')
)
def sendData(self):
message = {'id' : self.id,
'moisture': self.moisture,
'temperature' : self.temp }
self.producer.send('dataDetectors', message)
def cycle(self): def cycle(self):
self.moisture += rnd.random() / 100 self.moisture += rnd.random() / 100
self.temp += (rnd.random() - 0.5) / 100 self.temp += (rnd.random() - 0.5) / 100
def sendData(self):
data = {"moisture": self.moisture,
"temp": self.temp}
requests.post(f"http://127.0.0.1:20002/webhook?id={self.id}", json=data)
detector1 = Detector(1, 0.6, 0.2, 40, 20) detector1 = Detector(1, 0.6, 0.2, 40, 20)
detector2 = Detector(2, 0.7, 0.3, 40, 20)
detector3 = Detector(3, 0.9, 0.6, 40, 20)
detectors = [detector1, detector2, detector3] detectors = [detector1]
while True: if __name__ =="__main__":
for detector in detectors: while True:
detector.cycle() for detector in detectors:
detector.sendData() detector.cycle()
time.sleep(1) detector.sendData()
time.sleep(1)

View File

@ -2,20 +2,20 @@ from kafka import KafkaProducer, KafkaConsumer
import kafka import kafka
import socket import socket
from json import dumps, loads from json import dumps, loads
from flask import Flask, request
import time import time
from enum import Enum from enum import Enum
from GreenhouseDetector.detector import Detector import threading
class Status(Enum): app = Flask(__name__)
UNKNOWN = -1
OFF = 0
ON = 1
def start_manager():
return
class Manager: class Manager:
def __init__(self, id: int, moisture: float = 0, temp: float = 20, isAutoOn: bool = False, valve_state: str = "closed", def __init__(self, _id: int, moisture: float = 0, temp: float = 20, isAutoOn: bool = False, valve_state: str = "closed",
heater_state: str = "off"): heater_state: str = "off"):
self.id = id self._id = _id
self.moisture = moisture self.moisture = moisture
self.temp = temp self.temp = temp
self.isAutoOn = isAutoOn self.isAutoOn = isAutoOn
@ -24,49 +24,29 @@ class Manager:
self.dataPublisher = KafkaProducer( self.dataPublisher = KafkaProducer(
bootstrap_servers=['localhost:9092'], bootstrap_servers=['localhost:9092'],
client_id=f'manager{id}_producer', client_id=f'manager{self._id}_producer',
value_serializer=lambda v: dumps(v).encode('utf-8') value_serializer=lambda v: dumps(v).encode('utf-8')
) )
# self.detectorConsumer = KafkaConsumer(
# 'dataDetectors',
# bootstrap_servers=['localhost:9092'],
# auto_offset_reset='earliest',
# enable_auto_commit=True,
# consumer_timeout_ms=1000,
#group_id=f'manager{id}',
# value_deserializer=lambda x: loads(x.decode('utf-8'))
#)
self.controllerConsumer = KafkaConsumer( self.controllerConsumer = KafkaConsumer(
'commands', 'commands',
bootstrap_servers=['localhost:9092'], bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest', auto_offset_reset='earliest',
enable_auto_commit=True, enable_auto_commit=True,
consumer_timeout_ms=2000, consumer_timeout_ms=2000,
group_id=f'manager{id}', group_id=f'manager{self._id}',
value_deserializer=lambda x: loads(x.decode('utf-8')) value_deserializer=lambda x: loads(x.decode('utf-8'))
) )
self.controllerConsumerResponse = KafkaProducer( self.controllerConsumerResponse = KafkaProducer(
bootstrap_servers=['localhost:9092'], bootstrap_servers=['localhost:9092'],
client_id=f'manager{id}_producer', client_id=f'manager{self._id}_producer',
value_serializer=lambda v: dumps(v).encode('utf-8') value_serializer=lambda v: dumps(v).encode('utf-8')
) )
def update(self):
for message in self.detectorConsumer:
print(f"Manager {self.id} received message: ")
print(message.value)
self.moisture = message.value['moisture']
self.temp = message.value['temperature']
print("Updating info...\n")
self.sendData()
def sendData(self): def sendData(self):
print("sending data...") print("sending data...")
message = { message = {
'id': self.id, 'id': self._id,
'moisture': self.moisture, 'moisture': self.moisture,
'temp': self.temp, 'temp': self.temp,
'valveStatus': str(self.valve_state), 'valveStatus': str(self.valve_state),
@ -76,6 +56,8 @@ class Manager:
print(message) print(message)
self.dataPublisher.send('data', message) self.dataPublisher.send('data', message)
self.dataPublisher.flush()
def toggle_device(self, device, request_id, greenhouse_id): def toggle_device(self, device, request_id, greenhouse_id):
@ -122,23 +104,33 @@ class Manager:
for tp, msgs in messages.items(): for tp, msgs in messages.items():
for message in msgs: for message in msgs:
print(f"Manager {self.id} received message: ") print(f"Manager {self._id} received message: ")
print(message.value) print(message.value)
self.request_id = message.value['request_id'] self.request_id = message.value['request_id']
self.greenhouse_id = message.value['greenhouse_id'] self.greenhouse_id = message.value['greenhouse_id']
self.command = message.value['command'] self.command = message.value['command']
self.toggle_device(self.command, self.request_id, self.greenhouse_id) self.toggle_device(self.command, self.request_id, self.greenhouse_id)
@app.route(f'/webhook', methods=['POST'])
def webhook():
print("received webhook", request.args.get('id'))
for manager in managers:
print()
if int(request.args.get('id')) == manager._id and request.method == 'POST':
print("Data received from Webhook is", request.json)
body = request.json
for key, value in body.items():
setattr(manager, key, value)
manager1 = Manager(id=1) manager.sendData()
return f"Webhook received for manager {manager._id}"
return "Webhook ignored"
t1 = threading.Thread(target=start_manager)
manager1 = Manager(_id=1)
managers = [manager1] managers = [manager1]
if __name__ == "__main__":
while True: threading.Thread(target=lambda: app.run(host="0.0.0.0", port=20002, debug=True, use_reloader=False)).start()
time.sleep(5)
manager1.sendData()
for manager in managers:
manager.getCommand()

View File

@ -1,3 +1,12 @@
networks:
vpn:
name: kafkaVPN
driver: bridge
ipam:
config:
- subnet: "192.168.2.0/24"
gateway: "192.168.2.1"
services: services:
cloud: cloud:
build: ./Cloud/ build: ./Cloud/
@ -20,64 +29,81 @@ services:
redis: redis:
image: 'redis:latest' image: 'redis:latest'
ports: ports:
- '6379:6379' - '6379:6379'
volumes: volumes:
- 'cloud-redis:/data' - 'cloud-redis:/data'
healthcheck: healthcheck:
test: test:
- CMD - CMD
- redis-cli - redis-cli
- ping - ping
retries: 3 retries: 3
timeout: 5s timeout: 5s
kafka: zookeeper:
image: confluentinc/cp-kafka:7.4.0 networks:
ports: - vpn
- 9092:9092 image: confluentinc/cp-zookeeper:7.4.0
- 9997:9997 environment:
expose: ZOOKEEPER_CLIENT_PORT: 2181
- 29092:29092 ZOOKEEPER_TICK_TIME: 2000
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_LOG_FLUSH_INTERVAL_MESSAGES: 10000
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
depends_on:
- zookeeper
init-kafka:
image: confluentinc/cp-kafka:7.4.0
depends_on:
- kafka
entrypoint: [ '/bin/sh', '-c' ]
command: |
"
# blocks until kafka is reachable
kafka-topics --bootstrap-server kafka:29092 --list
echo -e 'Creating kafka topics'
kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic commands --replication-factor 1 --partitions 1
kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic data --replication-factor 1 --partitions 1
kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic dataDetectors --replication-factor 1 --partitions 1
kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic response --replication-factor 1 --partitions 1
echo -e 'Successfully created the following topics:'
kafka-topics --bootstrap-server kafka:29092 --list
"
kafka-ui:
container_name: kafka-ui
image: provectuslabs/kafka-ui:latest
ports: ports:
- 8080:8080 - 2181:2181
kafka:
networks:
vpn:
ipv4_address: 192.168.2.10
image: confluentinc/cp-kafka:7.4.0
ports:
- 9092:9092
- 9997:9997
expose:
- 29092:29092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENERS: HOST://0.0.0.0:9092,DOCKER://0.0.0.0:29092
KAFKA_ADVERTISED_LISTENERS: HOST://192.168.1.5:9092,DOCKER://kafka:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: DOCKER:PLAINTEXT,HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: DOCKER
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_LOG_FLUSH_INTERVAL_MESSAGES: 10000
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
depends_on:
- zookeeper
init-kafka:
networks:
- vpn
image: confluentinc/cp-kafka:7.4.0
depends_on: depends_on:
- kafka - kafka
environment: entrypoint: [ '/bin/sh', '-c' ]
KAFKA_CLUSTERS_0_NAME: local command: |
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092 "
KAFKA_CLUSTERS_0_METRICS_PORT: 9997 # blocks until kafka is reachable
kafka-topics --bootstrap-server kafka:29092 --list
echo -e 'Creating kafka topics'
kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic commands --replication-factor 1 --partitions 1
kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic data --replication-factor 1 --partitions 1
kafka-topics --bootstrap-server kafka:29092 --create --if-not-exists --topic response --replication-factor 1 --partitions 1
echo -e 'Successfully created the following topics:'
kafka-topics --bootstrap-server kafka:29092 --list
"
kafka-ui:
networks:
- vpn
container_name: kafka-ui
image: provectuslabs/kafka-ui:latest
ports:
- 8080:8080
depends_on:
- kafka
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:29092
KAFKA_CLUSTERS_0_METRICS_PORT: 9997
volumes: volumes:
postgres_data: postgres_data:
driver: local driver: local