🌊 Redis Stream

探索 Redis 流数据结构的强大功能

Redis Stream 概述

Redis Stream 是 Redis 5.0 引入的新数据结构,专门用于处理流数据。它结合了消息队列和时间序列数据的特点,提供了强大的流处理能力,支持消费者组、消息确认、持久化等高级功能。

📝 添加消息

使用 XADD 命令向流中添加消息:

# 添加消息(自动生成ID) XADD mystream * name "张三" age 25 city "北京" # 指定消息ID XADD mystream 1609459200000-0 name "李四" age 30 city "上海" # 添加多个字段 XADD mystream * user_id 1001 action "login" timestamp 1609459200 ip "192.168.1.100" # 限制流的最大长度 XADD mystream MAXLEN 1000 * event "purchase" amount 99.99

消息ID格式:

  • 时间戳-序列号:如 1609459200000-0
  • *:自动生成ID
  • 毫秒时间戳:确保消息的时间顺序
  • 序列号:同一毫秒内的消息序号

📖 读取消息

使用 XREAD 命令读取流中的消息:

# 从头开始读取所有消息 XREAD STREAMS mystream 0 # 读取指定ID之后的消息 XREAD STREAMS mystream 1609459200000-0 # 阻塞读取新消息 XREAD BLOCK 0 STREAMS mystream $ # 设置阻塞超时时间(毫秒) XREAD BLOCK 5000 STREAMS mystream $ # 限制读取数量 XREAD COUNT 10 STREAMS mystream 0 # 同时读取多个流 XREAD STREAMS stream1 stream2 0 0

特殊ID说明:

  • 0:从流的开始读取
  • $:只读取新添加的消息
  • >:在消费者组中表示未被消费的消息

👥 消费者组

Redis Stream 支持消费者组模式,实现负载均衡和消息确认:

# 创建消费者组 XGROUP CREATE mystream mygroup 0 MKSTREAM # 从最新消息开始创建组 XGROUP CREATE mystream mygroup $ # 消费者读取消息 XREADGROUP GROUP mygroup consumer1 COUNT 1 STREAMS mystream > # 确认消息处理完成 XACK mystream mygroup 1609459200000-0 # 查看消费者组信息 XINFO GROUPS mystream # 查看消费者信息 XINFO CONSUMERS mystream mygroup # 查看待确认消息 XPENDING mystream mygroup

🔍 查询和管理

Stream 提供了丰富的查询和管理命令:

# 查看流信息 XINFO STREAM mystream # 获取流长度 XLEN mystream # 范围查询 XRANGE mystream - + XRANGE mystream 1609459200000 1609459300000 # 反向范围查询 XREVRANGE mystream + - COUNT 10 # 删除消息 XDEL mystream 1609459200000-0 # 修剪流(删除旧消息) XTRIM mystream MAXLEN 1000 XTRIM mystream MAXLEN ~ 1000 # 近似修剪,性能更好

⚠️ 异常处理

处理消费过程中的异常情况:

# 查看待确认消息详情 XPENDING mystream mygroup - + 10 consumer1 # 转移超时消息给其他消费者 XCLAIM mystream mygroup consumer2 60000 1609459200000-0 # 自动转移超时消息 XAUTOCLAIM mystream mygroup consumer2 60000 0 # 删除消费者 XGROUP DELCONSUMER mystream mygroup consumer1 # 删除消费者组 XGROUP DESTROY mystream mygroup
注意: 未确认的消息会一直保留在待确认列表中,需要定期处理超时消息。

🏗️ 实际应用场景

Redis Stream 在实际项目中的应用:

典型应用场景:

  • 事件溯源:记录系统中的所有事件
  • 消息队列:异步任务处理
  • 日志收集:实时日志流处理
  • 监控数据:时间序列数据存储
  • 用户行为追踪:记录用户操作轨迹
# 用户行为追踪示例 # 记录用户行为 XADD user_actions * user_id 1001 action "page_view" page "/home" timestamp 1609459200 XADD user_actions * user_id 1001 action "click" element "buy_button" timestamp 1609459210 XADD user_actions * user_id 1001 action "purchase" product_id 2001 amount 99.99 timestamp 1609459220 # 创建分析消费者组 XGROUP CREATE user_actions analytics_group 0 MKSTREAM # 分析消费者读取数据 XREADGROUP GROUP analytics_group analyzer1 COUNT 10 STREAMS user_actions > # 确认处理完成 XACK user_actions analytics_group 1609459200000-0 1609459210000-0

⚡ 性能优化

Redis Stream 性能优化建议:

优化策略:

  • 合理设置流的最大长度,避免内存过度使用
  • 使用近似修剪(~)提高性能
  • 批量读取消息,减少网络往返
  • 及时确认消息,避免待确认列表过大
  • 监控消费者组状态,处理异常消费者
# 性能优化示例 # 批量添加消息 XADD mystream * field1 value1 XADD mystream * field2 value2 XADD mystream * field3 value3 # 批量读取 XREADGROUP GROUP mygroup consumer1 COUNT 100 STREAMS mystream > # 近似修剪 XTRIM mystream MAXLEN ~ 10000 # 设置合理的阻塞时间 XREAD BLOCK 1000 STREAMS mystream $

🔧 Stream 命令总结

命令 功能 示例
XADD 添加消息 XADD stream * field value
XREAD 读取消息 XREAD STREAMS stream 0
XGROUP CREATE 创建消费者组 XGROUP CREATE stream group 0
XREADGROUP 消费者组读取 XREADGROUP GROUP group consumer STREAMS stream >
XACK 确认消息 XACK stream group id
XLEN 获取流长度 XLEN stream
XRANGE 范围查询 XRANGE stream - +
XTRIM 修剪流 XTRIM stream MAXLEN 1000
XPENDING 查看待确认消息 XPENDING stream group
XINFO 查看流信息 XINFO STREAM stream