AQS是一个用来构件锁和同步器的框架,Lock包中的各种锁,concurrent包中的各种同步器都是基于AQS来构建的。
在并发领域,主要关心的问题是互斥与同步。
互斥:即同一时刻只允许一个线程访问共享资源同步:线程之间如何通信、协作。这两大问题可以通过信号量和管程来解决
信号量是操作系统提供的一种进程间常见的通信方式,主要用来协调并发程序对共享资源的访问。操作系统可以保证对信号量操作的原子性。
信号量机制的引入解决了进程同步和互斥的问题,但信号量的大量同步操作分散到各个进程中不便于管理,还可能导致系统死锁,另外条件越多,需要的信号量越多,更容易出现问题。
示意图如下
管程就是一个把所有进程对某一种临界资源的同步操作都几种起来,构成一个秘书进程。凡是要访问改临界资源的进程,都要先告知该进程,然后由其实现多进程对同一临界资源的互斥使用。
管程是一种在信号量机制上进行改进的并发编程模型,解决了信号量在临界区的PV操作上配对的麻烦。把配对的 PV 操作集中在一起而形成的并发编程方法理论,极大降低了使用和理解成本。
管程就是一个对象监视器。任何线程要想访问共享变量,就要排队进入监控范围;接收检查,不符合条件则继续等待,直到被通知,然后继续进入监视器
信号量和管程两者是等价的,信号量可以实现管程,管程也可以实现信号量,只是两者表现形式不同
管程是解决并发问题的万能钥匙
AQS是基于Java并发包中管程的一种实现
AQS是一个用来构件锁和同步器的框架,其维护了一个共享资源state和一个FIFO的等待队列,底层利用CAS机制来保证操作的原子性。
AQS实现锁的主要原理:
以实现独占锁为例,其原理如下:
state初始化0,在多线程条件下,线程要执行临界区的代码,必须首先获取state,某个线程获取成功以后state加1,其他线程再获取的话由于共享资源已被占用,所以会到 FIFO 等待队列去等待,等占有 state 的线程执行完临界区的代码释放资源( state 减 1)后,会唤醒 FIFO 中的下一个等待线程(head 中的下一个结点)去获取 state。
state由于是多线程共享变量,所以必须要定义成volatile,以保证state的可见性。同时需要AQS来提供对state的原子操作方法。以保证线程的安全。
另外AQS中实现的FIFO队列(CLH队列)其实是双向链表实现的,由head,tail节点表示。head结点表示当前占用的线程,其他节点由于暂时获取不到锁所以一次排队等待锁释放。
即AQS如下定义:
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { // 以下为双向链表的首尾结点,代表入口等待队列 private transient volatile Node head; private transient volatile Node tail; // 共享变量 state private volatile int state; // cas 获取 / 释放 state,保证线程安全地获取锁 protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); } // ... }获得独占锁的过程就是在acquire中定义的,该方法用来了模板设计模式,由子类实现
过程
acquire(int)尝试获取资源,如果获取失败,将线程插入等待队列插入等待队列后,并没有放弃获取资源,而是根据前置节点状态判断是否应该继续获取资源如果前置节点是头节点,继续尝试获取资源,如果前置节点是SIGNAl状态就中断当前线程,否则继续尝试获取资源。直到当前的线程被park()获取到资源结束释放独占锁的过程就是在其中定义的
过程
首先调用子类的tryRealse()方法释放锁,然后唤醒后续节点在唤醒的过程中,需要判断后续节点是否满足情况如果后续节点不为空且不是作废状态,则唤醒这个后续节点否则从tail节点向前寻找合适的节点,如果找到,则唤醒ReentrantLock是我们比较常见的一种锁,也是基于AQS实现的。属于独占锁,有公平和非公平两种锁模式。
下面我们看一下独占、非公平模式的源码实现
首先先看下ReentrantLock的使用方法 // 1. 初始化可重入锁 private ReentrantLock lock = new ReentrantLock(); public void run() { // 加锁 lock.lock(); try { // 2. 执行临界区代码 } catch (InterruptedException e) { e.printStackTrace(); } finally { // 3. 解锁 lock.unlock(); } }第一步是初始化可重入锁,其默认使用的是非公平锁机制,也可以添加参数来指定使用公平锁
FairSync和NonfairSync是ReentrantLock实现的内部类,分别指公平和非公平模式
我们接着来剖析下非公平锁(NonfairSync)的实现方式,是如何加锁的
static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; /** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } }可以看到lock方法主要有两步:
使用CAS来获取state资源,如果成功设置1,代码state资环获取锁成功,此时记录下当前占用state的线程setExclusiveOwnerThread**(Thread.currentThread())**;如果CAS设置state为1失败(获取锁失败),则执行acquire**(1)**;方法,该方法是由AQS提供的如下所示: public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }其首先调用tryAcquire尝试获取state,如果成功则跳过后面的步骤。如果失败,则执行acquireQueued将线程加入CLH等待队列中。
如果tryAcquire(arg)执行失败,代表获取锁失败,则执行acquireQueued方法,将线程加入FIFO等待队列
该方法是AQS提供的一个模板方法,最终由其AQS具体的实现类(Sync)实现,由于执行的是非公平锁逻辑,其执行代码如下:
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { // 如果c为0 表示此时资源时空闲的(即锁是释放的)再用CAS获取锁 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { // 此条件表示之前已经有线程获得锁,且此线程再一次获得了锁,获取资源次数再加1 int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }通过代码可知锁的获取主要分为两种情况
state为0:代表锁已经被释放,可以去获取,于是使用CAS去重新获取锁资源,如果获取成功,则代表竞争锁成功,使用setExclusiveOwnerThread(current) 记录下此时占有锁的线程。如果state不为0:代表已经有线程占用了锁。如果此时的线程依然是以前锁的线程,代表此线程再一次占有了锁(可重入锁),此时更新state,记录下锁被占有的次数。首先会调用addWaiter(Node.EXCLUSIVE)将包含有当前线程的Node节点入队,Node.EXCLUSIVE代表此结点为独占模式
首先是获取FIFO队列的尾节点没看尾节点是否存在,存在就采用CAS的方式将等待线程入队,如果尾节点为空则执行enq方法。
enq方法:首先判断tail是否为空,如果为空说明FIFO队列的head,tail还未构建,此时先构建头结点,构建之后再用CAS的方式将此线程结点入队。
执行完addWaiter后,线程入队成功,就执行关键的acquireQueued。
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; // 如果尾节点不为空,则用CAS将获取锁失败的线程入队 if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 如果结点为空,执行enq方法 enq(node); return node; }如果线程自旋中因为异常等原因获取锁最终失败,则会调用此方法
private void cancelAcquire(Node node) { // 如果节点为空,直接返回 if (node == null) return; // 由于线程要被取消了,所以将 thread 线程清掉 node.thread = null; // 下面这步表示将 node 的 pre 指向之前第一个非取消状态的结点(即跳过所有取消状态的结点),waitStatus > 0 表示当前结点状态为取消状态 Node pred = node.prev; while (pred.waitStatus > 0) node.prev = pred = pred.prev; // 获取经过过滤后的 pre 的 next 结点,这一步主要用在后面的 CAS 设置 pre 的 next 节点上 Node predNext = pred.next; // 将当前结点设置为取消状态 node.waitStatus = Node.CANCELLED; // 如果当前取消结点为尾结点,使用 CAS 则将尾结点设置为其前驱节点,如果设置成功,则尾结点的 next 指针设置为空 if (node == tail && compareAndSetTail(node, pred)) { compareAndSetNext(pred, predNext, null); } else { // 这一步看得有点绕,我们想想,如果当前节点取消了,那是不是要把当前节点的前驱节点指向当前节点的后继节点,但是我们之前也说了,要唤醒或阻塞结点,须在其前驱节点的状态为 SIGNAL 的条件才能操作,所以在设置 pre 的 next 节点时要保证 pre 结点的状态为 SIGNAL,想通了这一点相信你不难理解以下代码。 int ws; if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) && pred.thread != null) { Node next = node.next; if (next != null && next.waitStatus <= 0) compareAndSetNext(pred, predNext, next); } else { // 如果 pre 为 head,或者 pre 的状态设置 SIGNAL 失败,则直接唤醒后继结点去竞争锁,之前我们说过, SIGNAL 的结点取消(或释放锁)后可以唤醒后继结点 unparkSuccessor(node); } node.next = node; // help GC } }不管是公平锁还是非公平锁,最终都是调用AQS的release来释放锁
public final boolean release(int arg) { // 锁释放是否成功 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }tryRelease方法定义在了AQS的子类Sync方法里
protected final boolean tryRelease(int releases) { int c = getState() - releases; // 只有持有锁的线程才能释放锁,所以如果当前锁不是持有锁的线程,则抛异常 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; // 说明线程持有的锁全部释放了,需要释放 exclusiveOwnerThread 的持有线程 if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }锁释放成功后就会唤醒head之后的节点,让他们来竞争锁
public final boolean release(int arg) { // 锁释放是否成功 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) // 锁释放成功后,唤醒 head 之后的节点,让它来竞争锁 unparkSuccessor(h); return true; } return false; }这里释放锁的条件时h!=null&&h.waitStatis!=0
如果h==null:
一个线程在竞争锁,现在它释放了,当然没有所谓的唤醒后续节点其他线程正在运行竞争锁,只是还未初始化头节点,既然其他线程正在运行,也就无需执行唤醒操作如果h!=null 且 h.waitStatis==0
说明head的后续节点正在自旋竞争锁,也就是说线程是运行状态的,即无需唤醒h!=null&&h.waitStatis<0
此时waitStatus的值可能为SIGNAL或PROPAGATE,这两种情况说明后续节点阻塞需要被唤醒由于节点入队并不是原子操作,所以在寻找队列的第一个非取消状态的节点要从后往前找。
线程自旋时是先执行node.pre=pre,然后再执行pre.next =node,如果unparkSuccessor刚好在这两者之间执行,此时找不到head的后续节点。
AQS通过提供state及FIFO队列的管理,为我们提供了一套通用的实现锁的底层方法,基本上定义一个锁,都是转为在其内部定义AQS的子类,调用AQS的底层方法来实现的,由于AQS在底层已经为了定义好了这些获取stats及FIFO队列的管理工作,我们要实现一个锁就比较简单了。
下面就是基于AQS来实现一个非可重入的互斥锁。
ublic class Mutex { private Sync sync = new Sync(); public void lock () { sync.acquire(1); } public void unlock () { sync.release(1); } private static class Sync extends AbstractQueuedSynchronizer { @Override protected boolean tryAcquire (int arg) { return compareAndSetState(0, 1); } @Override protected boolean tryRelease (int arg) { setState(0); return true; } @Override protected boolean isHeldExclusively () { return getState() == 1; } } }