🔧 Redisson 客户端

Redisson 简介

Redisson 是一个在 Redis 基础上实现的 Java 驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的 Java 常用对象,还提供了许多分布式服务。其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service)等。

🔒 分布式锁

提供可重入锁、公平锁、读写锁等多种锁实现

📊 分布式对象

分布式 Map、Set、List、Queue 等集合对象

🎯 分布式服务

远程服务、执行器服务、调度器服务等

⚡ 高性能

基于 Netty 框架,支持异步操作

Redisson vs Jedis vs Lettuce

特性 Redisson Jedis Lettuce
连接方式 基于 Netty,异步非阻塞 基于 Socket,同步阻塞 基于 Netty,异步非阻塞
线程安全 线程安全 非线程安全 线程安全
分布式锁 内置多种锁实现 需要手动实现 需要手动实现
分布式对象 丰富的分布式对象 基础 Redis 命令 基础 Redis 命令
集群支持 完善的集群支持 支持集群 支持集群
学习成本 中等 中等
适用场景 复杂分布式应用 简单 Redis 操作 高并发应用

环境准备

Maven 依赖

org.redisson redisson 3.24.3 org.redisson redisson-spring-boot-starter 3.24.3 com.fasterxml.jackson.core jackson-databind 2.15.2

基础配置

// 单机模式配置 Config config = new Config(); config.useSingleServer() .setAddress("redis://127.0.0.1:6379") .setPassword("your_password") // 如果有密码 .setDatabase(0) .setConnectionMinimumIdleSize(10) .setConnectionPoolSize(64) .setIdleConnectionTimeout(10000) .setConnectTimeout(10000) .setTimeout(3000) .setRetryAttempts(3) .setRetryInterval(1500); RedissonClient redisson = Redisson.create(config);
// 集群模式配置 Config config = new Config(); config.useClusterServers() .addNodeAddress("redis://127.0.0.1:7000") .addNodeAddress("redis://127.0.0.1:7001") .addNodeAddress("redis://127.0.0.1:7002") .addNodeAddress("redis://127.0.0.1:7003") .addNodeAddress("redis://127.0.0.1:7004") .addNodeAddress("redis://127.0.0.1:7005") .setPassword("your_password") .setMasterConnectionMinimumIdleSize(10) .setMasterConnectionPoolSize(64) .setSlaveConnectionMinimumIdleSize(10) .setSlaveConnectionPoolSize(64) .setIdleConnectionTimeout(10000) .setConnectTimeout(10000) .setTimeout(3000) .setRetryAttempts(3) .setRetryInterval(1500); RedissonClient redisson = Redisson.create(config);
// 哨兵模式配置 Config config = new Config(); config.useSentinelServers() .setMasterName("mymaster") .addSentinelAddress("redis://127.0.0.1:26379") .addSentinelAddress("redis://127.0.0.1:26380") .addSentinelAddress("redis://127.0.0.1:26381") .setPassword("your_password") .setMasterConnectionMinimumIdleSize(10) .setMasterConnectionPoolSize(64) .setSlaveConnectionMinimumIdleSize(10) .setSlaveConnectionPoolSize(64) .setIdleConnectionTimeout(10000) .setConnectTimeout(10000) .setTimeout(3000) .setRetryAttempts(3) .setRetryInterval(1500); RedissonClient redisson = Redisson.create(config);

分布式锁

可重入锁(RLock)

public class RedissonLockExample { private RedissonClient redisson; /** * 基础锁使用 */ public void basicLockExample() { RLock lock = redisson.getLock("myLock"); try { // 尝试加锁,最多等待100秒,上锁以后10秒自动解锁 boolean isLocked = lock.tryLock(100, 10, TimeUnit.SECONDS); if (isLocked) { // 执行业务逻辑 System.out.println("获取锁成功,执行业务逻辑"); Thread.sleep(5000); } else { System.out.println("获取锁失败"); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { // 释放锁 if (lock.isHeldByCurrentThread()) { lock.unlock(); } } } /** * 自动续期锁(看门狗机制) */ public void watchdogLockExample() { RLock lock = redisson.getLock("watchdogLock"); try { // 加锁,不设置过期时间,使用看门狗机制自动续期 lock.lock(); // 执行长时间业务逻辑 System.out.println("执行长时间业务逻辑"); Thread.sleep(30000); // 30秒,超过默认锁过期时间 } catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { if (lock.isHeldByCurrentThread()) { lock.unlock(); } } } /** * 异步锁操作 */ public void asyncLockExample() { RLock lock = redisson.getLock("asyncLock"); // 异步加锁 RFuture lockFuture = lock.tryLockAsync(100, 10, TimeUnit.SECONDS); lockFuture.whenComplete((isLocked, throwable) -> { if (throwable != null) { System.err.println("加锁异常: " + throwable.getMessage()); return; } if (isLocked) { try { // 执行业务逻辑 System.out.println("异步获取锁成功"); } finally { // 异步释放锁 lock.unlockAsync(); } } else { System.out.println("异步获取锁失败"); } }); } }

公平锁(RFairLock)

public class FairLockExample { private RedissonClient redisson; /** * 公平锁示例 */ public void fairLockExample() { RFairLock fairLock = redisson.getFairLock("fairLock"); try { // 公平锁保证先请求的线程先获得锁 boolean isLocked = fairLock.tryLock(100, 10, TimeUnit.SECONDS); if (isLocked) { System.out.println("获取公平锁成功: " + Thread.currentThread().getName()); Thread.sleep(2000); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { if (fairLock.isHeldByCurrentThread()) { fairLock.unlock(); } } } /** * 多线程公平锁测试 */ public void testFairLock() { ExecutorService executor = Executors.newFixedThreadPool(5); for (int i = 0; i < 5; i++) { final int threadNum = i; executor.submit(() -> { System.out.println("线程 " + threadNum + " 请求锁"); fairLockExample(); }); } executor.shutdown(); } }

读写锁(RReadWriteLock)

public class ReadWriteLockExample { private RedissonClient redisson; private RReadWriteLock readWriteLock; public ReadWriteLockExample(RedissonClient redisson) { this.redisson = redisson; this.readWriteLock = redisson.getReadWriteLock("readWriteLock"); } /** * 读操作 */ public String readData(String key) { RLock readLock = readWriteLock.readLock(); try { readLock.lock(10, TimeUnit.SECONDS); // 模拟读取数据 System.out.println("读取数据: " + Thread.currentThread().getName()); Thread.sleep(1000); return "data_" + key; } catch (InterruptedException e) { Thread.currentThread().interrupt(); return null; } finally { if (readLock.isHeldByCurrentThread()) { readLock.unlock(); } } } /** * 写操作 */ public void writeData(String key, String value) { RLock writeLock = readWriteLock.writeLock(); try { writeLock.lock(10, TimeUnit.SECONDS); // 模拟写入数据 System.out.println("写入数据: " + Thread.currentThread().getName()); Thread.sleep(2000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { if (writeLock.isHeldByCurrentThread()) { writeLock.unlock(); } } } /** * 测试读写锁 */ public void testReadWriteLock() { ExecutorService executor = Executors.newFixedThreadPool(10); // 启动多个读线程 for (int i = 0; i < 5; i++) { executor.submit(() -> readData("test")); } // 启动写线程 executor.submit(() -> writeData("test", "value")); // 再启动读线程 for (int i = 0; i < 3; i++) { executor.submit(() -> readData("test")); } executor.shutdown(); } }

分布式对象

分布式 Map(RMap)

public class RedissonMapExample { private RedissonClient redisson; /** * 基础 Map 操作 */ public void basicMapOperations() { RMap map = redisson.getMap("myMap"); // 基础操作 map.put("key1", "value1"); map.put("key2", "value2"); String value = map.get("key1"); System.out.println("获取值: " + value); // 批量操作 Map batch = new HashMap<>(); batch.put("key3", "value3"); batch.put("key4", "value4"); map.putAll(batch); // 条件操作 String oldValue = map.putIfAbsent("key5", "value5"); boolean replaced = map.replace("key1", "value1", "newValue1"); // 获取所有键值 Set keys = map.keySet(); Collection values = map.values(); Set> entries = map.entrySet(); System.out.println("Map 大小: " + map.size()); } /** * 带过期时间的 Map */ public void mapWithTTL() { RMapCache mapCache = redisson.getMapCache("cacheMap"); // 设置键值对,10秒后过期 mapCache.put("tempKey", "tempValue", 10, TimeUnit.SECONDS); // 设置键值对,5秒后过期,最大空闲时间3秒 mapCache.put("idleKey", "idleValue", 5, TimeUnit.SECONDS, 3, TimeUnit.SECONDS); // 批量设置过期时间 Map batch = new HashMap<>(); batch.put("batchKey1", "batchValue1"); batch.put("batchKey2", "batchValue2"); mapCache.putAll(batch, 30, TimeUnit.SECONDS); } /** * 异步 Map 操作 */ public void asyncMapOperations() { RMap map = redisson.getMap("asyncMap"); // 异步放入 RFuture putFuture = map.putAsync("asyncKey", "asyncValue"); putFuture.whenComplete((oldValue, throwable) -> { if (throwable == null) { System.out.println("异步放入成功,旧值: " + oldValue); } else { System.err.println("异步放入失败: " + throwable.getMessage()); } }); // 异步获取 RFuture getFuture = map.getAsync("asyncKey"); getFuture.whenComplete((value, throwable) -> { if (throwable == null) { System.out.println("异步获取成功,值: " + value); } else { System.err.println("异步获取失败: " + throwable.getMessage()); } }); } }

分布式 Set(RSet)

public class RedissonSetExample { private RedissonClient redisson; /** * 基础 Set 操作 */ public void basicSetOperations() { RSet<String> set = redisson.getSet("mySet"); // 添加元素 set.add("element1"); set.add("element2"); set.add("element3"); // 批量添加 Set<String> batch = Arrays.asList("element4", "element5").stream() .collect(Collectors.toSet()); set.addAll(batch); // 检查元素 boolean contains = set.contains("element1"); System.out.println("包含 element1: " + contains); // 移除元素 boolean removed = set.remove("element1"); System.out.println("移除 element1: " + removed); // 获取所有元素 Set<String> allElements = set.readAll(); System.out.println("所有元素: " + allElements); System.out.println("Set 大小: " + set.size()); } /** * Set 集合运算 */ public void setOperations() { RSet<String> set1 = redisson.getSet("set1"); RSet<String> set2 = redisson.getSet("set2"); // 初始化数据 set1.addAll(Arrays.asList("a", "b", "c", "d")); set2.addAll(Arrays.asList("c", "d", "e", "f")); // 交集 Set<String> intersection = set1.readIntersection("set2"); System.out.println("交集: " + intersection); // 并集 Set<String> union = set1.readUnion("set2"); System.out.println("并集: " + union); // 差集 Set<String> diff = set1.readDiff("set2"); System.out.println("差集: " + diff); // 将交集结果存储到新的 Set int intersectionSize = set1.intersection("set2", "intersectionResult"); System.out.println("交集大小: " + intersectionSize); } }

分布式 List(RList)

public class RedissonListExample { private RedissonClient redisson; /** * 基础 List 操作 */ public void basicListOperations() { RList<String> list = redisson.getList("myList"); // 添加元素 list.add("first"); list.add("second"); list.add(1, "inserted"); // 在索引1处插入 // 批量添加 list.addAll(Arrays.asList("third", "fourth")); // 获取元素 String first = list.get(0); String last = list.get(list.size() - 1); System.out.println("第一个元素: " + first); System.out.println("最后一个元素: " + last); // 查找元素 int index = list.indexOf("second"); System.out.println("second 的索引: " + index); // 移除元素 String removed = list.remove(0); // 移除第一个元素 boolean removedByValue = list.remove("third"); // 移除指定值 // 子列表 List<String> subList = list.subList(0, 2); System.out.println("子列表: " + subList); // 排序 list.sort(String::compareTo); System.out.println("List 内容: " + list.readAll()); } /** * 阻塞队列操作 */ public void blockingQueueOperations() { RBlockingQueue<String> blockingQueue = redisson.getBlockingQueue("blockingQueue"); // 生产者线程 new Thread(() -> { try { for (int i = 0; i < 5; i++) { blockingQueue.put("item" + i); System.out.println("生产: item" + i); Thread.sleep(1000); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }).start(); // 消费者线程 new Thread(() -> { try { while (true) { String item = blockingQueue.take(); // 阻塞等待 System.out.println("消费: " + item); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }).start(); } }

分布式服务

分布式信号量(RSemaphore)

public class RedissonSemaphoreExample { private RedissonClient redisson; /** * 信号量限流示例 */ public void semaphoreExample() { RSemaphore semaphore = redisson.getSemaphore("mySemaphore"); // 设置许可数量 semaphore.trySetPermits(3); // 模拟多个线程竞争资源 ExecutorService executor = Executors.newFixedThreadPool(10); for (int i = 0; i < 10; i++) { final int threadNum = i; executor.submit(() -> { try { // 获取许可 semaphore.acquire(); System.out.println("线程 " + threadNum + " 获取到许可,开始执行"); // 模拟业务处理 Thread.sleep(2000); System.out.println("线程 " + threadNum + " 执行完成,释放许可"); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { // 释放许可 semaphore.release(); } }); } executor.shutdown(); } /** * 尝试获取许可(非阻塞) */ public void tryAcquireExample() { RSemaphore semaphore = redisson.getSemaphore("tryAcquireSemaphore"); semaphore.trySetPermits(2); try { // 尝试获取许可,最多等待5秒 boolean acquired = semaphore.tryAcquire(5, TimeUnit.SECONDS); if (acquired) { System.out.println("获取许可成功"); // 执行业务逻辑 Thread.sleep(3000); } else { System.out.println("获取许可失败"); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { if (semaphore.availablePermits() < 2) { semaphore.release(); } } } }

分布式计数器(RAtomicLong)

public class RedissonAtomicExample { private RedissonClient redisson; /** * 原子计数器示例 */ public void atomicLongExample() { RAtomicLong atomicLong = redisson.getAtomicLong("myCounter"); // 设置初始值 atomicLong.set(0); // 原子操作 long value = atomicLong.incrementAndGet(); // 自增并返回 System.out.println("自增后的值: " + value); long oldValue = atomicLong.getAndAdd(10); // 加10并返回旧值 System.out.println("加10前的值: " + oldValue); System.out.println("当前值: " + atomicLong.get()); // 比较并设置 boolean updated = atomicLong.compareAndSet(11, 100); System.out.println("CAS 更新结果: " + updated); System.out.println("最终值: " + atomicLong.get()); } /** * 多线程计数器测试 */ public void concurrentCounterTest() { RAtomicLong counter = redisson.getAtomicLong("concurrentCounter"); counter.set(0); ExecutorService executor = Executors.newFixedThreadPool(10); CountDownLatch latch = new CountDownLatch(100); // 启动100个线程,每个线程增加10次 for (int i = 0; i < 100; i++) { executor.submit(() -> { try { for (int j = 0; j < 10; j++) { counter.incrementAndGet(); } } finally { latch.countDown(); } }); } try { latch.await(); System.out.println("最终计数值: " + counter.get()); // 应该是1000 } catch (InterruptedException e) { Thread.currentThread().interrupt(); } executor.shutdown(); } }

分布式 CountDownLatch

public class RedissonCountDownLatchExample { private RedissonClient redisson; /** * CountDownLatch 示例 */ public void countDownLatchExample() { RCountDownLatch latch = redisson.getCountDownLatch("myLatch"); // 设置计数值 latch.trySetCount(3); // 启动工作线程 for (int i = 0; i < 3; i++) { final int taskNum = i; new Thread(() -> { try { System.out.println("任务 " + taskNum + " 开始执行"); Thread.sleep((taskNum + 1) * 1000); // 模拟不同的执行时间 System.out.println("任务 " + taskNum + " 执行完成"); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { latch.countDown(); // 计数减1 } }).start(); } // 主线程等待所有任务完成 try { System.out.println("等待所有任务完成..."); latch.await(); // 阻塞等待计数归零 System.out.println("所有任务已完成!"); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } /** * 带超时的 CountDownLatch */ public void countDownLatchWithTimeout() { RCountDownLatch latch = redisson.getCountDownLatch("timeoutLatch"); latch.trySetCount(5); // 启动部分工作线程(少于计数值) for (int i = 0; i < 3; i++) { final int taskNum = i; new Thread(() -> { try { Thread.sleep(2000); System.out.println("任务 " + taskNum + " 完成"); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { latch.countDown(); } }).start(); } try { // 等待10秒,如果超时则继续执行 boolean completed = latch.await(10, TimeUnit.SECONDS); if (completed) { System.out.println("所有任务在超时前完成"); } else { System.out.println("等待超时,当前计数: " + latch.getCount()); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }

Spring Boot 集成

# application.yml spring: redis: redisson: config: | singleServerConfig: address: "redis://127.0.0.1:6379" password: null database: 0 connectionMinimumIdleSize: 10 connectionPoolSize: 64 idleConnectionTimeout: 10000 connectTimeout: 10000 timeout: 3000 retryAttempts: 3 retryInterval: 1500 threads: 16 nettyThreads: 32 codec: !<org.redisson.codec.JsonJacksonCodec> {} transportMode: "NIO"
@Configuration public class RedissonConfig { @Bean(destroyMethod = "shutdown") public RedissonClient redissonClient() { Config config = new Config(); config.useSingleServer() .setAddress("redis://127.0.0.1:6379") .setConnectionMinimumIdleSize(10) .setConnectionPoolSize(64) .setIdleConnectionTimeout(10000) .setConnectTimeout(10000) .setTimeout(3000) .setRetryAttempts(3) .setRetryInterval(1500); return Redisson.create(config); } } @Service public class RedissonService { @Autowired private RedissonClient redissonClient; public RLock getLock(String lockKey) { return redissonClient.getLock(lockKey); } public <T> RMap<String, T> getMap(String mapName) { return redissonClient.getMap(mapName); } public <T> RSet<T> getSet(String setName) { return redissonClient.getSet(setName); } public <T> RList<T> getList(String listName) { return redissonClient.getList(listName); } }

💡 最佳实践

  • 连接池配置:根据应用负载合理配置连接池大小
  • 锁的使用:避免长时间持有锁,合理设置锁的过期时间
  • 异常处理:妥善处理网络异常和锁获取失败的情况
  • 资源释放:确保在 finally 块中释放锁和其他资源
  • 监控告警:监控锁的持有时间、获取成功率等指标
  • 序列化选择:根据数据特点选择合适的序列化方式

⚠️ 注意事项

  • 锁重入:Redisson 锁支持重入,但要确保加锁和解锁次数匹配
  • 网络分区:在网络分区情况下,可能出现脑裂问题
  • 时钟偏移:分布式环境下要注意服务器时钟同步
  • 内存使用:大量使用分布式对象时要注意 Redis 内存使用
  • 版本兼容:确保 Redisson 版本与 Redis 版本兼容

🎯 总结

Redisson 是一个功能强大的 Redis Java 客户端,提供了丰富的分布式对象和服务。它简化了分布式应用的开发,特别是在分布式锁、分布式集合、分布式服务等方面。合理使用 Redisson 可以大大提升分布式应用的开发效率和系统稳定性。