📜 Redis 脚本

使用 Lua 脚本实现复杂的原子性操作

Redis 脚本概述

Redis 从 2.6 版本开始支持 Lua 脚本,允许用户在 Redis 服务器端执行复杂的逻辑操作。Lua 脚本在 Redis 中具有原子性,整个脚本作为一个整体执行,不会被其他命令打断。这使得 Lua 脚本成为实现复杂业务逻辑、保证数据一致性的强大工具。

Redis Lua 脚本特性

⚛️

原子性执行

整个脚本作为一个原子操作执行,不会被其他命令中断

🚀

高性能

在服务器端执行,减少网络往返,提高执行效率

🔒

数据一致性

保证复杂操作的数据一致性,避免竞态条件

💾

脚本缓存

脚本会被缓存,重复执行时只需传递 SHA1 值

🔧

灵活性

支持复杂的条件判断和循环逻辑

🌐

跨平台

Lua 语言简单易学,跨平台兼容性好

1. 编写脚本

使用 Lua 语言编写业务逻辑

2. 执行脚本

使用 EVAL 命令执行脚本

3. 脚本缓存

Redis 自动缓存脚本

4. 重复执行

使用 EVALSHA 快速执行

🛠️ 脚本基本命令

Redis 脚本的核心命令及其使用方法:

命令 语法 描述 返回值
EVAL EVAL script numkeys key [key ...] arg [arg ...] 执行 Lua 脚本 脚本返回值
EVALSHA EVALSHA sha1 numkeys key [key ...] arg [arg ...] 通过 SHA1 值执行缓存的脚本 脚本返回值
SCRIPT LOAD SCRIPT LOAD script 加载脚本到缓存,返回 SHA1 脚本的 SHA1 值
SCRIPT EXISTS SCRIPT EXISTS sha1 [sha1 ...] 检查脚本是否存在于缓存中 存在状态数组
SCRIPT FLUSH SCRIPT FLUSH 清空脚本缓存 OK
SCRIPT KILL SCRIPT KILL 终止正在执行的脚本 OK

基本脚本示例:

# 简单的 Hello World 脚本
EVAL "return 'Hello, Redis Lua!'" 0
# 返回:"Hello, Redis Lua!"

# 获取键值的脚本
EVAL "return redis.call('GET', KEYS[1])" 1 mykey
# 返回:mykey 的值

# 设置键值的脚本
EVAL "return redis.call('SET', KEYS[1], ARGV[1])" 1 mykey "Hello World"
# 返回:OK

# 条件设置脚本
local script = [[
    local key = KEYS[1]
    local value = ARGV[1]
    local current = redis.call('GET', key)
    if current == false then
        redis.call('SET', key, value)
        return 1
    else
        return 0
    end
]]

EVAL script 1 "newkey" "newvalue"
# 返回:1 (设置成功) 或 0 (键已存在)

脚本缓存示例:

# 加载脚本到缓存
SCRIPT LOAD "return redis.call('GET', KEYS[1])"
# 返回:"4e6d8fc8bb01276962cce5371fa795a7763657ae"

# 检查脚本是否存在
SCRIPT EXISTS "4e6d8fc8bb01276962cce5371fa795a7763657ae"
# 返回:1) (integer) 1

# 使用 SHA1 执行脚本
EVALSHA "4e6d8fc8bb01276962cce5371fa795a7763657ae" 1 mykey
# 返回:mykey 的值

# 清空脚本缓存
SCRIPT FLUSH
# 返回:OK

# 再次检查脚本
SCRIPT EXISTS "4e6d8fc8bb01276962cce5371fa795a7763657ae"
# 返回:1) (integer) 0

📚 Lua 语法基础

在 Redis 中使用的 Lua 语法要点:

变量和数据类型:

-- 变量声明 local name = "Redis" local count = 100 local flag = true local arr = {1, 2, 3, 4, 5} local obj = {name="Redis", version="7.0"} -- 字符串操作 local str1 = "Hello" local str2 = "World" local result = str1 .. " " .. str2 -- 字符串连接 -- 数值操作 local a = 10 local b = 20 local sum = a + b local product = a * b

条件判断和循环:

-- 条件判断 if count > 50 then return "High" elseif count > 20 then return "Medium" else return "Low" end -- for 循环 for i = 1, 10 do redis.call('INCR', 'counter') end -- while 循环 local i = 1 while i <= 5 do redis.call('LPUSH', 'list', i) i = i + 1 end -- 遍历数组 for index, value in ipairs(arr) do redis.call('SET', 'key' .. index, value) end

Redis 命令调用:

-- redis.call() - 严格模式,出错时停止执行 local value = redis.call('GET', 'mykey') if value == false then redis.call('SET', 'mykey', 'default') end -- redis.pcall() - 保护模式,出错时返回错误信息 local result = redis.pcall('INCR', 'string_key') if type(result) == 'table' and result.err then return "Error: " .. result.err end -- 批量操作 redis.call('MULTI') redis.call('SET', 'key1', 'value1') redis.call('SET', 'key2', 'value2') redis.call('EXEC')

KEYS 和 ARGV 参数:

-- KEYS[1], KEYS[2], ... 访问键参数 -- ARGV[1], ARGV[2], ... 访问值参数 local key1 = KEYS[1] -- 第一个键 local key2 = KEYS[2] -- 第二个键 local value1 = ARGV[1] -- 第一个参数 local value2 = ARGV[2] -- 第二个参数 -- 获取参数数量 local key_count = #KEYS local arg_count = #ARGV -- 遍历所有键 for i = 1, #KEYS do redis.call('DEL', KEYS[i]) end

🎯 实际应用案例

Redis Lua 脚本在实际项目中的典型应用:

🔒 分布式锁

-- 获取锁脚本
local lock_script = [[
    local key = KEYS[1]
    local identifier = ARGV[1]
    local expire = ARGV[2]
    
    local result = redis.call('SET', key, identifier, 'EX', expire, 'NX')
    if result then
        return 1
    else
        return 0
    end
]]

-- 释放锁脚本
local unlock_script = [[
    local key = KEYS[1]
    local identifier = ARGV[1]
    
    local current = redis.call('GET', key)
    if current == identifier then
        return redis.call('DEL', key)
    else
        return 0
    end
]]

-- 使用示例
EVAL lock_script 1 "lock:resource" "client123" 30
EVAL unlock_script 1 "lock:resource" "client123"

优势:原子性操作,避免锁的误删除

📊 限流器

-- 滑动窗口限流脚本
local rate_limit_script = [[
    local key = KEYS[1]
    local window = tonumber(ARGV[1])  -- 时间窗口(秒)
    local limit = tonumber(ARGV[2])   -- 限制次数
    local current_time = tonumber(ARGV[3])
    
    -- 清理过期记录
    redis.call('ZREMRANGEBYSCORE', key, 0, current_time - window)
    
    -- 获取当前窗口内的请求数
    local current_requests = redis.call('ZCARD', key)
    
    if current_requests < limit then
        -- 添加当前请求
        redis.call('ZADD', key, current_time, current_time)
        redis.call('EXPIRE', key, window)
        return {1, limit - current_requests - 1}
    else
        return {0, 0}
    end
]]

-- 使用示例
local current_time = os.time()
EVAL rate_limit_script 1 "rate_limit:user123" 60 10 current_time

优势:精确的滑动窗口限流,高性能

💰 库存扣减

-- 原子性库存扣减脚本
local stock_deduct_script = [[
    local stock_key = KEYS[1]
    local order_key = KEYS[2]
    local quantity = tonumber(ARGV[1])
    local order_id = ARGV[2]
    
    -- 获取当前库存
    local current_stock = tonumber(redis.call('GET', stock_key) or 0)
    
    -- 检查库存是否足够
    if current_stock >= quantity then
        -- 扣减库存
        redis.call('DECRBY', stock_key, quantity)
        -- 记录订单
        redis.call('SADD', order_key, order_id)
        -- 返回成功和剩余库存
        return {1, current_stock - quantity}
    else
        -- 返回失败和当前库存
        return {0, current_stock}
    end
]]

-- 使用示例
EVAL stock_deduct_script 2 "stock:product123" "orders:product123" 5 "order456"

优势:防止超卖,保证库存准确性

🎲 抽奖系统

-- 加权随机抽奖脚本
local lottery_script = [[
    local user_key = KEYS[1]
    local prizes_key = KEYS[2]
    local user_id = ARGV[1]
    local max_draws = tonumber(ARGV[2])
    
    -- 检查用户抽奖次数
    local user_draws = tonumber(redis.call('GET', user_key) or 0)
    if user_draws >= max_draws then
        return {0, "已达到最大抽奖次数"}
    end
    
    -- 获取奖品配置
    local prizes = redis.call('HGETALL', prizes_key)
    local total_weight = 0
    local prize_list = {}
    
    -- 计算总权重
    for i = 1, #prizes, 2 do
        local prize_name = prizes[i]
        local weight = tonumber(prizes[i + 1])
        total_weight = total_weight + weight
        table.insert(prize_list, {prize_name, weight})
    end
    
    -- 生成随机数
    math.randomseed(os.time() + user_draws)
    local random_num = math.random(1, total_weight)
    
    -- 确定中奖奖品
    local current_weight = 0
    for _, prize in ipairs(prize_list) do
        current_weight = current_weight + prize[2]
        if random_num <= current_weight then
            -- 增加用户抽奖次数
            redis.call('INCR', user_key)
            redis.call('EXPIRE', user_key, 86400)  -- 24小时过期
            return {1, prize[1]}
        end
    end
    
    return {0, "抽奖失败"}
]]

-- 使用示例
EVAL lottery_script 2 "draws:user123" "prizes:config" "user123" 3

优势:公平抽奖,防止作弊,支持加权随机

📈 排行榜更新

-- 排行榜更新脚本
local leaderboard_script = [[
    local board_key = KEYS[1]
    local user_key = KEYS[2]
    local user_id = ARGV[1]
    local score = tonumber(ARGV[2])
    local max_size = tonumber(ARGV[3])
    
    -- 获取用户当前分数
    local current_score = redis.call('ZSCORE', board_key, user_id)
    
    -- 只有分数更高时才更新
    if not current_score or score > tonumber(current_score) then
        -- 更新排行榜
        redis.call('ZADD', board_key, score, user_id)
        
        -- 限制排行榜大小
        local board_size = redis.call('ZCARD', board_key)
        if board_size > max_size then
            redis.call('ZREMRANGEBYRANK', board_key, 0, board_size - max_size - 1)
        end
        
        -- 更新用户最高分
        redis.call('SET', user_key, score)
        
        -- 获取用户排名
        local rank = redis.call('ZREVRANK', board_key, user_id)
        return {1, rank + 1, score}
    else
        local rank = redis.call('ZREVRANK', board_key, user_id)
        return {0, rank and (rank + 1) or -1, current_score}
    end
]]

-- 使用示例
EVAL leaderboard_script 2 "leaderboard:game" "user:123:best" "user123" 9500 100

优势:高效的排行榜维护,支持分数比较

🔄 缓存更新

-- 缓存更新脚本
local cache_update_script = [[
    local cache_key = KEYS[1]
    local lock_key = KEYS[2]
    local data = ARGV[1]
    local ttl = tonumber(ARGV[2])
    local lock_ttl = tonumber(ARGV[3])
    
    -- 尝试获取更新锁
    local lock_result = redis.call('SET', lock_key, '1', 'EX', lock_ttl, 'NX')
    
    if lock_result then
        -- 获取锁成功,更新缓存
        redis.call('SET', cache_key, data, 'EX', ttl)
        redis.call('DEL', lock_key)
        return {1, "缓存更新成功"}
    else
        -- 获取锁失败,检查缓存是否存在
        local cache_exists = redis.call('EXISTS', cache_key)
        if cache_exists == 1 then
            return {0, "缓存已存在,跳过更新"}
        else
            return {-1, "等待其他进程更新缓存"}
        end
    end
]]

-- 使用示例
EVAL cache_update_script 2 "cache:user:123" "lock:cache:user:123" "user_data" 3600 10

优势:防止缓存击穿,避免重复更新

脚本性能和最佳实践

方面 优点 缺点 最佳实践
原子性 完全原子性执行 长脚本会阻塞服务器 控制脚本执行时间
性能 减少网络往返 复杂逻辑影响性能 优化算法复杂度
缓存 脚本自动缓存 内存占用增加 定期清理无用脚本
调试 支持错误返回 调试相对困难 充分测试和日志记录

⚠️ 注意事项:

  • 执行时间:避免编写执行时间过长的脚本
  • 内存使用:注意脚本中的内存分配,避免内存泄漏
  • 随机性:脚本中的随机数生成需要注意种子设置
  • 错误处理:使用 pcall 处理可能出错的操作
  • 键的访问:只能访问通过 KEYS 参数传递的键

✅ 最佳实践建议:

  • 脚本设计:保持脚本简洁,单一职责
  • 参数验证:在脚本开始时验证输入参数
  • 错误处理:合理使用 redis.call 和 redis.pcall
  • 性能优化:避免不必要的循环和复杂计算
  • 版本管理:为脚本建立版本管理机制
  • 测试覆盖:编写充分的测试用例
  • 监控告警:监控脚本执行时间和错误率

💡 使用建议:

  • 适用场景:复杂的原子性操作、条件判断、批量处理
  • 不适用场景:简单的单命令操作、长时间运行的任务
  • 替代方案:简单操作使用事务,复杂业务逻辑考虑应用层处理
  • 调试技巧:使用 redis.log 输出调试信息