新聞中心

創(chuàng)新互聯(lián)是一家專注于成都網(wǎng)站設(shè)計(jì)、網(wǎng)站建設(shè)與策劃設(shè)計(jì),古冶網(wǎng)站建設(shè)哪家好?創(chuàng)新互聯(lián)做網(wǎng)站,專注于網(wǎng)站建設(shè)10余年,網(wǎng)設(shè)計(jì)領(lǐng)域的專業(yè)建站公司;建站業(yè)務(wù)涵蓋:古冶等地區(qū)。古冶做網(wǎng)站價(jià)格咨詢:13518219792
什么是Stream?
Stream 實(shí)際上是一個(gè)具有消息發(fā)布/訂閱功能的組件,也就常說的消息隊(duì)列。其實(shí)這種類似于 broker/consumer(生產(chǎn)者/消費(fèi)者)的數(shù)據(jù)結(jié)構(gòu)很常見,比如 RabbitMQ 消息中間件、Celery 消息中間件,以及 Kafka 分布式消息系統(tǒng)等,而 Redis Stream 正是借鑒了 Kafaka 系統(tǒng)。
1) 優(yōu)點(diǎn)
Strean 除了擁有很高的性能和內(nèi)存利用率外, 它最大的特點(diǎn)就是提供了消息的持久化存儲(chǔ),以及主從復(fù)制功能,從而解決了網(wǎng)絡(luò)斷開、Redis 宕機(jī)情況下,消息丟失的問題,即便是重啟 Redis,存儲(chǔ)的內(nèi)容也會(huì)存在。
2) 流程
Stream 消息隊(duì)列主要由四部分組成,分別是:消息本身、生產(chǎn)者、消費(fèi)者和消費(fèi)組,對(duì)于前述三者很好理解,下面了解什么是消費(fèi)組。
一個(gè) Stream 隊(duì)列可以擁有多個(gè)消費(fèi)組,每個(gè)消費(fèi)組中又包含了多個(gè)消費(fèi)者,組內(nèi)消費(fèi)者之間存在競爭關(guān)系。當(dāng)某個(gè)消費(fèi)者消費(fèi)了一條消息時(shí),同組消費(fèi)者,都不會(huì)再次消費(fèi)這條消息。被消費(fèi)的消息 ID 會(huì)被放入等待處理的 Pending_ids 中。每消費(fèi)完一條信息,消費(fèi)組的游標(biāo)就會(huì)向前移動(dòng)一位,組內(nèi)消費(fèi)者就繼續(xù)去爭搶下消息。
Redis Stream 消息隊(duì)列結(jié)構(gòu)程如下圖所示:
圖1:Redis Stream流程處理圖
下面對(duì)上圖涉及的專有名詞做簡單解釋:
- Stream direction:表示數(shù)據(jù)流,它是一個(gè)消息鏈,將所有的消息都串起來,每個(gè)消息都有一個(gè)唯一標(biāo)識(shí) ID 和對(duì)應(yīng)的消息內(nèi)容(Message content)。
- Consumer Group :表示消費(fèi)組,擁有唯一的組名,使用 XGROUP CREATE 命令創(chuàng)建。一個(gè) Stream 消息鏈上可以有多個(gè)消費(fèi)組,一個(gè)消費(fèi)組內(nèi)擁有多個(gè)消費(fèi)者,每一個(gè)消費(fèi)者也有一個(gè)唯一的 ID 標(biāo)識(shí)。
- last_delivered_id :表示消費(fèi)組游標(biāo),每個(gè)消費(fèi)組都會(huì)有一個(gè)游標(biāo) last_delivered_id,任意一個(gè)消費(fèi)者讀取了消息都會(huì)使游標(biāo) last_delivered_id 往前移動(dòng)。
- pending_ids :Redis 官方稱為 PEL,表示消費(fèi)者的狀態(tài)變量,它記錄了當(dāng)前已經(jīng)被客戶端讀取的消息 ID,但是這些消息沒有被 ACK(確認(rèn)字符)。如果客戶端沒有 ACK,那么這個(gè)變量中的消息 ID 會(huì)越來越多,一旦被某個(gè)消息被 ACK,它就開始減少。
3) ACK
ACK(Acknowledge character)即確認(rèn)字符,在數(shù)據(jù)通信中,接收方傳遞給發(fā)送方的一種傳輸類控制字符。表示發(fā)來的數(shù)據(jù)已確認(rèn)接收無誤。在 TCP/IP 協(xié)議中,如果接收方成功的接收到數(shù)據(jù),那么會(huì)回復(fù)一個(gè) ACK 數(shù)據(jù)。通常 ACK 信號(hào)有自己固定的格式,長度大小,由接收方回復(fù)給發(fā)送方。
常用命令匯總
| 命令 | 說明 |
|---|---|
| XADD | 添加消息到末尾。 |
| XTRIM | 對(duì) Stream 流進(jìn)行修剪,限制長度。 |
| XDEL | 刪除指定的消息。 |
| XLEN | 獲取流包含的元素?cái)?shù)量,即消息長度。 |
| XRANGE | 獲取消息列表,會(huì)自動(dòng)過濾已經(jīng)刪除的消息。 |
| XREVRANGE | 反向獲取消息列表,ID 從大到小。 |
| XREAD | 以阻塞或非阻塞方式獲取消息列表。 |
| XGROUP CREATE | 創(chuàng)建消費(fèi)者組。 |
| XREADGROUP GROUP | 讀取消費(fèi)者組中的消息。 |
| XACK | 將消息標(biāo)記為"已處理"。 |
| XGROUP SETID | 為消費(fèi)者組設(shè)置新的最后遞送消息ID。 |
| XGROUP DELCONSUMER | 刪除消費(fèi)者。 |
| XGROUP DESTROY | 刪除消費(fèi)者組。 |
| XPENDING | 顯示待處理消息的相關(guān)信息。 |
| XCLAIM | 轉(zhuǎn)移消息的歸屬權(quán)。 |
| XINFO | 查看 Stream 流、消費(fèi)者和消費(fèi)者組的相關(guān)信息。 |
| XINFO GROUPS | 查看消費(fèi)者組的信息。 |
| XINFO STREAM | 查看 Stream 流信息。 |
| XINFO CONSUMERS key group | 查看組內(nèi)消費(fèi)者流信息。 |
基本命令應(yīng)用
下面通過一組示例演示有關(guān) Stream 命令的使用:
#添加一個(gè)消息, * 表示以時(shí)間戳自動(dòng)創(chuàng)建id
127.0.0.1:6379> XADD mystream * username www.biancheng.net age 10 c.biancheng.net age 9
"1610619132674-1"
#自定義id等于001,注意id只增不減
127.0.0.1:6379> XADD mystream1 001 name zhangsan addr hebei
"1-0"
127.0.0.1:6379> XADD mystream1 002 name lisi addr hunan
"2-0"
#如果插入重復(fù)的id號(hào)會(huì)報(bào)錯(cuò)
127.0.0.1:6379> XADD mystream1 001 name wangwu addr fujian
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item
127.0.0.1:6379> XADD mystream1 003 name wangwu addr fujian
"3-0"
#刪除id=001的數(shù)據(jù)
127.0.0.1:6379> XDEL mystream1 001
(integer) 1
#查看stream隊(duì)列包含的消息數(shù)量,也就消息長度
127.0.0.1:6379> XLEN mystream1
(integer) 2
#獲取消息列表,-表示最小,+表示最大
127.0.0.1:6379> XRANGE mystream - +
1) 1) "1610619132674-0"
2) 1) "username"
2) "www.biancheng.net"
3) "age"
4) "10"
2) 1) "1610619178028-0"
2) 1) "username"
2) "c.biancheng.net"
3) "age"
4) "9"
#獲取消息列表
127.0.0.1:6379> XRANGE mystream1 - 003
1) 1) "2-0"
2) 1) "name"
2) "lisi"
3) "addr"
4) "hunan"
2) 1) "3-0"
2) 1) "name"
2) "wangwu"
3) "addr"
4) "fujian"
#使用count指定返回?cái)?shù)據(jù)的數(shù)量
127.0.0.1:6379> XRANGE mystream1 - 003 count 1
1) 1) "2-0"
2) 1) "name"
2) "lisi"
3) "addr"
4) "hunan"
#刪除整個(gè)Stream
127.0.0.1:6379> DEL mystream
#使用xread讀取消息
127.0.0.1:6379> XREAD count 2 STREAMS mystream1 2-0
1) 1) "mystream1"
2) 1) 1) "3-0"
2) 1) "name"
2) "wangwu"
3) "addr"
4) "fujian"
創(chuàng)建消息ID
上述示例中,當(dāng)我們創(chuàng)建一個(gè) Srteam 時(shí), 需要?jiǎng)?chuàng)建消息 ID,該 ID 是唯一、不可重復(fù)的,并且只增不減。消息 ID 有兩種創(chuàng)建方式,一是系統(tǒng)自動(dòng)生成,二是自定義創(chuàng)建。
1) 系統(tǒng)自動(dòng)創(chuàng)建
語法格式如下:
XADD key ID field value [field value ...]
參數(shù)說明如下:
- key :指定隊(duì)列名稱,如果不存就創(chuàng)建;
- ID :消息 id,我們使用
*表示由 redis 生成,可以自定義,但是要自己保證遞增性; - field value :消息記錄。
返回值是毫秒時(shí)間戳格式的字符串。比如 1610619132674-2,它表示在該毫秒內(nèi)產(chǎn)生的第 2 條消息。使用示例:
XADD mystream * username www.biancheng.net age 10
2) 自定義ID
自定義 ID 比較簡單,但是需要注意的是 ID 的形式必須是 “整數(shù)”,并且后面加入消息的 ID 必須大于前面消息的 ID,也就是自定義 ID 也必須遵守遞增的規(guī)則。示例如下:
XADD mystream1 001 name zhangsan addr hebei
創(chuàng)建消費(fèi)組
Redis Stream通過
XGROUP CREATE指令創(chuàng)建消費(fèi)組(Consumer Group),在創(chuàng)建時(shí),需要傳遞起始消息的 ID 用來初始化 last_delivered_id 變量。語法格式如下:
XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
參數(shù)說明如下:
- key :指定 Stream 隊(duì)列名稱,若不存在則自動(dòng)創(chuàng)建。
- groupname :自定義消費(fèi)組的名稱,不可重復(fù)。
- $ :表示從尾部開始消費(fèi),只接受新消息,而當(dāng)前 Stream 的消息則被忽略。
示例如下:
#創(chuàng)建消費(fèi)組,并傳遞消息起始id 0-0
127.0.0.1:6379> XGROUP CREATE mystream1 ms1 0-0
OK
#從尾部開始消費(fèi)信息,只接受新消息
127.0.0.1:6379> XGROUP CREATE mystream1 ms3 $
OK
#xinfo查看隊(duì)列信息
127.0.0.1:6379> XINFO stream mystream1
#隊(duì)列中消息的長度
1) "length"
2) (integer) 2
3) "radix-tree-keys"
4) (integer) 1
5) "radix-tree-nodes"
6) (integer) 2
#有幾個(gè)消費(fèi)組
7) "groups"
8) (integer) 2
9) "last-generated-id"
10) "3-0"
11) "first-entry" #第一個(gè)消息
12) 1) "2-0"
2) 1) "name"
2) "lisi"
3) "addr"
4) "hunan"
13) "last-entry" #最后一個(gè)消息
14) 1) "3-0"
2) 1) "name"
2) "wangwu"
3) "addr"
4) "fujian"
#查看消費(fèi)組信息
127.0.0.1:6379> XINFO GROUPS mystream1
1) 1) "name"
2) "ms1"
3) "consumers"
4) (integer) 0
5) "pending"
6) (integer) 0
7) "last-delivered-id"
8) "0-0"
2) 1) "name"
2) "ms3"
3) "consumers"
4) (integer) 0
5) "pending"
6) (integer) 0
7) "last-delivered-id"
8) "3-0"
消費(fèi)消息
Redis Stream 通過
XREADGROUP命令使消費(fèi)組消費(fèi)信息,它和
XREAD命令一樣,都可以阻塞等待新消息。讀到新消息后,對(duì)應(yīng)的消息 ID 就會(huì)進(jìn)入消費(fèi)者的 PLE(正在處理的消息)結(jié)構(gòu)里,客戶端處理完畢后使用 XACK 命令通知 Redis 服務(wù)器,本條消息已經(jīng)處理完畢,該消息的 ID 就會(huì)從 PEL 中移除。示意圖如下:
圖2:Redis Stream 流程示意圖
XREADGROUP命令的語法格式如下所示:
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
參數(shù)說明如下:
- group :消費(fèi)組名稱。
- consumer :消費(fèi)者名稱。
- count : 要讀取的數(shù)量。
- milliseconds : 阻塞時(shí)間,以毫秒為單位。
- key : 鍵指定的隊(duì)列名稱。
- ID : 表示消息 ID。
使用示例如下:
#消費(fèi)組中消費(fèi)者讀取消息,> 表示每當(dāng)消費(fèi)一個(gè)信息,消費(fèi)組游標(biāo)就前移一位
127.0.0.1:6379> XREADGROUP GROUP ms1 c1 COUNT 1 STREAMS mystream1 >
1) 1) "mystream1"
2) 1) 1) "2-0"
2) 1) "name"
2) "lisi"
3) "addr"
4) "hunan"
#再使用ms1-c1讀取一條消息
127.0.0.1:6379> XREADGROUP GROUP ms1 c1 COUNT 1 STREAMS mystream1 >
1) 1) "mystream1"
2) 1) 1) "3-0"
2) 1) "name"
2) "wangwu"
3) "addr"
4) "fujian"
#BLOCK 1000表示等待1秒,如果沒有任何消息到來,則返回nill,此時(shí)移動(dòng)到了末尾
127.0.0.1:6379> XREADGROUP GROUP ms1 c1 COUNT 1 BLOCK 1000 STREAMS mystream1 >
(nil)
127.0.0.1:6379> XREADGROUP GROUP ms1 c1 COUNT 1 STREAMS mystream1 1
1) 1) "mystream1"
2) 1) 1) "2-0"
2) 1) "name"
2) "lisi"
3) "addr"
4) "hunan"
#超出了消息id的范圍
127.0.0.1:6379> XREADGROUP GROUP ms1 c1 COUNT 1 STREAMS mystream1 3
1) 1) "mystream1"
2) (empty list or set)
#添加新的消息 ID為004
127.0.0.1:6379> XADD mystream1 004 name zhangwu age 24
#使用另外一個(gè)消費(fèi)組讀取消息
127.0.0.1:6379> XREADGROUP GROUP ms3 c2 COUNT 2 STREAMS mystream1 >
1) 1) "mystream1"
2) 1) 1) "4-0"
2) 1) "name"
2) "zhangwu"
3) "age"
4) "21"
#xack將id=002消息標(biāo)記為已經(jīng)處理
127.0.0.1:6379> XACK mystream1 ms1 002
(integer) 1
注意:
>表示每當(dāng)消費(fèi)者讀取一條消息時(shí),last_delivered_id 變量就會(huì)前移一位。
文章名稱:RedisStream消息隊(duì)列
網(wǎng)頁URL:http://www.fisionsoft.com.cn/article/djjeooe.html


咨詢
建站咨詢
