提交 c5f1b99a 编写于 作者: 金永显's avatar 金永显

提交项目

上级 cda10041
package com.inodes.inodeskafka;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class ProjectApplication {
public static void main(String[] args) {
SpringApplication.run(ProjectApplication.class, args);
}
}
package com.inodes.inodeskafka.config;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import java.util.HashMap;
import java.util.Map;
/**
* 链家
* @author jinyongxian
* @date 2022/4/25 9:53
*/
@Configuration
public class KafkaLianjiaConfig {
@Value("${spring.kafka.lianjia.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.lianjia.consumer.group-id}")
private String groupId;
@Value("${spring.kafka.lianjia.consumer.enable-auto-commit}")
private boolean enableAutoCommit;
@Bean
public KafkaTemplate<String, String> kafkaLianjiaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaLianjiaContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
private ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
private Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.ACKS_CONFIG, "1");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
private Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
}
\ No newline at end of file
package com.inodes.inodeskafka.config;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import java.util.HashMap;
import java.util.Map;
/**
* 测试环境
* @author jinyongxian
* @date 2022/4/25 9:45
*/
@EnableKafka
@Configuration
public class KafkaTestConfig {
@Value("${spring.kafka.test.bootstrap-servers}")
private String bootstrapServers;
@Value("${spring.kafka.test.consumer.group-id}")
private String groupId;
@Value("${spring.kafka.test.consumer.enable-auto-commit}")
private boolean enableAutoCommit;
@Bean
public KafkaTemplate<String, String> kafkaTestTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> kafkaTestContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
private ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
private Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.ACKS_CONFIG, "1"); // 不能写成 1
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
private Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
}
\ No newline at end of file
package com.inodes.inodeskafka.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.builders.PathSelectors;
import springfox.documentation.builders.RequestHandlerSelectors;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger.web.OperationsSorter;
import springfox.documentation.swagger.web.TagsSorter;
import springfox.documentation.swagger.web.UiConfiguration;
import springfox.documentation.swagger.web.UiConfigurationBuilder;
import springfox.documentation.swagger2.annotations.EnableSwagger2WebMvc;
/**
* @author jinyongxian
* @date 2022/4/25 10:58
*/
@Configuration
@EnableSwagger2WebMvc
public class Swagger2Config {
@Bean
public Docket createRestApi() {
return new Docket(DocumentationType.SWAGGER_2).apiInfo(apiInfo()).select()
// 为当前包路径
.apis(RequestHandlerSelectors.basePackage("com.inodes"))
.paths(PathSelectors.any())
.build();
}
// 构建 api文档的详细信息函数,注意这里的注解引用的是哪个
private ApiInfo apiInfo() {
return new ApiInfoBuilder()
// 页面标题
.title("kafka的Topic中转")
.description("kafka的Topic中转")
// 版本号
.version("1.0")
// 描述
.description("API在线文档").build();
}
@Bean
public UiConfiguration uiConfiguration(){
UiConfigurationBuilder uiConfigurationBuilder = UiConfigurationBuilder.builder();
uiConfigurationBuilder.operationsSorter(OperationsSorter.METHOD);
uiConfigurationBuilder.tagsSorter(TagsSorter.ALPHA);
uiConfigurationBuilder.filter(true);
uiConfigurationBuilder.displayRequestDuration(true);
return uiConfigurationBuilder.build();
}
}
package com.inodes.inodeskafka.controller;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
/**
* kafa转发测试
* @author jinyongxian
* @date 2019/12/27 16:09
* @description
*/
@Api(tags = "kafa转发测试")
@RestController
@RequestMapping("/test")
@Slf4j
public class TestController {
@Autowired
private KafkaTemplate kafkaTestTemplate; //测试环境发送测试
@Autowired
private KafkaTemplate kafkaLianjiaTemplate;//链接结果返回测试
@Value("${kafkaSinan.topicProducer}")
private String topicProducer = ""; //测试环境发送测试
@Value("${kafkaSinan.topicResult}")
private String topicResult = ""; //链接结果返回测试
@ApiOperation(value = "测试环境,发送Topic测试,是否转发到链家相应Topic", notes = "测试环境,发送Topic测试,是否转发到链家相应Topic")
@RequestMapping(value = "/testSend",method = RequestMethod.GET)
@ResponseBody
public String testSend() {
kafkaTestTemplate.send(topicProducer, "kafka testSend");
return "success";
}
@ApiOperation(value = "链接发送返回结果Topic测试,是否转发到测试相应Topic", notes = "链接发送返回结果Topic测试,是否转发到测试相应Topic")
@RequestMapping(value = "/lianJianResultTest",method = RequestMethod.GET)
@ResponseBody
public String lianJianResultTest() {
kafkaLianjiaTemplate.send(topicResult, "kafka lianJianResultTest");
return "success";
}
}
package com.inodes.inodeskafka.service;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
/**
* @author jinyongxian
* @date 2022/4/25 10:24
*/
@Slf4j
@Service
public class SinanKzProcurementTaskResultTestKafkaService {
@Value("${kafkaSinan.topicResult}")
private String topicResult = "";
@Autowired
private KafkaTemplate kafkaTestTemplate;//测试环境kafka
@KafkaListener(topics ="${kafkaSinan.topicResult}",containerFactory = "kafkaLianjiaContainerFactory")
public void topicResult(ConsumerRecord<String, Object> consumerRecord){
log.info("返回审核结果消息:{}", consumerRecord.value());
kafkaTestTemplate.send(topicResult,consumerRecord.toString());
}
}
package com.inodes.inodeskafka.service;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
/**
* @author jinyongxian
* @date 2022/4/25 10:08
*/
@Slf4j
@Service
public class SinanKzProcurementTaskTestKafkaService {
@Value("${kafkaSinan.topicProducer}")
private String topicProducer = "";
/*
* 模以对接,正式环境需要注释代码
* */
@Autowired
private KafkaTemplate kafkaLianjiaTemplate;//链家Kafka
/*
* 监听测试环境的名称为sinan-kzProcurement-task-test的Topic,转发给链家的名称为sinan-kzProcurement-task-test的Topic
* */
@KafkaListener(topics ="${kafkaSinan.topicProducer}",containerFactory = "kafkaTestContainerFactory")
public void topicProducerSim(ConsumerRecord<String, Object> consumerRecord){
log.info("任务生成消息接收:{}", consumerRecord.value());
kafkaLianjiaTemplate.send(topicProducer,consumerRecord.toString());
}
}
server:
port: 9555
spring:
kafka:
test:
bootstrap-servers: 127.0.0.1:9092 #测试环境
producer:
retries: 1
acks: 1
batch-size: 16384
buffer-memory: 33554432
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: kangzhuo
enable-auto-commit: true
auto-commit-interval: 100
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
lianjia:
bootstrap-servers: 127.0.0.1:9093 #链家kafka
producer:
retries: 1
acks: 1
batch-size: 16384
buffer-memory: 33554432
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: kangzhuo
enable-auto-commit: true
auto-commit-interval: 100
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
kafkaSinan:
topicResult: sinan-kzProcurement-task-result-test
topicProducer: sinan-kzProcurement-task-test
\ No newline at end of file
server:
tomcat:
accesslog:
buffered: true
directory: logs
enabled: false
file-date-format: .yyyy-MM-dd
pattern: common
prefix: access_log
rename-on-rotate: false
request-attributes-enabled: false
rotate: true
suffix: .log
spring:
profiles:
active: demo
mvc:
static-path-pattern=/template/**
resources:
static-locations: classpath:/META-INF/resources/,classpath:/resources/,classpath:/static/,classpath:/public/
http:
encoding:
charset: utf-8
enabled: true
multipart:
max-request-size: 500MB
max-file-size: 500MB
servlet:
multipart:
max-request-size: 500MB
max-file-size: 500MB
jackson:
#date-format: yyyy-MM-dd HH:mm:ss
#time-zone: GMT+8
serialization:
write-dates-as-timestamps: true
\ No newline at end of file
Markdown 格式
0% or
您添加了 0 到此讨论。请谨慎行事。
先完成此消息的编辑!
想要评论请 注册