📡 Redis 发布订阅

实现实时消息传递的通信机制

发布订阅概述

Redis 发布订阅(Pub/Sub)是一种消息通信模式,发送者(Publisher)发送消息,订阅者(Subscriber)接收消息。发送者和订阅者之间通过频道(Channel)进行解耦,发送者不需要知道有哪些订阅者,订阅者也不需要知道有哪些发送者。这种模式特别适合实时消息传递、事件通知和分布式系统间的通信。

发布订阅架构

📤 Publisher 1

消息发布者

📤 Publisher 2

消息发布者

📺 Channel

消息频道

📥 Subscriber 1

消息订阅者

📥 Subscriber 2

消息订阅者

特点:解耦通信、实时传递、多对多关系、无消息持久化

1. 订阅频道

客户端订阅感兴趣的频道

2. 发布消息

发布者向频道发送消息

3. 消息分发

Redis将消息推送给订阅者

4. 接收处理

订阅者接收并处理消息

🛠️ 基本命令

Redis 发布订阅的核心操作命令:

命令 语法 描述 返回值
SUBSCRIBE SUBSCRIBE channel [channel ...] 订阅一个或多个频道 订阅确认信息
UNSUBSCRIBE UNSUBSCRIBE [channel [channel ...]] 取消订阅频道 取消订阅确认
PUBLISH PUBLISH channel message 向频道发布消息 接收消息的订阅者数量
PSUBSCRIBE PSUBSCRIBE pattern [pattern ...] 订阅匹配模式的频道 订阅确认信息
PUNSUBSCRIBE PUNSUBSCRIBE [pattern [pattern ...]] 取消模式订阅 取消订阅确认
PUBSUB PUBSUB subcommand [argument [argument ...]] 查看发布订阅状态 状态信息

基本操作示例:

# 终端1:订阅者
# 订阅单个频道
SUBSCRIBE news
# 返回:
# 1) "subscribe"
# 2) "news"
# 3) (integer) 1

# 订阅多个频道
SUBSCRIBE news sports tech
# 返回订阅确认信息

# 终端2:发布者
# 发布消息到 news 频道
PUBLISH news "Breaking: Redis 7.0 released!"
# 返回:(integer) 1  # 表示有1个订阅者收到消息

# 发布消息到 sports 频道
PUBLISH sports "Football match tonight"
# 返回:(integer) 1

# 终端1会收到消息:
# 1) "message"
# 2) "news"
# 3) "Breaking: Redis 7.0 released!"

# 取消订阅
UNSUBSCRIBE news
# 返回:
# 1) "unsubscribe"
# 2) "news"
# 3) (integer) 2  # 剩余订阅频道数

模式订阅示例:

# 终端1:模式订阅
# 订阅所有以 "news:" 开头的频道
PSUBSCRIBE news:*
# 返回:
# 1) "psubscribe"
# 2) "news:*"
# 3) (integer) 1

# 订阅多个模式
PSUBSCRIBE news:* sports:* tech:*

# 终端2:发布消息
# 发布到具体频道
PUBLISH news:china "Local news update"
PUBLISH news:world "International news"
PUBLISH sports:football "Match result"
PUBLISH tech:ai "AI breakthrough"

# 终端1会收到匹配的消息:
# 1) "pmessage"
# 2) "news:*"        # 匹配的模式
# 3) "news:china"    # 实际频道
# 4) "Local news update"  # 消息内容

# 取消模式订阅
PUNSUBSCRIBE news:*
# 返回:
# 1) "punsubscribe"
# 2) "news:*"
# 3) (integer) 2  # 剩余模式订阅数

🔍 状态查询命令

查看发布订阅系统的状态信息:

命令 描述 示例
PUBSUB CHANNELS 列出当前活跃的频道 PUBSUB CHANNELS
PUBSUB CHANNELS pattern 列出匹配模式的活跃频道 PUBSUB CHANNELS news:*
PUBSUB NUMSUB 查看频道的订阅者数量 PUBSUB NUMSUB news sports
PUBSUB NUMPAT 查看模式订阅的总数 PUBSUB NUMPAT

状态查询示例:

# 查看所有活跃频道
PUBSUB CHANNELS
# 返回:
# 1) "news"
# 2) "sports"
# 3) "tech"

# 查看匹配模式的频道
PUBSUB CHANNELS news:*
# 返回:
# 1) "news:china"
# 2) "news:world"

# 查看特定频道的订阅者数量
PUBSUB NUMSUB news sports
# 返回:
# 1) "news"
# 2) (integer) 3    # news 频道有3个订阅者
# 3) "sports"
# 4) (integer) 1    # sports 频道有1个订阅者

# 查看模式订阅总数
PUBSUB NUMPAT
# 返回:(integer) 2  # 总共有2个模式订阅

🎯 应用场景

Redis 发布订阅在实际项目中的典型应用:

💬 实时聊天系统

# 用户加入聊天室
SUBSCRIBE chatroom:general
SUBSCRIBE chatroom:tech

# 用户发送消息
PUBLISH chatroom:general "Hello everyone!"
PUBLISH chatroom:tech "Anyone using Redis?"

# 私聊频道
SUBSCRIBE user:private:123
PUBLISH user:private:123 "Private message"

# 系统通知
PUBLISH system:notifications "Server maintenance in 10 minutes"

优势:实时性强,支持多房间,易于扩展

📊 实时数据推送

# 股票价格推送
SUBSCRIBE stock:AAPL
SUBSCRIBE stock:GOOGL
PUBLISH stock:AAPL "150.25"

# 系统监控数据
SUBSCRIBE monitor:cpu
SUBSCRIBE monitor:memory
PUBLISH monitor:cpu "85%"
PUBLISH monitor:memory "70%"

# 实时日志
SUBSCRIBE logs:error
SUBSCRIBE logs:warning
PUBLISH logs:error "Database connection failed"

优势:低延迟,高并发,实时监控

🔔 事件通知系统

# 用户行为事件
SUBSCRIBE events:user:login
SUBSCRIBE events:user:purchase
PUBLISH events:user:login "user123 logged in"
PUBLISH events:user:purchase "user456 bought item789"

# 系统事件
SUBSCRIBE events:system:*
PUBLISH events:system:backup "Backup completed"
PUBLISH events:system:deploy "New version deployed"

# 业务事件
SUBSCRIBE events:order:*
PUBLISH events:order:created "Order #12345 created"
PUBLISH events:order:shipped "Order #12345 shipped"

优势:事件驱动,松耦合,易于集成

🎮 游戏实时通信

# 游戏房间通信
SUBSCRIBE game:room:123
PUBLISH game:room:123 "Player joined"
PUBLISH game:room:123 "Game started"

# 玩家状态更新
SUBSCRIBE player:status:*
PUBLISH player:status:user123 "level up"
PUBLISH player:status:user456 "achievement unlocked"

# 全服公告
SUBSCRIBE server:announcements
PUBLISH server:announcements "Server maintenance tonight"

# 排行榜更新
SUBSCRIBE leaderboard:updates
PUBLISH leaderboard:updates "New #1 player: user789"

优势:实时互动,多人同步,状态广播

🏭 微服务通信

# 服务间事件通信
SUBSCRIBE service:user:events
SUBSCRIBE service:order:events
PUBLISH service:user:events "user_registered"
PUBLISH service:order:events "order_completed"

# 配置更新通知
SUBSCRIBE config:updates
PUBLISH config:updates "database_config_changed"

# 健康检查
SUBSCRIBE health:checks
PUBLISH health:checks "service_a:healthy"
PUBLISH health:checks "service_b:unhealthy"

# 缓存失效通知
SUBSCRIBE cache:invalidation
PUBLISH cache:invalidation "user:123:profile"

优势:服务解耦,事件驱动,分布式通信

📱 移动应用推送

# 用户个性化推送
SUBSCRIBE push:user:123
PUBLISH push:user:123 "You have a new message"

# 群组推送
SUBSCRIBE push:group:developers
PUBLISH push:group:developers "New API documentation available"

# 地理位置推送
SUBSCRIBE push:location:beijing
PUBLISH push:location:beijing "Weather alert: Heavy rain"

# 应用更新通知
SUBSCRIBE app:updates
PUBLISH app:updates "Version 2.0 available for download"

优势:个性化推送,分组管理,实时到达

💻 客户端实现

不同编程语言的发布订阅实现示例:

Java 实现(Jedis)

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;

// 订阅者实现
class MySubscriber extends JedisPubSub {
    @Override
    public void onMessage(String channel, String message) {
        System.out.println("收到消息: " + channel + " -> " + message);
    }
    
    @Override
    public void onSubscribe(String channel, int subscribedChannels) {
        System.out.println("订阅频道: " + channel);
    }
    
    @Override
    public void onUnsubscribe(String channel, int subscribedChannels) {
        System.out.println("取消订阅: " + channel);
    }
}

// 使用示例
public class PubSubExample {
    public static void main(String[] args) {
        Jedis jedis = new Jedis("localhost", 6379);
        
        // 订阅者线程
        new Thread(() -> {
            Jedis subscriber = new Jedis("localhost", 6379);
            MySubscriber mySubscriber = new MySubscriber();
            subscriber.subscribe(mySubscriber, "news", "sports");
        }).start();
        
        // 发布者
        Thread.sleep(1000); // 等待订阅建立
        jedis.publish("news", "Breaking news!");
        jedis.publish("sports", "Match result");
        
        jedis.close();
    }
}

Python 实现(redis-py)

import redis
import threading
import time

# 订阅者函数
def subscriber():
    r = redis.Redis(host='localhost', port=6379, decode_responses=True)
    pubsub = r.pubsub()
    
    # 订阅频道
    pubsub.subscribe('news', 'sports')
    
    # 监听消息
    for message in pubsub.listen():
        if message['type'] == 'message':
            print(f"收到消息: {message['channel']} -> {message['data']}")
        elif message['type'] == 'subscribe':
            print(f"订阅频道: {message['channel']}")

# 发布者函数
def publisher():
    r = redis.Redis(host='localhost', port=6379, decode_responses=True)
    
    time.sleep(1)  # 等待订阅建立
    
    # 发布消息
    r.publish('news', 'Breaking news!')
    r.publish('sports', 'Match result')
    
    print("消息已发布")

# 使用示例
if __name__ == '__main__':
    # 启动订阅者线程
    sub_thread = threading.Thread(target=subscriber)
    sub_thread.daemon = True
    sub_thread.start()
    
    # 启动发布者
    pub_thread = threading.Thread(target=publisher)
    pub_thread.start()
    
    # 等待
    pub_thread.join()
    time.sleep(2)

Node.js 实现(ioredis)

const Redis = require('ioredis');

// 创建 Redis 连接
const publisher = new Redis({
    host: 'localhost',
    port: 6379
});

const subscriber = new Redis({
    host: 'localhost',
    port: 6379
});

// 订阅者
subscriber.subscribe('news', 'sports');

subscriber.on('subscribe', (channel, count) => {
    console.log(`订阅频道: ${channel}, 总订阅数: ${count}`);
});

subscriber.on('message', (channel, message) => {
    console.log(`收到消息: ${channel} -> ${message}`);
});

// 发布者
setTimeout(() => {
    publisher.publish('news', 'Breaking news!');
    publisher.publish('sports', 'Match result');
    console.log('消息已发布');
}, 1000);

// 模式订阅示例
const patternSubscriber = new Redis();
patternSubscriber.psubscribe('news:*');

patternSubscriber.on('pmessage', (pattern, channel, message) => {
    console.log(`模式匹配: ${pattern} -> ${channel} -> ${message}`);
});

// 发布到模式匹配的频道
setTimeout(() => {
    publisher.publish('news:china', 'Local news');
    publisher.publish('news:world', 'International news');
}, 2000);

Go 实现(go-redis)

package main

import (
    "context"
    "fmt"
    "time"
    
    "github.com/go-redis/redis/v8"
)

func main() {
    ctx := context.Background()
    
    // 创建 Redis 客户端
    rdb := redis.NewClient(&redis.Options{
        Addr: "localhost:6379",
    })
    
    // 订阅者 goroutine
    go func() {
        pubsub := rdb.Subscribe(ctx, "news", "sports")
        defer pubsub.Close()
        
        // 监听消息
        ch := pubsub.Channel()
        for msg := range ch {
            fmt.Printf("收到消息: %s -> %s\n", msg.Channel, msg.Payload)
        }
    }()
    
    // 等待订阅建立
    time.Sleep(time.Second)
    
    // 发布消息
    err := rdb.Publish(ctx, "news", "Breaking news!").Err()
    if err != nil {
        panic(err)
    }
    
    err = rdb.Publish(ctx, "sports", "Match result").Err()
    if err != nil {
        panic(err)
    }
    
    fmt.Println("消息已发布")
    
    // 模式订阅示例
    go func() {
        pubsub := rdb.PSubscribe(ctx, "news:*")
        defer pubsub.Close()
        
        ch := pubsub.Channel()
        for msg := range ch {
            fmt.Printf("模式匹配: %s -> %s\n", msg.Channel, msg.Payload)
        }
    }()
    
    time.Sleep(time.Second)
    
    // 发布到模式匹配的频道
    rdb.Publish(ctx, "news:china", "Local news")
    rdb.Publish(ctx, "news:world", "International news")
    
    // 等待消息处理
    time.Sleep(2 * time.Second)
}

发布订阅优缺点

方面 优点 缺点
实时性 消息即时推送,延迟极低 网络问题可能导致消息丢失
解耦性 发布者和订阅者完全解耦 难以追踪消息流向
扩展性 支持多对多通信模式 大量订阅者时性能下降
持久化 内存占用小,性能高 消息不持久化,重启丢失
可靠性 简单易用,配置简单 无消息确认机制

⚠️ 注意事项和限制

使用 Redis 发布订阅时需要注意的问题:

🚨 消息丢失风险:

  • 无持久化:消息不会被持久化存储
  • 订阅者离线:离线期间的消息会丢失
  • 网络问题:网络中断可能导致消息丢失
  • 缓冲区满:客户端缓冲区满时会断开连接

⚠️ 性能考虑:

  • 内存使用:大量订阅者会增加内存消耗
  • CPU 开销:消息分发需要 CPU 资源
  • 网络带宽:大量消息会占用网络带宽
  • 连接数限制:每个订阅者需要一个连接

💡 最佳实践:

  • 合理设计频道:避免频道过多或过少
  • 控制消息大小:避免发送过大的消息
  • 监控订阅者:定期检查订阅者状态
  • 错误处理:实现重连和错误恢复机制
  • 消息格式:使用结构化的消息格式(JSON等)
  • 频道命名:使用有意义的频道命名规范

生产环境配置建议:

# Redis 配置优化
# 客户端输出缓冲区限制
client-output-buffer-limit pubsub 32mb 8mb 60

# 最大客户端连接数
maxclients 10000

# 超时设置
timeout 300

# 监控脚本示例
#!/bin/bash
# 监控发布订阅状态

echo "=== Redis 发布订阅监控 ==="

# 检查活跃频道
echo "活跃频道数量:"
redis-cli PUBSUB CHANNELS | wc -l

# 检查模式订阅数量
echo "模式订阅数量:"
redis-cli PUBSUB NUMPAT

# 检查特定频道订阅者数量
echo "重要频道订阅者数量:"
redis-cli PUBSUB NUMSUB news sports tech

# 检查客户端连接数
echo "客户端连接数:"
redis-cli INFO clients | grep connected_clients

# 检查内存使用
echo "内存使用情况:"
redis-cli INFO memory | grep used_memory_human

✅ 适用场景总结:

  • 实时通信:聊天系统、实时评论、在线游戏
  • 事件通知:系统监控、业务事件、状态变更
  • 数据推送:股票价格、传感器数据、日志流
  • 微服务通信:服务间事件、配置更新、健康检查
  • 缓存失效:分布式缓存同步、数据一致性