第20章

⚡ 并发编程实战

综合运用所学知识,构建高性能并发系统,解决实际业务场景中的并发问题

学习目标

实战项目概览

本章将通过多个实战项目,帮助你将前面学到的并发编程知识应用到实际场景中。我们将构建高性能的并发系统,解决真实业务中的并发问题。

实战特色

每个项目都基于真实的业务场景,涵盖从设计到实现的完整过程,并提供性能测试和优化建议。

高并发秒杀系统
模拟电商秒杀场景,处理高并发请求,保证数据一致性和系统稳定性。
线程池 原子类 并发集合 锁机制
实时日志分析系统
实时处理和分析大量日志数据,提供高性能的数据处理能力。
生产者消费者 Fork/Join CompletableFuture
分布式缓存系统
构建高性能的分布式缓存,支持LRU策略和过期机制。
读写锁 并发集合 同步工具类

高并发计数器实现

在高并发场景下,简单的计数器往往成为性能瓶颈。我们将实现一个高性能的分布式计数器,支持原子操作和一致性保证。

设计思路

原子操作
使用AtomicLong和LongAdder实现无锁的原子计数操作。
分段计数
采用分段策略减少竞争,提高并发性能。
一致性保证
确保在高并发环境下计数的准确性和一致性。

核心实现

HighPerformanceCounter.java
public class HighPerformanceCounter {
    private final LongAdder counter = new LongAdder();
    private final AtomicLong atomicCounter = new AtomicLong(0);
    
    // 使用LongAdder实现高性能计数
    public void increment() {
        counter.increment();
    }
    
    public void add(long delta) {
        counter.add(delta);
    }
    
    public long sum() {
        return counter.sum();
    }
    
    // 原子操作版本
    public long incrementAndGet() {
        return atomicCounter.incrementAndGet();
    }
    
    // 性能测试方法
    public static void performanceTest() {
        HighPerformanceCounter counter = new HighPerformanceCounter();
        int threadCount = 100;
        int operationsPerThread = 100000;
        
        ExecutorService executor = Executors.newFixedThreadPool(threadCount);
        CountDownLatch latch = new CountDownLatch(threadCount);
        
        long startTime = System.currentTimeMillis();
        
        for (int i = 0; i < threadCount; i++) {
            executor.submit(() -> {
                try {
                    for (int j = 0; j < operationsPerThread; j++) {
                        counter.increment();
                    }
                } finally {
                    latch.countDown();
                }
            });
        }
        
        try {
            latch.await();
            long endTime = System.currentTimeMillis();
            System.out.println("总计数: " + counter.sum());
            System.out.println("耗时: " + (endTime - startTime) + "ms");
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } finally {
            executor.shutdown();
        }
    }
}

生产者消费者系统

构建一个高性能的生产者消费者系统,支持多生产者多消费者模式,具备背压控制和监控功能。

系统架构

ProducerConsumerSystem.java
public class ProducerConsumerSystem {
    private final BlockingQueue queue;
    private final ExecutorService producerPool;
    private final ExecutorService consumerPool;
    private final AtomicLong producedCount = new AtomicLong(0);
    private final AtomicLong consumedCount = new AtomicLong(0);
    private volatile boolean running = true;
    
    public ProducerConsumerSystem(int queueCapacity, int producerThreads, int consumerThreads) {
        this.queue = new ArrayBlockingQueue<>(queueCapacity);
        this.producerPool = Executors.newFixedThreadPool(producerThreads);
        this.consumerPool = Executors.newFixedThreadPool(consumerThreads);
    }
    
    // 生产者接口
    public interface Producer {
        T produce() throws InterruptedException;
    }
    
    // 消费者接口
    public interface Consumer {
        void consume(T item) throws InterruptedException;
    }
    
    // 启动生产者
    public void startProducer(Producer producer) {
        producerPool.submit(() -> {
            while (running) {
                try {
                    T item = producer.produce();
                    if (item != null) {
                        queue.put(item); // 阻塞式放入
                        producedCount.incrementAndGet();
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                } catch (Exception e) {
                    System.err.println("生产者异常: " + e.getMessage());
                }
            }
        });
    }
    
    // 启动消费者
    public void startConsumer(Consumer consumer) {
        consumerPool.submit(() -> {
            while (running || !queue.isEmpty()) {
                try {
                    T item = queue.poll(1, TimeUnit.SECONDS);
                    if (item != null) {
                        consumer.consume(item);
                        consumedCount.incrementAndGet();
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                } catch (Exception e) {
                    System.err.println("消费者异常: " + e.getMessage());
                }
            }
        });
    }
    
    // 获取统计信息
    public void printStats() {
        System.out.println("队列大小: " + queue.size());
        System.out.println("已生产: " + producedCount.get());
        System.out.println("已消费: " + consumedCount.get());
        System.out.println("待处理: " + (producedCount.get() - consumedCount.get()));
    }
    
    // 优雅关闭
    public void shutdown() {
        running = false;
        producerPool.shutdown();
        consumerPool.shutdown();
        
        try {
            if (!producerPool.awaitTermination(5, TimeUnit.SECONDS)) {
                producerPool.shutdownNow();
            }
            if (!consumerPool.awaitTermination(5, TimeUnit.SECONDS)) {
                consumerPool.shutdownNow();
            }
        } catch (InterruptedException e) {
            producerPool.shutdownNow();
            consumerPool.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }
}

并发LRU缓存系统

实现一个线程安全的LRU缓存,支持高并发访问,具备过期策略和内存管理功能。

核心特性

线程安全
使用读写锁保证并发安全,读操作不互斥。
过期策略
支持TTL过期和定期清理过期数据。
LRU算法
基于双向链表和HashMap实现高效LRU。
ConcurrentLRUCache.java
public class ConcurrentLRUCache {
    private final int capacity;
    private final long defaultTTL;
    private final ConcurrentHashMap> cache;
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    private CacheNode head;
    private CacheNode tail;
    
    private static class CacheNode {
        K key;
        V value;
        long expireTime;
        CacheNode prev;
        CacheNode next;
        
        CacheNode(K key, V value, long expireTime) {
            this.key = key;
            this.value = value;
            this.expireTime = expireTime;
        }
    }
    
    public ConcurrentLRUCache(int capacity, long defaultTTL) {
        this.capacity = capacity;
        this.defaultTTL = defaultTTL;
        this.cache = new ConcurrentHashMap<>();
        
        // 初始化双向链表
        this.head = new CacheNode<>(null, null, 0);
        this.tail = new CacheNode<>(null, null, 0);
        head.next = tail;
        tail.prev = head;
        
        // 启动清理线程
        startCleanupTask();
    }
    
    public V get(K key) {
        CacheNode node = cache.get(key);
        if (node == null) {
            return null;
        }
        
        // 检查是否过期
        if (System.currentTimeMillis() > node.expireTime) {
            remove(key);
            return null;
        }
        
        // 移动到头部(最近使用)
        lock.writeLock().lock();
        try {
            moveToHead(node);
        } finally {
            lock.writeLock().unlock();
        }
        
        return node.value;
    }
    
    public void put(K key, V value) {
        put(key, value, defaultTTL);
    }
    
    public void put(K key, V value, long ttl) {
        long expireTime = System.currentTimeMillis() + ttl;
        CacheNode existingNode = cache.get(key);
        
        if (existingNode != null) {
            // 更新现有节点
            lock.writeLock().lock();
            try {
                existingNode.value = value;
                existingNode.expireTime = expireTime;
                moveToHead(existingNode);
            } finally {
                lock.writeLock().unlock();
            }
        } else {
            // 添加新节点
            CacheNode newNode = new CacheNode<>(key, value, expireTime);
            
            lock.writeLock().lock();
            try {
                if (cache.size() >= capacity) {
                    // 移除最少使用的节点
                    CacheNode lastNode = tail.prev;
                    removeNode(lastNode);
                    cache.remove(lastNode.key);
                }
                
                addToHead(newNode);
                cache.put(key, newNode);
            } finally {
                lock.writeLock().unlock();
            }
        }
    }
    
    public V remove(K key) {
        CacheNode node = cache.remove(key);
        if (node != null) {
            lock.writeLock().lock();
            try {
                removeNode(node);
            } finally {
                lock.writeLock().unlock();
            }
            return node.value;
        }
        return null;
    }
    
    private void moveToHead(CacheNode node) {
        removeNode(node);
        addToHead(node);
    }
    
    private void addToHead(CacheNode node) {
        node.prev = head;
        node.next = head.next;
        head.next.prev = node;
        head.next = node;
    }
    
    private void removeNode(CacheNode node) {
        node.prev.next = node.next;
        node.next.prev = node.prev;
    }
    
    private void startCleanupTask() {
        ScheduledExecutorService cleanupExecutor = Executors.newSingleThreadScheduledExecutor();
        cleanupExecutor.scheduleAtFixedRate(this::cleanupExpiredEntries, 60, 60, TimeUnit.SECONDS);
    }
    
    private void cleanupExpiredEntries() {
        long currentTime = System.currentTimeMillis();
        List expiredKeys = new ArrayList<>();
        
        // 收集过期的key
        for (Map.Entry> entry : cache.entrySet()) {
            if (currentTime > entry.getValue().expireTime) {
                expiredKeys.add(entry.getKey());
            }
        }
        
        // 移除过期的条目
        for (K key : expiredKeys) {
            remove(key);
        }
    }
    
    public int size() {
        return cache.size();
    }
    
    public void clear() {
        lock.writeLock().lock();
        try {
            cache.clear();
            head.next = tail;
            tail.prev = head;
        } finally {
            lock.writeLock().unlock();
        }
    }
}

限流器实现

实现多种限流算法,包括令牌桶、漏桶和滑动窗口,适用于不同的限流场景。

令牌桶算法

算法原理

令牌桶以固定速率产生令牌,请求需要获取令牌才能通过,支持突发流量。

TokenBucketRateLimiter.java
public class TokenBucketRateLimiter {
    private final long capacity;        // 桶容量
    private final long refillRate;      // 令牌生成速率(每秒)
    private final AtomicLong tokens;    // 当前令牌数
    private final AtomicLong lastRefillTime; // 上次填充时间
    
    public TokenBucketRateLimiter(long capacity, long refillRate) {
        this.capacity = capacity;
        this.refillRate = refillRate;
        this.tokens = new AtomicLong(capacity);
        this.lastRefillTime = new AtomicLong(System.currentTimeMillis());
    }
    
    public boolean tryAcquire() {
        return tryAcquire(1);
    }
    
    public boolean tryAcquire(long permits) {
        refillTokens();
        
        long currentTokens = tokens.get();
        if (currentTokens >= permits) {
            if (tokens.compareAndSet(currentTokens, currentTokens - permits)) {
                return true;
            }
            // CAS失败,重试
            return tryAcquire(permits);
        }
        
        return false;
    }
    
    private void refillTokens() {
        long now = System.currentTimeMillis();
        long lastRefill = lastRefillTime.get();
        
        if (now > lastRefill) {
            long timePassed = now - lastRefill;
            long tokensToAdd = (timePassed * refillRate) / 1000;
            
            if (tokensToAdd > 0) {
                long currentTokens = tokens.get();
                long newTokens = Math.min(capacity, currentTokens + tokensToAdd);
                
                if (tokens.compareAndSet(currentTokens, newTokens)) {
                    lastRefillTime.set(now);
                }
            }
        }
    }
    
    public long getAvailableTokens() {
        refillTokens();
        return tokens.get();
    }
}

滑动窗口算法

SlidingWindowRateLimiter.java
public class SlidingWindowRateLimiter {
    private final int windowSize;       // 窗口大小(秒)
    private final int maxRequests;      // 最大请求数
    private final ConcurrentLinkedQueue requestTimes;
    
    public SlidingWindowRateLimiter(int windowSize, int maxRequests) {
        this.windowSize = windowSize;
        this.maxRequests = maxRequests;
        this.requestTimes = new ConcurrentLinkedQueue<>();
    }
    
    public boolean tryAcquire() {
        long now = System.currentTimeMillis();
        long windowStart = now - (windowSize * 1000L);
        
        // 清理过期的请求记录
        while (!requestTimes.isEmpty() && requestTimes.peek() < windowStart) {
            requestTimes.poll();
        }
        
        // 检查是否超过限制
        if (requestTimes.size() < maxRequests) {
            requestTimes.offer(now);
            return true;
        }
        
        return false;
    }
    
    public int getCurrentRequests() {
        long now = System.currentTimeMillis();
        long windowStart = now - (windowSize * 1000L);
        
        // 清理过期的请求记录
        while (!requestTimes.isEmpty() && requestTimes.peek() < windowStart) {
            requestTimes.poll();
        }
        
        return requestTimes.size();
    }
}

并发编程面试题精讲

总结常见的并发编程面试题,提供详细的解答思路和最佳答案。

经典面试题

synchronized vs ReentrantLock
区别:
• synchronized是关键字,ReentrantLock是类
• ReentrantLock提供更多功能(公平锁、中断等)
• synchronized自动释放,ReentrantLock需手动释放
• 性能上ReentrantLock在高竞争时更优
ConcurrentHashMap原理
JDK8实现:
• 使用CAS + synchronized
• 数组 + 链表 + 红黑树结构
• 分段锁思想,锁粒度更细
• 扩容时支持并发扩容
ThreadLocal内存泄漏
原因:
• ThreadLocalMap的key是弱引用
• value是强引用,可能导致内存泄漏
• 线程池中线程复用加剧问题
解决:及时调用remove()方法
AQS工作原理
核心机制:
• 状态管理(state字段)
• FIFO等待队列
• 模板方法模式
• CAS操作保证原子性

面试技巧

回答要点
  • 理论结合实践:不仅要说原理,还要举实际应用例子
  • 对比分析:说明不同方案的优缺点和适用场景
  • 性能考虑:讨论性能影响和优化方法
  • 实际经验:分享项目中遇到的并发问题和解决方案
  • 深入细节:准备深入的技术细节,展示扎实功底

性能测试与优化

并发系统的性能测试和优化是确保系统稳定运行的关键环节。

性能测试工具

优化策略

减少锁竞争
使用无锁算法、减小锁粒度、避免锁嵌套。
内存优化
对象池化、减少GC压力、合理设置堆大小。
I/O优化
使用NIO、异步I/O、连接池等技术。
上一章:并发编程陷阱 返回目录