AQS 和 ReentrantLock

2023/03/08
Note

源码基于 Java 17 (Spring5 都停止维护了,是时候升级了...).

在看具体看在这之前有个问题:请说说你对 AQS 的理解?

面试的时候被问到这种问题就很蛋疼,因为你可能知道它的原理,但是不知道怎么概括出来。

所以个人建议在看源码前一定要看一下每个类上的注释,比如说这里的 AQS (这里只截取了部分):

Provides a framework for implementing blocking locks and related synchronizers (semaphores, events, etc) that rely on first-in-first-out (FIFO) wait queues.

提供了一个框架,去帮助开发者实现一个依赖于先进先出(FIFO)等待队列的同步锁或相关同步器(事件、信号量等)

如果谈理解的话,用这一句开头就会很舒服、

概要

类继承图

对于ReentrantLock,你需要知道它里面有一个等待队列,也就是 AQS (AbstractQueuedSynchronizer),这个队列只有“头部的节点”才有资格抢到锁!但这并不代表其它节点对应的线程不会被唤醒,这些线程只是没有抢锁的资格,在获取资格前抢锁永远失败。在这里需要注意:没有资格抢锁 != 没有机会被唤醒

AQS示例

例如上图,是 AQS 的一个简单示例。state 表示队列状态,如果该值非 0,则表示锁已经被其它线程占用。这里 Node0 目前已经是抢到锁的状态,其余节点都处于等待(挂起)状态。

Node0 释放锁时,它将会唤醒 Node1#waiter 对应的线程,并去除掉 WAITINGstatus,此时 Node1 发现自己处于第一个,就会尝试抢锁。如果成功,则将 head 指向为自己。

Note

第一个:当 node.prev == head 时,当前节点就是第一个。

对于 Node2,由于它不是第一个节点,所以它会在抢锁时直接进入 WAITING 状态。

CHL

其实文档已经很清晰的说明了 AQS 是怎么实现的了,如果有能力,可以直接硬啃文档。因为字数过多,这里我只摘出一部分。


CLH 可以避免使用同步锁,通过 prevnext 对前后节点的连接以及 status 字段,当线程释放锁时,CLH 队列可以给它的继任者发送一个信号,此时继任者可以进一步进行抢锁操作。

+------+ prev +-------+ +------+ | head | <---- | first | <---- | tail | +------+ +-------+ +------+
text

向 CLH 插入节点只需要在 tail 执行一个原子操作。当 CAS 操作执行成功后,只需要再将之前的 tail.next 设置为自己即可。即使后面后面这一步不是原子操作, 但是也足够确保任何被阻塞的线程,都能够成功的接收到前任者的信号(例如 Node0 是当前的 tail,此时 CAS 插入 Node1 的同时 Node0 的锁释放掉了,此时 Node0.next 为空,但是仍然不影响代码的正常运行)。

源码

构造器

/** * Creates an instance of {@code ReentrantLock}. * This is equivalent to using {@code ReentrantLock(false)}. */ public ReentrantLock() { sync = new NonfairSync(); } /** * Creates an instance of {@code ReentrantLock} with the * given fairness policy. * * @param fair {@code true} if this lock should use a fair ordering policy */ public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
java

可以通过构造器来设置锁是否为公平锁,默认为非公平锁

公平锁和非公平锁的唯一区别就是公平锁多了一个判断条件:hasQueuedPredecessors。该方法主要用于判断公平锁加锁时等待队列中是否存在有效节点。

// 公平锁 @ReservedStackAccess protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // 公平只比非公平锁多了下面一个条件,其余和非公平锁一样。 // 这里会判断队列中是否有正在排队的线程,如果没有,则直接抢锁,不进队列了 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
java

这里可以发现,如果是非公平锁,线程在入队之前总会尝试抢锁,如果抢到了就不会入队了。如果是公平锁入队前,只会在等待队列为空时尝试抢锁,否则需要老老实实入队。

Important

这里以非公平锁为例演示,当代码不一样时会单独指出,否则表示两边实现一样。

lock

lock方法主要调用了AQS中的acquire方法

// 该方法是RentrantLock里的 final void lock() { if (!initialTryLock()) acquire(1); } // 非公平锁 final boolean initialTryLock() { Thread current = Thread.currentThread(); if (compareAndSetState(0, 1)) { // first attempt is unguarded setExclusiveOwnerThread(current); return true; } else if (getExclusiveOwnerThread() == current) { int c = getState() + 1; if (c < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(c); return true; } else return false; } // 公平锁 final boolean initialTryLock() { Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedThreads() && compareAndSetState(0, 1)) { setExclusiveOwnerThread(current); return true; } } else if (getExclusiveOwnerThread() == current) { if (++c < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(c); return true; } return false; } // 该方法是AQS类里的 public final void acquire(int arg) { if (!tryAcquire(arg)) acquire(null, arg, false, false, false, 0L); }
java
Important

可以发现公平锁和非公平锁的唯一区别就是:公平锁在没入队前抢锁总是需要判断队列是否为空。

这里后面就只展示非公平锁的代码了。

大致流程流程:

锁空闲并且抢锁成功抢锁成功继续抢锁当前线程是否为已经持有锁的线程state + 1 标记重入 1 次抢锁成功尝试进入等待队列继续抢锁yesnoyesnolock 流程

tryAcquire

AQS 使用了模板模式,这里 tryAcquire 是一个模板方法,需要子类实现。该方法会先尝试抢占锁 (尝试将 status0 设置为1 ),若失败则继续判断:

protected final boolean tryAcquire(int acquires) { if (getState() == 0 && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; }
java

acquire

如果tryAcquire没有拿到锁,则会进入这一步:将线程加入到等待队列。代码很长:

final int acquire(Node node, int arg, boolean shared, boolean interruptible, boolean timed, long time) { Thread current = Thread.currentThread(); byte spins = 0, postSpins = 0; // retries upon unpark of first thread boolean interrupted = false, first = false; Node pred = null; // predecessor of node when enqueued /* * Repeatedly: * Check if node now first * if so, ensure head stable, else ensure valid predecessor * if node is first or not yet enqueued, try acquiring * else if node not yet created, create it * else if not yet enqueued, try once to enqueue * else if woken from park, retry (up to postSpins times) * else if WAITING status not set, set and retry * else park and clear WAITING status, and check cancellation */ for (;;) { if (!first && (pred = (node == null) ? null : node.prev) != null && !(first = (head == pred))) { if (pred.status < 0) { cleanQueue(); // predecessor cancelled continue; } else if (pred.prev == null) { Thread.onSpinWait(); // ensure serialization continue; } } if (first || pred == null) { // 为第一个节点,或者还没有入队,直接尝试抢锁 boolean acquired; try { if (shared) acquired = (tryAcquireShared(arg) >= 0); else acquired = tryAcquire(arg); } catch (Throwable ex) { cancelAcquire(node, interrupted, false); throw ex; } if (acquired) { if (first) { // 将自己设置为第一个节点 node.prev = null; head = node; pred.next = null; node.waiter = null; if (shared) signalNextIfShared(node); if (interrupted) current.interrupt(); } return 1; } } if (node == null) { // 创建节点并重新循环 if (shared) node = new SharedNode(); else node = new ExclusiveNode(); } else if (pred == null) { // 尝试将节点加入队列(尾插) node.waiter = current; Node t = tail; node.setPrevRelaxed(t); // 这里使用 unsafe 赋值,避免了读写屏障导致数据不一致 if (t == null) tryInitializeHead(); else if (!casTail(t, node)) // 尝试 CAS 设置尾结点 node.setPrevRelaxed(null); // back out else t.next = node; } else if (first && spins != 0) { --spins; // reduce unfairness on rewaits, 这里后面说 Thread.onSpinWait(); // 指示当前线程暂时无法在循环中完成任务,JVM会优化此类循环以提高性能 } else if (node.status == 0) { // 状态为 0 表示正在抢锁 node.status = WAITING; // 设置节点状态为等待 } else { long nanos; // 挂起线程,等待下次唤醒 spins = postSpins = (byte)((postSpins << 1) | 1); // 这里后面说 if (!timed) LockSupport.park(this); else if ((nanos = time - System.nanoTime()) > 0L) LockSupport.parkNanos(this, nanos); else break; node.clearStatus(); if ((interrupted |= Thread.interrupted()) && interruptible) break; } } return cancelAcquire(node, interrupted, interruptible); }
java

这里在注释里已经讲的很清楚了,主要干了这么几件事:

  1. if 是第一个节点,确保继任者有效(合法)
  2. if 节点是第一个并且还没有入队,则尝试抢锁
  3. else if 节点没有被创建,则创建对应的节点
  4. else if 节点没有入队,则尝试入队
  5. else if 线程被唤醒,则重新自旋并设置更大的 spins 以防止一直抢不到锁
  6. else if WAITING 状态没有被设置,则设置并该状态并重试
  7. else 挂起线程,然后清除 WAITING 状态并检查取消(中断)请求

在继续前,我们来看一下节点的 status 有哪些:

// 表示当前节点正在等待信号 static final int WAITING = 1; // must be 1 // 表示当前节点已经被取消了,即放弃抢锁 static final int CANCELLED = 0x80000000; // must be negative. 1000 0000 0000 0000 0000 0000 0000 0001 // 表示当前节点已经被挂起,正在等待唤醒信号 static final int COND = 2; // in a condition wait
java

这里 COND 后面再说,ReentrantLock 里面没有用到。WAITING 则是表示当前节点正在等待信号。

cleanQueue

这里代码很恶心,变量名称全部用一个字符表示,这里我把它换成了稍微能理解的名称:

private void cleanQueue() { for (;;) { // restart point for (Node currentNode = tail, nextNode = null, prevNode, prevNext;;) { // (prevNode, currentNode, nextNode) triples if (currentNode == null || (prevNode = currentNode.prev) == null) return; // end of list if (nextNode == null ? tail != currentNode : (nextNode.prev != currentNode || nextNode.status < 0)) break; // inconsistent if (currentNode.status < 0) { // cancelled if ((nextNode == null ? casTail(currentNode, prevNode) : nextNode.casPrev(currentNode, prevNode)) && currentNode.prev == prevNode) { prevNode.casNext(currentNode, nextNode); // OK if fails if (prevNode.prev == null) signalNext(prevNode); } break; } if ((prevNext = prevNode.next) != currentNode) { // help finish if (prevNext != null && currentNode.prev == prevNode) { prevNode.casNext(prevNext, currentNode); if (prevNode.prev == null) signalNext(prevNode); } break; } nextNode = currentNode; currentNode = currentNode.prev; } } }
java

最后这段 help finish 可能有点迷,为什么还要帮别人完成节点连接,这里我们可以看一下前面的代码我们是怎么把节点插入到队列中的:

node.waiter = current; Node t = tail; node.setPrevRelaxed(t); // avoid unnecessary fence if (t == null) tryInitializeHead(); else if (!casTail(t, node)) node.setPrevRelaxed(null); // back out else t.next = node;
java

可以发现 casTail(t, node)t.next = node 并不是一个原子操作,所以在高并发下,可能导致 t.next = node 还没被执行,就被其它线程视作进入队列了。

Note

说实话,我目前没有想到不帮忙会出现什么问题🫠...但是肯定是有点隐患在这里的。

cancelAcquire

这个方法很简单,直接看就能看懂:

private int cancelAcquire(Node node, boolean interrupted, boolean interruptible) { if (node != null) { node.waiter = null; node.status = CANCELLED; if (node.prev != null) cleanQueue(); } if (interrupted) { if (interruptible) return CANCELLED; else Thread.currentThread().interrupt(); } return 0; }
java

unlock

// Sync类的方法 public void unlock() { sync.release(1); } // AQS的方法 public final boolean release(int arg) { if (tryRelease(arg)) { signalNext(head); return true; } return false; } // Sync类的方法 protected final boolean tryRelease(int releases) { int c = getState() - releases; if (getExclusiveOwnerThread() != Thread.currentThread()) throw new IllegalMonitorStateException(); boolean free = (c == 0); if (free) setExclusiveOwnerThread(null); setState(c); return free; }
java

解锁的代码都很简单,因为已经占有了锁,所以不需要过多判断。

头节点的释放

先来看一下 signalNext:

private static void signalNext(Node h) { Node s; if (h != null && (s = h.next) != null && s.status != 0) { s.getAndUnsetStatus(WAITING); LockSupport.unpark(s.waiter); } }
java

这段代码很简单,只是取消了继任者的 WAITING 状态并唤醒它。

可以发现 AQS 的 release 方法没有对任何节点进行删除,这里就非常巧妙的利用了只有第一个节点才能抢锁的特性来对节点进行了删除。在后面第一个节点抢到锁后,它会将自己设置为新的 tail,旧的 tail 因此被丢弃,最后被垃圾回收。

总结

最后用一张图总结一下吧:

开始抢锁开始抢锁设置独占重入次数 + 1再次尝试抢锁设置独占continue循环抢锁,直到成功或取消是第一个节点尝试抢锁设置独占,并将队列头部指向自己继续cleanQueue节点未创建创建节点节点的prev为空尝试将节点加入队列是第一个节点并且自旋次数不为0自旋次数减一, 继续循环节点 status == 0设置节点状态为WAITING提高自旋次数, 并挂起线程将状态设置为0抢锁成功CAS lock success其它Already lockedyesnoyesnoyesnoyesyesyesyesno被唤醒nononolock

Condition

ReentrantLock lock = new ReentrantLock(); Condition condition = lock.newCondition(); new Thread(() -> { lock.lock(); System.out.println("Thread 1 locked."); try { condition.await(); System.out.println("Thread 1 awake"); } catch (InterruptedException e) { throw new RuntimeException(e); } }).start(); Thread.sleep(1000); new Thread(() -> { lock.lock(); System.out.println("Thread 2 locked."); condition.signal(); System.out.println("Thread 2 after signal"); lock.unlock(); }).start(); Thread.sleep(3000);
java

输出:

Thread 1 locked. Thread 2 locked. Thread 2 after signal Thread 1 awake
log

对于Condition,这里我们来了解一下 awaitsignal 这俩个重要的方法。从方法名大致就可以猜出这俩是干什么的了,线程通过调用 await 可以阻塞自己,然后其它线程可以调用 signalsignalAll 来唤醒一个或所有被阻塞的线程。

需要注意的是:

  • 调用 awaitsignal 前必须持有锁
  • 调用 signal 后需要手动释放锁

newCondition 中的非静态内部类

这里如果浅看 newCondition 源码的话,会发现 newCondition 只是创建并返回了一个新的对象:

public Condition newCondition() { return sync.newCondition(); } abstract static class Sync extends AbstractQueuedSynchronizer { final ConditionObject newCondition() { return new ConditionObject(); } }
java

如果不细看的话这里会很懵逼: 为什么直接返回了一个对象并且没有和任何东西进行关联?

其实只要细看就可以发现,ConditionObject 是 AQS 中的一个非静态内部类,在创建后是可以访问当前 ReentrantLock 实例对象的(this 访问)。

await

在看之前需要了解一个类:ConditionNode。定义如下:

static final class ConditionNode extends Node implements ForkJoinPool.ManagedBlocker { ConditionNode nextWaiter; // link to next waiting node public final boolean isReleasable() { return status <= 1 || Thread.currentThread().isInterrupted(); } public final boolean block() { while (!isReleasable()) LockSupport.park(); return true; } }
java

需要注意的是,ConditionNode 继承了 Node,首先 ConditionNode 会在条件队列中等待唤醒,在唤醒前,它会被插入到等待队列,唤醒后,就可以直接去抢锁了。

Note

条件队列是由 ConditionNode#nextWaiter 组成的单向队列。


public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); // 这个 node 是一个单向链表 ConditionNode node = new ConditionNode(); // 将节点插入到 条件队列 链表尾部 // 插入后将节点状态设置为 COND | WAITING,并**释放锁**, 因为必须持有锁调用,所以实现很简单 int savedState = enableWait(node); LockSupport.setCurrentBlocker(this); // for back-compatibility boolean interrupted = false, cancelled = false, rejected = false; // canReacquire: 检查节点是否进入等待队列(CLH队列) while (!canReacquire(node)) { if (interrupted |= Thread.interrupted()) { if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0) break; // else interrupted after signal } else if ((node.status & COND) != 0) { try { if (rejected) node.block(); else // 阻塞,直到下次唤醒 ForkJoinPool.managedBlock(node); } catch (RejectedExecutionException ex) { rejected = true; } catch (InterruptedException ie) { interrupted = true; } } else Thread.onSpinWait(); // awoke while enqueuing } // while 循环结束,代表 node 已经进入了等待队列 LockSupport.setCurrentBlocker(null); // 将状态清零 node.clearStatus(); // 抢锁 acquire(node, savedState, false, false, false, 0L); if (interrupted) { if (cancelled) { unlinkCancelledWaiters(node); throw new InterruptedException(); } Thread.currentThread().interrupt(); } }
java

来看一下 canReacquire:

private boolean canReacquire(ConditionNode node) { // check links, not status to avoid enqueue race Node p; // traverse unless known to be bidirectionally linked return node != null && (p = node.prev) != null && (p.next == node || isEnqueued(node)); }
java

实际就是去访问等待队列的前后节点检查是否入队,最后的 isEnqueued 会遍历整个等待队列,搜索 node 是否在队列中。

signal

public final void signal() { ConditionNode first = firstWaiter; if (!isHeldExclusively()) throw new IllegalMonitorStateException(); if (first != null) doSignal(first, false); }
java

首先会判断当前线程是否持有锁,然后会进一步处理条件队列中的第一个节点。

private void doSignal(ConditionNode first, boolean all) { while (first != null) { ConditionNode next = first.nextWaiter; if ((firstWaiter = next) == null) lastWaiter = null; if ((first.getAndUnsetStatus(COND) & COND) != 0) { enqueue(first); if (!all) break; } first = next; } }
java

这里会将节点从条件队列头部移出,然后添加到等待队列中。需要主要的是 getAndUnsetStatus 返回的是之前的旧值,而不是新值,所以 first.getAndUnsetStatus(COND) & COND 是在判断节点是否处于等待状态。

最后的 enqueue 则是将节点放到等待队列中:

final void enqueue(Node node) { if (node != null) { for (;;) { Node t = tail; node.setPrevRelaxed(t); // avoid unnecessary fence if (t == null) // initialize tryInitializeHead(); else if (casTail(t, node)) { t.next = node; if (t.status < 0) // wake up to clean link LockSupport.unpark(node.waiter); break; } } } }
java

实际和 acquire 方法中入队的代码一样,都是 CAS tail 来插入节点。在插入完毕后,就会唤醒对应线程,对应线程再去抢锁。

观后感

不得不说,Java 17 真的是狠狠重构了一遍 AQS,这里的代码相比 Java 8 变得非常清晰了!如果你之前看过 Java 8 的实现,会发现里面非常复杂。