基本
概念
原则:首先让你的代码正确,然后让它跑的更快
- 正确性:在多线程环境中,正确性不仅仅是指程序在单线程环境下能够正确执行,还包括在多线程并发执行时,程序的行为仍然符合预期。这意味着程序需要处理并发访问共享资源时可能出现的竞态条件、死锁、活锁等问题。
- 竞态条件:当多个线程同时访问和修改共享资源时,程序的输出依赖于线程执行的顺序,这种情况称为竞态条件。竞态条件可能导致程序行为不可预测。
- 数据竞争:多个线程未同步时访问同一个共享变量,且至少一个线程在写。
- 死锁:当多个线程互相等待对方释放锁时,可能会导致所有线程都无法继续执行,这种情况称为死锁。避免死锁的策略包括按顺序获取锁、使用超时机制等。
- 活锁:当多个线程不断尝试解决冲突但始终无法取得进展时,称为活锁。活锁通常是由于线程不断重试某个操作但始终无法成功。
- 原子操作:原子操作是指不可分割的操作,要么全部执行成功,要么全部不执行。原子操作可以避免竞态条件,常见的原子操作包括CAS(Compare-And-Swap)操作。
- CAS:乐观锁机制,硬件级别的原子操作。不需要使用锁,因此避免了锁的开销(如上下文切换、阻塞等)。CAS 操作可能会遇到 ABA 问题。
- ABA问题:变量的值从 A 变为 B,又变回 A,CAS 无法感知中间的变化。可以通过版本号或标记来解决。
eg:java原子类——java.util.concurrent.atomic
AtomicReference替代对象引用的线程安全类
eg:ConcurrencyIssues
在多线程环境中程序需要处理并发访问共享资源时可能出现的竞态条件、死锁、活锁等问题。
public class ConcurrencyIssues {
// 共享资源,用于展示竞态条件
private int counter = 0;
// 两个锁对象,用于展示死锁和活锁
private final Object lock1 = new Object();
private final Object lock2 = new Object();
// 标志位,用于展示活锁
private boolean isProcessing = false;
/**
* 竞态条件(Race Condition)
* 多个线程同时修改共享资源,导致结果不可预测。
*/
public void demonstrateRaceCondition() {
Runnable task = () -> {
for (int i = 0; i < 1000; i++) {
counter++; // 竞态条件:多个线程同时修改 counter
}
};
Thread thread1 = new Thread(task);
Thread thread2 = new Thread(task);
thread1.start();
thread2.start();
try {
thread1.join();
thread2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Race Condition: Counter value = " + counter);
}
/**
* 死锁(Deadlock)
* 两个线程互相持有对方需要的锁,导致程序无法继续执行。
*/
public void demonstrateDeadlock() {
Runnable task1 = () -> {
synchronized (lock1) {
System.out.println("Thread 1: Holding lock1...");
try {
Thread.sleep(100); // 模拟延迟,增加死锁概率
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Thread 1: Waiting for lock2...");
synchronized (lock2) {
System.out.println("Thread 1: Acquired lock2!");
}
}
};
Runnable task2 = () -> {
synchronized (lock2) {
System.out.println("Thread 2: Holding lock2...");
try {
Thread.sleep(100); // 模拟延迟,增加死锁概率
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Thread 2: Waiting for lock1...");
synchronized (lock1) {
System.out.println("Thread 2: Acquired lock1!");
}
}
};
Thread thread1 = new Thread(task1);
Thread thread2 = new Thread(task2);
thread1.start();
thread2.start();
try {
thread1.join();
thread2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Deadlock: If you see this, deadlock did not occur!");
}
/**
* 活锁(Livelock)
* 两个线程不断尝试解决问题,但由于相互让步,导致程序无法继续执行。
*/
public void demonstrateLivelock() {
Runnable task1 = () -> {
while (isProcessing) {
System.out.println("Thread 1: Waiting for processing to complete...");
try {
Thread.sleep(100); // 模拟延迟
} catch (InterruptedException e) {
e.printStackTrace();
}
}
isProcessing = true;
System.out.println("Thread 1: Processing started!");
isProcessing = false;
};
Runnable task2 = () -> {
while (isProcessing) {
System.out.println("Thread 2: Waiting for processing to complete...");
try {
Thread.sleep(100); // 模拟延迟
} catch (InterruptedException e) {
e.printStackTrace();
}
}
isProcessing = true;
System.out.println("Thread 2: Processing started!");
isProcessing = false;
};
Thread thread1 = new Thread(task1);
Thread thread2 = new Thread(task2);
thread1.start();
thread2.start();
try {
thread1.join();
thread2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Livelock: If you see this, livelock did not occur!");
}
public static void main(String[] args) {
ConcurrencyIssues issues = new ConcurrencyIssues();
// 演示竞态条件
issues.demonstrateRaceCondition();
// 演示死锁
issues.demonstrateDeadlock();
// 演示活锁
issues.demonstrateLivelock();
}
}
eg:AtomicInteger
通过CAS机制和自旋锁机制实现的MyAtomicInteger
import sun.misc.Unsafe; // 使用 Unsafe 类实现 CAS 操作
public class MyAtomicInteger {
private volatile int value; // 使用 volatile 保证可见性
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset; // value 字段的内存偏移量
static {
try {
// 获取 value 字段的内存偏移量
valueOffset = unsafe.objectFieldOffset(MyAtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) {
throw new Error(ex);
}
}
// 构造函数
public MyAtomicInteger(int initialValue) {
value = initialValue;
}
// 获取当前值
public int get() {
return value;
}
// 设置新值
public void set(int newValue) {
value = newValue;
}
// CAS 操作:比较并交换
private boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
// 原子递增
public int incrementAndGet() {
int prev, next;
do {
prev = get(); // 获取当前值
next = prev + 1; // 计算新值
} while (!compareAndSet(prev, next)); // CAS 操作,直到成功
return next;
}
// 原子递减
public int decrementAndGet() {
int prev, next;
do {
prev = get(); // 获取当前值
next = prev - 1; // 计算新值
} while (!compareAndSet(prev, next)); // CAS 操作,直到成功
return next;
}
// 原子加法
public int addAndGet(int delta) {
int prev, next;
do {
prev = get(); // 获取当前值
next = prev + delta; // 计算新值
} while (!compareAndSet(prev, next)); // CAS 操作,直到成功
return next;
}
}
线程安全
线程安全是为了确保在多线程环境下,共享资源的状态能够保持一致性和正确性。如果没有线程安全机制,多个线程同时访问和修改共享资源可能会导致数据不一致、程序崩溃或其他不可预见的错误。
- 编写线程安全的代码,本质上就是管理对状态的访问。
- 线程安全性需要我们在不可控制的并发访问中保护数据。
- 当多个线程访问一个类时,如果不用考虑这些线程在运行时环境下的调度和交替执行,并且不需要额外的同步及在调用方代码不必作其他的协调,这个类的行为仍然是正确的,那么称这个类是线程安全的。
- 开始就将一个类设计成线程安全的比在后期修复它更容易。
- 无状态对象永远是线程安全的。
- 大部分servlet是无状态的,只有servlet在处理请求时需要保存信息的情况下,线程安全性才会成为问题。
- Check-Then-Act先检查后执行——初始化延迟。
++count:“读取——修改——写入”的操作系列,其结果状态依赖于之前的状态
- 状态:一个对象的状态就是它的数据,存储在状态对象中,其包括对象本身的域以及其他附属对象的域。
- 共享:一个变量可以被多个线程访问。
- 可变:变量的值在其生命周期内可变.。
同步
同步机制是为了协调多个线程对共享资源的访问,确保在同一时间只有一个线程能够修改共享资源。通过同步机制,可以避免竞态条件,确保数据的一致性和正确性。
在不变性条件中涉及多个变量时,各个变量之间并不彼此独立,而是某个变量的值会对其他变量的值产生约束。
修复多线程条件下对同一个未正确同步的变量的访问隐患的方法:
- 不要跨线程共享变量
- 使状态变量为不可变的
- 在任何访问状态变量的时候使用同步
发布和逸出
类未被正确发布
eg:非final字段在构造函数中初始化
public class UnsafePublication {
private int value;
public UnsafePublication(int value) {
this.value = value;
}
// 可能被其他线程看到value为0(默认值)而非构造时设置的值
}
import sun.misc.Unsafe;
import java.lang.reflect.Field;
public class UnsafePublicationPoc {
private int value; // 非final、非volatile
// 通过Unsafe直接分配内存但不调用构造函数
private static Unsafe getUnsafe() throws Exception {
Field f = Unsafe.class.getDeclaredField("theUnsafe");
f.setAccessible(true);
return (Unsafe) f.get(null);
}
public static void main(String[] args) throws Exception {
Unsafe unsafe = getUnsafe();
// 分配对象内存但不调用构造函数(value不会被初始化为42)
UnsafePublicationPoc instance = (UnsafePublicationPoc) unsafe.allocateInstance(UnsafePublicationPoc.class);
// 线程A:尝试初始化value(但故意延迟)
Thread writer = new Thread(() -> {
try {
Thread.sleep(100); // 确保reader线程先看到未初始化的对象
} catch (InterruptedException e) {}
instance.value = 42;
});
// 线程B:在value被初始化前读取
Thread reader = new Thread(() -> {
System.out.println("看到的value值: " + instance.value); // 必然输出0
});
writer.start();
reader.start();
writer.join();
reader.join();
}
}
eg:通过静态字段或方法发布对象
public class Holder {
public static Holder holder;
public static void initialize() {
holder = new Holder(); // 不安全发布
}
}
public class UnsafeHolder {
private int value;
public static UnsafeHolder holder;
public UnsafeHolder() {
// 关键步骤1:先发布this引用(危险操作)
holder = this;
// 关键步骤2:模拟耗时初始化操作(制造时间窗口)
try {
Thread.sleep(100); // 延迟让其他线程有机会介入
} catch (InterruptedException e) {}
this.value = 42; // 实际初始化
}
public static void main(String[] args) throws Exception {
// 监控线程:持续检查value值
new Thread(() -> {
while (true) {
if (UnsafeHolder.holder != null && UnsafeHolder.holder.value != 42) {
System.out.println("检测到未初始化值: " + UnsafeHolder.holder.value);
System.exit(0);
}
}
}).start();
// 主线程:创建对象(触发构造函数)
new UnsafeHolder();
Thread.sleep(1000);
System.out.println("正常结束,未检测到问题");
}
}
eg:在构造函数中启动线程或注册监听器
public class ThisEscape {
public ThisEscape(EventSource source) {
source.registerListener(new EventListener() {
public void onEvent(Event e) {
doSomething(e); // 可能访问未完全构造的ThisEscape实例
}
});
}
}
interface EventListener {
void onEvent();
}
class EventSource {
private EventListener listener;
// 注册监听器并立即触发事件(模拟外部系统快速回调)
public void registerListener(EventListener listener) {
this.listener = listener;
triggerEvent(); // 危险操作:在注册时立即触发事件
}
private void triggerEvent() {
if (listener != null) {
listener.onEvent(); // 此时ThisEscape可能未完成构造
}
}
}
public class ThisEscape {
private int value = 10; // 初始值设为10,构造器中会修改为20
public ThisEscape(EventSource source) {
// 注册监听器(隐含传递this引用)
source.registerListener(new EventListener() {
@Override
public void onEvent() {
// 当事件触发时,可能访问到未完全初始化的对象
System.out.println("[回调中] value = " + value);
}
});
// 模拟耗时初始化操作(让回调有机会在初始化完成前触发)
try {
Thread.sleep(1000); // 延迟1秒
} catch (InterruptedException e) {
e.printStackTrace();
}
this.value = 20; // 实际初始化值
System.out.println("[构造完成] value = " + value);
}
public static void main(String[] args) {
EventSource source = new EventSource();
new ThisEscape(source);
}
}
volatile
- 可见性:JMM中,每个线程都有自己的工作内存(线程本地内存),线程对变量的操作通常是在工作内存中进行的,当线程修改volatile值时,会被立刻写回主存,其他线程读取时直接从主存中读。
- 禁止指令重排序:
- 在写volatile变量之前插入StoreStore屏障,之后插入StoreLoad屏障。
- 在读volatile变量之前插入LoadLoad屏障,之后插入LoadStore屏障。
锁
java原生锁
特性 | 内置锁(synchronized ) | 显式锁(ReentrantLock ) |
---|---|---|
锁管理方式 | JVM 自动管理(隐式) | 手动管理(显式调用 lock/unlock ) |
实现机制 | JVM 监视器锁(monitorenter/monitorexit ) | 基于 AQS + CAS(Java 代码实现) |
可中断性 | ❌ 不支持 | ✅ 支持(lockInterruptibly() ) |
超时获取 | ❌ 不支持 | ✅ 支持(tryLock(timeout) ) |
公平性 | ❌ 仅非公平 | ✅ 可配置(公平/非公平) |
条件变量 | 仅 wait/notify (绑定对象监视器) | 支持多个 Condition (更灵活) |
适用场景 | 简单同步、代码简洁 | 复杂同步、需要高级功能(如超时、公平锁) |
内置锁
内置锁是 Java 语言层面原生支持的锁机制,通过 synchronized 关键字实现,由 JVM 直接管理,锁的获取和释放由 JVM 自动完成。
核心性质:
- 隐式锁管理:无需手动加锁/解锁,进入同步块自动获取锁,退出时自动释放(包括异常情况)。
- 基于监视器(Monitor):每个 Java 对象都关联一个内置锁(Monitor),通过 monitorenter 和 monitorexit 字节码指令实现。
- 非公平锁:默认不保证线程获取锁的顺序。
- 可重入:同一个线程可以多次获取同一把锁(避免死锁)。
- 不可中断:线程在等待锁时无法被中断(Thread.interrupt() 无法直接打断)。
eg:基本使用
public class IntrinsicLockDemo {
private int counter = 0;
// 同步方法(锁对象是 this)
public synchronized void increment() {
counter++;
}
// 同步代码块(锁对象是 this)
public void decrement() {
synchronized (this) {
counter--;
}
}
// 静态同步方法(锁对象是 Class 对象)
public static synchronized void staticMethod() {
System.out.println("Static synchronized method");
}
}
eg:可重入
public class ReentrantDemo {
public synchronized void method1() {
System.out.println("method1");
method2(); // 调用另一个同步方法(可重入)
}
public synchronized void method2() {
System.out.println("method2");
}
public static void main(String[] args) {
ReentrantDemo demo = new ReentrantDemo();
demo.method1(); // 不会死锁,因为锁可重入
}
}
eg:不可中断
public class NonInterruptibleDemo {
public synchronized void doTask() {
while (true) {
// 模拟长时间占用锁
System.out.println(Thread.currentThread().getName() + " is running...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
System.out.println("Interrupted, but still holding the lock!");
}
}
}
public static void main(String[] args) throws InterruptedException {
NonInterruptibleDemo demo = new NonInterruptibleDemo();
Thread t1 = new Thread(demo::doTask, "Thread-1");
Thread t2 = new Thread(demo::doTask, "Thread-2");
t1.start();
Thread.sleep(100); // 确保 t1 先获取锁
t2.start();
Thread.sleep(2000);
t2.interrupt(); // 尝试中断 t2(但无法打断锁等待)
}
}
显式锁
显式锁是 Java 标准库(java.util.concurrent.locks)提供的锁机制,通过 Lock 接口(如 ReentrantLock)实现,需要 手动管理锁的获取和释放,提供比内置锁更灵活的控制能力。
核心性质:
- 显式锁管理:必须手动调用 lock() 和 unlock()(通常在 try-finally 块中确保释放)。
- 基于 AQS(AbstractQueuedSynchronizer):底层使用 CAS(Compare-And-Swap)和队列实现锁竞争。
- 可重入:同一个线程可以多次获取同一把锁。
- 可中断:支持 lockInterruptibly(),允许线程在等待锁时响应中断。
- 可配置公平性:支持公平锁(按等待顺序获取)和非公平锁(默认)。
- 超时获取:支持 tryLock(timeout),避免无限等待。
- 条件变量(Condition):支持更灵活的线程通信(类似 wait/notify 但更可控)。
eg:基本用法
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ExplicitLockDemo {
private final Lock lock = new ReentrantLock();
private int counter = 0;
public void increment() {
lock.lock(); // 手动加锁
try {
counter++;
} finally {
lock.unlock(); // 必须手动释放
}
}
public void decrement() {
if (lock.tryLock()) { // 尝试获取锁(非阻塞)
try {
counter--;
} finally {
lock.unlock();
}
} else {
System.out.println("Failed to acquire lock!");
}
}
}
eg:可中断
public class InterruptibleLockDemo {
private final Lock lock = new ReentrantLock();
public void doTask() throws InterruptedException {
System.out.println(Thread.currentThread().getName() + " trying to acquire lock...");
lock.lockInterruptibly(); // 可被中断的锁
try {
System.out.println(Thread.currentThread().getName() + " acquired the lock!");
Thread.sleep(5000); // 模拟长时间任务
} finally {
lock.unlock();
System.out.println(Thread.currentThread().getName() + " released the lock!");
}
}
public static void main(String[] args) throws InterruptedException {
InterruptibleLockDemo demo = new InterruptibleLockDemo();
Thread t1 = new Thread(() -> {
try {
demo.doTask();
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + " was interrupted while waiting!");
}
}, "Thread-1");
Thread t2 = new Thread(() -> {
try {
demo.doTask();
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + " was interrupted while waiting!");
}
}, "Thread-2");
t1.start();
Thread.sleep(100); // 确保 t1 先获取锁
t2.start();
Thread.sleep(1000);
t2.interrupt(); // 中断 t2 的锁等待
}
}
eg:公平锁和非公平锁
public class FairLockDemo {
private static final Lock fairLock = new ReentrantLock(true); // 公平锁
private static final Lock unfairLock = new ReentrantLock(); // 非公平锁(默认)
public static void testLock(Lock lock, String lockType) {
System.out.println("Testing " + lockType + " lock:");
for (int i = 0; i < 3; i++) {
new Thread(() -> {
for (int j = 0; j < 2; j++) {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + " acquired the lock");
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}, "Thread-" + i).start();
}
}
public static void main(String[] args) throws InterruptedException {
testLock(fairLock, "Fair");
Thread.sleep(1000);
testLock(unfairLock, "Non-Fair");
}
}
评论