您的位置  > 互联网

基于rpush+lpop+blpopblpop的几种方式

List与java中的类似。 如果插入头部或尾部,时间复杂度为O(1),插入其他地方则需要O(n)。 查询也是如此。 因此,列表一般用作队列。 存储结构+

使用

while (true){
          Jedis jedis=JedisUtil.getJedis();
          String message=jedis.lpop("myQueue");
          if(StringUtils.isEmpty(message)){
                 try{
                 	//减少redis服务端压力。根据业务设定轮询时间
                     Thread.sleep(1000);
                  }catch (Exception e){
                   }
                    continue;
                }
                /**
                 * dealMessage
                 */
 }

优势

缺点

2 基于rpush+blpop

while (true){
            Jedis jedis=JedisUtil.getJedis();
            List returnList=null;
            try {
                //意思是说,在使用 blpop的时候,如果中途因为网络波动或者某些其他原因导致连接池失效,那么就永远接收不到信息了
                //为了解决这个问题,就需要 blpop的超时功能。让 blpop每几分钟就断开,检查一下网络,再重新连上。
                //阻塞过程中与redis保持连接
                 returnList = jedis.blpop(100, "myqueue");
            }catch (Exception e){
            }
            if(returnList!=null&&returnList.size()!=0) {
                String message=returnList.get(1);
                /**
                 * dealMessage()
                 */
            }
            JedisUtil.returnBrokenResource(jedis);
        }

BLPOP原理

redis处理blpop命令时,会首先查找key对应的列表。 如果存在,就会弹出数据响应给客户端。 否则,将相应的键压入数据结构中,并阻塞相应的值。 当下一个推送命令发出时,服务器检查相应的密钥是否存在。 如果存在,则将key添加到链表中,将value插入到链表中并响应客户端。 与监控类似。

相比之前的rpush+lpop。这样可以即时获取消息

以上两种方式一般用于简单的向程序上下游传递消息。 为了提高程序效率,可以批量lpop。 减少网络io

可以批量使用lpop

  public static ListlPopByPipeLineBatch(String key,int length){
         List result=new ArrayList<>(length);
         Jedis jedis=JedisUtil.getJedis();
         if(jedis==null){
             return result;
         }
         Pipeline pipeline=null;
         try {
             pipeline=jedis.pipelined();
             for(int i=0;i objectList=pipeline.syncAndReturnAll();
             for(Object returnObject:objectList){
                 if(returnObject==null){
                     continue;
                 }
                 if(returnObject instanceof String){
                     result.add((String) returnObject);
                 }
                 return result;
             }
         }catch (Exception e){
             
         }finally {
             JedisUtil.returnBrokenResource(jedis);
         }
         
     }

或者使用(需要是原子的并且可以在事务或Lua中执行)

0 n-1(读取前n个数据)

ltrim n -1(删除前n条数据)

应用场景

1 应用解耦,现在上下游服务之间所有通信都是通过redis队列

2 削峰。

3 发布/订阅机制

Redis还支持消息的发布订阅模式。 订阅者(Sub)通过向redis服务发出命令来订阅频道()。 当发布者通过命令向redis服务发出命令时,所有订阅该频道的客户端都会收到这条消息。

消费者:

订阅新闻:[...]

取消订阅:

生产者生产消息:

原则

发布-订阅模型。 对于每一个,下面都有一个注册列表。 这将被添加到注册列表中。 遍历其下的列表,将对应的信息推送到

优势

缺点

不适合消息存储和消息积压业务。 相反,他擅长处理广播、即时通讯、即时反馈业务。

应用场景

用户在线监控; 主动用户监控;

4 设置消息队列的优先级

有序集(Set)是不允许重复的类型元素的集合,每个元素都与一个类型分数相关联。 有序集的成员是唯一的,但分数可以重复。 集合中的最大成员数为 2^32 - 1,这意味着每个集合可以存储超过 40 亿个成员。 上述基于Set的功能在实际开发中有很多应用,例如游戏实时性能排名、博客文章点赞排名等各种排名和优先级队列的实现。

增删改查的时间复杂度为log(n)

使用set可以作为优先级消息队列,但消息内容不重复。

添加zadd 
zadd key score1 value1… scoren valuen
获取前n个数
zrange key 0 ,n-1 [withscores]
zrevrange 0 ,n-1 [withscores]
删除元素(和上一步要在一个事务中执行)
zrem key member [member …]
返回有序集中指定分数区间内的所有的成员。有序集成员按分数值递减(从大到小)的次序排列。
zrevrangebyscore key max min [withscores] [limit offset count]
zrangebyscore key max min [withscores]  [limit offset count]
场景: zadd中存放 订单id ,sorce为时间戳。优先消费最早的时间戳,删除过期订单。

应用场景

查看最新的新闻发布。 例如,有一个新闻发布列表,只显示当前时间之前半小时内的最新新闻。 可以使用set来存储。 过期新闻可以根据时间范围定期删除。

5个redis

.0 中引入了新的数据结构。 解决之前的消息没有存储、没有确认机制的问题。 数据结构为radix tree(基数树,适合存储带有整数键的数据)。 查询效率log(n)

1 追加新消息、xadd、产生消息

XADD 密钥 ID 字段 [字段...]

消息ID说明:由两部分组成:时间戳-序列号。 时间戳以毫秒为单位,是生成消息时的Redis服务器时间。 它是一个 64 位整数 (int64)。 序列号是该毫秒时间点内的消息序列号,也是一个64位整数。

2 查看消息

键 开始 结束 [COUNT 次]

键 - +

3 创建消费者组

密钥ID(ID=0-0表示从头开始,$表示从尾部开始)

4 创建新消费者

group [Count count] [block] ID (>表示从0开始)

5 xack确认消费消息

xack 密钥 ID

6

为了解决组内消息读取但处理时消费者崩溃导致消息丢失的问题,设计了一个列表来记录已读取但未处理的消息。 该命令用于从消费者组或消费者内的消费者获取未处理的消息。

键组[开始结束计数][]

7 消息传输

关键组最短空闲时间ID

8 坏消息

xdel 密钥 ID

应用场景

1 调度系统、消息通信。

列表用于记录已读但未处理的消息。 命令`用于从消费者组或消费者内的消费者获取未处理的消息。

键组[开始结束计数][]

7 消息传输

关键组最短空闲时间ID

8 坏消息

xdel 密钥 ID

应用场景

1 调度系统、消息通信。