kafka没有重试机制不支持消息重试,也没有死信队列,因此使用kafka做消息队列时,需要自己实 现消息重试的功能。
实现
创建新的kafka主题作为重试队列:
- 创建一个topic作为重试topic,用于接收等待重试的消息。
- 普通topic消费者设置待重试消息的下一个重试topic。
- 从重试topic获取待重试消息储存到redis的zset中,并以下一次消费时间排序
- 定时任务从redis获取到达消费事件的消息,并把消息发送到对应的topic
- 同一个消息重试次数过多则不再重试
代码实现
依赖
org.springframework.boot spring-boot-starter-data-redisorg.springframework.boot spring-boot-starter-weborg.springframework.kafka spring-kafkacom.alibaba fastjson1.2.73 org.springframework.boot spring-boot-starter-testtest org.junit.vintage junit-vintage-engineio.projectreactor reactor-testtest org.springframework.kafka spring-kafka-testtest org.springframework.boot spring-boot-maven-plugin
添加application.properties
# bootstrap.servers spring.kafka.bootstrap-servers=node1:9092 # key序列化器 spring.kafka.producer.keyserializer=org.apache.kafka.common.serialization.StringSerializer # value序列化器 spring.kafka.producer.valueserializer=org.apache.kafka.common.serialization.StringSerializer # 消费组id:group.id spring.kafka.consumer.group-id=retryGroup # key反序列化器 spring.kafka.consumer.keydeserializer=org.apache.kafka.common.serialization.StringDeserializer # value反序列化器 spring.kafka.consumer.valuedeserializer=org.apache.kafka.common.serialization.StringDeserializer # redis数据库编号 spring.redis.database=0 # redis主机地址 spring.redis.host=node1 # redis端口 spring.redis.port=6379 # Redis服务器连接密码(默认为空) spring.redis.password= # 连接池最大连接数(使用负值表示没有限制) spring.redis.jedis.pool.max-active=20 # 连接池最大阻塞等待时间(使用负值表示没有限制) spring.redis.jedis.pool.max-wait=-1 # 连接池中的最大空闲连接 spring.redis.jedis.pool.max-idle=10 # 连接池中的最小空闲连接 spring.redis.jedis.pool.min-idle=0 # 连接超时时间(毫秒) spring.redis.timeout=1000 # Kafka主题名称 spring.kafka.topics.test=tp_demo_retry_01 # 重试队列 spring.kafka.topics.retry=tp_demo_retry_02
AppConfig.java
import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; @Configuration public class AppConfig { @Bean public RedisTemplateredisTemplate(RedisConnectionFactory factory) { RedisTemplate template = new RedisTemplate<>(); // 配置连接工厂 template.setConnectionFactory(factory); return template; } }
RetryController .java
import com.lagou.kafka.demo.service.KafkaService; import org.apache.kafka.clients.producer.ProducerRecord; import org.springframework.beans.factory.annotation.Value; import java.util.concurrent.ExecutionException; @RestController public class RetryController { @Autowired private KafkaService kafkaService; @Value("${spring.kafka.topics.test}") private String topic; @RequestMapping("/send/{message}") public String sendMessage(@PathVariable String message) throws ExecutionException, InterruptedException { ProducerRecordrecord = new ProducerRecord<>( topic, message ); // 向业务主题发送消息 String result = kafkaService.sendMessage(record); return result; } }
KafkaService.java
import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.SendResult; import java.util.concurrent.ExecutionException; @Service public class KafkaService { private Logger log = LoggerFactory.getLogger(KafkaService.class); @Autowired private KafkaTemplatekafkaTemplate; public String sendMessage(ProducerRecord record) throws ExecutionException, InterruptedException { SendResult result = this.kafkaTemplate.send(record).get(); RecordMetadata metadata = result.getRecordMetadata(); String returnResult = metadata.topic() + "\t" + metadata.partition() + "\t" + metadata.offset(); log.info("发送消息成功:" + returnResult); return returnResult; } }
ConsumerListener.java
import com.lagou.kafka.demo.service.RetryService; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.kafka.annotation.KafkaListener; @Component public class ConsumerListener { private static final Logger log = LoggerFactory.getLogger(ConsumerListener.class); @Autowired private RetryService kafkaRetryService; private static int index = 0; @KafkaListener(topics = "${spring.kafka.topics.test}", groupId = "${spring.kafka.consumer.group-id}") public void consume(ConsumerRecordrecord) { try { // 业务处理 log.info("消费的消息:" + record); index++; if (index % 2 == 0) { throw new Exception("该重发了"); } } catch (Exception e) { log.error(e.getMessage()); // 消息重试,实际上先将消息放到redis kafkaRetryService.consumerLater(record); } } }
RetryService .java
import com.alibaba.fastjson.JSON; import com.lzh.kafka.demo.entity.RetryRecord; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.header.Header; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.core.KafkaTemplate; import java.nio.ByteBuffer; import java.util.Calendar; @Service public class RetryService { private static final Logger log = LoggerFactory.getLogger(RetryService.class); /** * 消息消费失败后下一次消费的延迟时间(秒) * 第一次重试延迟10秒;第 二次延迟30秒,第三次延迟1分钟... */ private static final int[] RETRY_INTERVAL_SECONDS = {10, 30, 1*60, 2*60, 5*60, 10*60, 30*60, 1*60*60, 2*60*60}; /** * 重试topic */ @Value("${spring.kafka.topics.retry}") private String retryTopic; @Autowired private KafkaTemplatekafkaTemplate; public void consumerLater(ConsumerRecord record){ // 获取消息的已重试次数 int retryTimes = getRetryTimes(record); Date nextConsumerTime = getNextConsumerTime(retryTimes); // 如果达到重试次数,则不再重试 if(nextConsumerTime == null) { return; } // 组织消息 RetryRecord retryRecord = new RetryRecord(); retryRecord.setNextTime(nextConsumerTime.getTime()); retryRecord.setTopic(record.topic()); retryRecord.setRetryTimes(retryTimes); retryRecord.setKey(record.key()); retryRecord.setValue(record.value()); // 转换为字符串 String value = JSON.toJSONString(retryRecord); // 发送到重试队列 kafkaTemplate.send(retryTopic, null, value); } /** * 获取消息的已重试次数 */ private int getRetryTimes(ConsumerRecord record){ int retryTimes = -1; for(Header header : record.headers()){ if(RetryRecord.KEY_RETRY_TIMES.equals(header.key())){ ByteBuffer buffer = ByteBuffer.wrap(header.value()); retryTimes = buffer.getInt(); } } retryTimes++; return retryTimes; } /** * 获取待重试消息的下一次消费时间 */ private Date getNextConsumerTime(int retryTimes){ // 重试次数超过上限,不再重试 if(RETRY_INTERVAL_SECONDS.length < retryTimes) { return null; } Calendar calendar = Calendar.getInstance(); calendar.add(Calendar.SECOND, RETRY_INTERVAL_SECONDS[retryTimes]); return calendar.getTime(); } }
RetryListener.java
import com.alibaba.fastjson.JSON; import com.lzh.kafka.demo.entity.RetryRecord; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.ZSetOperations; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; import java.util.UUID; @Component @EnableScheduling public class RetryListener { private Logger log = LoggerFactory.getLogger(RetryListener.class); private static final String RETRY_KEY_ZSET = "_retry_key"; private static final String RETRY_VALUE_MAP = "_retry_value"; @Autowired private RedisTemplateredisTemplate; @Autowired private KafkaTemplate kafkaTemplate; @Value("${spring.kafka.topics.test}") private String bizTopic; @KafkaListener(topics = "${spring.kafka.topics.retry}") // public void consume(List > list) { // for(ConsumerRecord record : list){ public void consume(ConsumerRecord record) { System.out.println("需要重试的消息:" + record); RetryRecord retryRecord = JSON.parseObject(record.value(), RetryRecord.class); /** * 防止待重试消息太多撑爆redis,可以将待重试消息按下一次重试时间分开存储放到不同介质 * 例如下一次重试时间在半小时以后的消息储存到mysql,并定时从mysql读取即将重试的消息储储存到redis */ // 通过redis的zset进行时间排序 String key = UUID.randomUUID().toString(); redisTemplate.opsForHash().put(RETRY_VALUE_MAP, key, record.value()); redisTemplate.opsForZSet().add(RETRY_KEY_ZSET, key, retryRecord.getNextTime()); } // } /** * 定时任务从redis读取到达重试时间的消息,发送到对应的topic */ // @Scheduled(cron="2 * * * * *") @Scheduled(fixedDelay = 2000) public void retryFromRedis() { log.warn("retryFromRedis----begin"); long currentTime = System.currentTimeMillis(); // 根据时间倒序获取 Set > typedTuples = redisTemplate.opsForZSet().reverseRangeByScoreWithScores(RETRY_KEY_ZSET, 0, currentTime); // 移除取出的消息 redisTemplate.opsForZSet().removeRangeByScore(RETRY_KEY_ZSET, 0, currentTime); for(ZSetOperations.TypedTuple
RetryRecord.java
package com.lzh.kafka.demo.entity; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.internals.RecordHeader; import java.nio.ByteBuffer; public class RetryRecord { public static final String KEY_RETRY_TIMES = "retryTimes"; private String key; private String value; private Integer retryTimes; private String topic; private Long nextTime; public RetryRecord() { } public String getKey() { return key; } public void setKey(String key) { this.key = key; } public String getValue() { return value; } public void setValue(String value) { this.value = value; } public Integer getRetryTimes() { return retryTimes; } public void setRetryTimes(Integer retryTimes) { this.retryTimes = retryTimes; } public String getTopic() { return topic; } public void setTopic(String topic) { this.topic = topic; } public Long getNextTime() { return nextTime; } public void setNextTime(Long nextTime) { this.nextTime = nextTime; } public ProducerRecord parse() { Integer partition = null; Long timestamp = System.currentTimeMillis(); Listheaders = new ArrayList<>(); ByteBuffer retryTimesBuffer = ByteBuffer.allocate(4); retryTimesBuffer.putInt(retryTimes); retryTimesBuffer.flip(); headers.add(new RecordHeader(RetryRecord.KEY_RETRY_TIMES, retryTimesBuffer)); ProducerRecord sendRecord = new ProducerRecord( topic, partition, timestamp, key, value, headers); return sendRecord; } }
猜你喜欢
- 9小时前【Spark编程基础】第7章 Structured Streaming
- 9小时前[springboot配置Kafka] springboot配置多个kafka,包含账号密码
- 9小时前iOS NSKeyedUnarchiver归档和读取
- 9小时前kafka伪集群部署,使用KRAFT模式
- 9小时前Linux环境如何彻底卸载感干净RabbitMQ
- 9小时前Flink实现同时消费多个kafka topic,并输出到多个topic
- 9小时前【大数据实验五】 MapReduce初级编程实践
- 8小时前免费网站制作(免费网站制作成品)
- 6小时前全聚德烤鸭(全聚德烤鸭店王府井店)
- 4小时前复韵母有哪些9个(复韵母有哪些9个加声调)
网友评论
- 搜索
- 最新文章
- 热门文章