Note 源码基于 Java 17 (Spring5 都停止维护了,是时候升级了...).
在看具体看在这之前有个问题:请说说你对 AQS
的理解?
面试的时候被问到这种问题就很蛋疼,因为你可能知道它的原理,但是不知道怎么概括出来。
所以个人建议在看源码前一定要看一下每个类上的注释,比如说这里的 AQS
(这里只截取了部分):
Provides a framework for implementing blocking locks and related synchronizers (semaphores, events, etc) that rely on first-in-first-out (FIFO) wait queues.
提供了一个框架,去帮助开发者实现一个依赖于先进先出(FIFO)等待队列的同步锁或相关同步器(事件、信号量等)
如果谈理解的话,用这一句开头就会很舒服、
概要
对于ReentrantLock
,你需要知道它里面有一个等待队列,也就是 AQS (AbstractQueuedSynchronizer
),这个队列只有“头部的节点”才有资格抢到锁!但这并不代表其它节点对应的线程不会被唤醒,这些线程只是没有抢锁的资格,在获取资格前抢锁永远失败。在这里需要注意:没有资格抢锁 != 没有机会被唤醒。
例如上图,是 AQS 的一个简单示例。state
表示队列状态,如果该值非 0,则表示锁已经被其它线程占用。这里 Node0
目前已经是抢到锁的状态,其余节点都处于等待(挂起)状态。
当 Node0
释放锁时,它将会唤醒 Node1#waiter
对应的线程,并去除掉 WAITING
的 status
,此时 Node1
发现自己处于第一个,就会尝试抢锁。如果成功,则将 head
指向为自己。
Note 第一个:当
node.prev == head
时,当前节点就是第一个。
对于 Node2
,由于它不是第一个节点,所以它会在抢锁时直接进入 WAITING
状态。
CHL
其实文档已经很清晰的说明了 AQS 是怎么实现的了,如果有能力,可以直接硬啃文档。因为字数过多,这里我只摘出一部分。
CLH 可以避免使用同步锁,通过 prev
和 next
对前后节点的连接以及 status
字段,当线程释放锁时,CLH 队列可以给它的继任者发送一个信号,此时继任者可以进一步进行抢锁操作。
+------+ prev +-------+ +------+ | head | <---- | first | <---- | tail | +------+ +-------+ +------+text
向 CLH 插入节点只需要在 tail
执行一个原子操作。当 CAS 操作执行成功后,只需要再将之前的 tail.next
设置为自己即可。即使后面后面这一步不是原子操作,
但是也足够确保任何被阻塞的线程,都能够成功的接收到前任者的信号(例如 Node0
是当前的 tail
,此时 CAS 插入 Node1
的同时 Node0
的锁释放掉了,此时 Node0.next
为空,但是仍然不影响代码的正常运行)。
源码
构造器
/**
* Creates an instance of {@code ReentrantLock}.
* This is equivalent to using {@code ReentrantLock(false)}.
*/
public ReentrantLock() {
sync = new NonfairSync();
}
/**
* Creates an instance of {@code ReentrantLock} with the
* given fairness policy.
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
java
可以通过构造器来设置锁是否为公平锁,默认为非公平锁
公平锁和非公平锁的唯一区别就是公平锁多了一个判断条件:hasQueuedPredecessors
。该方法主要用于判断公平锁加锁时等待队列中是否存在有效节点。
// 公平锁
@ReservedStackAccess
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 公平只比非公平锁多了下面一个条件,其余和非公平锁一样。
// 这里会判断队列中是否有正在排队的线程,如果没有,则直接抢锁,不进队列了
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
java
这里可以发现,如果是非公平锁,线程在入队之前总会尝试抢锁,如果抢到了就不会入队了。如果是公平锁入队前,只会在等待队列为空时尝试抢锁,否则需要老老实实入队。
Important 这里以非公平锁为例演示,当代码不一样时会单独指出,否则表示两边实现一样。
lock
lock
方法主要调用了AQS中的acquire
方法
// 该方法是RentrantLock里的
final void lock() {
if (!initialTryLock())
acquire(1);
}
// 非公平锁
final boolean initialTryLock() {
Thread current = Thread.currentThread();
if (compareAndSetState(0, 1)) { // first attempt is unguarded
setExclusiveOwnerThread(current);
return true;
} else if (getExclusiveOwnerThread() == current) {
int c = getState() + 1;
if (c < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(c);
return true;
} else
return false;
}
// 公平锁
final boolean initialTryLock() {
Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedThreads() && compareAndSetState(0, 1)) {
setExclusiveOwnerThread(current);
return true;
}
} else if (getExclusiveOwnerThread() == current) {
if (++c < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(c);
return true;
}
return false;
}
// 该方法是AQS类里的
public final void acquire(int arg) {
if (!tryAcquire(arg))
acquire(null, arg, false, false, false, 0L);
}
java
Important 可以发现公平锁和非公平锁的唯一区别就是:公平锁在没入队前抢锁总是需要判断队列是否为空。
这里后面就只展示非公平锁的代码了。
大致流程流程:
tryAcquire
AQS 使用了模板模式,这里 tryAcquire
是一个模板方法,需要子类实现。该方法会先尝试抢占锁 (尝试将 status
从 0
设置为1
),若失败则继续判断:
protected final boolean tryAcquire(int acquires) {
if (getState() == 0 && compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
java
acquire
如果tryAcquire
没有拿到锁,则会进入这一步:将线程加入到等待队列。代码很长:
final int acquire(Node node, int arg, boolean shared,
boolean interruptible, boolean timed, long time) {
Thread current = Thread.currentThread();
byte spins = 0, postSpins = 0; // retries upon unpark of first thread
boolean interrupted = false, first = false;
Node pred = null; // predecessor of node when enqueued
/*
* Repeatedly:
* Check if node now first
* if so, ensure head stable, else ensure valid predecessor
* if node is first or not yet enqueued, try acquiring
* else if node not yet created, create it
* else if not yet enqueued, try once to enqueue
* else if woken from park, retry (up to postSpins times)
* else if WAITING status not set, set and retry
* else park and clear WAITING status, and check cancellation
*/
for (;;) {
if (!first && (pred = (node == null) ? null : node.prev) != null &&
!(first = (head == pred))) {
if (pred.status < 0) {
cleanQueue(); // predecessor cancelled
continue;
} else if (pred.prev == null) {
Thread.onSpinWait(); // ensure serialization
continue;
}
}
if (first || pred == null) { // 为第一个节点,或者还没有入队,直接尝试抢锁
boolean acquired;
try {
if (shared)
acquired = (tryAcquireShared(arg) >= 0);
else
acquired = tryAcquire(arg);
} catch (Throwable ex) {
cancelAcquire(node, interrupted, false);
throw ex;
}
if (acquired) {
if (first) { // 将自己设置为第一个节点
node.prev = null;
head = node;
pred.next = null;
node.waiter = null;
if (shared)
signalNextIfShared(node);
if (interrupted)
current.interrupt();
}
return 1;
}
}
if (node == null) { // 创建节点并重新循环
if (shared)
node = new SharedNode();
else
node = new ExclusiveNode();
} else if (pred == null) { // 尝试将节点加入队列(尾插)
node.waiter = current;
Node t = tail;
node.setPrevRelaxed(t); // 这里使用 unsafe 赋值,避免了读写屏障导致数据不一致
if (t == null)
tryInitializeHead();
else if (!casTail(t, node)) // 尝试 CAS 设置尾结点
node.setPrevRelaxed(null); // back out
else
t.next = node;
} else if (first && spins != 0) {
--spins; // reduce unfairness on rewaits, 这里后面说
Thread.onSpinWait(); // 指示当前线程暂时无法在循环中完成任务,JVM会优化此类循环以提高性能
} else if (node.status == 0) { // 状态为 0 表示正在抢锁
node.status = WAITING; // 设置节点状态为等待
} else {
long nanos; // 挂起线程,等待下次唤醒
spins = postSpins = (byte)((postSpins << 1) | 1); // 这里后面说
if (!timed)
LockSupport.park(this);
else if ((nanos = time - System.nanoTime()) > 0L)
LockSupport.parkNanos(this, nanos);
else
break;
node.clearStatus();
if ((interrupted |= Thread.interrupted()) && interruptible)
break;
}
}
return cancelAcquire(node, interrupted, interruptible);
}
java
这里在注释里已经讲的很清楚了,主要干了这么几件事:
- if 是第一个节点,确保继任者有效(合法)
- if 节点是第一个并且还没有入队,则尝试抢锁
- else if 节点没有被创建,则创建对应的节点
- else if 节点没有入队,则尝试入队
- else if 线程被唤醒,则重新自旋并设置更大的
spins
以防止一直抢不到锁 - else if WAITING 状态没有被设置,则设置并该状态并重试
- else 挂起线程,然后清除 WAITING 状态并检查取消(中断)请求
在继续前,我们来看一下节点的 status
有哪些:
// 表示当前节点正在等待信号
static final int WAITING = 1; // must be 1
// 表示当前节点已经被取消了,即放弃抢锁
static final int CANCELLED = 0x80000000; // must be negative. 1000 0000 0000 0000 0000 0000 0000 0001
// 表示当前节点已经被挂起,正在等待唤醒信号
static final int COND = 2; // in a condition wait
java
这里 COND
后面再说,ReentrantLock
里面没有用到。WAITING
则是表示当前节点正在等待信号。
cleanQueue
这里代码很恶心,变量名称全部用一个字符表示,这里我把它换成了稍微能理解的名称:
private void cleanQueue() {
for (;;) { // restart point
for (Node currentNode = tail, nextNode = null, prevNode, prevNext;;) { // (prevNode, currentNode, nextNode) triples
if (currentNode == null || (prevNode = currentNode.prev) == null)
return; // end of list
if (nextNode == null ? tail != currentNode : (nextNode.prev != currentNode || nextNode.status < 0))
break; // inconsistent
if (currentNode.status < 0) { // cancelled
if ((nextNode == null ? casTail(currentNode, prevNode) : nextNode.casPrev(currentNode, prevNode)) &&
currentNode.prev == prevNode) {
prevNode.casNext(currentNode, nextNode); // OK if fails
if (prevNode.prev == null)
signalNext(prevNode);
}
break;
}
if ((prevNext = prevNode.next) != currentNode) { // help finish
if (prevNext != null && currentNode.prev == prevNode) {
prevNode.casNext(prevNext, currentNode);
if (prevNode.prev == null)
signalNext(prevNode);
}
break;
}
nextNode = currentNode;
currentNode = currentNode.prev;
}
}
}
java
最后这段 help finish
可能有点迷,为什么还要帮别人完成节点连接,这里我们可以看一下前面的代码我们是怎么把节点插入到队列中的:
node.waiter = current;
Node t = tail;
node.setPrevRelaxed(t); // avoid unnecessary fence
if (t == null)
tryInitializeHead();
else if (!casTail(t, node))
node.setPrevRelaxed(null); // back out
else
t.next = node;
java
可以发现 casTail(t, node)
和 t.next = node
并不是一个原子操作,所以在高并发下,可能导致 t.next = node
还没被执行,就被其它线程视作进入队列了。
Note 说实话,我目前没有想到不帮忙会出现什么问题🫠...但是肯定是有点隐患在这里的。
cancelAcquire
这个方法很简单,直接看就能看懂:
private int cancelAcquire(Node node, boolean interrupted,
boolean interruptible) {
if (node != null) {
node.waiter = null;
node.status = CANCELLED;
if (node.prev != null)
cleanQueue();
}
if (interrupted) {
if (interruptible)
return CANCELLED;
else
Thread.currentThread().interrupt();
}
return 0;
}
java
unlock
// Sync类的方法
public void unlock() {
sync.release(1);
}
// AQS的方法
public final boolean release(int arg) {
if (tryRelease(arg)) {
signalNext(head);
return true;
}
return false;
}
// Sync类的方法
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (getExclusiveOwnerThread() != Thread.currentThread())
throw new IllegalMonitorStateException();
boolean free = (c == 0);
if (free)
setExclusiveOwnerThread(null);
setState(c);
return free;
}
java
解锁的代码都很简单,因为已经占有了锁,所以不需要过多判断。
头节点的释放
先来看一下 signalNext
:
private static void signalNext(Node h) {
Node s;
if (h != null && (s = h.next) != null && s.status != 0) {
s.getAndUnsetStatus(WAITING);
LockSupport.unpark(s.waiter);
}
}
java
这段代码很简单,只是取消了继任者的 WAITING
状态并唤醒它。
可以发现 AQS 的 release
方法没有对任何节点进行删除,这里就非常巧妙的利用了只有第一个节点才能抢锁的特性来对节点进行了删除。在后面第一个节点抢到锁后,它会将自己设置为新的 tail
,旧的 tail
因此被丢弃,最后被垃圾回收。
总结
最后用一张图总结一下吧:
Condition
ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();
new Thread(() -> {
lock.lock();
System.out.println("Thread 1 locked.");
try {
condition.await();
System.out.println("Thread 1 awake");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}).start();
Thread.sleep(1000);
new Thread(() -> {
lock.lock();
System.out.println("Thread 2 locked.");
condition.signal();
System.out.println("Thread 2 after signal");
lock.unlock();
}).start();
Thread.sleep(3000);
java
输出:
Thread 1 locked. Thread 2 locked. Thread 2 after signal Thread 1 awakelog
对于Condition
,这里我们来了解一下 await
和 signal
这俩个重要的方法。从方法名大致就可以猜出这俩是干什么的了,线程通过调用 await
可以阻塞自己,然后其它线程可以调用 signal
和 signalAll
来唤醒一个或所有被阻塞的线程。
需要注意的是:
- 调用
await
和signal
前必须持有锁 - 调用
signal
后需要手动释放锁
newCondition 中的非静态内部类
这里如果浅看 newCondition
源码的话,会发现 newCondition
只是创建并返回了一个新的对象:
public Condition newCondition() {
return sync.newCondition();
}
abstract static class Sync extends AbstractQueuedSynchronizer {
final ConditionObject newCondition() {
return new ConditionObject();
}
}
java
如果不细看的话这里会很懵逼: 为什么直接返回了一个对象并且没有和任何东西进行关联?
其实只要细看就可以发现,ConditionObject
是 AQS 中的一个非静态内部类,在创建后是可以访问当前 ReentrantLock
实例对象的(this
访问)。
await
在看之前需要了解一个类:ConditionNode
。定义如下:
static final class ConditionNode extends Node
implements ForkJoinPool.ManagedBlocker {
ConditionNode nextWaiter; // link to next waiting node
public final boolean isReleasable() {
return status <= 1 || Thread.currentThread().isInterrupted();
}
public final boolean block() {
while (!isReleasable()) LockSupport.park();
return true;
}
}
java
需要注意的是,ConditionNode
继承了 Node
,首先 ConditionNode
会在条件队列中等待唤醒,在唤醒前,它会被插入到等待队列,唤醒后,就可以直接去抢锁了。
Note 条件队列是由
ConditionNode#nextWaiter
组成的单向队列。
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 这个 node 是一个单向链表
ConditionNode node = new ConditionNode();
// 将节点插入到 条件队列 链表尾部
// 插入后将节点状态设置为 COND | WAITING,并**释放锁**, 因为必须持有锁调用,所以实现很简单
int savedState = enableWait(node);
LockSupport.setCurrentBlocker(this); // for back-compatibility
boolean interrupted = false, cancelled = false, rejected = false;
// canReacquire: 检查节点是否进入等待队列(CLH队列)
while (!canReacquire(node)) {
if (interrupted |= Thread.interrupted()) {
if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
break; // else interrupted after signal
} else if ((node.status & COND) != 0) {
try {
if (rejected)
node.block();
else
// 阻塞,直到下次唤醒
ForkJoinPool.managedBlock(node);
} catch (RejectedExecutionException ex) {
rejected = true;
} catch (InterruptedException ie) {
interrupted = true;
}
} else
Thread.onSpinWait(); // awoke while enqueuing
}
// while 循环结束,代表 node 已经进入了等待队列
LockSupport.setCurrentBlocker(null);
// 将状态清零
node.clearStatus();
// 抢锁
acquire(node, savedState, false, false, false, 0L);
if (interrupted) {
if (cancelled) {
unlinkCancelledWaiters(node);
throw new InterruptedException();
}
Thread.currentThread().interrupt();
}
}
java
来看一下 canReacquire
:
private boolean canReacquire(ConditionNode node) {
// check links, not status to avoid enqueue race
Node p; // traverse unless known to be bidirectionally linked
return node != null && (p = node.prev) != null &&
(p.next == node || isEnqueued(node));
}
java
实际就是去访问等待队列的前后节点检查是否入队,最后的 isEnqueued
会遍历整个等待队列,搜索 node
是否在队列中。
signal
public final void signal() {
ConditionNode first = firstWaiter;
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
if (first != null)
doSignal(first, false);
}
java
首先会判断当前线程是否持有锁,然后会进一步处理条件队列中的第一个节点。
private void doSignal(ConditionNode first, boolean all) {
while (first != null) {
ConditionNode next = first.nextWaiter;
if ((firstWaiter = next) == null)
lastWaiter = null;
if ((first.getAndUnsetStatus(COND) & COND) != 0) {
enqueue(first);
if (!all)
break;
}
first = next;
}
}
java
这里会将节点从条件队列头部移出,然后添加到等待队列中。需要主要的是 getAndUnsetStatus
返回的是之前的旧值,而不是新值,所以 first.getAndUnsetStatus(COND) & COND
是在判断节点是否处于等待状态。
最后的 enqueue
则是将节点放到等待队列中:
final void enqueue(Node node) {
if (node != null) {
for (;;) {
Node t = tail;
node.setPrevRelaxed(t); // avoid unnecessary fence
if (t == null) // initialize
tryInitializeHead();
else if (casTail(t, node)) {
t.next = node;
if (t.status < 0) // wake up to clean link
LockSupport.unpark(node.waiter);
break;
}
}
}
}
java
实际和 acquire
方法中入队的代码一样,都是 CAS tail
来插入节点。在插入完毕后,就会唤醒对应线程,对应线程再去抢锁。
观后感
不得不说,Java 17 真的是狠狠重构了一遍 AQS,这里的代码相比 Java 8 变得非常清晰了!如果你之前看过 Java 8 的实现,会发现里面非常复杂。