第12章

🗂️ 并发集合

掌握Java并发集合的原理和应用,包括ConcurrentHashMap、CopyOnWriteArrayList、BlockingQueue等高性能数据结构

学习目标

ConcurrentHashMap

ConcurrentHashMap是Java并发包中最重要的数据结构之一,它提供了线程安全的哈希表实现,相比于Hashtable和Collections.synchronizedMap(),具有更好的并发性能。

核心特性

ConcurrentHashMap通过分段锁(Segment)和CAS操作实现高并发,在Java 8中进一步优化为Node数组+链表/红黑树的结构。

实现原理

分段锁机制
Java 7中使用Segment数组,每个Segment包含一个HashEntry数组,只锁定操作的Segment,提高并发度。
CAS操作
Java 8中大量使用CAS操作替代锁,减少锁竞争,提高性能。
红黑树优化
当链表长度超过8时转换为红黑树,提高查找效率。

代码示例

ConcurrentHashMap基本使用
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.CountDownLatch;

public class ConcurrentHashMapDemo {
    private static final ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
    
    public static void main(String[] args) throws InterruptedException {
        int threadCount = 10;
        CountDownLatch latch = new CountDownLatch(threadCount);
        ExecutorService executor = Executors.newFixedThreadPool(threadCount);
        
        // 并发写入
        for (int i = 0; i < threadCount; i++) {
            final int index = i;
            executor.submit(() -> {
                try {
                    for (int j = 0; j < 1000; j++) {
                        String key = "key-" + (index * 1000 + j);
                        map.put(key, j);
                        
                        // 使用putIfAbsent保证原子性
                        map.putIfAbsent("counter", 0);
                        map.compute("counter", (k, v) -> v + 1);
                    }
                } finally {
                    latch.countDown();
                }
            });
        }
        
        latch.await();
        executor.shutdown();
        
        System.out.println("Map size: " + map.size());
        System.out.println("Counter: " + map.get("counter"));
    }
}

CopyOnWriteArrayList

CopyOnWriteArrayList是一个线程安全的List实现,采用写时复制(Copy-On-Write)策略。它特别适用于读多写少的场景,如缓存、配置信息等。

工作原理

CopyOnWriteArrayList示例
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CopyOnWriteArrayListDemo {
    private static final CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
    
    public static void main(String[] args) throws InterruptedException {
        // 初始化数据
        for (int i = 0; i < 5; i++) {
            list.add("item-" + i);
        }
        
        ExecutorService executor = Executors.newFixedThreadPool(3);
        
        // 读线程
        executor.submit(() -> {
            while (true) {
                for (String item : list) {
                    System.out.println("Reading: " + item);
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }
        });
        
        // 写线程
        executor.submit(() -> {
            for (int i = 5; i < 10; i++) {
                list.add("item-" + i);
                System.out.println("Added: item-" + i);
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        });
        
        Thread.sleep(5000);
        executor.shutdownNow();
    }
}

BlockingQueue

BlockingQueue是一个支持阻塞操作的队列接口,当队列为空时获取操作会阻塞,当队列满时插入操作会阻塞。它是实现生产者消费者模式的理想选择。

主要实现类

ArrayBlockingQueue
基于数组的有界阻塞队列,FIFO排序,支持公平和非公平锁。
LinkedBlockingQueue
基于链表的可选有界阻塞队列,默认容量为Integer.MAX_VALUE。
PriorityBlockingQueue
支持优先级的无界阻塞队列,元素按优先级排序。
生产者消费者模式
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

public class ProducerConsumerDemo {
    private static final BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);
    
    static class Producer implements Runnable {
        @Override
        public void run() {
            try {
                for (int i = 0; i < 20; i++) {
                    String item = "item-" + i;
                    queue.put(item); // 阻塞直到有空间
                    System.out.println("Produced: " + item);
                    Thread.sleep(100);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
    
    static class Consumer implements Runnable {
        @Override
        public void run() {
            try {
                while (true) {
                    String item = queue.poll(1, TimeUnit.SECONDS);
                    if (item != null) {
                        System.out.println("Consumed: " + item);
                        Thread.sleep(200);
                    } else {
                        System.out.println("No item available, timeout");
                        break;
                    }
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
    
    public static void main(String[] args) throws InterruptedException {
        Thread producer = new Thread(new Producer());
        Thread consumer1 = new Thread(new Consumer());
        Thread consumer2 = new Thread(new Consumer());
        
        producer.start();
        consumer1.start();
        consumer2.start();
        
        producer.join();
        consumer1.join();
        consumer2.join();
    }
}

ConcurrentLinkedQueue

ConcurrentLinkedQueue是一个基于链表的无界非阻塞队列,使用CAS操作实现线程安全,具有很高的并发性能。

性能特点

ConcurrentLinkedQueue使用无锁算法,避免了锁竞争,在高并发场景下性能优异,但不支持阻塞操作。

核心特性

ConcurrentLinkedQueue使用示例
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.CountDownLatch;

public class ConcurrentLinkedQueueDemo {
    private static final ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
    
    public static void main(String[] args) throws InterruptedException {
        int threadCount = 5;
        CountDownLatch latch = new CountDownLatch(threadCount * 2);
        ExecutorService executor = Executors.newFixedThreadPool(threadCount * 2);
        
        // 生产者线程
        for (int i = 0; i < threadCount; i++) {
            final int producerId = i;
            executor.submit(() -> {
                try {
                    for (int j = 0; j < 100; j++) {
                        String item = "P" + producerId + "-" + j;
                        queue.offer(item);
                        if (j % 20 == 0) {
                            System.out.println("Producer " + producerId + " added: " + item);
                        }
                    }
                } finally {
                    latch.countDown();
                }
            });
        }
        
        // 消费者线程
        for (int i = 0; i < threadCount; i++) {
            final int consumerId = i;
            executor.submit(() -> {
                try {
                    int count = 0;
                    String item;
                    while ((item = queue.poll()) != null || count < 1000) {
                        if (item != null) {
                            count++;
                            if (count % 20 == 0) {
                                System.out.println("Consumer " + consumerId + " got: " + item);
                            }
                        } else {
                            Thread.sleep(1); // 短暂等待
                        }
                    }
                    System.out.println("Consumer " + consumerId + " processed: " + count + " items");
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    latch.countDown();
                }
            });
        }
        
        latch.await();
        executor.shutdown();
        
        System.out.println("Remaining items in queue: " + queue.size());
    }
}

性能对比分析

不同的并发集合在不同场景下有着不同的性能表现,选择合适的数据结构对系统性能至关重要。

选择策略

高并发读写
推荐:ConcurrentHashMap
场景:缓存、计数器、配置管理
读多写少
推荐:CopyOnWriteArrayList
场景:监听器列表、配置信息
生产者消费者
推荐:BlockingQueue系列
场景:任务队列、消息传递
性能测试建议
  • 根据实际业务场景进行基准测试
  • 考虑内存使用情况,特别是CopyOnWriteArrayList
  • 评估读写比例,选择合适的数据结构
  • 监控GC压力,避免频繁的对象创建
上一章:Future和CompletableFuture 返回目录 下一章:同步工具类