Compare commits

..

1 Commits
main ... lw5

Author SHA1 Message Date
2f859b0495 lw5 2024-10-24 23:11:42 +04:00
14 changed files with 2135 additions and 30 deletions

View File

@ -5,8 +5,20 @@
</component>
<component name="ChangeListManager">
<list default="true" id="67d3f0b7-2fd3-4b40-a135-0347e30e6207" name="Changes" comment="add log file">
<change afterPath="$PROJECT_DIR$/minio/src/main/java/com/spring/minio/config/KafkaConsumerConfig.java" afterDir="false" />
<change afterPath="$PROJECT_DIR$/minio/src/main/java/com/spring/minio/consumer/Consumer.java" afterDir="false" />
<change afterPath="$PROJECT_DIR$/minio/src/main/java/com/spring/minio/properties/KafkaProperties.java" afterDir="false" />
<change afterPath="$PROJECT_DIR$/report/src/main/java/com/spring/report/config/KafkaProducerConfig.java" afterDir="false" />
<change afterPath="$PROJECT_DIR$/report/src/main/java/com/spring/report/kafka/Producer.java" afterDir="false" />
<change beforePath="$PROJECT_DIR$/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/workspace.xml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/docker-compose.yml" beforeDir="false" afterPath="$PROJECT_DIR$/docker-compose.yml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/minio/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/minio/.idea/workspace.xml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/minio/pom.xml" beforeDir="false" afterPath="$PROJECT_DIR$/minio/pom.xml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/minio/src/main/resources/application.yml" beforeDir="false" afterPath="$PROJECT_DIR$/minio/src/main/resources/application.yml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/report/logs/application.log" beforeDir="false" afterPath="$PROJECT_DIR$/report/logs/application.log" afterDir="false" />
<change beforePath="$PROJECT_DIR$/report/pom.xml" beforeDir="false" afterPath="$PROJECT_DIR$/report/pom.xml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/report/src/main/java/com/spring/report/controller/VisitorController.java" beforeDir="false" afterPath="$PROJECT_DIR$/report/src/main/java/com/spring/report/controller/VisitorController.java" afterDir="false" />
<change beforePath="$PROJECT_DIR$/report/src/main/resources/application.yml" beforeDir="false" afterPath="$PROJECT_DIR$/report/src/main/resources/application.yml" afterDir="false" />
</list>
<option name="SHOW_DIALOG" value="false" />
<option name="HIGHLIGHT_CONFLICTS" value="true" />
@ -36,14 +48,15 @@
"ASKED_ADD_EXTERNAL_FILES": "true",
"Docker.docker-compose.yml: Compose Deployment.executor": "Run",
"RunOnceActivity.ShowReadmeOnStart": "true",
"git-widget-placeholder": "main",
"git-widget-placeholder": "lw5",
"kotlin-language-version-configured": "true",
"last_opened_file_path": "D:/java/dcaa-mcs/report/src/main/resources",
"last_opened_file_path": "D:/java/DemoTaskMediasoft(2)",
"node.js.detected.package.eslint": "true",
"node.js.detected.package.tslint": "true",
"node.js.selected.package.eslint": "(autodetect)",
"node.js.selected.package.tslint": "(autodetect)",
"nodejs_package_manager_path": "npm",
"settings.editor.selected.configurable": "bigdataide_conn_settings",
"vue.rearranger.settings.migration": "true"
}
}]]></component>
@ -95,6 +108,7 @@
<workItem from="1729199229123" duration="2034000" />
<workItem from="1729271426421" duration="377000" />
<workItem from="1729706571244" duration="12931000" />
<workItem from="1729789297391" duration="5967000" />
</task>
<task id="LOCAL-00001" summary="add minio">
<option name="closed" value="true" />

View File

@ -86,10 +86,39 @@ services:
backend:
aliases:
- "myminio"
zookeeper:
container_name: zookeeper
image: confluentinc/cp-zookeeper:latest
depends_on:
- myminio
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- '2181:2181'
networks:
backend:
aliases:
- "zookeeper"
kafka:
container_name: kafka
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- '9092:9092'
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
networks:
backend:
aliases:
- "kafka"
networks:
backend:
driver: bridge
volumes:
minio-storage:
driver: bridge

View File

@ -5,8 +5,20 @@
</component>
<component name="ChangeListManager">
<list default="true" id="4c468ddb-7fbe-40b8-bf97-16a0c944e8c6" name="Changes" comment="">
<change afterPath="$PROJECT_DIR$/src/main/java/com/spring/minio/config/KafkaConsumerConfig.java" afterDir="false" />
<change afterPath="$PROJECT_DIR$/src/main/java/com/spring/minio/consumer/Consumer.java" afterDir="false" />
<change afterPath="$PROJECT_DIR$/src/main/java/com/spring/minio/properties/KafkaProperties.java" afterDir="false" />
<change afterPath="$PROJECT_DIR$/../report/src/main/java/com/spring/report/config/KafkaProducerConfig.java" afterDir="false" />
<change afterPath="$PROJECT_DIR$/../report/src/main/java/com/spring/report/kafka/Producer.java" afterDir="false" />
<change beforePath="$PROJECT_DIR$/../.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/../.idea/workspace.xml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/../docker-compose.yml" beforeDir="false" afterPath="$PROJECT_DIR$/../docker-compose.yml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/workspace.xml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/pom.xml" beforeDir="false" afterPath="$PROJECT_DIR$/pom.xml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/src/main/resources/application.yml" beforeDir="false" afterPath="$PROJECT_DIR$/src/main/resources/application.yml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/../report/logs/application.log" beforeDir="false" afterPath="$PROJECT_DIR$/../report/logs/application.log" afterDir="false" />
<change beforePath="$PROJECT_DIR$/../report/pom.xml" beforeDir="false" afterPath="$PROJECT_DIR$/../report/pom.xml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/../report/src/main/java/com/spring/report/controller/VisitorController.java" beforeDir="false" afterPath="$PROJECT_DIR$/../report/src/main/java/com/spring/report/controller/VisitorController.java" afterDir="false" />
<change beforePath="$PROJECT_DIR$/../report/src/main/resources/application.yml" beforeDir="false" afterPath="$PROJECT_DIR$/../report/src/main/resources/application.yml" afterDir="false" />
</list>
<option name="SHOW_DIALOG" value="false" />
<option name="HIGHLIGHT_CONFLICTS" value="true" />
@ -16,18 +28,18 @@
<component name="FileTemplateManagerImpl">
<option name="RECENT_TEMPLATES">
<list>
<option value="Class" />
<option value="Dockerfile" />
<option value="Class" />
</list>
</option>
</component>
<component name="Git.Settings">
<option name="RECENT_GIT_ROOT_PATH" value="$PROJECT_DIR$/.." />
</component>
<component name="KubernetesApiPersistence"><![CDATA[{}]]></component>
<component name="KubernetesApiProvider"><![CDATA[{
"isMigrated": true
}]]></component>
<component name="KubernetesApiPersistence">{}</component>
<component name="KubernetesApiProvider">{
&quot;isMigrated&quot;: true
}</component>
<component name="MavenImportPreferences">
<option name="generalSettings">
<MavenGeneralSettings>
@ -61,7 +73,7 @@
"RunOnceActivity.ShowReadmeOnStart": "true",
"SHARE_PROJECT_CONFIGURATION_FILES": "true",
"Spring Boot.MinioApplication.executor": "Run",
"git-widget-placeholder": "main",
"git-widget-placeholder": "lw5",
"kotlin-language-version-configured": "true",
"last_opened_file_path": "D:/учеба/диплом/protege",
"node.js.detected.package.eslint": "true",
@ -94,7 +106,9 @@
</method>
</configuration>
<configuration default="true" type="docker-deploy" factoryName="docker-compose.yml" temporary="true">
<deployment type="docker-compose.yml" />
<deployment type="docker-compose.yml">
<settings />
</deployment>
<method v="2" />
</configuration>
<configuration name="docker-compose.yml.minio: Compose Deployment" type="docker-deploy" factoryName="docker-compose.yml" temporary="true" server-name="Docker">
@ -141,6 +155,7 @@
<workItem from="1729271678576" duration="138000" />
<workItem from="1729432602672" duration="627000" />
<workItem from="1729705852820" duration="15757000" />
<workItem from="1729795093249" duration="1693000" />
</task>
<servers />
</component>

View File

@ -60,6 +60,10 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>

View File

@ -0,0 +1,43 @@
package com.spring.minio.config;
import com.spring.minio.properties.KafkaProperties;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import java.util.HashMap;
import java.util.Map;
@EnableKafka
@Configuration
@RequiredArgsConstructor
public class KafkaConsumerConfig {
private final KafkaProperties kafkaProperties;
private ConsumerFactory<String, byte[]> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapAddress());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getGroupId());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "20971520");
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "20971520");
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, byte[]> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, byte[]> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}

View File

@ -0,0 +1,30 @@
package com.spring.minio.consumer;
import com.spring.minio.config.MinioAdapter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
@Slf4j
@Service
@RequiredArgsConstructor
public class Consumer {
private final MinioAdapter minioAdapter;
/**
* Слушатель Kafka, который принимает сообщения из заданного топика и обрабатывает их.
*
* @param message Сообщение, полученное из Kafka.
*/
@KafkaListener(topics = "test_topic", containerFactory = "kafkaListenerContainerFactory")
public void listenGroupTopic(byte[] message) {
log.info("Received message");
try {
minioAdapter.uploadFile("user1", "visitors_report.csv", message);
} catch (Exception e) {
log.error("Exception: ", e);
}
}
}

View File

@ -0,0 +1,22 @@
package com.spring.minio.properties;
import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
@Configuration
@ConfigurationProperties(prefix = "app.kafka")
@Getter
@Setter
public class KafkaProperties {
/**
* Адреса серверов Kafka, к которым будет подключаться потребитель.
*/
private String bootstrapAddress;
/**
* Идентификатор группы потребителей.
*/
private String groupId;
}

View File

@ -4,4 +4,9 @@ minio:
access:
name: role_for_spring
secret: 123456789
url: "http://minio:9000"
url: "http://minio:9000"
app:
kafka:
bootstrapAddress: kafka:9092
groupId: group1

File diff suppressed because it is too large Load Diff

View File

@ -62,6 +62,10 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>

View File

@ -0,0 +1,51 @@
package com.spring.report.config;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
import static com.spring.report.kafka.Producer.TEST_TOPIC;
@Configuration
public class KafkaProducerConfig {
@Value("${kafka.bootstrapAddress}")
private String SERVER;
private ProducerFactory<String, byte[]> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVER);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
configProps.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "20971520");
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, byte[]> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, SERVER);
return new KafkaAdmin(configs);
}
@Bean
public NewTopic testTopic() {
return new NewTopic(TEST_TOPIC, 1, (short) 1);
}
}

View File

@ -2,6 +2,7 @@ package com.spring.report.controller;
import com.opencsv.CSVWriter;
import com.spring.report.entity.Visitor;
import com.spring.report.kafka.Producer;
import com.spring.report.repository.VisitorRepository;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@ -22,6 +23,7 @@ import org.springframework.web.client.RestTemplate;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.nio.file.Files;
import java.util.List;
@RestController
@ -31,7 +33,7 @@ import java.util.List;
public class VisitorController {
private final VisitorRepository visitorRepository;
public static final String CORRELATION_ID = "correlation-id";
private final RestTemplate restTemplate;
private final Producer producer;
@GetMapping
public ResponseEntity<List<Visitor>> report(@RequestHeader(CORRELATION_ID) String correlationId) {
@ -84,23 +86,10 @@ public class VisitorController {
}
private void uploadFileToMinio(File file, String corId) {
String url = "http://myminio:8081/upload";
try {
HttpHeaders headers = setHeaders(corId);
headers.setContentType(MediaType.MULTIPART_FORM_DATA);
MultiValueMap<String, Object> body = new LinkedMultiValueMap<>();
body.add("file", new FileSystemResource(file));
HttpEntity<MultiValueMap<String, Object>> requestEntity = new HttpEntity<>(body, headers);
ResponseEntity<String> response = restTemplate.postForEntity(url, requestEntity, String.class);
if (response.getStatusCode() != HttpStatus.OK) {
log.error("Failed to upload file: {}", response.getStatusCode());
}
producer.sendEvent(Producer.TEST_TOPIC, Files.readAllBytes(file.toPath()));
} catch (Exception e) {
log.error("Error uploading file to MinIO", e);
log.error("Error uploading file to Kafka", e);
}
}
}

View File

@ -0,0 +1,33 @@
package com.spring.report.kafka;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.Assert;
import java.util.concurrent.CompletableFuture;
@Slf4j
@Service
@RequiredArgsConstructor
public class Producer {
private final KafkaTemplate<String, byte[]> kafkaTemplateByteArray;
public static final String TEST_TOPIC = "test_topic";
public void sendEvent(final String topic, final byte[] data) {
Assert.hasText(topic, "topic must not be blank");
Assert.notNull(data, "data not be null");
CompletableFuture<SendResult<String, byte[]>> future = kafkaTemplateByteArray.send(topic, data);
future.whenComplete((result, ex) -> {
if (ex == null) {
log.info("Kafka send complete");
} else {
log.error("Kafka fail send", ex);
}
});
}
}

View File

@ -12,3 +12,6 @@ spring:
hibernate:
format_sql: true
dialect: org.hibernate.dialect.PostgreSQLDialect
kafka:
bootstrapAddress: kafka:9092