Redisson 简介
Redisson 是一个在 Redis 基础上实现的 Java 驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的 Java 常用对象,还提供了许多分布式服务。其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service)等。
🔒 分布式锁
提供可重入锁、公平锁、读写锁等多种锁实现
📊 分布式对象
分布式 Map、Set、List、Queue 等集合对象
🎯 分布式服务
远程服务、执行器服务、调度器服务等
⚡ 高性能
基于 Netty 框架,支持异步操作
Redisson vs Jedis vs Lettuce
特性 | Redisson | Jedis | Lettuce |
---|---|---|---|
连接方式 | 基于 Netty,异步非阻塞 | 基于 Socket,同步阻塞 | 基于 Netty,异步非阻塞 |
线程安全 | 线程安全 | 非线程安全 | 线程安全 |
分布式锁 | 内置多种锁实现 | 需要手动实现 | 需要手动实现 |
分布式对象 | 丰富的分布式对象 | 基础 Redis 命令 | 基础 Redis 命令 |
集群支持 | 完善的集群支持 | 支持集群 | 支持集群 |
学习成本 | 中等 | 低 | 中等 |
适用场景 | 复杂分布式应用 | 简单 Redis 操作 | 高并发应用 |
环境准备
Maven 依赖
基础配置
// 单机模式配置
Config config = new Config();
config.useSingleServer()
.setAddress("redis://127.0.0.1:6379")
.setPassword("your_password") // 如果有密码
.setDatabase(0)
.setConnectionMinimumIdleSize(10)
.setConnectionPoolSize(64)
.setIdleConnectionTimeout(10000)
.setConnectTimeout(10000)
.setTimeout(3000)
.setRetryAttempts(3)
.setRetryInterval(1500);
RedissonClient redisson = Redisson.create(config);
// 集群模式配置
Config config = new Config();
config.useClusterServers()
.addNodeAddress("redis://127.0.0.1:7000")
.addNodeAddress("redis://127.0.0.1:7001")
.addNodeAddress("redis://127.0.0.1:7002")
.addNodeAddress("redis://127.0.0.1:7003")
.addNodeAddress("redis://127.0.0.1:7004")
.addNodeAddress("redis://127.0.0.1:7005")
.setPassword("your_password")
.setMasterConnectionMinimumIdleSize(10)
.setMasterConnectionPoolSize(64)
.setSlaveConnectionMinimumIdleSize(10)
.setSlaveConnectionPoolSize(64)
.setIdleConnectionTimeout(10000)
.setConnectTimeout(10000)
.setTimeout(3000)
.setRetryAttempts(3)
.setRetryInterval(1500);
RedissonClient redisson = Redisson.create(config);
// 哨兵模式配置
Config config = new Config();
config.useSentinelServers()
.setMasterName("mymaster")
.addSentinelAddress("redis://127.0.0.1:26379")
.addSentinelAddress("redis://127.0.0.1:26380")
.addSentinelAddress("redis://127.0.0.1:26381")
.setPassword("your_password")
.setMasterConnectionMinimumIdleSize(10)
.setMasterConnectionPoolSize(64)
.setSlaveConnectionMinimumIdleSize(10)
.setSlaveConnectionPoolSize(64)
.setIdleConnectionTimeout(10000)
.setConnectTimeout(10000)
.setTimeout(3000)
.setRetryAttempts(3)
.setRetryInterval(1500);
RedissonClient redisson = Redisson.create(config);
分布式锁
可重入锁(RLock)
public class RedissonLockExample {
private RedissonClient redisson;
/**
* 基础锁使用
*/
public void basicLockExample() {
RLock lock = redisson.getLock("myLock");
try {
// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
boolean isLocked = lock.tryLock(100, 10, TimeUnit.SECONDS);
if (isLocked) {
// 执行业务逻辑
System.out.println("获取锁成功,执行业务逻辑");
Thread.sleep(5000);
} else {
System.out.println("获取锁失败");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
// 释放锁
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}
/**
* 自动续期锁(看门狗机制)
*/
public void watchdogLockExample() {
RLock lock = redisson.getLock("watchdogLock");
try {
// 加锁,不设置过期时间,使用看门狗机制自动续期
lock.lock();
// 执行长时间业务逻辑
System.out.println("执行长时间业务逻辑");
Thread.sleep(30000); // 30秒,超过默认锁过期时间
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
if (lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}
/**
* 异步锁操作
*/
public void asyncLockExample() {
RLock lock = redisson.getLock("asyncLock");
// 异步加锁
RFuture lockFuture = lock.tryLockAsync(100, 10, TimeUnit.SECONDS);
lockFuture.whenComplete((isLocked, throwable) -> {
if (throwable != null) {
System.err.println("加锁异常: " + throwable.getMessage());
return;
}
if (isLocked) {
try {
// 执行业务逻辑
System.out.println("异步获取锁成功");
} finally {
// 异步释放锁
lock.unlockAsync();
}
} else {
System.out.println("异步获取锁失败");
}
});
}
}
公平锁(RFairLock)
public class FairLockExample {
private RedissonClient redisson;
/**
* 公平锁示例
*/
public void fairLockExample() {
RFairLock fairLock = redisson.getFairLock("fairLock");
try {
// 公平锁保证先请求的线程先获得锁
boolean isLocked = fairLock.tryLock(100, 10, TimeUnit.SECONDS);
if (isLocked) {
System.out.println("获取公平锁成功: " + Thread.currentThread().getName());
Thread.sleep(2000);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
if (fairLock.isHeldByCurrentThread()) {
fairLock.unlock();
}
}
}
/**
* 多线程公平锁测试
*/
public void testFairLock() {
ExecutorService executor = Executors.newFixedThreadPool(5);
for (int i = 0; i < 5; i++) {
final int threadNum = i;
executor.submit(() -> {
System.out.println("线程 " + threadNum + " 请求锁");
fairLockExample();
});
}
executor.shutdown();
}
}
读写锁(RReadWriteLock)
public class ReadWriteLockExample {
private RedissonClient redisson;
private RReadWriteLock readWriteLock;
public ReadWriteLockExample(RedissonClient redisson) {
this.redisson = redisson;
this.readWriteLock = redisson.getReadWriteLock("readWriteLock");
}
/**
* 读操作
*/
public String readData(String key) {
RLock readLock = readWriteLock.readLock();
try {
readLock.lock(10, TimeUnit.SECONDS);
// 模拟读取数据
System.out.println("读取数据: " + Thread.currentThread().getName());
Thread.sleep(1000);
return "data_" + key;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
} finally {
if (readLock.isHeldByCurrentThread()) {
readLock.unlock();
}
}
}
/**
* 写操作
*/
public void writeData(String key, String value) {
RLock writeLock = readWriteLock.writeLock();
try {
writeLock.lock(10, TimeUnit.SECONDS);
// 模拟写入数据
System.out.println("写入数据: " + Thread.currentThread().getName());
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
if (writeLock.isHeldByCurrentThread()) {
writeLock.unlock();
}
}
}
/**
* 测试读写锁
*/
public void testReadWriteLock() {
ExecutorService executor = Executors.newFixedThreadPool(10);
// 启动多个读线程
for (int i = 0; i < 5; i++) {
executor.submit(() -> readData("test"));
}
// 启动写线程
executor.submit(() -> writeData("test", "value"));
// 再启动读线程
for (int i = 0; i < 3; i++) {
executor.submit(() -> readData("test"));
}
executor.shutdown();
}
}
分布式对象
分布式 Map(RMap)
public class RedissonMapExample {
private RedissonClient redisson;
/**
* 基础 Map 操作
*/
public void basicMapOperations() {
RMap map = redisson.getMap("myMap");
// 基础操作
map.put("key1", "value1");
map.put("key2", "value2");
String value = map.get("key1");
System.out.println("获取值: " + value);
// 批量操作
Map batch = new HashMap<>();
batch.put("key3", "value3");
batch.put("key4", "value4");
map.putAll(batch);
// 条件操作
String oldValue = map.putIfAbsent("key5", "value5");
boolean replaced = map.replace("key1", "value1", "newValue1");
// 获取所有键值
Set keys = map.keySet();
Collection values = map.values();
Set> entries = map.entrySet();
System.out.println("Map 大小: " + map.size());
}
/**
* 带过期时间的 Map
*/
public void mapWithTTL() {
RMapCache mapCache = redisson.getMapCache("cacheMap");
// 设置键值对,10秒后过期
mapCache.put("tempKey", "tempValue", 10, TimeUnit.SECONDS);
// 设置键值对,5秒后过期,最大空闲时间3秒
mapCache.put("idleKey", "idleValue", 5, TimeUnit.SECONDS, 3, TimeUnit.SECONDS);
// 批量设置过期时间
Map batch = new HashMap<>();
batch.put("batchKey1", "batchValue1");
batch.put("batchKey2", "batchValue2");
mapCache.putAll(batch, 30, TimeUnit.SECONDS);
}
/**
* 异步 Map 操作
*/
public void asyncMapOperations() {
RMap map = redisson.getMap("asyncMap");
// 异步放入
RFuture putFuture = map.putAsync("asyncKey", "asyncValue");
putFuture.whenComplete((oldValue, throwable) -> {
if (throwable == null) {
System.out.println("异步放入成功,旧值: " + oldValue);
} else {
System.err.println("异步放入失败: " + throwable.getMessage());
}
});
// 异步获取
RFuture getFuture = map.getAsync("asyncKey");
getFuture.whenComplete((value, throwable) -> {
if (throwable == null) {
System.out.println("异步获取成功,值: " + value);
} else {
System.err.println("异步获取失败: " + throwable.getMessage());
}
});
}
}
分布式 Set(RSet)
public class RedissonSetExample {
private RedissonClient redisson;
/**
* 基础 Set 操作
*/
public void basicSetOperations() {
RSet<String> set = redisson.getSet("mySet");
// 添加元素
set.add("element1");
set.add("element2");
set.add("element3");
// 批量添加
Set<String> batch = Arrays.asList("element4", "element5").stream()
.collect(Collectors.toSet());
set.addAll(batch);
// 检查元素
boolean contains = set.contains("element1");
System.out.println("包含 element1: " + contains);
// 移除元素
boolean removed = set.remove("element1");
System.out.println("移除 element1: " + removed);
// 获取所有元素
Set<String> allElements = set.readAll();
System.out.println("所有元素: " + allElements);
System.out.println("Set 大小: " + set.size());
}
/**
* Set 集合运算
*/
public void setOperations() {
RSet<String> set1 = redisson.getSet("set1");
RSet<String> set2 = redisson.getSet("set2");
// 初始化数据
set1.addAll(Arrays.asList("a", "b", "c", "d"));
set2.addAll(Arrays.asList("c", "d", "e", "f"));
// 交集
Set<String> intersection = set1.readIntersection("set2");
System.out.println("交集: " + intersection);
// 并集
Set<String> union = set1.readUnion("set2");
System.out.println("并集: " + union);
// 差集
Set<String> diff = set1.readDiff("set2");
System.out.println("差集: " + diff);
// 将交集结果存储到新的 Set
int intersectionSize = set1.intersection("set2", "intersectionResult");
System.out.println("交集大小: " + intersectionSize);
}
}
分布式 List(RList)
public class RedissonListExample {
private RedissonClient redisson;
/**
* 基础 List 操作
*/
public void basicListOperations() {
RList<String> list = redisson.getList("myList");
// 添加元素
list.add("first");
list.add("second");
list.add(1, "inserted"); // 在索引1处插入
// 批量添加
list.addAll(Arrays.asList("third", "fourth"));
// 获取元素
String first = list.get(0);
String last = list.get(list.size() - 1);
System.out.println("第一个元素: " + first);
System.out.println("最后一个元素: " + last);
// 查找元素
int index = list.indexOf("second");
System.out.println("second 的索引: " + index);
// 移除元素
String removed = list.remove(0); // 移除第一个元素
boolean removedByValue = list.remove("third"); // 移除指定值
// 子列表
List<String> subList = list.subList(0, 2);
System.out.println("子列表: " + subList);
// 排序
list.sort(String::compareTo);
System.out.println("List 内容: " + list.readAll());
}
/**
* 阻塞队列操作
*/
public void blockingQueueOperations() {
RBlockingQueue<String> blockingQueue = redisson.getBlockingQueue("blockingQueue");
// 生产者线程
new Thread(() -> {
try {
for (int i = 0; i < 5; i++) {
blockingQueue.put("item" + i);
System.out.println("生产: item" + i);
Thread.sleep(1000);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
// 消费者线程
new Thread(() -> {
try {
while (true) {
String item = blockingQueue.take(); // 阻塞等待
System.out.println("消费: " + item);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
}
}
分布式服务
分布式信号量(RSemaphore)
public class RedissonSemaphoreExample {
private RedissonClient redisson;
/**
* 信号量限流示例
*/
public void semaphoreExample() {
RSemaphore semaphore = redisson.getSemaphore("mySemaphore");
// 设置许可数量
semaphore.trySetPermits(3);
// 模拟多个线程竞争资源
ExecutorService executor = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
final int threadNum = i;
executor.submit(() -> {
try {
// 获取许可
semaphore.acquire();
System.out.println("线程 " + threadNum + " 获取到许可,开始执行");
// 模拟业务处理
Thread.sleep(2000);
System.out.println("线程 " + threadNum + " 执行完成,释放许可");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
// 释放许可
semaphore.release();
}
});
}
executor.shutdown();
}
/**
* 尝试获取许可(非阻塞)
*/
public void tryAcquireExample() {
RSemaphore semaphore = redisson.getSemaphore("tryAcquireSemaphore");
semaphore.trySetPermits(2);
try {
// 尝试获取许可,最多等待5秒
boolean acquired = semaphore.tryAcquire(5, TimeUnit.SECONDS);
if (acquired) {
System.out.println("获取许可成功");
// 执行业务逻辑
Thread.sleep(3000);
} else {
System.out.println("获取许可失败");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
if (semaphore.availablePermits() < 2) {
semaphore.release();
}
}
}
}
分布式计数器(RAtomicLong)
public class RedissonAtomicExample {
private RedissonClient redisson;
/**
* 原子计数器示例
*/
public void atomicLongExample() {
RAtomicLong atomicLong = redisson.getAtomicLong("myCounter");
// 设置初始值
atomicLong.set(0);
// 原子操作
long value = atomicLong.incrementAndGet(); // 自增并返回
System.out.println("自增后的值: " + value);
long oldValue = atomicLong.getAndAdd(10); // 加10并返回旧值
System.out.println("加10前的值: " + oldValue);
System.out.println("当前值: " + atomicLong.get());
// 比较并设置
boolean updated = atomicLong.compareAndSet(11, 100);
System.out.println("CAS 更新结果: " + updated);
System.out.println("最终值: " + atomicLong.get());
}
/**
* 多线程计数器测试
*/
public void concurrentCounterTest() {
RAtomicLong counter = redisson.getAtomicLong("concurrentCounter");
counter.set(0);
ExecutorService executor = Executors.newFixedThreadPool(10);
CountDownLatch latch = new CountDownLatch(100);
// 启动100个线程,每个线程增加10次
for (int i = 0; i < 100; i++) {
executor.submit(() -> {
try {
for (int j = 0; j < 10; j++) {
counter.incrementAndGet();
}
} finally {
latch.countDown();
}
});
}
try {
latch.await();
System.out.println("最终计数值: " + counter.get()); // 应该是1000
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
executor.shutdown();
}
}
分布式 CountDownLatch
public class RedissonCountDownLatchExample {
private RedissonClient redisson;
/**
* CountDownLatch 示例
*/
public void countDownLatchExample() {
RCountDownLatch latch = redisson.getCountDownLatch("myLatch");
// 设置计数值
latch.trySetCount(3);
// 启动工作线程
for (int i = 0; i < 3; i++) {
final int taskNum = i;
new Thread(() -> {
try {
System.out.println("任务 " + taskNum + " 开始执行");
Thread.sleep((taskNum + 1) * 1000); // 模拟不同的执行时间
System.out.println("任务 " + taskNum + " 执行完成");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
latch.countDown(); // 计数减1
}
}).start();
}
// 主线程等待所有任务完成
try {
System.out.println("等待所有任务完成...");
latch.await(); // 阻塞等待计数归零
System.out.println("所有任务已完成!");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
/**
* 带超时的 CountDownLatch
*/
public void countDownLatchWithTimeout() {
RCountDownLatch latch = redisson.getCountDownLatch("timeoutLatch");
latch.trySetCount(5);
// 启动部分工作线程(少于计数值)
for (int i = 0; i < 3; i++) {
final int taskNum = i;
new Thread(() -> {
try {
Thread.sleep(2000);
System.out.println("任务 " + taskNum + " 完成");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
latch.countDown();
}
}).start();
}
try {
// 等待10秒,如果超时则继续执行
boolean completed = latch.await(10, TimeUnit.SECONDS);
if (completed) {
System.out.println("所有任务在超时前完成");
} else {
System.out.println("等待超时,当前计数: " + latch.getCount());
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
Spring Boot 集成
# application.yml
spring:
redis:
redisson:
config: |
singleServerConfig:
address: "redis://127.0.0.1:6379"
password: null
database: 0
connectionMinimumIdleSize: 10
connectionPoolSize: 64
idleConnectionTimeout: 10000
connectTimeout: 10000
timeout: 3000
retryAttempts: 3
retryInterval: 1500
threads: 16
nettyThreads: 32
codec: !<org.redisson.codec.JsonJacksonCodec> {}
transportMode: "NIO"
@Configuration
public class RedissonConfig {
@Bean(destroyMethod = "shutdown")
public RedissonClient redissonClient() {
Config config = new Config();
config.useSingleServer()
.setAddress("redis://127.0.0.1:6379")
.setConnectionMinimumIdleSize(10)
.setConnectionPoolSize(64)
.setIdleConnectionTimeout(10000)
.setConnectTimeout(10000)
.setTimeout(3000)
.setRetryAttempts(3)
.setRetryInterval(1500);
return Redisson.create(config);
}
}
@Service
public class RedissonService {
@Autowired
private RedissonClient redissonClient;
public RLock getLock(String lockKey) {
return redissonClient.getLock(lockKey);
}
public <T> RMap<String, T> getMap(String mapName) {
return redissonClient.getMap(mapName);
}
public <T> RSet<T> getSet(String setName) {
return redissonClient.getSet(setName);
}
public <T> RList<T> getList(String listName) {
return redissonClient.getList(listName);
}
}
💡 最佳实践
- 连接池配置:根据应用负载合理配置连接池大小
- 锁的使用:避免长时间持有锁,合理设置锁的过期时间
- 异常处理:妥善处理网络异常和锁获取失败的情况
- 资源释放:确保在 finally 块中释放锁和其他资源
- 监控告警:监控锁的持有时间、获取成功率等指标
- 序列化选择:根据数据特点选择合适的序列化方式
⚠️ 注意事项
- 锁重入:Redisson 锁支持重入,但要确保加锁和解锁次数匹配
- 网络分区:在网络分区情况下,可能出现脑裂问题
- 时钟偏移:分布式环境下要注意服务器时钟同步
- 内存使用:大量使用分布式对象时要注意 Redis 内存使用
- 版本兼容:确保 Redisson 版本与 Redis 版本兼容
🎯 总结
Redisson 是一个功能强大的 Redis Java 客户端,提供了丰富的分布式对象和服务。它简化了分布式应用的开发,特别是在分布式锁、分布式集合、分布式服务等方面。合理使用 Redisson 可以大大提升分布式应用的开发效率和系统稳定性。