Stream消息队列 2023-02-27 21:58 Stream是一种数据类型(Redis 5.0 引入),也就是说和String、List、Set、Hash、ZSet一样,是会被持久化到硬盘上的。因此基于Stream的消息队列的消息安全性是可以保证的。 ### 发送消息 ![](https://minio.riun.xyz/riun1/2023-02-27_2YM0rik1Q9oPqf844i.jpg) 示例: xadd s1 * name hanxu age 18 > "1677503324793-0" //返回消息的唯一Id s1是消息队列的队列名(如果消息队列不存在,则自动创建)(没有最大消息限制),*代表redis自动生成消息唯一id,发送的消息(key,value形式)是: - name hanxu - age 18 使用xlen s1查看消息队列s1中的消息长度 >127.0.0.1:6379> xlen s1 >(integer) 1 ### 读取消息 ![](https://minio.riun.xyz/riun1/2023-02-27_2YM5epl6ztBzIvCm4x.jpg) 示例: XREAD COUNT 1 STREAMS s1 0 >1) 1) "s1" > 2) 1) 1) "1677503324793-0" > 2) 1) "name" > 2) "hanxu" > 3) "age" > 4) "18" count 1 代表读取一个消息, STREAMS s1 代表从消息队列s1中读取,0 代表消息的起始id,0表示从第一个消息开始读 此时从另一个客户端再次执行同样的xread命令,也能读取到这条消息。说明消息永久存在。 #### 阻塞读取 xread count 1 block 0 streams s1 0 >1) 1) "s1" > 2) 1) 1) "1677503324793-0" > 2) 1) "name" > 2) "hanxu" > 3) "age" > 4) "18" 仍然能够读到,因为是从第一个消息开始读。block 0的意思是若没有能读取的数据,则一直阻塞。 xread count 1 block 0 streams s1 $ > //客户端阻塞停在这 读不到消息,因为 $ 是从最新的消息开始读。从执行这个命令开始,并没有新的消息被存入。(此时我们用其他客户端发送一条消息,这里就能读到了) 要深刻理解 $ 的含义:执行这个命令开始,有没有新的消息,他不看以前的你有没有读过。相当于从这个人开始上岗执勤,有没有新案件,假设一下来了5个案件,他处理最新的那个,处理过程中另外4个案件已经存入消息队列了,等他处理完之后,他不会处理刚刚那4个。只是等待新消息的到来。所以会造成**消息漏读**。 ### 特点 Stream类型消息队列的xread命令特点: - 消息可回溯 - 一个消息能被多个消费者读取 - 可以阻塞读取 - 有消息漏读的风险 ### 消费者组 消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。特点: 1、消息分流:消息会分流给组内的不同消费者,即组内各消费者是竞争关系,消费到一条消息后,组内其他消费者就不能重复消费了。 2、消息标示:消费者组会维护一个标识,记录最后一个被处理的消息。哪怕消费者宕机,重启后还能从标识后读取消息,确保每个消息都能被消费。 3、消息确认:组内的消费者获取消息后,消息会处于pending状态,并存在pending-list中。当处理完成后需要通过xack确认消息,标记消息为已处理,才会从pending-list移除消息。 #### 创建消费者组 xgroup create key groupName ID [MKSTREAM] - xgroup就是操作消费者组的命令; - key 是消息队列的名字,要给哪个队列创建一个消费者组; - groupName 消费者组自己的名字; - ID 表示这个消费者组监听消息时从哪开始监听,和以前一样,0 代表队列中第一个消息,$ 代表队列中最新的消息。 - MKSTREAM 当创建消费者组时,消息队列不存在,则自动创建队列。给了,则自动创建;不给,队列不存在则无法创建消费者组。 #### 其他常用命令 ![](https://minio.riun.xyz/riun1/2023-02-27_2YMzryiViXecEkZBg2.jpg) --END--
发表评论