Redis Stream
Redis Stream 技术文档
1、Redis Stream 简介
Redis Stream 是 Redis 5.0 引入的一种新的数据类型,它用于可持久化的实时数据流处理,提供了一套简单而强大的消息分发模型。类似于Kafka的消息队列,Redis Stream 也支持多者进行数据消费,可以适用于诸如日志处理、即时消息、事件源、实时分析等数据采集场景。
redis> XADD mystream * field1 value1 field2 value2
>> "36116035032231361-0"
redis> XADD mystream * field1 value3
>> "36116036694022528-0"
redis> XADD mystream * field2 value4
>> "36116037260014209-0"
redis> XRANGE mystream - +
1) 1) "36116035032231361-0"
2) 1) "field2"
2) "value2"
3) "field1"
4) "value1"
2) 1) "36116036694022528-0"
2) 1) "field1"
2) "value3"
3) 1) "36116037260014209-0"
2) 1) "field2"
2) "value4"
2、Redis Stream 结构
每个 Redis Stream 均由一个唯一的名称表示。Stream 是一个有序列表,它由一个或多个包含键值对的消息组成,每个消息从左到右在列表中进行排列。当 Stream 长度超过限制时,旧数据会根据策略自动删除。
消息可以使用任何可打印的字符串作为标识符,也可以自己使用 XADD 命令来设置ID,Redis 会通过时间戳和随机标识符自动创建唯一ID标识。每个消息都由零个或多个键值对组成,每个键和值都是一个字符串,一个消息可以创建多个字段,但所有字段应具有相同的数量,不同长度的消息会对消费端造成偏移量不一致的问题。
3、Redis Stream 命令
命令 | 描述 |
---|---|
XADD |
添加一条消息到流 |
XRANGE |
返回给定区间内的消息,可以限制消息数量以及逆序输出 |
XREVRANGE |
返回逆序排列的区间内的消息,基本与XRANGE相同 |
XLEN |
获取 Stream 长度 |
XREAD |
读取来自一个或多个 Stream 的新消息。 |
XACK |
消费者确认一条或多条消息 |
XDEL |
删除一条或多条数据 |
XGROUP |
创建和管理 Stream 的消费者组,由多个消费者共同消费消息。 |
4、使用 Redis Stream 构建简单消息系统
通过 Redis Stream ,我们可以快速构建一个消息系统,以便管理日志、分发任务等。下面是一个简单的Python demo,它将使用Redis Stream来发送和接收消息。
首先,我们需要安装Python的redis库(pip install redis)。接着,可以直接使用 Redis 的 STREAM 命令来构建消息系统,代码如下:
import redis
conn = redis.Redis()
def client_post_message(n):
for i in range(n):
message = input("Message:")
conn.execute_command("XADD", "mystream", "*", "message", message)
def client_receive_message():
while True:
last_id = "0-0" if last_id is None else last_id
response = conn.execute_command("XREAD", "COUNT", 1, "BLOCK", 0, "STREAMS", "mystream", last_id)
streams = response[0][1]
for stream in streams:
msg = stream[1][1].decode("utf-8")
print(f"Received message: {msg}")
last_id = stream[0]
if __name__ == '__main__':
import sys
last_id = None
if len(sys.argv) > 1:
client_post_message(int(sys.argv[1]))
else:
client_receive_message()
上面的 Python 代码中,将 Redis 连接到本地 Redis 实例来发送和接收数据。客户端按需选择post 消息或者recv 消息。接到的消息会被输出到控制台。
5、总结
Redis Stream 支持多消费者的实时消息分发,其内部实现为基于链表的消息队列,并且应在存储中尽可能长时间保留数据,从而支持最终一致性模型。Redis Stream 不仅适用于大规模数据处理,也可以用于小规模的消息通讯。即使在网络延迟的情况下,Redis Stream 仍能够保证消息的可靠传递,是一个性能优异、易于扩展的优秀实时数据处理工具。