AQS实现了锁获取的基本框架,是JAVA中众多锁以及并发工具的基础 参考https://segmentfault.com/a/1190000015739343
state:整个工具的核心,设置和修改状态,很多方法的操作都依赖于当前状态是什么。状态全局共享,volatile类型,以保证其修改的可见性;
private volatile int state;该属性的值即表示了锁的状态,state为0表示锁没有被占用,state大于0表示当前已经有线程持有该锁,大于1是因为可能存在可重入的情况。
FIFO队列:存放阻塞的等待线程,来完成线程的排队执行。封装成Node,Node维护一个prev引用和next引用,实现双向链表;Node是 AbstractQueuedSynchronizer 的一个内部类
static final class Node { // 共享锁和独占锁的判断标志 static final Node SHARED = new Node(); static final Node EXCLUSIVE = null; // waitStatus 可选值 static final int CANCELLED = 1; static final int SIGNAL = -1; static final int CONDITION = -2; static final int PROPAGATE = -3; // 节点的等待状态,这个后面详细介绍 volatile int waitStatus; // 当前节点的前置节点 volatile Node prev; // 当前节点的后置节点 volatile Node next; // Node 数组中所代表的线程 volatile Thread thread; // 标志位,如果是null则说明是独占锁,不为null说明是共享锁 Node nextWaiter; // 判断是否是共享锁 final boolean isShared() { return nextWaiter == SHARED; } // 返回前置节点 final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } .... }Node总结属性为:
// 节点所代表的线程 volatile Thread thread;
// 双向链表,每个节点需要保存自己的前驱节点和后继节点的引用 volatile Node prev; volatile Node next;
// 线程所处的等待锁的状态,初始化时,该值为0 volatile int waitStatus; static final int CANCELLED = 1; static final int SIGNAL = -1; static final int CONDITION = -2; static final int PROPAGATE = -3;
// 该属性用于条件队列或者共享锁 Node nextWaiter;
CAS: CAS操作是最轻量的并发处理,通常我们对于状态的修改都会用到CAS操作,因为状态可能被多个线程同时修改,CAS操作保证了同一个时刻,只有一个线程能修改成功,从而保证了线程安全,CAS操作基本是由Unsafe工具类的compareAndSwapXXX来实现的;CAS采用的是乐观锁的思想,因此常常伴随着自旋,如果发现当前无法成功地执行CAS,则不断重试,直到成功为止,自旋的的表现形式通常是一个死循环for(;😉。
ReentrantLock有 公平锁 和 非公平锁 两种实现,默认为非公平锁
public class ReentrantLock implements Lock, java.io.Serializable { private final Sync sync; abstract static class Sync extends AbstractQueuedSynchronizer { ... } static final class NonfairSync extends Sync{ ... } static final class FairSync extends Sync { ... } public ReentrantLock() { sync = new NonfairSync(); } public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); } // 获取锁 public void lock() { sync.lock(); } ... }公平锁和非公平锁的实现都继承Sync,而Sync继承AQS
static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; //获取锁 final void lock() { acquire(1); } }acquire()方法来自AQS
/** * Acquires in exclusive mode, ignoring interrupts. Implemented * by invoking at least once {@link #tryAcquire}, * returning on success. Otherwise the thread is queued, possibly * repeatedly blocking and unblocking, invoking {@link * #tryAcquire} until success. This method can be used * to implement method {@link Lock#lock}. * * @param arg the acquire argument. This value is conveyed to * {@link #tryAcquire} but is otherwise uninterpreted and * can represent anything you like. */ public final void acquire(int arg) { //尝试获取锁 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }了解上述中几个方法实现 (1)tryAcquire(arg):由继承AQS的子类实现, 尝试获取锁 (2)addWaiter(Node mode):由AQS实现, 负责在获取锁失败后调用, 将当前请求锁的线程包装成Node放到sync queue中去,并返回这个Node。 (3)acquireQueued(final Node node, int arg):由AQS实现, 主要对上面刚加入队列的Node不断尝试以下两种操作之一: 在前驱节点就是head节点的时候,继续尝试获取锁 将当前线程挂起,使CPU不再调度它 (4)selfInterrupt:由AQS实现, 用于中断当前线程。
从上面的简单介绍中可以看出,除了获取锁的逻辑 tryAcquire(arg)由子类实现外, 其余方法均由AQS实现。 方法详解: tryAcquire
/** * Fair version of tryAcquire. Don't grant access unless * recursive call or no waiters or is first. */ protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); // 获取标记量,如果等于0则说明还没有线程获取锁 int c = getState(); if (c == 0) { // 如果等待队列中没有在当前线程前面的等待线程,则使用CAS将state置为acquires, // 并且记录下来获取锁的线程(因为是独占锁) if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } // 如果state不等于0,则说明有线程获得了锁,判断是否是当前线程获取的锁,如果是,则累加state(因为是可重入锁) else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } }addWaiter 如果 tryAcquire 尝试获取锁失败后,会调用 acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 方法,将当前线程包装成Node,加到等待锁的队列中去
/** /** * Creates and enqueues node for current thread and given mode. * * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared * @return the new node */ private Node addWaiter(Node mode) { // 将当前线程包装成一个Node节点 Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; // 如果队列不为空(因为tail是指向尾结点,如果他为空,则说明队列为空), 则将当前线程包装成Node节点入队末尾。 // 并且将tail 指向队尾节点。 if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } // 如果队列为空,或者队尾元素已经变化(compareAndSetTail(pred, node) cas 操作失败),则会调用enq enq(node); return node; } * Inserts node into queue, initializing if necessary. See picture above. * @param node the node to insert * @return node's predecessor */ private Node enq(final Node node) { for (;;) { Node t = tail; // 如果队列为空,则创建一个头结点,这个头结点是新new出来的,所以不包含任何数据。 // 外层是个循环,跳出循环的唯一办法就是走else支路 if (t == null) { // Must initialize // 创建头节点 if (compareAndSetHead(new Node())) tail = head; } else { // 走到这里说明队列已经不为空,至少有了头结点。 // 让node前置节点指向 tail所指向的节点, 之后并设置tail指向node节点.(这里会造成尾分叉) node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }在这个方法中,我们首先会尝试直接入队,但是因为目前是在并发条件下,所以有可能同一时刻,有多个线程都在尝试入队,导致compareAndSetTail(pred, node)操作失败——因为有可能其他线程已经成为了新的尾节点,导致尾节点不再是我们之前看到的那个pred了。
如果入队失败了,接下来我们就需要调用enq(node)方法,在该方法中我们将通过自旋+CAS的方式,确保当前节点入队。 acquireQueued 这个方法中将继续尝试获取锁,获取失败判断是否将线程挂起
/** * Acquires in exclusive uninterruptible mode for thread already in * queue. Used by condition wait methods as well as acquire. * * @param node the node * @param arg the acquire argument * @return {@code true} if interrupted while waiting */ final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { // 获取node节点的前置节点 final Node p = node.predecessor(); // 如果前置节点是头结点,则说明当前节点已经是等待线程中最前面的了(因为头结点并不代表任何等待线程),调用tryAcquire()尝试获取锁。 if (p == head && tryAcquire(arg)) { // 如果锁获取成功,则将node设置为头节点(清空了锁代表的线程信息,可以理解为变相的出队),并返回 setHead(node); p.next = null; // help GC failed = false; return interrupted; } // 如果node前面还有等待的节点,则判断是否需要将当前线程挂起。 // 设置好闹钟后(shouldParkAfterFailedAcquire 返回true), 调用parkAndCheckInterrupt() 挂起线程 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } .... // 可以看到如果node获取到锁,那么它将成为头结点,但是他的信息也被清空,不代表任何线程信息。 private void setHead(Node node) { head = node; node.thread = null; node.prev = null; } ... /** * Checks and updates status for a node that failed to acquire. * Returns true if thread should block. This is the main signal * control in all acquire loops. Requires that pred == node.prev. * * @param pred node's predecessor holding status * @param node the node * @return {@code true} if thread should block */ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; // 如果前置节点的状态为 SIGNAL, 则说明已经设置了唤醒状态(订好了闹钟),直接返回true。 if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; // 如果 前置节点的 ws 大于0(其实也就是取消状态),则说明前置节点已经取消排队了,则跳过这些取消的节点,直接跳到未取消的节点 if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ // 否则的话,将前置节点状态置为SIGNAL,即后面的节点需要前置节点唤醒 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }