AQS (2) 共享锁 CountDownLatch 源码分析

前面提到过独占锁的实现,今天来讲下共享锁。来看下定义:

  • 独占锁 被某个线程持有时,其他线程只能等待当前线程释放后才能去竞争锁,而且只有一个线程能竞争锁成功。
  • 共享锁 是可以被共享的,它可以被多个线程同时持有。如果一个线程获取共享锁成功,那么其他等待的线程也会去获取共享锁,而且获取大概率会成功。共享锁典型的有ReadWriteLock、CountdownLatch。

今天我们就拿CountdownLatch 来开刀,还是建议看下之前的AQS解析,不然可能看不懂今天的。传送门:

罗政:AQS (1) ReentrantLock 的上锁 与 解锁 源码

实例代码

public class a {
    public static void main(String[] args) throws IOException, InterruptedException, ExecutionException {
        CountDownLatch downLatch = new CountDownLatch(2);
        new Thread() {
            public void run() {
                downLatch.countDown();
                downLatch.countDown();
            };
        }.start();
        downLatch.await();
        System.err.println("main end");
    }
}

当多个线程每执行一次countDown 时,CountDownLatch的锁重入次数减1 (也就是state减1) ,当state为0时,await 唤醒。 多个线程共享到这个state锁,这就是共享锁。


源码分析

构造函数

public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
}    

这个函数就是讲AQS的state变量赋值为count,然后没其他的操作了。


await方法

   public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }    
// sync 的 acquireSharedInterruptibly方法
public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

两个步骤,先执行tryAcquireShared,然后 小于0的话执行doAcquireSharedInterruptibly 。

tryAcquireShared分析

protected int tryAcquireShared(int acquires) {
     return (getState() == 0) ? 1 : -1;
}

判断state,如果state为0的话就返回1,否则返回-1从而执行doAcquireSharedInterruptibly。

    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

addWaiter 之前已经分析了,就是构造一个AQS队列起来,执行addWaiter 后,由于传入的是Node.SHARED模式,构造的队列就是下面这样子

而独占模式addWaiter 是这样子

在回过来看下 addWaiter 后的步骤

 private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        // 返回tail节点(不是SHARED节点哦)
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                //获取前一个节点 ,也就是空的头结点,head指针
                final Node p = node.predecessor();
                if (p == head) {
                    //根据state判断是否能拿到锁,共享锁的话只要state大于0就拿不到,等于0就能拿到
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        // 拿到锁了
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
               // 拿不到锁 ,进入 parkAndCheckInterrupt 阻塞
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
// 是否能拿到共享锁  CountDownLatch 里面的Sys类实现
protected int tryAcquireShared(int acquires) {
       return (getState() == 0) ? 1 : -1;
}

tryAcquireShared :判断是否能拿到锁功能。

  • CountDownLatch共享锁的实现 : 只要state大于0 就拿不到锁,等于0就能拿到锁。也就是上面的tryAcquireShared方法实现。
  • 独占锁的实现 : state等于0能拿锁,AQS的独占线程(exclusiveOwnerThread)是当前线程就能拿到锁,其余拿锁失败。(根据前面文章的分析,前文已经提供了传送门)。

这里由于是wait方法的执行,state是大于0的,于是拿不到锁,进入下面两步

 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()){
     throw new InterruptedException();
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
             //pred 为 空的头结点     node为tail节点(非SHARED节点)
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            return true;
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
}

其实前文已经分析了,我在分析一遍

第一次进入shouldParkAfterFailedAcquire ,就会把 投的空节点 waitStatus改为-1 ,AQS队列就成了下面:

接着第2次for 循环进来shouldParkAfterFailedAcquire 判断 waitStatus 是-1了,就返回true。

总结 shouldParkAfterFailedAcquire 这个方法

当拿不到锁时循环执行两次,第一次让空的头结点信号变成 SIGNAL ,代表有节点要加入AQS队列了,第二次返回true,然后进行阻塞当前线程。

上面返回true后就会执行parkAndCheckInterrupt方法

    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

LockSupport.park 阻塞到此。 记住这里 ,等会 CountDownLatch 的state减减到0了,就会继续从 LockSupport.park(this); 这一行开始执行。

到此我们的 downLatch.await(); 方法分析完毕!!!!


countDown方法分析

    public void countDown() {
        sync.releaseShared(1);
    }
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

    protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState(); // 1
                if (c == 0)
                    return false;
                int nextc = c-1; // nextc = 0
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
     }

tryReleaseShared方法 就是判断state来唤醒wait。我们假如state已经是1了,也就是最后一次 countDown 的执行(执行完就唤醒了)。

在 return nextc ==0 的时候,由于nextc 已经是0了,所以返回true。返回true就执行doReleaseShared方法。

    private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;   // -1
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

还记得我们现在AQS队列的结构吗,我再贴一遍图

由于头结点的waitStatus是-1 ,于是会执行

 if (ws == Node.SIGNAL) {
       if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))// 把head的waitStatus改为0
              continue;            // loop to recheck cases
          unparkSuccessor(h);
 }

   private void unparkSuccessor(Node node) {
        int ws = node.waitStatus; //  waitStatus为 0 
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        Node s = node.next;  // head的next就是AQS队列等待的第一个节点 ,waitStatus 是0
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)  // 执行唤醒
            LockSupport.unpark(s.thread);
    }

LockSupport.unpark(s.thread); 这一步就是执行唤醒开始park的线程,唤醒后继续park位置的for循环,来删除这个唤醒的节点,唤醒后,AQS队列就只剩这样了(如果删除的前面文章已经分析过,上文也给了传送门)

https://pic2.zhimg.com/v2-081c5da8433f90a6cef087ac85226895_b.jpg


到此 CountdownLatch 已经分析结束!!! 共享锁也到此结束。

总结

共享锁,多个线程都是共享一个state,这是与独占锁的最大区别,wait时,state大于0就代表拿不到锁然后线程阻塞,countdown时,只有state减为0才表示能拿到锁,wait唤醒。

强烈推荐一个 进阶 JAVA架构师 的博客

Java架构师修炼

支付宝打赏 微信打赏

如果文章对您有帮助,您可以鼓励一下作者