AQS (1) ReentrantLock 的上锁 与 解锁 源码

AQS大致感受

AQS( java . util . concurrent . locks .AbstractQueuedSynchronizer ) , AQS可以说是整个Java并发编程包JUC的底层实现,采用CAS+自旋的方式,然后采用一个双向链表组成的队列,结合一个state来实现的同步器。很多JUC包,比如ReentrantLock、Semaphore、CountDownLatch、CyclicBarrier 都是通过AQS来实现的。也就是说AQS主要用来: 提供多线程竞争时,线程等待唤醒的机制,也就是如何用java应用层代码实现锁

eclipse-javadoc:☂=hadluo-test/E:\/java8\/jre\/lib\/rt.jar

eclipse-javadoc:☂=hadluo-test/E:\/java8\/jre\/lib\/rt.jar

eclipse-javadoc:☂=hadluo-test/E:\/java8\/jre\/lib\/rt.jar

eclipse-javadoc:☂=hadluo-test/E:\/java8\/jre\/lib\/rt.jar


AQS框架大致思想

AQS的核心就是一个标记为volatile的state状态值+一个队列,这个锁是否被抢占到则是用这个state来标记的,因此把state设置为volatile的目的也是保证对所有的线程可见,我们假设有三个线程去抢占锁,此时线程1抢占成功了,就会把state值设置为1,然后其他的没有抢到的线程则会被封装成一个Node进入到这个阻塞队列中。等待持有锁的线程释放之后,会唤醒离头结点最近的那个节点对应的线程。继续尝试抢占state。

源码大致成员:

public abstract class AbstractQueuedSynchronizer {
    // 指向 运行的节点
    private transient volatile Node head;
    //指向 新增加的等待线程节点
    private transient volatile Node tail;
    //线程拿到锁就加加,值就是线程重入锁的次数
    private volatile int state;
    // 当前拿到锁的线程
    private transient Thread exclusiveOwnerThread;

    // 等待队列的Node结构
    static final class Node {
        // 线程所处的等待锁的状态,初始化时,该值为0
        volatile int waitStatus;
        volatile Node prev;
        volatile Node next;
        volatile Thread thread;
        Node nextWaiter;
    }
}


ReentrantLock怎么使用AQS的

ReentrantLock的构造

  public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }
  // 默认构造的是非公平的
  public ReentrantLock() {
        sync = new NonfairSync();
  }

参数如果为true ,就是公平锁(FairSync),否则就是非公平锁(NonfairSync),我们先讨论下默认的非公平锁怎么实现的。 综上所述:构造函数就是构造了一个NonfairSync 对象赋值给成员sync 。

注意:NonfairSync和FairSync 都是继承 AbstractQueuedSynchronizer (AQS), 也就是说要想使用AQS功能,就需要继承它。

lock.lock() 加锁实现

 final void lock() {
            //调用AQS的compareAndSetState ,cas判断state是不是0,是的话返回true,且把state改为1
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
  }

  // AQS里面的方法,利用cas判断state的值是否是预期的值
  protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

会调到NonfairSync 的lock方法,做了下面几步操作:

  • 如果还没有线程执行抢占锁,state肯定为0, CAS将state改为1,并且设置当前线程到 AQS的 exclusiveOwnerThread 成员变量中,代表 有线程已经抢占锁了。
  • 如果已经有线程被抢占了,执行else逻辑,也就是acquire(1) 。

acquire(1) 分析

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

acquire也是AQS的方法。大致 就是去尝试获取锁,如果失败了就构造Node节点加入等待队列。他是由四个函数组合:

  • tryAcquire(int arg)
    由子类实现,获取锁。
  • addWaiter(Node mode)
    获取锁失败后,将等待线程封装成Node加入等待队列,由AQS实现。
  • acquireQueued(final Node node, int arg)
    在队列中如果其前驱节点是头节点,就循环获取锁,获取锁成功就返回。
    如果其前驱不是头节点,或者是头节点但是获取锁失败,挂起当前线程。由AQS实现。
  • selfInterrupt()
    自我中断,当获取锁的时候,发生中断时记录下来,推迟到抢锁结束后中断线程

下面来一步一步分析

由于tryAcquire是AQS里面的方法,且没有实现,就是要子类来实现的,这里就是在 NonfairSync 类里面。

    // AQS里面的tryAcquire方法,不提供实现
    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }   

       // 子类NonfairSync 里面的 tryAcquire 实现
    final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }
  • 如果state为0 ,代表没有线程占锁了,然后CAS拿锁并设置为当前线程,返回拿锁成功true。
  • 如果state已经被占,判断是不是当前线程占的(可重入),如果是将state设置为加1 ,也就是重入次数,返回拿锁成功true。
  • 以上两步都不是,返回拿锁失败false。

回到 acquire(1)

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

tryAcquire拿锁失败返回false,走 acquireQueued(addWaiter(Node.EXCLUSIVE), 1) 。

EXCLUSIVE是独占模式:独占锁被某个线程持有时,其他线程只能等待当前线程释放后才能去竞争锁,而且只有一个线程能竞争锁成功。

还有共享锁:共享锁是可以被共享的,它可以被多个线程同时持有。如果一个线程获取共享锁成功,那么其他等待的线程也会去获取共享锁,而且获取大概率会成功。

共享锁我也有文章分析了,请看

罗政:AQS (2) 共享锁 CountDownLatch 源码分析


addWaiter(Node.EXCLUSIVE)分析

    private Node addWaiter(Node mode) { // mode为 null
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail; // null
        if (pred != null) {
           // 已经有等待的节点了,就直接添加到队列尾部
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
static final class Node {
        Node nextWaiter;
        volatile Thread thread;
        static final Node SHARED = new Node();
        static final Node EXCLUSIVE = null;
        Node(Thread thread, Node mode) { 
            this.nextWaiter = mode;
            this.thread = thread;
        }
}

构造了一个Node, 设置了当前线程的值。tail是AQS的成员,现在tail是null的,于是进入enq刚刚构造的node。

  private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // 队列为空,构造空的队列头node
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else { // 添加node到队列尾部
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

enq就是把node节点 插入到队列的尾部 ,并把全局tail指向node,head指向队头(空的node节点)。 enq后变成下图结构:

如果又有线程拿不到锁,执行addWaiter 的话,就直接执行if(pred !=null) 里面的

  private Node addWaiter(Node mode) { // mode为 null
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail; // null
        if (pred != null) {
           // 已经有等待的节点了,就直接添加到队列尾部
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

就直接往队列尾部加节点如下图

addWaiter已经讲完。我们再 回到 acquire(1)

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

addWaiter返回的是 tail节点。acquireQueued方法如下

final boolean acquireQueued(final Node node, int arg) {  //node tail节点
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                // 返回尾节点的前一个节点
                final Node p = node.predecessor();
               //前一个节点是头节点,代表是队列中第一个线程等待的节点 & 拿锁成功
                if (p == head && tryAcquire(arg)) {
                    // 拿锁成功,就直接运行,我们这里是拿锁不成功的
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
 // Node节点里面的方法,返回前一个节点
  final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

根据代码注释,拿锁不成功回去判断shouldParkAfterFailedAcquire

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        // pred 为tail队尾等待的节点的前一个节点   node为tail队尾等待的节点
        int ws = pred.waitStatus;  //此时waitStatus是为0的
        if (ws == Node.SIGNAL)  // -1
            return true;
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
           // 将 waitStatus 置为 Node.SIGNAL也就是-1  , 代表等待唤醒
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
    static final int SIGNAL    = -1;

shouldParkAfterFailedAcquire 就是从我们tail节点开始,把前一个节点的waitStatus改为-1,改成功后,再次循环进入shouldParkAfterFailedAcquire ,判断前一个节点是-1了,就返回true, 就执行 parkAndCheckInterrupt() 方法。队列结构图:

parkAndCheckInterrupt 方法

    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

就是阻塞当前线程了。也就是拿锁不成功等待。到此lock方法已结束。

lock方法总结

  • 当第一个线程lock时, cas判断AQS的state为0并且设置为1,还将exclusiveOwnerThread设置为当前线程,代表拿锁成功然后返回,如果判断state为1,则进行下面操作。
  • 如果此时AQS的tail为空,代表队列还是空的,则进行enq入队操作,入队时,构造两个node,一个是空的头结点,一个是存了当前线程的尾部节点,且两个节点的waitStatus 为0。然后循环把当前tail节点的前一个节点的waitStatus 置为-1,最后执行LockSupport.park挂起当前线程。
  • 如果此时有第三个线程执行lock(且第一个线程还没释放锁),然后也会构造一个当前线程的node,然后添加到队列的尾部,并把前一个节点的waitStatus 置为-1,最后LockSupport.park挂起当前线程。
  • 造成的后果就是: 来一个线程我就添加到队列尾部,然后将前一个节点的waitStatus置为-1,然后阻塞挂起。最后一个节点的 waitStatus是0,其余都是-1。


lock.unlock() 解锁实现

    public final boolean release(int arg) {  // arg 为 1
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
  protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }

unlock 会直接调到AQS的release方法。tryRelease也是要子类实现的。

     protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
               // 当前线程还没有拿锁
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                // 已经释放锁了
                free = true;
                setExclusiveOwnerThread(null);
            }
           // 重入锁情况是大于0
            setState(c);
            return free;
     }

c为当前state减1的值, Thread.currentThread()!=getExclusiveOwnerThread() 当前线程还没调用lock, 你就直接调用release,这不是疯了吗,于是抛出异常。如

        ReentrantLock lock = new ReentrantLock();
        lock.unlock();
/// 结果
Exception in thread "main" java.lang.IllegalMonitorStateException
	at java.util.concurrent.locks.ReentrantLock$Sync.tryRelease(ReentrantLock.java:151)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer.release(AbstractQueuedSynchronizer.java:1261)
	at java.util.concurrent.locks.ReentrantLock.unlock(ReentrantLock.java:457)
	at arithmetic.Server.main(Server.java:12)

tryRelease方法 就是让state作减减操作,如果减后是0,则把AQS的线程置为null,返回解锁成功true , 还有就是有重入锁情况,state减后还不是0,返回解锁不成功false。

在回去看上面的release方法。返回false,就直接返回了。也就是说重入锁且重入次数大于1的话,只是把state值减减而已,也没有执行唤醒操作。

如果解锁成功呢?再看下release代码

  if (tryRelease(arg)) {
       // 解锁成功 ,此时AQS state值为0
            Node h = head;
            // 取出头结点(是一个空节点)执行unparkSuccessor
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
  }

unparkSuccessor很关键

    private void unparkSuccessor(Node node) { // node是空的头结点
       // 从上面lock分析 此时是-1的
        int ws = node.waitStatus;
        if (ws < 0)  // cas把node的waitStatus 改为0 
            compareAndSetWaitStatus(node, ws, 0);
       // 头结点的一个节点
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        // 唤醒 头结点的下一个节点
        if (s != null)
            LockSupport.unpark(s.thread);
    }

unparkSuccessor 就是把空的头结点唤醒。 注意现在头节点的waitStatus 已变成0了 。然后我们在回到上面park阻塞的位置代码

final boolean acquireQueued(final Node node, int arg) { 
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                // 因为唤醒的  是头结点下一个节点,所以node就是头结点下一个节点
                final Node p = node.predecessor(); //p 为头结点
               //前一个节点是头节点,代表是队列中第一个线程等待的节点 & 拿锁成功
                if (p == head && tryAcquire(arg)) {
                    // state已经是0了,会进入拿锁成功,
                    setHead(node); //头结点变成了唤醒的节点
                    p.next = null; // 之前的头结点next置为空,反正不要了
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt()) // 阻塞到这里,唤醒后继续执行
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }

唤醒后parkAndCheckInterrupt方法返回继续for循环。此时node就是唤醒的节点,也就是头结点的下一个节点。然后把之前头结点改成唤醒的节点。回收之前的头结点。

回收后,AQS队列又变成这样咯

https://pic2.zhimg.com/v2-081c5da8433f90a6cef087ac85226895_b.jpg

总结unlock过程

  • 如果之前没有lock过,会直接抛出UnsupportedOperationException异常
  • 如果是重入多次(多次调用了lock),则只会那减减重入次数(state减减 )。
  • 如果只是lock一次,state为1,减减到0,并把AQS的exclusiveOwnerThread置为空,代表解锁成功。
  • 解锁成功后,去唤醒空的头结点的下一个节点,unlock完毕! 但是会导致之前节点的park继续执行。执行逻辑就是把唤醒的节点变成头结点,之前头结点移除。也就是相当于把这个唤醒的节点作出队操作。

AQS还是很庞大的,现在还只是讲了ReentrantLock的lock和unlock不知道你了解没有,先讲到这里把,需要时间消化消化。后面我在继续讲关于AQS的其它东西。


《 飘风不终朝,骤雨不终日 》

释义:狂风总有一天会停止,大雨总有一天会消失。


强烈推荐一个 进阶 JAVA架构师 的博客

https://pic2.zhimg.com/v2-1e8deea0c94dab83067a8eca4007734d_ipico.jpg

本文完~

支付宝打赏 微信打赏

如果文章对您有帮助,您可以鼓励一下作者