CountDownLatch源码分析

it2023-01-21  51

CountDownLatch底层用的是AQS(AbstractQueuedSynchronizer) 通过CountDownLatch初始化state值, await()方法:如果state不是0就会park,等待被唤醒 countDown()方法:每调用一次就会使state减一,如果state是0了,就会执行唤醒操作,唤醒等待线程

private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; // state是AbstractQueuedSynchronizer里的属性,通过原子的操作state,实现加锁和解锁。 // CountDownLatch构造方法的传参,通过这里设置给了state. Sync(int count) { setState(count); } int getCount() { return getState(); } // 等于0则可以继续,返回1 // 非0则要阻塞等待,返回-1 protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } // 唤醒 protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); // 如果此时已经是0,表示其他的线程已经把state改成0了,不用再次唤醒了,返回false if (c == 0) return false; // 减一,并且更新state的值 int nextc = c-1; if (compareAndSetState(c, nextc)) // 和0比较, // = 0: 表示达到了唤醒的条件,返回true // !=0: 没有达到唤醒的条件,返回false return nextc == 0; } } }

await()方法

public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); } public final void acquireSharedInterruptibly(int arg) throws InterruptedException { // 如果其他的要此线程中断等待,就可以抛出异常 if (Thread.interrupted()) throw new InterruptedException(); // tryAcquireShared()方法刚刚已经看到了 if (tryAcquireShared(arg) < 0) // 进入if块,说明state不是0, 则需要阻塞等待此时的arg是await()方法传来的是1 doAcquireSharedInterruptibly(arg); }

doAcquireSharedInterruptibly()方法

// 需要阻塞等待 private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { // 构造链表 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { // 找到这个node的前驱 final Node p = node.predecessor(); // 如果是头结点就进入代码块 if (p == head) { // 这个方法已经见过了,state是否等于0, int r = tryAcquireShared(arg); // 如果大于0,说明state已经等于0了,则跳出循环 if (r >= 0) { // 将此node节点设置为头结点 setHeadAndPropagate(node, r); // 将前驱节点删除 p.next = null; // help GC failed = false; return; } } // 将node的前驱节点的waitStatus属性设置为-1 if (shouldParkAfterFailedAcquire(p, node) && // park住线程 parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }

addWaiter()方法:

private Node addWaiter(Node mode) { // 创建新的node节点 Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; // 此时的尾指针不是空,将node插入到链表的尾部,形成双向链表 if (pred != null) { // 新创建的node的前驱指向尾指针 node.prev = pred; // 将node设置为尾指针 if (compareAndSetTail(pred, node)) { // next指向node,构成一个双向的链表 pred.next = node; // 返回新创建的节点 return node; } } // 如果尾指针为空,表示还没有形成链表,进入enq()方法,构造链表 enq(node); return node; } // 构造链表 private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize // 如果尾指针为空,则先创建一个空的node节点 if (compareAndSetHead(new Node())) // 头指针和尾指针够指向这个空节点 tail = head; } else { // 循环第二次的时候,尾指针已经不为空了,进入此代码块 // 将node插入到尾部,形成双向链表 node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }

总结:

先获取一次state是否等于0,等于0 则不用阻塞,如果不等于0,则进入阻塞线程操作。新增node节点,如果已经形成链表就直接追加到链表尾部,如果没有形成链表就先创建一个空的头结点,之后链接到尾部查看node的前驱节点,如果是头结点,再次查看state的状态是否等于0,等于0,则讲此节点设置为头结点,删除之前的头结点;如果state的状态不是0,则将node的前驱节点的waitStatus设置为-1,再次循环,重复步骤3,如果state还是不满足条件,就park线程,等待唤醒。

唤醒操作

public void countDown() { sync.releaseShared(1); } public final boolean releaseShared(int arg) { // 将state-1,如果等于0,则满足唤醒条件,进入唤醒操作 if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { // 先将头结点的WaitStatus设置为0 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases // 唤醒操作 unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } } private void unparkSuccessor(Node node) { // 此时node已经设置为了0 int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } // 执行下面的操作,直接唤醒线程。 if (s != null) LockSupport.unpark(s.thread); }

总结:

先将state的值减一,如果为0则表示已经符合唤醒的条件,进入唤醒操作,先将头结点的waitStatus设置为0,之后唤醒线程。
最新回复(0)