第12章
🗂️ 并发集合
掌握Java并发集合的原理和应用,包括ConcurrentHashMap、CopyOnWriteArrayList、BlockingQueue等高性能数据结构
学习目标
- 掌握ConcurrentHashMap的实现原理和分段锁机制
- 了解CopyOnWriteArrayList的写时复制策略和适用场景
- 熟练使用各种BlockingQueue实现生产者消费者模式
- 理解ConcurrentLinkedQueue的无锁实现原理
- 进行并发集合的性能分析和选择策略
ConcurrentHashMap
ConcurrentHashMap是Java并发包中最重要的数据结构之一,它提供了线程安全的哈希表实现,相比于Hashtable和Collections.synchronizedMap(),具有更好的并发性能。
核心特性
ConcurrentHashMap通过分段锁(Segment)和CAS操作实现高并发,在Java 8中进一步优化为Node数组+链表/红黑树的结构。
实现原理
分段锁机制
Java 7中使用Segment数组,每个Segment包含一个HashEntry数组,只锁定操作的Segment,提高并发度。
CAS操作
Java 8中大量使用CAS操作替代锁,减少锁竞争,提高性能。
红黑树优化
当链表长度超过8时转换为红黑树,提高查找效率。
代码示例
ConcurrentHashMap基本使用
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.CountDownLatch;
public class ConcurrentHashMapDemo {
private static final ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
public static void main(String[] args) throws InterruptedException {
int threadCount = 10;
CountDownLatch latch = new CountDownLatch(threadCount);
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
// 并发写入
for (int i = 0; i < threadCount; i++) {
final int index = i;
executor.submit(() -> {
try {
for (int j = 0; j < 1000; j++) {
String key = "key-" + (index * 1000 + j);
map.put(key, j);
// 使用putIfAbsent保证原子性
map.putIfAbsent("counter", 0);
map.compute("counter", (k, v) -> v + 1);
}
} finally {
latch.countDown();
}
});
}
latch.await();
executor.shutdown();
System.out.println("Map size: " + map.size());
System.out.println("Counter: " + map.get("counter"));
}
}
CopyOnWriteArrayList
CopyOnWriteArrayList是一个线程安全的List实现,采用写时复制(Copy-On-Write)策略。它特别适用于读多写少的场景,如缓存、配置信息等。
工作原理
- 读操作:直接读取当前数组,无需加锁,性能极高
- 写操作:复制整个数组,在新数组上进行修改,然后替换原数组
- 一致性:保证读操作的一致性,但可能读到旧数据
- 内存开销:写操作需要复制整个数组,内存开销较大
CopyOnWriteArrayList示例
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CopyOnWriteArrayListDemo {
private static final CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
public static void main(String[] args) throws InterruptedException {
// 初始化数据
for (int i = 0; i < 5; i++) {
list.add("item-" + i);
}
ExecutorService executor = Executors.newFixedThreadPool(3);
// 读线程
executor.submit(() -> {
while (true) {
for (String item : list) {
System.out.println("Reading: " + item);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}
});
// 写线程
executor.submit(() -> {
for (int i = 5; i < 10; i++) {
list.add("item-" + i);
System.out.println("Added: item-" + i);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
});
Thread.sleep(5000);
executor.shutdownNow();
}
}
BlockingQueue
BlockingQueue是一个支持阻塞操作的队列接口,当队列为空时获取操作会阻塞,当队列满时插入操作会阻塞。它是实现生产者消费者模式的理想选择。
主要实现类
ArrayBlockingQueue
基于数组的有界阻塞队列,FIFO排序,支持公平和非公平锁。
LinkedBlockingQueue
基于链表的可选有界阻塞队列,默认容量为Integer.MAX_VALUE。
PriorityBlockingQueue
支持优先级的无界阻塞队列,元素按优先级排序。
生产者消费者模式
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
public class ProducerConsumerDemo {
private static final BlockingQueue<String> queue = new ArrayBlockingQueue<>(10);
static class Producer implements Runnable {
@Override
public void run() {
try {
for (int i = 0; i < 20; i++) {
String item = "item-" + i;
queue.put(item); // 阻塞直到有空间
System.out.println("Produced: " + item);
Thread.sleep(100);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
static class Consumer implements Runnable {
@Override
public void run() {
try {
while (true) {
String item = queue.poll(1, TimeUnit.SECONDS);
if (item != null) {
System.out.println("Consumed: " + item);
Thread.sleep(200);
} else {
System.out.println("No item available, timeout");
break;
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
public static void main(String[] args) throws InterruptedException {
Thread producer = new Thread(new Producer());
Thread consumer1 = new Thread(new Consumer());
Thread consumer2 = new Thread(new Consumer());
producer.start();
consumer1.start();
consumer2.start();
producer.join();
consumer1.join();
consumer2.join();
}
}
ConcurrentLinkedQueue
ConcurrentLinkedQueue是一个基于链表的无界非阻塞队列,使用CAS操作实现线程安全,具有很高的并发性能。
性能特点
ConcurrentLinkedQueue使用无锁算法,避免了锁竞争,在高并发场景下性能优异,但不支持阻塞操作。
核心特性
- 无锁实现:使用CAS操作保证线程安全
- 无界队列:理论上可以无限增长
- FIFO顺序:先进先出的队列语义
- 弱一致性:迭代器具有弱一致性保证
ConcurrentLinkedQueue使用示例
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.CountDownLatch;
public class ConcurrentLinkedQueueDemo {
private static final ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
public static void main(String[] args) throws InterruptedException {
int threadCount = 5;
CountDownLatch latch = new CountDownLatch(threadCount * 2);
ExecutorService executor = Executors.newFixedThreadPool(threadCount * 2);
// 生产者线程
for (int i = 0; i < threadCount; i++) {
final int producerId = i;
executor.submit(() -> {
try {
for (int j = 0; j < 100; j++) {
String item = "P" + producerId + "-" + j;
queue.offer(item);
if (j % 20 == 0) {
System.out.println("Producer " + producerId + " added: " + item);
}
}
} finally {
latch.countDown();
}
});
}
// 消费者线程
for (int i = 0; i < threadCount; i++) {
final int consumerId = i;
executor.submit(() -> {
try {
int count = 0;
String item;
while ((item = queue.poll()) != null || count < 1000) {
if (item != null) {
count++;
if (count % 20 == 0) {
System.out.println("Consumer " + consumerId + " got: " + item);
}
} else {
Thread.sleep(1); // 短暂等待
}
}
System.out.println("Consumer " + consumerId + " processed: " + count + " items");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
latch.countDown();
}
});
}
latch.await();
executor.shutdown();
System.out.println("Remaining items in queue: " + queue.size());
}
}
性能对比分析
不同的并发集合在不同场景下有着不同的性能表现,选择合适的数据结构对系统性能至关重要。
选择策略
高并发读写
推荐:ConcurrentHashMap
场景:缓存、计数器、配置管理
场景:缓存、计数器、配置管理
读多写少
推荐:CopyOnWriteArrayList
场景:监听器列表、配置信息
场景:监听器列表、配置信息
生产者消费者
推荐:BlockingQueue系列
场景:任务队列、消息传递
场景:任务队列、消息传递
性能测试建议
- 根据实际业务场景进行基准测试
- 考虑内存使用情况,特别是CopyOnWriteArrayList
- 评估读写比例,选择合适的数据结构
- 监控GC压力,避免频繁的对象创建