Java使用Redis实现延时队列

Java使⽤Redis实现延时队列
A:需求说明:
1. 如果系统中需要⽤到定时执⾏计划的,⼜不想⽤到中间件,如果轮询数据库的话,会导致⼤量资源消耗,这样我们就可以使⽤Redis来
实现类似功(需要使⽤rabbitMQ的请看这⾥:)
2. 业务类型,如订单⼀些评论,如果48h⽤户未对商家评论,系统会⾃动产⽣⼀条默认评论,还有排队到时提醒等
B:实现思路:
1. 将整个Redis当做消息池,以kv形式存储消息,key为id,value为具体的消息body
2. 使⽤ZSET做优先队列,按照score维持优先级(⽤当前时间+需要延时的时间作为score)
3. 轮询ZSET,拿出score⽐当前时间戳⼤的数据(已过期的)
4. 根据id拿到消息池的具体消息进⾏消费
5. 消费成功,删除改队列和消息
6. 消费失败,让该消息重新回到队列
C:代码实现
. Message消息封装类
@Data
public class Message {
/**
* 消息id
*/
private String id;
/**
* 消息延迟/毫秒
*/
private long delay;
/**
* 消息存活时间
*/
private int ttl;
/**
* 消息体,对应业务内容
*/
private String body;
/
**
* 创建时间,如果只有优先级没有延迟,可以设置创建时间为0
* ⽤来消除时间的影响
*/
private long createTime;
}
2.基于redis的消息队列
@Component
public class RedisMQ {
/**
* 消息池前缀,以此前缀加上传递的消息id作为key,以消息{@link Message}
ap劫
* 的消息体body作为值存储
*/
public static final String MSG_POOL = "Message:Pool:";
/**
* zset队列名称 queue
*/
public static final String QUEUE_NAME = "Message:Queue:";
实验室废液桶
private static final int SEMIH = 30*60;
@Autowired
private RedisService redisService;
/**
* 存⼊消息池
* @param message
* @return
*/
public boolean addMsgPool(Message message) {
if (null != message) {
return redisService.setExp(MSG_POOL + Id(), Body(), Long.Ttl() + SEMIH));        }
return false;
}
/**
* 从消息池中删除消息
* @param id
* @return
*/
public void deMsgPool(String id) {
}
/**
* 向队列中添加消息
* @param key
* @param score 优先级
* @param val
* @return 返回消息id
*/
public void enMessage(String key, long score, String val) {
redisService.zsset(key,val,score);
}
/**
* 从队列删除消息
* @param id
* @return
*/
public boolean deMessage(String key, String id) {
return redisService.zdel(key, id);
}化纤抽丝
}
4.编写消息发送(⽣产者)
@Component
public class MessageProvider {
static Logger logger = Logger(MessageProvider.class);
private static int delay = 30;//30秒,可⾃⼰动态传⼊
@Resource
private RedisMQ redisMQ;
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
/
/改造成redis
public void sendMessage(String messageContent) {
try {
if (messageContent != null){
String seqId = UUID.randomUUID().toString();
// 将有效信息放⼊消息队列和消息池中
Message message = new Message();
// 可以添加延迟配置
message.setDelay(delay*1000);
message.setCreateTime(System.currentTimeMillis());
message.setBody(messageContent);
message.setId(seqId);
// 设置消息池ttl,防⽌长期占⽤
message.setTtl(delay + 360);
redisMQ.addMsgPool(message);
//当前时间加上延时的时间,作为score
Long delayTime = CreateTime() + Delay();
String d = sdf.CreateTime());
System.out.println("当前时间:" + d+",消费的时间:" + sdf.format(delayTime));                Message(RedisMQ.QUEUE_NAME,delayTime, Id());            }else {
家用智能控制系统logger.warn("消息内容为空");
}
}catch (Exception e){
e.printStackTrace();
}
}
}
5.消息消费者
@Component
public class RedisMQConsumer {
@Resource
private RedisMQ redisMQ;
@Autowired
private RedisService redisService;
@Autowired
lzr种子private MessageProvider provider;
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
/**
* 消息队列<br>
*
*/
@Scheduled(cron = "*/1 * * * * *")
public void monitor() {
Set<String> set = redisService.rangeByScore(RedisMQ.QUEUE_NAME, 0, System.currentTimeMillis());        if (null != set) {
long current = System.currentTimeMillis();
手机防盗系统
for (String id : set) {
long  score = Score(RedisMQ.QUEUE_NAME, id).longValue();
if (current >= score) {
// 已超时的消息拿出来消费
String str = "";
try {
str = (RedisMQ.MSG_POOL + id);
System.out.println("消费了:" + str+ ",消费的时间:" + sdf.format(System.currentTimeMillis()));                    } catch (Exception e) {
e.printStackTrace();
//如果出了异常,则重新放回队列
System.out.println("消费异常,重新回到队列");
provider.sendMessage(str);
} finally {
redisMQ.deMessage(RedisMQ.QUEUE_NAME, id);
redisMQ.deMsgPool(id);
}
}
}
}
}
}
6.配置信息
<!--1依赖引⼊-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
2yml配置
spring:
redis:
database: 1
host: 127.0.0.1
port: 6379
以上代码已经实现了延迟消费功能,现在来测试⼀下,调⽤MessageProvider的sendMessage⽅法,我设定了30秒可以看到结果
因为我们是⽤定时器去轮询的,会出现误差

本文发布于:2024-09-22 19:20:13,感谢您对本站的认可!

本文链接:https://www.17tex.com/tex/1/183680.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:消息   时间   队列   消费   实现   延迟   系统
留言与评论(共有 0 条评论)
   
验证码:
Copyright ©2019-2024 Comsenz Inc.Powered by © 易纺专利技术学习网 豫ICP备2022007602号 豫公网安备41160202000603 站长QQ:729038198 关于我们 投诉建议