Compare commits
1 Commits
Author | SHA1 | Date | |
---|---|---|---|
2f859b0495 |
@ -5,8 +5,20 @@
|
|||||||
</component>
|
</component>
|
||||||
<component name="ChangeListManager">
|
<component name="ChangeListManager">
|
||||||
<list default="true" id="67d3f0b7-2fd3-4b40-a135-0347e30e6207" name="Changes" comment="add log file">
|
<list default="true" id="67d3f0b7-2fd3-4b40-a135-0347e30e6207" name="Changes" comment="add log file">
|
||||||
|
<change afterPath="$PROJECT_DIR$/minio/src/main/java/com/spring/minio/config/KafkaConsumerConfig.java" afterDir="false" />
|
||||||
|
<change afterPath="$PROJECT_DIR$/minio/src/main/java/com/spring/minio/consumer/Consumer.java" afterDir="false" />
|
||||||
|
<change afterPath="$PROJECT_DIR$/minio/src/main/java/com/spring/minio/properties/KafkaProperties.java" afterDir="false" />
|
||||||
|
<change afterPath="$PROJECT_DIR$/report/src/main/java/com/spring/report/config/KafkaProducerConfig.java" afterDir="false" />
|
||||||
|
<change afterPath="$PROJECT_DIR$/report/src/main/java/com/spring/report/kafka/Producer.java" afterDir="false" />
|
||||||
<change beforePath="$PROJECT_DIR$/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/workspace.xml" afterDir="false" />
|
<change beforePath="$PROJECT_DIR$/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/workspace.xml" afterDir="false" />
|
||||||
|
<change beforePath="$PROJECT_DIR$/docker-compose.yml" beforeDir="false" afterPath="$PROJECT_DIR$/docker-compose.yml" afterDir="false" />
|
||||||
<change beforePath="$PROJECT_DIR$/minio/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/minio/.idea/workspace.xml" afterDir="false" />
|
<change beforePath="$PROJECT_DIR$/minio/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/minio/.idea/workspace.xml" afterDir="false" />
|
||||||
|
<change beforePath="$PROJECT_DIR$/minio/pom.xml" beforeDir="false" afterPath="$PROJECT_DIR$/minio/pom.xml" afterDir="false" />
|
||||||
|
<change beforePath="$PROJECT_DIR$/minio/src/main/resources/application.yml" beforeDir="false" afterPath="$PROJECT_DIR$/minio/src/main/resources/application.yml" afterDir="false" />
|
||||||
|
<change beforePath="$PROJECT_DIR$/report/logs/application.log" beforeDir="false" afterPath="$PROJECT_DIR$/report/logs/application.log" afterDir="false" />
|
||||||
|
<change beforePath="$PROJECT_DIR$/report/pom.xml" beforeDir="false" afterPath="$PROJECT_DIR$/report/pom.xml" afterDir="false" />
|
||||||
|
<change beforePath="$PROJECT_DIR$/report/src/main/java/com/spring/report/controller/VisitorController.java" beforeDir="false" afterPath="$PROJECT_DIR$/report/src/main/java/com/spring/report/controller/VisitorController.java" afterDir="false" />
|
||||||
|
<change beforePath="$PROJECT_DIR$/report/src/main/resources/application.yml" beforeDir="false" afterPath="$PROJECT_DIR$/report/src/main/resources/application.yml" afterDir="false" />
|
||||||
</list>
|
</list>
|
||||||
<option name="SHOW_DIALOG" value="false" />
|
<option name="SHOW_DIALOG" value="false" />
|
||||||
<option name="HIGHLIGHT_CONFLICTS" value="true" />
|
<option name="HIGHLIGHT_CONFLICTS" value="true" />
|
||||||
@ -36,14 +48,15 @@
|
|||||||
"ASKED_ADD_EXTERNAL_FILES": "true",
|
"ASKED_ADD_EXTERNAL_FILES": "true",
|
||||||
"Docker.docker-compose.yml: Compose Deployment.executor": "Run",
|
"Docker.docker-compose.yml: Compose Deployment.executor": "Run",
|
||||||
"RunOnceActivity.ShowReadmeOnStart": "true",
|
"RunOnceActivity.ShowReadmeOnStart": "true",
|
||||||
"git-widget-placeholder": "main",
|
"git-widget-placeholder": "lw5",
|
||||||
"kotlin-language-version-configured": "true",
|
"kotlin-language-version-configured": "true",
|
||||||
"last_opened_file_path": "D:/java/dcaa-mcs/report/src/main/resources",
|
"last_opened_file_path": "D:/java/DemoTaskMediasoft(2)",
|
||||||
"node.js.detected.package.eslint": "true",
|
"node.js.detected.package.eslint": "true",
|
||||||
"node.js.detected.package.tslint": "true",
|
"node.js.detected.package.tslint": "true",
|
||||||
"node.js.selected.package.eslint": "(autodetect)",
|
"node.js.selected.package.eslint": "(autodetect)",
|
||||||
"node.js.selected.package.tslint": "(autodetect)",
|
"node.js.selected.package.tslint": "(autodetect)",
|
||||||
"nodejs_package_manager_path": "npm",
|
"nodejs_package_manager_path": "npm",
|
||||||
|
"settings.editor.selected.configurable": "bigdataide_conn_settings",
|
||||||
"vue.rearranger.settings.migration": "true"
|
"vue.rearranger.settings.migration": "true"
|
||||||
}
|
}
|
||||||
}]]></component>
|
}]]></component>
|
||||||
@ -95,6 +108,7 @@
|
|||||||
<workItem from="1729199229123" duration="2034000" />
|
<workItem from="1729199229123" duration="2034000" />
|
||||||
<workItem from="1729271426421" duration="377000" />
|
<workItem from="1729271426421" duration="377000" />
|
||||||
<workItem from="1729706571244" duration="12931000" />
|
<workItem from="1729706571244" duration="12931000" />
|
||||||
|
<workItem from="1729789297391" duration="5967000" />
|
||||||
</task>
|
</task>
|
||||||
<task id="LOCAL-00001" summary="add minio">
|
<task id="LOCAL-00001" summary="add minio">
|
||||||
<option name="closed" value="true" />
|
<option name="closed" value="true" />
|
||||||
|
@ -86,10 +86,39 @@ services:
|
|||||||
backend:
|
backend:
|
||||||
aliases:
|
aliases:
|
||||||
- "myminio"
|
- "myminio"
|
||||||
|
zookeeper:
|
||||||
|
container_name: zookeeper
|
||||||
|
image: confluentinc/cp-zookeeper:latest
|
||||||
|
depends_on:
|
||||||
|
- myminio
|
||||||
|
environment:
|
||||||
|
ZOOKEEPER_CLIENT_PORT: 2181
|
||||||
|
ZOOKEEPER_TICK_TIME: 2000
|
||||||
|
ports:
|
||||||
|
- '2181:2181'
|
||||||
|
networks:
|
||||||
|
backend:
|
||||||
|
aliases:
|
||||||
|
- "zookeeper"
|
||||||
|
kafka:
|
||||||
|
container_name: kafka
|
||||||
|
image: confluentinc/cp-kafka:latest
|
||||||
|
depends_on:
|
||||||
|
- zookeeper
|
||||||
|
ports:
|
||||||
|
- '9092:9092'
|
||||||
|
environment:
|
||||||
|
KAFKA_BROKER_ID: 1
|
||||||
|
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
|
||||||
|
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
|
||||||
|
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
|
||||||
|
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
|
||||||
|
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
|
||||||
|
networks:
|
||||||
|
backend:
|
||||||
|
aliases:
|
||||||
|
- "kafka"
|
||||||
|
|
||||||
networks:
|
networks:
|
||||||
backend:
|
backend:
|
||||||
driver: bridge
|
driver: bridge
|
||||||
|
|
||||||
volumes:
|
|
||||||
minio-storage:
|
|
@ -5,8 +5,20 @@
|
|||||||
</component>
|
</component>
|
||||||
<component name="ChangeListManager">
|
<component name="ChangeListManager">
|
||||||
<list default="true" id="4c468ddb-7fbe-40b8-bf97-16a0c944e8c6" name="Changes" comment="">
|
<list default="true" id="4c468ddb-7fbe-40b8-bf97-16a0c944e8c6" name="Changes" comment="">
|
||||||
|
<change afterPath="$PROJECT_DIR$/src/main/java/com/spring/minio/config/KafkaConsumerConfig.java" afterDir="false" />
|
||||||
|
<change afterPath="$PROJECT_DIR$/src/main/java/com/spring/minio/consumer/Consumer.java" afterDir="false" />
|
||||||
|
<change afterPath="$PROJECT_DIR$/src/main/java/com/spring/minio/properties/KafkaProperties.java" afterDir="false" />
|
||||||
|
<change afterPath="$PROJECT_DIR$/../report/src/main/java/com/spring/report/config/KafkaProducerConfig.java" afterDir="false" />
|
||||||
|
<change afterPath="$PROJECT_DIR$/../report/src/main/java/com/spring/report/kafka/Producer.java" afterDir="false" />
|
||||||
<change beforePath="$PROJECT_DIR$/../.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/../.idea/workspace.xml" afterDir="false" />
|
<change beforePath="$PROJECT_DIR$/../.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/../.idea/workspace.xml" afterDir="false" />
|
||||||
|
<change beforePath="$PROJECT_DIR$/../docker-compose.yml" beforeDir="false" afterPath="$PROJECT_DIR$/../docker-compose.yml" afterDir="false" />
|
||||||
<change beforePath="$PROJECT_DIR$/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/workspace.xml" afterDir="false" />
|
<change beforePath="$PROJECT_DIR$/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/workspace.xml" afterDir="false" />
|
||||||
|
<change beforePath="$PROJECT_DIR$/pom.xml" beforeDir="false" afterPath="$PROJECT_DIR$/pom.xml" afterDir="false" />
|
||||||
|
<change beforePath="$PROJECT_DIR$/src/main/resources/application.yml" beforeDir="false" afterPath="$PROJECT_DIR$/src/main/resources/application.yml" afterDir="false" />
|
||||||
|
<change beforePath="$PROJECT_DIR$/../report/logs/application.log" beforeDir="false" afterPath="$PROJECT_DIR$/../report/logs/application.log" afterDir="false" />
|
||||||
|
<change beforePath="$PROJECT_DIR$/../report/pom.xml" beforeDir="false" afterPath="$PROJECT_DIR$/../report/pom.xml" afterDir="false" />
|
||||||
|
<change beforePath="$PROJECT_DIR$/../report/src/main/java/com/spring/report/controller/VisitorController.java" beforeDir="false" afterPath="$PROJECT_DIR$/../report/src/main/java/com/spring/report/controller/VisitorController.java" afterDir="false" />
|
||||||
|
<change beforePath="$PROJECT_DIR$/../report/src/main/resources/application.yml" beforeDir="false" afterPath="$PROJECT_DIR$/../report/src/main/resources/application.yml" afterDir="false" />
|
||||||
</list>
|
</list>
|
||||||
<option name="SHOW_DIALOG" value="false" />
|
<option name="SHOW_DIALOG" value="false" />
|
||||||
<option name="HIGHLIGHT_CONFLICTS" value="true" />
|
<option name="HIGHLIGHT_CONFLICTS" value="true" />
|
||||||
@ -16,18 +28,18 @@
|
|||||||
<component name="FileTemplateManagerImpl">
|
<component name="FileTemplateManagerImpl">
|
||||||
<option name="RECENT_TEMPLATES">
|
<option name="RECENT_TEMPLATES">
|
||||||
<list>
|
<list>
|
||||||
<option value="Class" />
|
|
||||||
<option value="Dockerfile" />
|
<option value="Dockerfile" />
|
||||||
|
<option value="Class" />
|
||||||
</list>
|
</list>
|
||||||
</option>
|
</option>
|
||||||
</component>
|
</component>
|
||||||
<component name="Git.Settings">
|
<component name="Git.Settings">
|
||||||
<option name="RECENT_GIT_ROOT_PATH" value="$PROJECT_DIR$/.." />
|
<option name="RECENT_GIT_ROOT_PATH" value="$PROJECT_DIR$/.." />
|
||||||
</component>
|
</component>
|
||||||
<component name="KubernetesApiPersistence"><![CDATA[{}]]></component>
|
<component name="KubernetesApiPersistence">{}</component>
|
||||||
<component name="KubernetesApiProvider"><![CDATA[{
|
<component name="KubernetesApiProvider">{
|
||||||
"isMigrated": true
|
"isMigrated": true
|
||||||
}]]></component>
|
}</component>
|
||||||
<component name="MavenImportPreferences">
|
<component name="MavenImportPreferences">
|
||||||
<option name="generalSettings">
|
<option name="generalSettings">
|
||||||
<MavenGeneralSettings>
|
<MavenGeneralSettings>
|
||||||
@ -61,7 +73,7 @@
|
|||||||
"RunOnceActivity.ShowReadmeOnStart": "true",
|
"RunOnceActivity.ShowReadmeOnStart": "true",
|
||||||
"SHARE_PROJECT_CONFIGURATION_FILES": "true",
|
"SHARE_PROJECT_CONFIGURATION_FILES": "true",
|
||||||
"Spring Boot.MinioApplication.executor": "Run",
|
"Spring Boot.MinioApplication.executor": "Run",
|
||||||
"git-widget-placeholder": "main",
|
"git-widget-placeholder": "lw5",
|
||||||
"kotlin-language-version-configured": "true",
|
"kotlin-language-version-configured": "true",
|
||||||
"last_opened_file_path": "D:/учеба/диплом/protege",
|
"last_opened_file_path": "D:/учеба/диплом/protege",
|
||||||
"node.js.detected.package.eslint": "true",
|
"node.js.detected.package.eslint": "true",
|
||||||
@ -94,7 +106,9 @@
|
|||||||
</method>
|
</method>
|
||||||
</configuration>
|
</configuration>
|
||||||
<configuration default="true" type="docker-deploy" factoryName="docker-compose.yml" temporary="true">
|
<configuration default="true" type="docker-deploy" factoryName="docker-compose.yml" temporary="true">
|
||||||
<deployment type="docker-compose.yml" />
|
<deployment type="docker-compose.yml">
|
||||||
|
<settings />
|
||||||
|
</deployment>
|
||||||
<method v="2" />
|
<method v="2" />
|
||||||
</configuration>
|
</configuration>
|
||||||
<configuration name="docker-compose.yml.minio: Compose Deployment" type="docker-deploy" factoryName="docker-compose.yml" temporary="true" server-name="Docker">
|
<configuration name="docker-compose.yml.minio: Compose Deployment" type="docker-deploy" factoryName="docker-compose.yml" temporary="true" server-name="Docker">
|
||||||
@ -141,6 +155,7 @@
|
|||||||
<workItem from="1729271678576" duration="138000" />
|
<workItem from="1729271678576" duration="138000" />
|
||||||
<workItem from="1729432602672" duration="627000" />
|
<workItem from="1729432602672" duration="627000" />
|
||||||
<workItem from="1729705852820" duration="15757000" />
|
<workItem from="1729705852820" duration="15757000" />
|
||||||
|
<workItem from="1729795093249" duration="1693000" />
|
||||||
</task>
|
</task>
|
||||||
<servers />
|
<servers />
|
||||||
</component>
|
</component>
|
||||||
|
@ -60,6 +60,10 @@
|
|||||||
<groupId>org.springframework.boot</groupId>
|
<groupId>org.springframework.boot</groupId>
|
||||||
<artifactId>spring-boot-starter-logging</artifactId>
|
<artifactId>spring-boot-starter-logging</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.springframework.kafka</groupId>
|
||||||
|
<artifactId>spring-kafka</artifactId>
|
||||||
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.springframework.boot</groupId>
|
<groupId>org.springframework.boot</groupId>
|
||||||
<artifactId>spring-boot-starter-test</artifactId>
|
<artifactId>spring-boot-starter-test</artifactId>
|
||||||
|
@ -0,0 +1,43 @@
|
|||||||
|
package com.spring.minio.config;
|
||||||
|
|
||||||
|
import com.spring.minio.properties.KafkaProperties;
|
||||||
|
import lombok.RequiredArgsConstructor;
|
||||||
|
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||||
|
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
|
||||||
|
import org.apache.kafka.common.serialization.StringDeserializer;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
import org.springframework.kafka.annotation.EnableKafka;
|
||||||
|
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
|
||||||
|
import org.springframework.kafka.core.ConsumerFactory;
|
||||||
|
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
@EnableKafka
|
||||||
|
@Configuration
|
||||||
|
@RequiredArgsConstructor
|
||||||
|
public class KafkaConsumerConfig {
|
||||||
|
private final KafkaProperties kafkaProperties;
|
||||||
|
|
||||||
|
private ConsumerFactory<String, byte[]> consumerFactory() {
|
||||||
|
Map<String, Object> props = new HashMap<>();
|
||||||
|
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapAddress());
|
||||||
|
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
|
||||||
|
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getGroupId());
|
||||||
|
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
|
||||||
|
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
|
||||||
|
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "20971520");
|
||||||
|
props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, "20971520");
|
||||||
|
return new DefaultKafkaConsumerFactory<>(props);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public ConcurrentKafkaListenerContainerFactory<String, byte[]> kafkaListenerContainerFactory() {
|
||||||
|
ConcurrentKafkaListenerContainerFactory<String, byte[]> factory =
|
||||||
|
new ConcurrentKafkaListenerContainerFactory<>();
|
||||||
|
factory.setConsumerFactory(consumerFactory());
|
||||||
|
return factory;
|
||||||
|
}
|
||||||
|
}
|
30
minio/src/main/java/com/spring/minio/consumer/Consumer.java
Normal file
30
minio/src/main/java/com/spring/minio/consumer/Consumer.java
Normal file
@ -0,0 +1,30 @@
|
|||||||
|
package com.spring.minio.consumer;
|
||||||
|
|
||||||
|
import com.spring.minio.config.MinioAdapter;
|
||||||
|
import lombok.RequiredArgsConstructor;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.kafka.annotation.KafkaListener;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
@Service
|
||||||
|
@RequiredArgsConstructor
|
||||||
|
public class Consumer {
|
||||||
|
private final MinioAdapter minioAdapter;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Слушатель Kafka, который принимает сообщения из заданного топика и обрабатывает их.
|
||||||
|
*
|
||||||
|
* @param message Сообщение, полученное из Kafka.
|
||||||
|
*/
|
||||||
|
@KafkaListener(topics = "test_topic", containerFactory = "kafkaListenerContainerFactory")
|
||||||
|
public void listenGroupTopic(byte[] message) {
|
||||||
|
log.info("Received message");
|
||||||
|
try {
|
||||||
|
minioAdapter.uploadFile("user1", "visitors_report.csv", message);
|
||||||
|
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("Exception: ", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,22 @@
|
|||||||
|
package com.spring.minio.properties;
|
||||||
|
|
||||||
|
import lombok.Getter;
|
||||||
|
import lombok.Setter;
|
||||||
|
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
|
||||||
|
@Configuration
|
||||||
|
@ConfigurationProperties(prefix = "app.kafka")
|
||||||
|
@Getter
|
||||||
|
@Setter
|
||||||
|
public class KafkaProperties {
|
||||||
|
/**
|
||||||
|
* Адреса серверов Kafka, к которым будет подключаться потребитель.
|
||||||
|
*/
|
||||||
|
private String bootstrapAddress;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Идентификатор группы потребителей.
|
||||||
|
*/
|
||||||
|
private String groupId;
|
||||||
|
}
|
@ -4,4 +4,9 @@ minio:
|
|||||||
access:
|
access:
|
||||||
name: role_for_spring
|
name: role_for_spring
|
||||||
secret: 123456789
|
secret: 123456789
|
||||||
url: "http://minio:9000"
|
url: "http://minio:9000"
|
||||||
|
|
||||||
|
app:
|
||||||
|
kafka:
|
||||||
|
bootstrapAddress: kafka:9092
|
||||||
|
groupId: group1
|
File diff suppressed because it is too large
Load Diff
@ -62,6 +62,10 @@
|
|||||||
<groupId>org.springframework.boot</groupId>
|
<groupId>org.springframework.boot</groupId>
|
||||||
<artifactId>spring-boot-starter-logging</artifactId>
|
<artifactId>spring-boot-starter-logging</artifactId>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
<dependency>
|
||||||
|
<groupId>org.springframework.kafka</groupId>
|
||||||
|
<artifactId>spring-kafka</artifactId>
|
||||||
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.springframework.boot</groupId>
|
<groupId>org.springframework.boot</groupId>
|
||||||
<artifactId>spring-boot-starter-test</artifactId>
|
<artifactId>spring-boot-starter-test</artifactId>
|
||||||
|
@ -0,0 +1,51 @@
|
|||||||
|
package com.spring.report.config;
|
||||||
|
|
||||||
|
import org.apache.kafka.clients.admin.AdminClientConfig;
|
||||||
|
import org.apache.kafka.clients.admin.NewTopic;
|
||||||
|
import org.apache.kafka.clients.producer.ProducerConfig;
|
||||||
|
import org.apache.kafka.common.serialization.ByteArraySerializer;
|
||||||
|
import org.apache.kafka.common.serialization.StringSerializer;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
|
||||||
|
import org.springframework.kafka.core.KafkaAdmin;
|
||||||
|
import org.springframework.kafka.core.KafkaTemplate;
|
||||||
|
import org.springframework.kafka.core.ProducerFactory;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import static com.spring.report.kafka.Producer.TEST_TOPIC;
|
||||||
|
|
||||||
|
@Configuration
|
||||||
|
public class KafkaProducerConfig {
|
||||||
|
@Value("${kafka.bootstrapAddress}")
|
||||||
|
private String SERVER;
|
||||||
|
|
||||||
|
private ProducerFactory<String, byte[]> producerFactory() {
|
||||||
|
Map<String, Object> configProps = new HashMap<>();
|
||||||
|
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVER);
|
||||||
|
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
|
||||||
|
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
|
||||||
|
configProps.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "20971520");
|
||||||
|
return new DefaultKafkaProducerFactory<>(configProps);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public KafkaTemplate<String, byte[]> kafkaTemplate() {
|
||||||
|
return new KafkaTemplate<>(producerFactory());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public KafkaAdmin kafkaAdmin() {
|
||||||
|
Map<String, Object> configs = new HashMap<>();
|
||||||
|
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, SERVER);
|
||||||
|
return new KafkaAdmin(configs);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public NewTopic testTopic() {
|
||||||
|
return new NewTopic(TEST_TOPIC, 1, (short) 1);
|
||||||
|
}
|
||||||
|
}
|
@ -2,6 +2,7 @@ package com.spring.report.controller;
|
|||||||
|
|
||||||
import com.opencsv.CSVWriter;
|
import com.opencsv.CSVWriter;
|
||||||
import com.spring.report.entity.Visitor;
|
import com.spring.report.entity.Visitor;
|
||||||
|
import com.spring.report.kafka.Producer;
|
||||||
import com.spring.report.repository.VisitorRepository;
|
import com.spring.report.repository.VisitorRepository;
|
||||||
import lombok.RequiredArgsConstructor;
|
import lombok.RequiredArgsConstructor;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
@ -22,6 +23,7 @@ import org.springframework.web.client.RestTemplate;
|
|||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FileWriter;
|
import java.io.FileWriter;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.nio.file.Files;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
@RestController
|
@RestController
|
||||||
@ -31,7 +33,7 @@ import java.util.List;
|
|||||||
public class VisitorController {
|
public class VisitorController {
|
||||||
private final VisitorRepository visitorRepository;
|
private final VisitorRepository visitorRepository;
|
||||||
public static final String CORRELATION_ID = "correlation-id";
|
public static final String CORRELATION_ID = "correlation-id";
|
||||||
private final RestTemplate restTemplate;
|
private final Producer producer;
|
||||||
|
|
||||||
@GetMapping
|
@GetMapping
|
||||||
public ResponseEntity<List<Visitor>> report(@RequestHeader(CORRELATION_ID) String correlationId) {
|
public ResponseEntity<List<Visitor>> report(@RequestHeader(CORRELATION_ID) String correlationId) {
|
||||||
@ -84,23 +86,10 @@ public class VisitorController {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void uploadFileToMinio(File file, String corId) {
|
private void uploadFileToMinio(File file, String corId) {
|
||||||
String url = "http://myminio:8081/upload";
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
HttpHeaders headers = setHeaders(corId);
|
producer.sendEvent(Producer.TEST_TOPIC, Files.readAllBytes(file.toPath()));
|
||||||
headers.setContentType(MediaType.MULTIPART_FORM_DATA);
|
|
||||||
|
|
||||||
MultiValueMap<String, Object> body = new LinkedMultiValueMap<>();
|
|
||||||
body.add("file", new FileSystemResource(file));
|
|
||||||
|
|
||||||
HttpEntity<MultiValueMap<String, Object>> requestEntity = new HttpEntity<>(body, headers);
|
|
||||||
ResponseEntity<String> response = restTemplate.postForEntity(url, requestEntity, String.class);
|
|
||||||
|
|
||||||
if (response.getStatusCode() != HttpStatus.OK) {
|
|
||||||
log.error("Failed to upload file: {}", response.getStatusCode());
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("Error uploading file to MinIO", e);
|
log.error("Error uploading file to Kafka", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
33
report/src/main/java/com/spring/report/kafka/Producer.java
Normal file
33
report/src/main/java/com/spring/report/kafka/Producer.java
Normal file
@ -0,0 +1,33 @@
|
|||||||
|
package com.spring.report.kafka;
|
||||||
|
|
||||||
|
import lombok.RequiredArgsConstructor;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.kafka.core.KafkaTemplate;
|
||||||
|
import org.springframework.kafka.support.SendResult;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
import org.springframework.util.Assert;
|
||||||
|
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
|
||||||
|
@Slf4j
|
||||||
|
@Service
|
||||||
|
@RequiredArgsConstructor
|
||||||
|
public class Producer {
|
||||||
|
private final KafkaTemplate<String, byte[]> kafkaTemplateByteArray;
|
||||||
|
public static final String TEST_TOPIC = "test_topic";
|
||||||
|
|
||||||
|
public void sendEvent(final String topic, final byte[] data) {
|
||||||
|
Assert.hasText(topic, "topic must not be blank");
|
||||||
|
Assert.notNull(data, "data not be null");
|
||||||
|
|
||||||
|
CompletableFuture<SendResult<String, byte[]>> future = kafkaTemplateByteArray.send(topic, data);
|
||||||
|
|
||||||
|
future.whenComplete((result, ex) -> {
|
||||||
|
if (ex == null) {
|
||||||
|
log.info("Kafka send complete");
|
||||||
|
} else {
|
||||||
|
log.error("Kafka fail send", ex);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
@ -12,3 +12,6 @@ spring:
|
|||||||
hibernate:
|
hibernate:
|
||||||
format_sql: true
|
format_sql: true
|
||||||
dialect: org.hibernate.dialect.PostgreSQLDialect
|
dialect: org.hibernate.dialect.PostgreSQLDialect
|
||||||
|
|
||||||
|
kafka:
|
||||||
|
bootstrapAddress: kafka:9092
|
Loading…
Reference in New Issue
Block a user