Redis 脚本概述
Redis 从 2.6 版本开始支持 Lua 脚本,允许用户在 Redis 服务器端执行复杂的逻辑操作。Lua 脚本在 Redis 中具有原子性,整个脚本作为一个整体执行,不会被其他命令打断。这使得 Lua 脚本成为实现复杂业务逻辑、保证数据一致性的强大工具。
Redis Lua 脚本特性
⚛️
原子性执行
整个脚本作为一个原子操作执行,不会被其他命令中断
🚀
高性能
在服务器端执行,减少网络往返,提高执行效率
🔒
数据一致性
保证复杂操作的数据一致性,避免竞态条件
💾
脚本缓存
脚本会被缓存,重复执行时只需传递 SHA1 值
🔧
灵活性
支持复杂的条件判断和循环逻辑
🌐
跨平台
Lua 语言简单易学,跨平台兼容性好
1. 编写脚本
使用 Lua 语言编写业务逻辑
→
2. 执行脚本
使用 EVAL 命令执行脚本
→
3. 脚本缓存
Redis 自动缓存脚本
→
4. 重复执行
使用 EVALSHA 快速执行
🛠️ 脚本基本命令
Redis 脚本的核心命令及其使用方法:
命令 | 语法 | 描述 | 返回值 |
---|---|---|---|
EVAL |
EVAL script numkeys key [key ...] arg [arg ...] |
执行 Lua 脚本 | 脚本返回值 |
EVALSHA |
EVALSHA sha1 numkeys key [key ...] arg [arg ...] |
通过 SHA1 值执行缓存的脚本 | 脚本返回值 |
SCRIPT LOAD |
SCRIPT LOAD script |
加载脚本到缓存,返回 SHA1 | 脚本的 SHA1 值 |
SCRIPT EXISTS |
SCRIPT EXISTS sha1 [sha1 ...] |
检查脚本是否存在于缓存中 | 存在状态数组 |
SCRIPT FLUSH |
SCRIPT FLUSH |
清空脚本缓存 | OK |
SCRIPT KILL |
SCRIPT KILL |
终止正在执行的脚本 | OK |
基本脚本示例:
# 简单的 Hello World 脚本 EVAL "return 'Hello, Redis Lua!'" 0 # 返回:"Hello, Redis Lua!" # 获取键值的脚本 EVAL "return redis.call('GET', KEYS[1])" 1 mykey # 返回:mykey 的值 # 设置键值的脚本 EVAL "return redis.call('SET', KEYS[1], ARGV[1])" 1 mykey "Hello World" # 返回:OK # 条件设置脚本 local script = [[ local key = KEYS[1] local value = ARGV[1] local current = redis.call('GET', key) if current == false then redis.call('SET', key, value) return 1 else return 0 end ]] EVAL script 1 "newkey" "newvalue" # 返回:1 (设置成功) 或 0 (键已存在)
脚本缓存示例:
# 加载脚本到缓存 SCRIPT LOAD "return redis.call('GET', KEYS[1])" # 返回:"4e6d8fc8bb01276962cce5371fa795a7763657ae" # 检查脚本是否存在 SCRIPT EXISTS "4e6d8fc8bb01276962cce5371fa795a7763657ae" # 返回:1) (integer) 1 # 使用 SHA1 执行脚本 EVALSHA "4e6d8fc8bb01276962cce5371fa795a7763657ae" 1 mykey # 返回:mykey 的值 # 清空脚本缓存 SCRIPT FLUSH # 返回:OK # 再次检查脚本 SCRIPT EXISTS "4e6d8fc8bb01276962cce5371fa795a7763657ae" # 返回:1) (integer) 0
📚 Lua 语法基础
在 Redis 中使用的 Lua 语法要点:
变量和数据类型:
-- 变量声明
local name = "Redis"
local count = 100
local flag = true
local arr = {1, 2, 3, 4, 5}
local obj = {name="Redis", version="7.0"}
-- 字符串操作
local str1 = "Hello"
local str2 = "World"
local result = str1 .. " " .. str2 -- 字符串连接
-- 数值操作
local a = 10
local b = 20
local sum = a + b
local product = a * b
条件判断和循环:
-- 条件判断
if count > 50 then
return "High"
elseif count > 20 then
return "Medium"
else
return "Low"
end
-- for 循环
for i = 1, 10 do
redis.call('INCR', 'counter')
end
-- while 循环
local i = 1
while i <= 5 do
redis.call('LPUSH', 'list', i)
i = i + 1
end
-- 遍历数组
for index, value in ipairs(arr) do
redis.call('SET', 'key' .. index, value)
end
Redis 命令调用:
-- redis.call() - 严格模式,出错时停止执行
local value = redis.call('GET', 'mykey')
if value == false then
redis.call('SET', 'mykey', 'default')
end
-- redis.pcall() - 保护模式,出错时返回错误信息
local result = redis.pcall('INCR', 'string_key')
if type(result) == 'table' and result.err then
return "Error: " .. result.err
end
-- 批量操作
redis.call('MULTI')
redis.call('SET', 'key1', 'value1')
redis.call('SET', 'key2', 'value2')
redis.call('EXEC')
KEYS 和 ARGV 参数:
-- KEYS[1], KEYS[2], ... 访问键参数
-- ARGV[1], ARGV[2], ... 访问值参数
local key1 = KEYS[1] -- 第一个键
local key2 = KEYS[2] -- 第二个键
local value1 = ARGV[1] -- 第一个参数
local value2 = ARGV[2] -- 第二个参数
-- 获取参数数量
local key_count = #KEYS
local arg_count = #ARGV
-- 遍历所有键
for i = 1, #KEYS do
redis.call('DEL', KEYS[i])
end
🎯 实际应用案例
Redis Lua 脚本在实际项目中的典型应用:
🔒 分布式锁
-- 获取锁脚本 local lock_script = [[ local key = KEYS[1] local identifier = ARGV[1] local expire = ARGV[2] local result = redis.call('SET', key, identifier, 'EX', expire, 'NX') if result then return 1 else return 0 end ]] -- 释放锁脚本 local unlock_script = [[ local key = KEYS[1] local identifier = ARGV[1] local current = redis.call('GET', key) if current == identifier then return redis.call('DEL', key) else return 0 end ]] -- 使用示例 EVAL lock_script 1 "lock:resource" "client123" 30 EVAL unlock_script 1 "lock:resource" "client123"
优势:原子性操作,避免锁的误删除
📊 限流器
-- 滑动窗口限流脚本 local rate_limit_script = [[ local key = KEYS[1] local window = tonumber(ARGV[1]) -- 时间窗口(秒) local limit = tonumber(ARGV[2]) -- 限制次数 local current_time = tonumber(ARGV[3]) -- 清理过期记录 redis.call('ZREMRANGEBYSCORE', key, 0, current_time - window) -- 获取当前窗口内的请求数 local current_requests = redis.call('ZCARD', key) if current_requests < limit then -- 添加当前请求 redis.call('ZADD', key, current_time, current_time) redis.call('EXPIRE', key, window) return {1, limit - current_requests - 1} else return {0, 0} end ]] -- 使用示例 local current_time = os.time() EVAL rate_limit_script 1 "rate_limit:user123" 60 10 current_time
优势:精确的滑动窗口限流,高性能
💰 库存扣减
-- 原子性库存扣减脚本 local stock_deduct_script = [[ local stock_key = KEYS[1] local order_key = KEYS[2] local quantity = tonumber(ARGV[1]) local order_id = ARGV[2] -- 获取当前库存 local current_stock = tonumber(redis.call('GET', stock_key) or 0) -- 检查库存是否足够 if current_stock >= quantity then -- 扣减库存 redis.call('DECRBY', stock_key, quantity) -- 记录订单 redis.call('SADD', order_key, order_id) -- 返回成功和剩余库存 return {1, current_stock - quantity} else -- 返回失败和当前库存 return {0, current_stock} end ]] -- 使用示例 EVAL stock_deduct_script 2 "stock:product123" "orders:product123" 5 "order456"
优势:防止超卖,保证库存准确性
🎲 抽奖系统
-- 加权随机抽奖脚本 local lottery_script = [[ local user_key = KEYS[1] local prizes_key = KEYS[2] local user_id = ARGV[1] local max_draws = tonumber(ARGV[2]) -- 检查用户抽奖次数 local user_draws = tonumber(redis.call('GET', user_key) or 0) if user_draws >= max_draws then return {0, "已达到最大抽奖次数"} end -- 获取奖品配置 local prizes = redis.call('HGETALL', prizes_key) local total_weight = 0 local prize_list = {} -- 计算总权重 for i = 1, #prizes, 2 do local prize_name = prizes[i] local weight = tonumber(prizes[i + 1]) total_weight = total_weight + weight table.insert(prize_list, {prize_name, weight}) end -- 生成随机数 math.randomseed(os.time() + user_draws) local random_num = math.random(1, total_weight) -- 确定中奖奖品 local current_weight = 0 for _, prize in ipairs(prize_list) do current_weight = current_weight + prize[2] if random_num <= current_weight then -- 增加用户抽奖次数 redis.call('INCR', user_key) redis.call('EXPIRE', user_key, 86400) -- 24小时过期 return {1, prize[1]} end end return {0, "抽奖失败"} ]] -- 使用示例 EVAL lottery_script 2 "draws:user123" "prizes:config" "user123" 3
优势:公平抽奖,防止作弊,支持加权随机
📈 排行榜更新
-- 排行榜更新脚本 local leaderboard_script = [[ local board_key = KEYS[1] local user_key = KEYS[2] local user_id = ARGV[1] local score = tonumber(ARGV[2]) local max_size = tonumber(ARGV[3]) -- 获取用户当前分数 local current_score = redis.call('ZSCORE', board_key, user_id) -- 只有分数更高时才更新 if not current_score or score > tonumber(current_score) then -- 更新排行榜 redis.call('ZADD', board_key, score, user_id) -- 限制排行榜大小 local board_size = redis.call('ZCARD', board_key) if board_size > max_size then redis.call('ZREMRANGEBYRANK', board_key, 0, board_size - max_size - 1) end -- 更新用户最高分 redis.call('SET', user_key, score) -- 获取用户排名 local rank = redis.call('ZREVRANK', board_key, user_id) return {1, rank + 1, score} else local rank = redis.call('ZREVRANK', board_key, user_id) return {0, rank and (rank + 1) or -1, current_score} end ]] -- 使用示例 EVAL leaderboard_script 2 "leaderboard:game" "user:123:best" "user123" 9500 100
优势:高效的排行榜维护,支持分数比较
🔄 缓存更新
-- 缓存更新脚本 local cache_update_script = [[ local cache_key = KEYS[1] local lock_key = KEYS[2] local data = ARGV[1] local ttl = tonumber(ARGV[2]) local lock_ttl = tonumber(ARGV[3]) -- 尝试获取更新锁 local lock_result = redis.call('SET', lock_key, '1', 'EX', lock_ttl, 'NX') if lock_result then -- 获取锁成功,更新缓存 redis.call('SET', cache_key, data, 'EX', ttl) redis.call('DEL', lock_key) return {1, "缓存更新成功"} else -- 获取锁失败,检查缓存是否存在 local cache_exists = redis.call('EXISTS', cache_key) if cache_exists == 1 then return {0, "缓存已存在,跳过更新"} else return {-1, "等待其他进程更新缓存"} end end ]] -- 使用示例 EVAL cache_update_script 2 "cache:user:123" "lock:cache:user:123" "user_data" 3600 10
优势:防止缓存击穿,避免重复更新
脚本性能和最佳实践
方面 | 优点 | 缺点 | 最佳实践 |
---|---|---|---|
原子性 | 完全原子性执行 | 长脚本会阻塞服务器 | 控制脚本执行时间 |
性能 | 减少网络往返 | 复杂逻辑影响性能 | 优化算法复杂度 |
缓存 | 脚本自动缓存 | 内存占用增加 | 定期清理无用脚本 |
调试 | 支持错误返回 | 调试相对困难 | 充分测试和日志记录 |
⚠️ 注意事项:
- 执行时间:避免编写执行时间过长的脚本
- 内存使用:注意脚本中的内存分配,避免内存泄漏
- 随机性:脚本中的随机数生成需要注意种子设置
- 错误处理:使用 pcall 处理可能出错的操作
- 键的访问:只能访问通过 KEYS 参数传递的键
✅ 最佳实践建议:
- 脚本设计:保持脚本简洁,单一职责
- 参数验证:在脚本开始时验证输入参数
- 错误处理:合理使用 redis.call 和 redis.pcall
- 性能优化:避免不必要的循环和复杂计算
- 版本管理:为脚本建立版本管理机制
- 测试覆盖:编写充分的测试用例
- 监控告警:监控脚本执行时间和错误率
💡 使用建议:
- 适用场景:复杂的原子性操作、条件判断、批量处理
- 不适用场景:简单的单命令操作、长时间运行的任务
- 替代方案:简单操作使用事务,复杂业务逻辑考虑应用层处理
- 调试技巧:使用 redis.log 输出调试信息