引用:如何在redis中实现消息队列 实现消息队列的博客 - CSDN博客
Redis实现最简单版本的消息队列
我们可以先回顾一下redis是用来做什么的。 Redis 是一个非关系型数据库。 它以key:value的形式存储在数据库中。 redis数据库的常见数据类型有字符串、列表、哈希、集合和有序集合,当我们简单了解了redis的列表类型后,我们就可以实现一个简单版的消息队列
Redis中向列表插入值的方法有很多种,但我们选择其中两种
rpush key 表示从右边插入,每次插入都是在前一个值的右边
lpush key vlaue 表示从左边插入,每次插入都是在前一个值的左边
雷斯值法
lpop 键从列表左侧弹出一个值
rpop 键从列表右侧弹出一个值
那么当我们看到列表的插入和取值的时候,是不是可以想到我们的消息队列呢? 其实消息队列可以简单的理解为我们的生产者在某个位置放了一个值,我们的消费者也在同一个位置放了一个值。 获取值,结合redis插入的值,比如我们的lpush,他每次插入的值都会到前一个值的左边,所以最右边的值是先插入的,然后再左边反过来,那么此时如果我们使用rpop弹出列表的最后一个值,那么我们就会得到第一个放入消息队列的值,然后弹出第二个,也就是第二个放入的值,这样我们就可以实现一个简单的消息队列
List利用自身的特性来保证有序性,但是存在一个问题。 出队时,消费者需要轮询消息队列。 不管消息队列是否有值,都需要轮询。 显然这是对资源的不合理消耗,所以Redis可以使用阻塞读命令BRPOP。 当消息队列为空时,就会阻塞等待。 当消息队列为空时,会重新排队。
消息重复消费
生产者可以创建一个唯一的 ID 来标识消息。 消费者需要存储已消费消息ID的列表。 每次处理消息时,都需要比较该消息ID是否存在于消费列表中。 如果该ID已经存在,则不会再次处理该消息。 ,这也是幂等性的保证
消息的可靠性
消息的可靠性保证了消费者将消息出队后,该消息可以保存在未完成队列中。 只有当消费者完成处理后,才会将其从未完成队列中删除。 核心思想是使用备份
List可以简单的满足消息队列的需求,但是需要注意的是List处理消息队列一定不能被多个消费者同时消费,否则无法保证消息的有序性。 如果生产者速度太快而消费者速度太慢,就会造成消息堆积,给Redis内存带来巨大压力。 因此,在这种场景下,需要考虑多个消费者分担压力的问题。 这就需要依赖5.0版本提供的解决方案。
它是.0中添加的新数据结构。 它提供消息持久化和主备复制功能,允许任何客户端随时访问数据。 它有一个消息链表,可以将所有添加的消息串在一起。 每条消息都有一个唯一的ID,并且这个ID是递增的。 它是redis专门为消息队列定义的一种数据结构。
当然,我们首先需要看看如何定义这个数据结构。 与其他数据结构一样,我们不需要显式创建它。 它是在执行第一次数据添加时自动创建的。 添加数据的命令为XADD,语法为 格式为XADD key ID field value [field value...],参数说明如下:
密钥:redis密钥
ID:消息的唯一标识符,可以指定或设置为*。 设置为*时,会自动生成ID,并且ID会递增。
字段值:消息的字段和值
多条消息的产生(创建)如下:
redis> XADD * 名字 Sara
“27-0”
redis> XADD *
“27-1”
redis>XLEN
()2
雷迪斯> - +
1) 1) “27-0”
2) 1) “姓名”
2)“萨拉”
3)“”
4)“”
2) 1) “27-1”
21) ””
2)“”
3)“”
4)“”
5)“”
6)“”
雷迪斯>
其中XLEN用于检查消息数量,用于通过范围查询根据ID递增来获取消息。 - 相当于负无穷大,+相当于正无穷大,即获取所有消息。我们再看看其他一些命令
XDEL:根据ID删除消息
> XADD * 1
44-0
> XADD * b 2
40-0
> XADD * c 3
44-0
> XDEL 40-0
()1
127.0.0.1:6379> - +
1) 1) 44-0
2) 1) “一”
21”
2) 1) 44-0
2) 1) “c”
23”
XLEN:获取消息数量
redis> XADD * 第 1 项
“77-0”
redis> XADD * 第 2 项
“78-0”
redis> XADD * 第 3 项
“78-1”
redis>XLEN
()3
雷迪斯>
:查询指定范围内的消息
key: 队列名称
start:起始值,-代表最小值
end:结束值,+表示最大值
计数:数量
redis> XADD * 名字伍尔夫
“11-0”
redis> XADD * 姓名 Jane
“11-1”
redis> XADD * 名称托尼
“11-2”
redis> XADD * 名称
“12-0”
redis> XADD * 名称 Ngozi
“12-1”
redis>XLEN
()5
redis> - + 计数 2
1) 1) “11-0”
2) 1) “姓名”
2)“”
3)“”
4)“伍尔夫”
2) 1) “11-1”
2) 1) “姓名”
2)“简”
3)“”
4)“”
雷迪斯>
:从后向前获取消息
语法格式 key end start [COUNT count]
key: 队列名称
end:结束值,+表示最大值
start:起始值,-代表最小值
计数:数量
redis> XADD * 名字伍尔夫
“58-0”
redis> XADD * 姓名 Jane
“59-0”
redis> XADD * 名称托尼
“59-1”
redis> XADD * 名称
“59-2”
redis> XADD * 名称 Ngozi
“59-3”
redis>XLEN
()5
redis> + - 计数 1
1) 1) “59-3”
2) 1) “姓名”
2)“恩戈齐”
3)“”
4)“”
XREAD
以阻塞或非阻塞的方式获取消息,即消费消息的命令。 语法格式为XREAD [COUNT count] [BLOCK] key [key ...] id [id ...],解释如下:
计数:数量
:可选,阻塞毫秒。 如果不设置,则为非阻塞模式。
key: 队列名称
id:消息ID
# 从头部读取两条消息
> XREAD 计数 2 0-0 0-0
1) 1) “”
2) 1) 1) 36-0
21) ””
2)“1532”
3)“事件ID”
4)“5”
5)“用户ID”
6)“”
2) 1) 06-0
21) ””
2)“812”
3)“事件ID”
4)“9”
5)“用户ID”
6)“”
21) ””
2) 1) 1) 25-0
2) 1) “姓名”
2)“”
3)“”
4)“伍尔夫”
2) 1) 98-0
2) 1) “姓名”
2)“简”
3)“”
4)“”
创建消费者组,使用消费者并发消费消息,解决消费者消费能力不足的问题。 语法格式为 [ key id-or-$] [SETID key id-or-$] [ key ] [ key ],解释如下:
key:队列名称,不存在则创建
:团队名字。
$:表示从尾开始消费,只接受新消息,当前所有消息都会被忽略。
从头开始的消费如下:
-组名称 0-0
从尾部开始消费如下:
-组名称 $
实际场景中,我们可以通过设置多个消费组的不同起始消费位置来达到并发消费的效果。 在这种情况下,可能会如下图所示:
该图的主要元素解释如下:
每个都有一个唯一的名称,即 Redis 键,当我们第一次使用 xadd 命令附加消息时自动创建。
Group:消费者组,通过命令创建。 一个消费者组有多个消费者()。
: 光标。 每个消费者组都会有一个游标。 任何阅读该消息的消费者都会将光标向前移动。
:()的state变量用于维护消费者的未确认id。 记录客户端已经读过的消息,但是没有ack(:确认字符)。
团体
读取消费者组中的消息,语法格式如下:
GROUP 组 [COUNT 计数] [BLOCK] [NOACK] key [key ...] ID [ID ...]
解释如下:
group:消费者组名
:消费者姓名。
count:读取次数。
:阻塞毫秒。
key:队列名称。
ID:消息ID。
测试如下:
组-组名称-名称 COUNT 1 >
总结
真正能应用到生产环境的Redis消息队列的实现是List,两者的区别如下