发布订阅概述
Redis 发布订阅(Pub/Sub)是一种消息通信模式,发送者(Publisher)发送消息,订阅者(Subscriber)接收消息。发送者和订阅者之间通过频道(Channel)进行解耦,发送者不需要知道有哪些订阅者,订阅者也不需要知道有哪些发送者。这种模式特别适合实时消息传递、事件通知和分布式系统间的通信。
发布订阅架构
📤 Publisher 1
消息发布者
📤 Publisher 2
消息发布者
→
📺 Channel
消息频道
→
📥 Subscriber 1
消息订阅者
📥 Subscriber 2
消息订阅者
特点:解耦通信、实时传递、多对多关系、无消息持久化
🛠️ 基本命令
Redis 发布订阅的核心操作命令:
命令 | 语法 | 描述 | 返回值 |
---|---|---|---|
SUBSCRIBE |
SUBSCRIBE channel [channel ...] |
订阅一个或多个频道 | 订阅确认信息 |
UNSUBSCRIBE |
UNSUBSCRIBE [channel [channel ...]] |
取消订阅频道 | 取消订阅确认 |
PUBLISH |
PUBLISH channel message |
向频道发布消息 | 接收消息的订阅者数量 |
PSUBSCRIBE |
PSUBSCRIBE pattern [pattern ...] |
订阅匹配模式的频道 | 订阅确认信息 |
PUNSUBSCRIBE |
PUNSUBSCRIBE [pattern [pattern ...]] |
取消模式订阅 | 取消订阅确认 |
PUBSUB |
PUBSUB subcommand [argument [argument ...]] |
查看发布订阅状态 | 状态信息 |
基本操作示例:
# 终端1:订阅者 # 订阅单个频道 SUBSCRIBE news # 返回: # 1) "subscribe" # 2) "news" # 3) (integer) 1 # 订阅多个频道 SUBSCRIBE news sports tech # 返回订阅确认信息 # 终端2:发布者 # 发布消息到 news 频道 PUBLISH news "Breaking: Redis 7.0 released!" # 返回:(integer) 1 # 表示有1个订阅者收到消息 # 发布消息到 sports 频道 PUBLISH sports "Football match tonight" # 返回:(integer) 1 # 终端1会收到消息: # 1) "message" # 2) "news" # 3) "Breaking: Redis 7.0 released!" # 取消订阅 UNSUBSCRIBE news # 返回: # 1) "unsubscribe" # 2) "news" # 3) (integer) 2 # 剩余订阅频道数
模式订阅示例:
# 终端1:模式订阅 # 订阅所有以 "news:" 开头的频道 PSUBSCRIBE news:* # 返回: # 1) "psubscribe" # 2) "news:*" # 3) (integer) 1 # 订阅多个模式 PSUBSCRIBE news:* sports:* tech:* # 终端2:发布消息 # 发布到具体频道 PUBLISH news:china "Local news update" PUBLISH news:world "International news" PUBLISH sports:football "Match result" PUBLISH tech:ai "AI breakthrough" # 终端1会收到匹配的消息: # 1) "pmessage" # 2) "news:*" # 匹配的模式 # 3) "news:china" # 实际频道 # 4) "Local news update" # 消息内容 # 取消模式订阅 PUNSUBSCRIBE news:* # 返回: # 1) "punsubscribe" # 2) "news:*" # 3) (integer) 2 # 剩余模式订阅数
🔍 状态查询命令
查看发布订阅系统的状态信息:
命令 | 描述 | 示例 |
---|---|---|
PUBSUB CHANNELS |
列出当前活跃的频道 | PUBSUB CHANNELS |
PUBSUB CHANNELS pattern |
列出匹配模式的活跃频道 | PUBSUB CHANNELS news:* |
PUBSUB NUMSUB |
查看频道的订阅者数量 | PUBSUB NUMSUB news sports |
PUBSUB NUMPAT |
查看模式订阅的总数 | PUBSUB NUMPAT |
状态查询示例:
# 查看所有活跃频道 PUBSUB CHANNELS # 返回: # 1) "news" # 2) "sports" # 3) "tech" # 查看匹配模式的频道 PUBSUB CHANNELS news:* # 返回: # 1) "news:china" # 2) "news:world" # 查看特定频道的订阅者数量 PUBSUB NUMSUB news sports # 返回: # 1) "news" # 2) (integer) 3 # news 频道有3个订阅者 # 3) "sports" # 4) (integer) 1 # sports 频道有1个订阅者 # 查看模式订阅总数 PUBSUB NUMPAT # 返回:(integer) 2 # 总共有2个模式订阅
🎯 应用场景
Redis 发布订阅在实际项目中的典型应用:
💬 实时聊天系统
# 用户加入聊天室 SUBSCRIBE chatroom:general SUBSCRIBE chatroom:tech # 用户发送消息 PUBLISH chatroom:general "Hello everyone!" PUBLISH chatroom:tech "Anyone using Redis?" # 私聊频道 SUBSCRIBE user:private:123 PUBLISH user:private:123 "Private message" # 系统通知 PUBLISH system:notifications "Server maintenance in 10 minutes"
优势:实时性强,支持多房间,易于扩展
📊 实时数据推送
# 股票价格推送 SUBSCRIBE stock:AAPL SUBSCRIBE stock:GOOGL PUBLISH stock:AAPL "150.25" # 系统监控数据 SUBSCRIBE monitor:cpu SUBSCRIBE monitor:memory PUBLISH monitor:cpu "85%" PUBLISH monitor:memory "70%" # 实时日志 SUBSCRIBE logs:error SUBSCRIBE logs:warning PUBLISH logs:error "Database connection failed"
优势:低延迟,高并发,实时监控
🔔 事件通知系统
# 用户行为事件 SUBSCRIBE events:user:login SUBSCRIBE events:user:purchase PUBLISH events:user:login "user123 logged in" PUBLISH events:user:purchase "user456 bought item789" # 系统事件 SUBSCRIBE events:system:* PUBLISH events:system:backup "Backup completed" PUBLISH events:system:deploy "New version deployed" # 业务事件 SUBSCRIBE events:order:* PUBLISH events:order:created "Order #12345 created" PUBLISH events:order:shipped "Order #12345 shipped"
优势:事件驱动,松耦合,易于集成
🎮 游戏实时通信
# 游戏房间通信 SUBSCRIBE game:room:123 PUBLISH game:room:123 "Player joined" PUBLISH game:room:123 "Game started" # 玩家状态更新 SUBSCRIBE player:status:* PUBLISH player:status:user123 "level up" PUBLISH player:status:user456 "achievement unlocked" # 全服公告 SUBSCRIBE server:announcements PUBLISH server:announcements "Server maintenance tonight" # 排行榜更新 SUBSCRIBE leaderboard:updates PUBLISH leaderboard:updates "New #1 player: user789"
优势:实时互动,多人同步,状态广播
🏭 微服务通信
# 服务间事件通信 SUBSCRIBE service:user:events SUBSCRIBE service:order:events PUBLISH service:user:events "user_registered" PUBLISH service:order:events "order_completed" # 配置更新通知 SUBSCRIBE config:updates PUBLISH config:updates "database_config_changed" # 健康检查 SUBSCRIBE health:checks PUBLISH health:checks "service_a:healthy" PUBLISH health:checks "service_b:unhealthy" # 缓存失效通知 SUBSCRIBE cache:invalidation PUBLISH cache:invalidation "user:123:profile"
优势:服务解耦,事件驱动,分布式通信
📱 移动应用推送
# 用户个性化推送 SUBSCRIBE push:user:123 PUBLISH push:user:123 "You have a new message" # 群组推送 SUBSCRIBE push:group:developers PUBLISH push:group:developers "New API documentation available" # 地理位置推送 SUBSCRIBE push:location:beijing PUBLISH push:location:beijing "Weather alert: Heavy rain" # 应用更新通知 SUBSCRIBE app:updates PUBLISH app:updates "Version 2.0 available for download"
优势:个性化推送,分组管理,实时到达
💻 客户端实现
不同编程语言的发布订阅实现示例:
Java 实现(Jedis)
import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPubSub; // 订阅者实现 class MySubscriber extends JedisPubSub { @Override public void onMessage(String channel, String message) { System.out.println("收到消息: " + channel + " -> " + message); } @Override public void onSubscribe(String channel, int subscribedChannels) { System.out.println("订阅频道: " + channel); } @Override public void onUnsubscribe(String channel, int subscribedChannels) { System.out.println("取消订阅: " + channel); } } // 使用示例 public class PubSubExample { public static void main(String[] args) { Jedis jedis = new Jedis("localhost", 6379); // 订阅者线程 new Thread(() -> { Jedis subscriber = new Jedis("localhost", 6379); MySubscriber mySubscriber = new MySubscriber(); subscriber.subscribe(mySubscriber, "news", "sports"); }).start(); // 发布者 Thread.sleep(1000); // 等待订阅建立 jedis.publish("news", "Breaking news!"); jedis.publish("sports", "Match result"); jedis.close(); } }
Python 实现(redis-py)
import redis import threading import time # 订阅者函数 def subscriber(): r = redis.Redis(host='localhost', port=6379, decode_responses=True) pubsub = r.pubsub() # 订阅频道 pubsub.subscribe('news', 'sports') # 监听消息 for message in pubsub.listen(): if message['type'] == 'message': print(f"收到消息: {message['channel']} -> {message['data']}") elif message['type'] == 'subscribe': print(f"订阅频道: {message['channel']}") # 发布者函数 def publisher(): r = redis.Redis(host='localhost', port=6379, decode_responses=True) time.sleep(1) # 等待订阅建立 # 发布消息 r.publish('news', 'Breaking news!') r.publish('sports', 'Match result') print("消息已发布") # 使用示例 if __name__ == '__main__': # 启动订阅者线程 sub_thread = threading.Thread(target=subscriber) sub_thread.daemon = True sub_thread.start() # 启动发布者 pub_thread = threading.Thread(target=publisher) pub_thread.start() # 等待 pub_thread.join() time.sleep(2)
Node.js 实现(ioredis)
const Redis = require('ioredis'); // 创建 Redis 连接 const publisher = new Redis({ host: 'localhost', port: 6379 }); const subscriber = new Redis({ host: 'localhost', port: 6379 }); // 订阅者 subscriber.subscribe('news', 'sports'); subscriber.on('subscribe', (channel, count) => { console.log(`订阅频道: ${channel}, 总订阅数: ${count}`); }); subscriber.on('message', (channel, message) => { console.log(`收到消息: ${channel} -> ${message}`); }); // 发布者 setTimeout(() => { publisher.publish('news', 'Breaking news!'); publisher.publish('sports', 'Match result'); console.log('消息已发布'); }, 1000); // 模式订阅示例 const patternSubscriber = new Redis(); patternSubscriber.psubscribe('news:*'); patternSubscriber.on('pmessage', (pattern, channel, message) => { console.log(`模式匹配: ${pattern} -> ${channel} -> ${message}`); }); // 发布到模式匹配的频道 setTimeout(() => { publisher.publish('news:china', 'Local news'); publisher.publish('news:world', 'International news'); }, 2000);
Go 实现(go-redis)
package main import ( "context" "fmt" "time" "github.com/go-redis/redis/v8" ) func main() { ctx := context.Background() // 创建 Redis 客户端 rdb := redis.NewClient(&redis.Options{ Addr: "localhost:6379", }) // 订阅者 goroutine go func() { pubsub := rdb.Subscribe(ctx, "news", "sports") defer pubsub.Close() // 监听消息 ch := pubsub.Channel() for msg := range ch { fmt.Printf("收到消息: %s -> %s\n", msg.Channel, msg.Payload) } }() // 等待订阅建立 time.Sleep(time.Second) // 发布消息 err := rdb.Publish(ctx, "news", "Breaking news!").Err() if err != nil { panic(err) } err = rdb.Publish(ctx, "sports", "Match result").Err() if err != nil { panic(err) } fmt.Println("消息已发布") // 模式订阅示例 go func() { pubsub := rdb.PSubscribe(ctx, "news:*") defer pubsub.Close() ch := pubsub.Channel() for msg := range ch { fmt.Printf("模式匹配: %s -> %s\n", msg.Channel, msg.Payload) } }() time.Sleep(time.Second) // 发布到模式匹配的频道 rdb.Publish(ctx, "news:china", "Local news") rdb.Publish(ctx, "news:world", "International news") // 等待消息处理 time.Sleep(2 * time.Second) }
发布订阅优缺点
方面 | 优点 | 缺点 |
---|---|---|
实时性 | 消息即时推送,延迟极低 | 网络问题可能导致消息丢失 |
解耦性 | 发布者和订阅者完全解耦 | 难以追踪消息流向 |
扩展性 | 支持多对多通信模式 | 大量订阅者时性能下降 |
持久化 | 内存占用小,性能高 | 消息不持久化,重启丢失 |
可靠性 | 简单易用,配置简单 | 无消息确认机制 |
⚠️ 注意事项和限制
使用 Redis 发布订阅时需要注意的问题:
🚨 消息丢失风险:
- 无持久化:消息不会被持久化存储
- 订阅者离线:离线期间的消息会丢失
- 网络问题:网络中断可能导致消息丢失
- 缓冲区满:客户端缓冲区满时会断开连接
⚠️ 性能考虑:
- 内存使用:大量订阅者会增加内存消耗
- CPU 开销:消息分发需要 CPU 资源
- 网络带宽:大量消息会占用网络带宽
- 连接数限制:每个订阅者需要一个连接
💡 最佳实践:
- 合理设计频道:避免频道过多或过少
- 控制消息大小:避免发送过大的消息
- 监控订阅者:定期检查订阅者状态
- 错误处理:实现重连和错误恢复机制
- 消息格式:使用结构化的消息格式(JSON等)
- 频道命名:使用有意义的频道命名规范
生产环境配置建议:
# Redis 配置优化 # 客户端输出缓冲区限制 client-output-buffer-limit pubsub 32mb 8mb 60 # 最大客户端连接数 maxclients 10000 # 超时设置 timeout 300 # 监控脚本示例 #!/bin/bash # 监控发布订阅状态 echo "=== Redis 发布订阅监控 ===" # 检查活跃频道 echo "活跃频道数量:" redis-cli PUBSUB CHANNELS | wc -l # 检查模式订阅数量 echo "模式订阅数量:" redis-cli PUBSUB NUMPAT # 检查特定频道订阅者数量 echo "重要频道订阅者数量:" redis-cli PUBSUB NUMSUB news sports tech # 检查客户端连接数 echo "客户端连接数:" redis-cli INFO clients | grep connected_clients # 检查内存使用 echo "内存使用情况:" redis-cli INFO memory | grep used_memory_human
✅ 适用场景总结:
- 实时通信:聊天系统、实时评论、在线游戏
- 事件通知:系统监控、业务事件、状态变更
- 数据推送:股票价格、传感器数据、日志流
- 微服务通信:服务间事件、配置更新、健康检查
- 缓存失效:分布式缓存同步、数据一致性