php循环取redis队列,详解Redis和队列

php循环取redis队列,详解Redis和队列
下⾯由Redis教程栏⽬给⼤家详解Redis和队列,希望对需要的朋友有所帮助!
概要
Redis不仅可作为缓存服务器,还可⽤作消息队列。它的列表类型天⽣⽀持⽤作消息队列。如下图所⽰:
由于Redis的列表是使⽤双向链表实现的,保存了头尾节点,所以在列表头尾两边插取元素都是⾮常快的。
普通队列实现
所以可以直接使⽤Redis的List实现消息队列,只需简单的两个指令lpush和rpop或者rpush和lpop。简单⽰例如下:
存放消息端(消息⽣产者):package sssage.queue;
import st.MyJedisFactory;import redis.clients.jedis.Jedis;
import urrent.TimeUnit;
/**
* 消息⽣产者
* @author yamikaze */public class Producer extends Thread {
public static final String MESSAGE_KEY = "message:queue"; private Jedis jedis; private String prod
ucerName; private volatile int count;
public Producer(String name) { this.producerName = name;
init();
}
private void init() {
jedis = LocalJedis();
public void putMessage(String message) {
Long size = jedis.lpush(MESSAGE_KEY, message);
System.out.println(producerName + ": 当前未被处理消息条数为:" + size);
count++;
}
public int getCount() { return count;
}
@Override public void run() { try { while (true) {
ate32Str());
TimeUnit.SECONDS.sleep(1);
}
} catch (InterruptedException e) {
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws InterruptedException{
Producer producer = new Producer("myProducer");
producer.start();
法兰锻造for(; ;) {
System.out.println("main : 已存储消息条数:" + Count()); TimeUnit.SECONDS.sleep(10);
}
}
}
消息处理端(消息消费者):package sssage.queue;
import st.MyJedisFactory;import redis.clients.jedis.Jedis; /**
* 消息消费者
* @author yamikaze */public class Customer extends Thread{
private String customerName; private volatile int count; private Jedis jedis; public Customer(String name) { this.customerName = name;
init();
private void init() {
jedis = LocalJedis();
}
public void processMessage() {
String message = jedis.rpop(Producer.MESSAGE_KEY); if(message != null) {
count++;
handle(message);
}
}
public void handle(String message) {
System.out.println(customerName + " 正在处理消息,消息内容是: " + message + " 这是第" + count + "条");
}
@Override public void run() { while (true) {
processMessage();
}
}
public static void main(String[] args) {
Customer customer = new Customer("yamikaze");
customer.start();
}
}
貌似还不错,但上述例⼦中消息消费者有⼀个问题存在,即需要不停的调⽤rpop⽅法查看List中是否有待处理消息。每调⽤⼀次都会发起⼀次连接,这会造成不必要的浪费。也许你会使⽤Thread.sleep()等⽅法让消费者线程隔⼀段时间再消费,但这样做有两个问题:
门槛记1)、如果⽣产者速度⼤于消费者消费速度,消息队列长度会⼀直增⼤,时间久了会占⽤⼤量内存空间。
2)、如果睡眠时间过长,这样不能处理⼀些时效性的消息,睡眠时间过短,也会在连接上造成⽐较⼤的开销。
所以可以使⽤brpop指令,这个指令只有在有元素时才返回,没有则会阻塞直到超时返回null,于是消费端可以将processMessage可以改为这样:public void processMessage() { /**
* brpop⽀持多个列表(队列)
* brpop指令是⽀持队列优先级的,⽐如这个例⼦中MESSAGE_KEY的优先级⼤于testKey(顺序决定)。
* 如果两个列表中都有元素,会优先返回优先级⾼的列表中的元素,所以这⼉优先返回MESSAGE_KEY
* 0表⽰不限制等待,会⼀直阻塞在这⼉ */
List messages = jedis.brpop(0, Producer.MESSAGE_KEY, "testKey"); if(messages.size() != 0) { //由于该指令可以监听多个Key,所以返回的是⼀个列表 //列表由2项组成,1) 列表名,2)数据
String keyName = (0); //如果返回的是MESSAGE_KEY的消息
if(Producer.MESSAGE_KEY.equals(keyName)) {
String message = (1);
handle(message);
}
}
System.out.println("=======================");
}
然后可以运⾏Customer,清空控制台,可以看到程序没有任何输出,阻塞在了brpop这⼉。然后在打开Redis的客户端,输⼊指令client list,可以查看当前有两个连接。
⼀次⽣产多次消费的队列
Redis除了对消息队列提供⽀持外,还提供了⼀组命令⽤于⽀持发布/订阅模式。利⽤Redis的pub/sub模式可以实现⼀次⽣产多次消费的队列。
1)发布
蒸汽消毒锅
PUBLISH指令可⽤于发布⼀条消息,格式 PUBLISH channel message
融合调度指挥通信系统
返回值表⽰订阅了该消息的数量。
2)订阅
SUBSCRIBE指令⽤于接收⼀条消息,格式 SUBSCRIBE channel
可以看到使⽤SUBSCRIBE指令后进⼊了订阅模式,但没有接收到publish发送的消息,这是因为只有
在消息发出去前订阅才会接收到。在这个模式下其他指令,只能看到回复。回复分为三种类型:
1、如果为subscribe,第⼆个值表⽰订阅的频道,第三个值表⽰是第⼏个订阅的频道?(理解成序号?)
2、如果为message(消息),第⼆个值为产⽣该消息的频道,第三个值为消息
3、如果为unsubscribe,第⼆个值表⽰取消订阅的频道,第三个值表⽰当前客户端的订阅数量。
可以使⽤指令UNSUBSCRIBE退订,如果不加参数,则会退订所有由SUBSCRIBE指令订阅的频道。
Redis还⽀持基于通配符的消息订阅,使⽤指令PSUBSCRIBE (pattern subscribe),例如:
再试试推送消息会得到以下结果:
可以看到publish指令返回的是2,⽽订阅端这边接收了两次消息。这是因为PSUBSCRIBE指令可以重复订阅频道。⽽使⽤PSUBSCRIBE 指令订阅的频道也要使⽤指令PUNSUBSCRIBE指令退订,该指令⽆法退订SUBSCRIBE订阅的频道,同理UNSUBSCRIBE也不能退订PSUBSCRIBE指令订阅的频道。同时PUNSUBSCRIBE指令通配符不会展开。
例如:PUNSUBSCRIBE * 不会匹配到 channel.*, 所以要取消订阅channel.*就要这样写PUBSUBSCRIBE channel.*。
代码⽰范如下:package sssage.subscribe;
import sssage.queue.StringUtils;import st.MyJedisFactory;import
redis.clients.jedis.Jedis;
/**
* 消息发布⽅
* @author yamikaze */public class Publisher {
public static final String CHANNEL_KEY = "channel:message"; private Jedis jedis;
public Publisher() {
jedis = LocalJedis();
}
public void publishMessage(String message) { if(StringUtils.isBlank(message)) { return;
}
jedis.publish(CHANNEL_KEY, message);
}
public static void main(String[] args) {
Publisher publisher = new Publisher();
publisher.publishMessage("Hello Redis!");
}
}
简单的发送⼀个消息。
消息订阅⽅:package sssage.subscribe;
成都华信电子设备厂
import st.MyJedisFactory;import redis.clients.jedis.Jedis;import redis.clients.jedis.JedisPubSub; import urrent.TimeUnit;
/**
* 消息订阅⽅客户端
* @author yamikaze */public class SubscribeClient {
private Jedis jedis; private static final String EXIT_COMMAND = "exit";
public SubscribeClient() {
jedis = LocalJedis();
}
public void subscribe(String ...channel) { if(channel == null || channel.length <= 0) { return;
} //消息处理,接收到消息时如何处理
JedisPubSub jps = new JedisPubSub() { /**
* JedisPubSub类是⼀个没有抽象⽅法的抽象类,⾥⾯⽅法都是⼀些空实现
高强钢* 所以可以选择需要的⽅法覆盖,这⼉使⽤的是SUBSCRIBE指令,所以覆盖了onMessage
* 如果使⽤PSUBSCRIBE指令,则覆盖onPMessage⽅法
* 当然也可以选择BinaryJedisPubSub,同样是抽象类,但⽅法参数为byte[] */
@Override public void onMessage(String channel, String message) { if(Publisher.CHANNEL_KEY.equals(channel)) { System.out.println("接收到消息: channel : " + message); //接收到exit消息后退出
if(EXIT_COMMAND.equals(message)) {

本文发布于:2024-09-21 19:33:40,感谢您对本站的认可!

本文链接:https://www.17tex.com/tex/4/183097.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:消息   指令   订阅   队列   频道   处理
留言与评论(共有 0 条评论)
   
验证码:
Copyright ©2019-2024 Comsenz Inc.Powered by © 易纺专利技术学习网 豫ICP备2022007602号 豫公网安备41160202000603 站长QQ:729038198 关于我们 投诉建议