第20章
⚡ 并发编程实战
综合运用所学知识,构建高性能并发系统,解决实际业务场景中的并发问题
学习目标
- 综合运用所学的并发编程知识
- 解决实际的高并发问题
- 掌握限流器的实现原理
- 学会并发编程面试题解析
- 构建可扩展的并发系统
实战项目概览
本章将通过多个实战项目,帮助你将前面学到的并发编程知识应用到实际场景中。我们将构建高性能的并发系统,解决真实业务中的并发问题。
实战特色
每个项目都基于真实的业务场景,涵盖从设计到实现的完整过程,并提供性能测试和优化建议。
高并发秒杀系统
模拟电商秒杀场景,处理高并发请求,保证数据一致性和系统稳定性。
实时日志分析系统
实时处理和分析大量日志数据,提供高性能的数据处理能力。
分布式缓存系统
构建高性能的分布式缓存,支持LRU策略和过期机制。
高并发计数器实现
在高并发场景下,简单的计数器往往成为性能瓶颈。我们将实现一个高性能的分布式计数器,支持原子操作和一致性保证。
设计思路
原子操作
使用AtomicLong和LongAdder实现无锁的原子计数操作。
分段计数
采用分段策略减少竞争,提高并发性能。
一致性保证
确保在高并发环境下计数的准确性和一致性。
核心实现
HighPerformanceCounter.java
public class HighPerformanceCounter {
private final LongAdder counter = new LongAdder();
private final AtomicLong atomicCounter = new AtomicLong(0);
// 使用LongAdder实现高性能计数
public void increment() {
counter.increment();
}
public void add(long delta) {
counter.add(delta);
}
public long sum() {
return counter.sum();
}
// 原子操作版本
public long incrementAndGet() {
return atomicCounter.incrementAndGet();
}
// 性能测试方法
public static void performanceTest() {
HighPerformanceCounter counter = new HighPerformanceCounter();
int threadCount = 100;
int operationsPerThread = 100000;
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
CountDownLatch latch = new CountDownLatch(threadCount);
long startTime = System.currentTimeMillis();
for (int i = 0; i < threadCount; i++) {
executor.submit(() -> {
try {
for (int j = 0; j < operationsPerThread; j++) {
counter.increment();
}
} finally {
latch.countDown();
}
});
}
try {
latch.await();
long endTime = System.currentTimeMillis();
System.out.println("总计数: " + counter.sum());
System.out.println("耗时: " + (endTime - startTime) + "ms");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
executor.shutdown();
}
}
}
生产者消费者系统
构建一个高性能的生产者消费者系统,支持多生产者多消费者模式,具备背压控制和监控功能。
系统架构
- 多生产者支持:支持多个生产者并发生产数据
- 多消费者支持:支持多个消费者并发消费数据
- 背压控制:当队列满时自动进行流量控制
- 监控统计:实时监控生产和消费速率
- 优雅关闭:支持系统的优雅停机
ProducerConsumerSystem.java
public class ProducerConsumerSystem {
private final BlockingQueue queue;
private final ExecutorService producerPool;
private final ExecutorService consumerPool;
private final AtomicLong producedCount = new AtomicLong(0);
private final AtomicLong consumedCount = new AtomicLong(0);
private volatile boolean running = true;
public ProducerConsumerSystem(int queueCapacity, int producerThreads, int consumerThreads) {
this.queue = new ArrayBlockingQueue<>(queueCapacity);
this.producerPool = Executors.newFixedThreadPool(producerThreads);
this.consumerPool = Executors.newFixedThreadPool(consumerThreads);
}
// 生产者接口
public interface Producer {
T produce() throws InterruptedException;
}
// 消费者接口
public interface Consumer {
void consume(T item) throws InterruptedException;
}
// 启动生产者
public void startProducer(Producer producer) {
producerPool.submit(() -> {
while (running) {
try {
T item = producer.produce();
if (item != null) {
queue.put(item); // 阻塞式放入
producedCount.incrementAndGet();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
System.err.println("生产者异常: " + e.getMessage());
}
}
});
}
// 启动消费者
public void startConsumer(Consumer consumer) {
consumerPool.submit(() -> {
while (running || !queue.isEmpty()) {
try {
T item = queue.poll(1, TimeUnit.SECONDS);
if (item != null) {
consumer.consume(item);
consumedCount.incrementAndGet();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
System.err.println("消费者异常: " + e.getMessage());
}
}
});
}
// 获取统计信息
public void printStats() {
System.out.println("队列大小: " + queue.size());
System.out.println("已生产: " + producedCount.get());
System.out.println("已消费: " + consumedCount.get());
System.out.println("待处理: " + (producedCount.get() - consumedCount.get()));
}
// 优雅关闭
public void shutdown() {
running = false;
producerPool.shutdown();
consumerPool.shutdown();
try {
if (!producerPool.awaitTermination(5, TimeUnit.SECONDS)) {
producerPool.shutdownNow();
}
if (!consumerPool.awaitTermination(5, TimeUnit.SECONDS)) {
consumerPool.shutdownNow();
}
} catch (InterruptedException e) {
producerPool.shutdownNow();
consumerPool.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
并发LRU缓存系统
实现一个线程安全的LRU缓存,支持高并发访问,具备过期策略和内存管理功能。
核心特性
线程安全
使用读写锁保证并发安全,读操作不互斥。
过期策略
支持TTL过期和定期清理过期数据。
LRU算法
基于双向链表和HashMap实现高效LRU。
ConcurrentLRUCache.java
public class ConcurrentLRUCache {
private final int capacity;
private final long defaultTTL;
private final ConcurrentHashMap> cache;
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private CacheNode head;
private CacheNode tail;
private static class CacheNode {
K key;
V value;
long expireTime;
CacheNode prev;
CacheNode next;
CacheNode(K key, V value, long expireTime) {
this.key = key;
this.value = value;
this.expireTime = expireTime;
}
}
public ConcurrentLRUCache(int capacity, long defaultTTL) {
this.capacity = capacity;
this.defaultTTL = defaultTTL;
this.cache = new ConcurrentHashMap<>();
// 初始化双向链表
this.head = new CacheNode<>(null, null, 0);
this.tail = new CacheNode<>(null, null, 0);
head.next = tail;
tail.prev = head;
// 启动清理线程
startCleanupTask();
}
public V get(K key) {
CacheNode node = cache.get(key);
if (node == null) {
return null;
}
// 检查是否过期
if (System.currentTimeMillis() > node.expireTime) {
remove(key);
return null;
}
// 移动到头部(最近使用)
lock.writeLock().lock();
try {
moveToHead(node);
} finally {
lock.writeLock().unlock();
}
return node.value;
}
public void put(K key, V value) {
put(key, value, defaultTTL);
}
public void put(K key, V value, long ttl) {
long expireTime = System.currentTimeMillis() + ttl;
CacheNode existingNode = cache.get(key);
if (existingNode != null) {
// 更新现有节点
lock.writeLock().lock();
try {
existingNode.value = value;
existingNode.expireTime = expireTime;
moveToHead(existingNode);
} finally {
lock.writeLock().unlock();
}
} else {
// 添加新节点
CacheNode newNode = new CacheNode<>(key, value, expireTime);
lock.writeLock().lock();
try {
if (cache.size() >= capacity) {
// 移除最少使用的节点
CacheNode lastNode = tail.prev;
removeNode(lastNode);
cache.remove(lastNode.key);
}
addToHead(newNode);
cache.put(key, newNode);
} finally {
lock.writeLock().unlock();
}
}
}
public V remove(K key) {
CacheNode node = cache.remove(key);
if (node != null) {
lock.writeLock().lock();
try {
removeNode(node);
} finally {
lock.writeLock().unlock();
}
return node.value;
}
return null;
}
private void moveToHead(CacheNode node) {
removeNode(node);
addToHead(node);
}
private void addToHead(CacheNode node) {
node.prev = head;
node.next = head.next;
head.next.prev = node;
head.next = node;
}
private void removeNode(CacheNode node) {
node.prev.next = node.next;
node.next.prev = node.prev;
}
private void startCleanupTask() {
ScheduledExecutorService cleanupExecutor = Executors.newSingleThreadScheduledExecutor();
cleanupExecutor.scheduleAtFixedRate(this::cleanupExpiredEntries, 60, 60, TimeUnit.SECONDS);
}
private void cleanupExpiredEntries() {
long currentTime = System.currentTimeMillis();
List expiredKeys = new ArrayList<>();
// 收集过期的key
for (Map.Entry> entry : cache.entrySet()) {
if (currentTime > entry.getValue().expireTime) {
expiredKeys.add(entry.getKey());
}
}
// 移除过期的条目
for (K key : expiredKeys) {
remove(key);
}
}
public int size() {
return cache.size();
}
public void clear() {
lock.writeLock().lock();
try {
cache.clear();
head.next = tail;
tail.prev = head;
} finally {
lock.writeLock().unlock();
}
}
}
限流器实现
实现多种限流算法,包括令牌桶、漏桶和滑动窗口,适用于不同的限流场景。
令牌桶算法
算法原理
令牌桶以固定速率产生令牌,请求需要获取令牌才能通过,支持突发流量。
TokenBucketRateLimiter.java
public class TokenBucketRateLimiter {
private final long capacity; // 桶容量
private final long refillRate; // 令牌生成速率(每秒)
private final AtomicLong tokens; // 当前令牌数
private final AtomicLong lastRefillTime; // 上次填充时间
public TokenBucketRateLimiter(long capacity, long refillRate) {
this.capacity = capacity;
this.refillRate = refillRate;
this.tokens = new AtomicLong(capacity);
this.lastRefillTime = new AtomicLong(System.currentTimeMillis());
}
public boolean tryAcquire() {
return tryAcquire(1);
}
public boolean tryAcquire(long permits) {
refillTokens();
long currentTokens = tokens.get();
if (currentTokens >= permits) {
if (tokens.compareAndSet(currentTokens, currentTokens - permits)) {
return true;
}
// CAS失败,重试
return tryAcquire(permits);
}
return false;
}
private void refillTokens() {
long now = System.currentTimeMillis();
long lastRefill = lastRefillTime.get();
if (now > lastRefill) {
long timePassed = now - lastRefill;
long tokensToAdd = (timePassed * refillRate) / 1000;
if (tokensToAdd > 0) {
long currentTokens = tokens.get();
long newTokens = Math.min(capacity, currentTokens + tokensToAdd);
if (tokens.compareAndSet(currentTokens, newTokens)) {
lastRefillTime.set(now);
}
}
}
}
public long getAvailableTokens() {
refillTokens();
return tokens.get();
}
}
滑动窗口算法
SlidingWindowRateLimiter.java
public class SlidingWindowRateLimiter {
private final int windowSize; // 窗口大小(秒)
private final int maxRequests; // 最大请求数
private final ConcurrentLinkedQueue requestTimes;
public SlidingWindowRateLimiter(int windowSize, int maxRequests) {
this.windowSize = windowSize;
this.maxRequests = maxRequests;
this.requestTimes = new ConcurrentLinkedQueue<>();
}
public boolean tryAcquire() {
long now = System.currentTimeMillis();
long windowStart = now - (windowSize * 1000L);
// 清理过期的请求记录
while (!requestTimes.isEmpty() && requestTimes.peek() < windowStart) {
requestTimes.poll();
}
// 检查是否超过限制
if (requestTimes.size() < maxRequests) {
requestTimes.offer(now);
return true;
}
return false;
}
public int getCurrentRequests() {
long now = System.currentTimeMillis();
long windowStart = now - (windowSize * 1000L);
// 清理过期的请求记录
while (!requestTimes.isEmpty() && requestTimes.peek() < windowStart) {
requestTimes.poll();
}
return requestTimes.size();
}
}
并发编程面试题精讲
总结常见的并发编程面试题,提供详细的解答思路和最佳答案。
经典面试题
synchronized vs ReentrantLock
区别:
• synchronized是关键字,ReentrantLock是类
• ReentrantLock提供更多功能(公平锁、中断等)
• synchronized自动释放,ReentrantLock需手动释放
• 性能上ReentrantLock在高竞争时更优
• synchronized是关键字,ReentrantLock是类
• ReentrantLock提供更多功能(公平锁、中断等)
• synchronized自动释放,ReentrantLock需手动释放
• 性能上ReentrantLock在高竞争时更优
ConcurrentHashMap原理
JDK8实现:
• 使用CAS + synchronized
• 数组 + 链表 + 红黑树结构
• 分段锁思想,锁粒度更细
• 扩容时支持并发扩容
• 使用CAS + synchronized
• 数组 + 链表 + 红黑树结构
• 分段锁思想,锁粒度更细
• 扩容时支持并发扩容
ThreadLocal内存泄漏
原因:
• ThreadLocalMap的key是弱引用
• value是强引用,可能导致内存泄漏
• 线程池中线程复用加剧问题
解决:及时调用remove()方法
• ThreadLocalMap的key是弱引用
• value是强引用,可能导致内存泄漏
• 线程池中线程复用加剧问题
解决:及时调用remove()方法
AQS工作原理
核心机制:
• 状态管理(state字段)
• FIFO等待队列
• 模板方法模式
• CAS操作保证原子性
• 状态管理(state字段)
• FIFO等待队列
• 模板方法模式
• CAS操作保证原子性
面试技巧
回答要点
- 理论结合实践:不仅要说原理,还要举实际应用例子
- 对比分析:说明不同方案的优缺点和适用场景
- 性能考虑:讨论性能影响和优化方法
- 实际经验:分享项目中遇到的并发问题和解决方案
- 深入细节:准备深入的技术细节,展示扎实功底
性能测试与优化
并发系统的性能测试和优化是确保系统稳定运行的关键环节。
性能测试工具
- JMH:Java微基准测试框架,精确测量代码性能
- JProfiler:Java性能分析工具,分析CPU和内存使用
- VisualVM:免费的性能监控工具
- Arthas:阿里开源的Java诊断工具
- 压测工具:JMeter、Gatling等负载测试工具
优化策略
减少锁竞争
使用无锁算法、减小锁粒度、避免锁嵌套。
内存优化
对象池化、减少GC压力、合理设置堆大小。
I/O优化
使用NIO、异步I/O、连接池等技术。