不断的学习,我们才能不断的前进
一个好的程序员是那种过单行线马路都要往两边看的人

Java源码解析---ReentrantLock与AQS

AQS概述

AQS抽象队列同步器,是实现锁的一个框架,内部定义了很多锁相关的方法。AQS内部维护了一个代表共享资源的状态变量volatile int state和一个FIFO线程等待队列(多线程争用资源被阻塞时会进入此队列)。
AQS 通过提供 state 及 FIFO 队列的管理,为我们提供了一套通用的实现锁的底层方法.
aqsshi-xian-yuan-li

状态变量state

state 初始化 0,在多线程条件下,线程要执行临界区的代码,必须首先获取 state,某个线程获取成功之后, state 加 1,其他线程再获取的话由于共享资源已被占用,所以会到 FIFO 等待队列去等待,等占有 state 的线程执行完临界区的代码释放资源( state 减 1)后,会唤醒 FIFO 中的下一个等待线程(head 中的下一个结点)去获取 state。
state 由于是多线程共享变量,所以必须定义成 volatile,以保证 state 的可见性。

等待队列

等待队列其实是双向链表实现的,由 head, tail 节点表示,head 结点代表当前占用的线程,其他节点由于暂时获取不到锁所以依次排队等待锁释放。结点的形式为Node:
头结点为虚结点,里面的exclusiveOwnerThread记录的是加锁的结点。

static final class Node {
        /** 标识等待节点处于共享模式,Semaphore使用**/
        static final Node SHARED = new Node();
        /** 标识等待节点处于独占模式,ReentrantLock使用 */
        static final Node EXCLUSIVE = null;

        // waitStatus的值,
        static final int CANCELLED =  1; // 由于超时或中断,节点已被取消
        
        //节点阻塞(park)必须在其前驱结点为 SIGNAL 的状态下才能进行,如果结点为 SIGNAL,则其释放锁或取消后,可以通过 unpark 唤醒下一个节点。
        static final int SIGNAL    = -1; 
        static final int CONDITION = -2; //表示线程在等待条件变量
        static final int PROPAGATE = -3; //表示后续结点会传播唤醒的操作,共享模式下起作用
        volatile int waitStatus;

        volatile Node prev;
        volatile Node next;
        volatile Thread thread; //使此结点排队的线程,使用后清零
        Node nextWaiter; //链接到下一个等待条件的结点
        
        final boolean isShared() {return nextWaiter == SHARED;}

        // 返回结点的前驱结点
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }
        Node() {  
        }
        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }
        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }

AQS源码

public abstract class AbstractQueuedSynchronizer
  extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    // 以下为双向链表的首尾结点,代表入口等待队列
    private transient volatile Node head;
    private transient volatile Node tail;
    
    // 共享变量 state
    private volatile int state;
    
    // cas 获取 / 释放 state,保证线程安全地获取锁
    protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }
    // ...

Condition

Condition用来替代传统的Object的wait()、notify()实现线程间的协作,相比使用Object的wait()、notify(),使用Condition中的await()、signal()这种方式实现线程间协作更加安全和高效。因此通常来说比较推荐使用Condition,主要对外提供awaite(Object.wait())和signal(Object.notify())调用。

ReentrantLock 加锁过程

ReentrantLock可以实现公平锁和非公平锁,通过在构造函数传入true / false 来实现,公平和非公平的区别在于,进入阻塞队列的时候,非公平锁会去尝试获取锁(CAS尝试设置状态变量为1),从而达到插队的目的
FairSync 和 NonfairSync 都是 ReentrantLock的内部类 和 Sync的实现类,Sync抽象类继承了AQS。
公平锁在获取锁时:tryAcquire里面需要先判断当前线程节点是否存在前驱节点,即只有首节点才能够获取同步状态
非公平锁获取锁时:

static final class NonfairSync extends Sync {
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }
}
static final class FairSync extends Sync {
        final void lock() {
            acquire(1);
        }
}    

非公平锁加锁的过程:

非公平锁通过CAS 尝试设置state为1,如果成功则设置 当前独占线程exclusiveOwnerThread为当前线程; 如果失败则执行acquire(1), 尝试着获取 state。
acquire(1):首先调用 tryAcquire 尝试着获取 state,如果成功,则跳过后面的步骤。如果失败,则执行 addWaiter 将线程加入 CLH 等待队列中,然后执行acquireQueued自旋尝试获取锁; 如果acquireQueued返回结果表示true,说明当前线程被唤醒或者被中断过,如果发现当前线程曾经被中断过,那我们就把当前线程再中断一次。PS: 因为被中断唤醒后当前线程并不响应中断,而是标记一下,然后自旋尝试去获取锁,所以抢到锁返回后,发现当前线程被中断后,就再中断自己一次

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

tryAcquire

这个方法是 AQS 提供的一个模板方法,最终由其 AQS 具体的实现类(Sync)实现,非公平锁逻辑步骤如下:

  1. 如果state状态变量为0,代表锁已经被释放,则使用 CAS 去重新获取锁资源,如果获取成功,则代表竞争锁成功,并设置独占线程exclusiveOwnerThread为当前线程。
  2. 如果 state 不为 0,代表之前已有线程占有了锁,如果当前线程等于独占线程,则为可重入锁,直接设置state状态变量为1。
  3. 加锁成功返回true,失败返回false。
final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
            //与公平方式不同,这里不需要判断是否存在前驱节点,即当前线程可以直接参与竞争锁
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

公平锁会先去判断是否存在前驱节点,没有前驱结点,即当前线程可以直接参与竞争

addWaiter

addWaiter将包含有当前线程的 Node 节点入队, Node.EXCLUSIVE 代表此结点为独占模式.
执行完 addWaiter 后,线程入队成功。

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    // 如果尾结点不为空,则用 CAS 将获取锁失败的线程入队
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 如果结点为空,执行 enq 方法
    enq(node);
    return node;
}

acquireQueued

这一步主要操作是让阻塞线程自旋尝试去获取锁
AQS 对线程自旋的处理:

  • 如果当前节点的上一个节点不为 head,且它的状态为 SIGNAL,则当前结点进入阻塞状态;
  • 如果当前结点的前一个节点是 head 结点,且获取锁(tryAcquire)成功:把 head 指向当前节点,并且让原 head 结点出队,结束返回false;,这样由于原 head 不可达, 会被垃圾回收。
  • 如果前一个节点不是 head 或者竞争锁失败,则首先调用 shouldParkAfterFailedAcquire 方法判断锁是否应该停止自旋进入阻塞状态,进入阻塞则当前线程挂起,等待被唤醒。
inal boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                //如果前一个节点是 head,则尝试自旋获取锁
                if (p == head && tryAcquire(arg)) {
                //  将 head 结点指向当前节点,原 head 结点出队
                    setHead(node); 
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 如果前一个节点不是 head 或者竞争锁失败,则进入阻塞状态
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
        //  如果线程自旋中因为异常等原因获取锁最终失败,则调用此方法,
        //  设置当前线程为cancle状态。
            if (failed)
                cancelAcquire(node);
        }
    }

shouldParkAfterFailedAcquire

如果加锁失败或者Node的前置节点不是head节点,就会通过shouldParkAfterFailedAcquire方法 将head节点的waitStatus变为了SIGNAL=-1,最后执行parkAndChecknIterrupt方法,调用LockSupport.park()挂起当前线程。
判断锁是否应该停止自旋进入阻塞状态:
1、如果前驱节点为 SIGNAL,则当前节点可以进入阻塞状态,返回true。
2、如果前驱节点为取消状态,则前驱节点需要移除,把所有当前节点之前所有 waitStatus 为取消状态的节点全部移除。
3、如果前驱节点小于等于 0,则需要首先将其前驱节点置为 SIGNAL。
shouldParkAfterFailedAcquire 返回 true 代表线程可以进入阻塞中断,那么下一步 parkAndCheckInterrupt 就该让线程阻塞了。

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
        
    if (ws == Node.SIGNAL)
       // 1. 如果前置顶点的状态为 SIGNAL,表示当前节点可以阻塞了
        return true;
    if (ws > 0) {
        // 2. 移除取消状态的结点
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        // 3. 如果前置节点的 ws <= 0,则其设置为 SIGNAL,
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

parkAndCheckInterrupt执行中断

private final boolean parkAndCheckInterrupt() {
    // 阻塞线程
    LockSupport.park(this);
    // 返回线程是否中断过,并且清除中断状态(在获得锁后会补一次中断)
    return Thread.interrupted();
}

cancelAcquire

这个cancelAcquire方法不仅是取消了当前节点的排队,还会同时将当前节点之前的那些已经CANCEL掉的节点移出队列。不过这里尤其需要注意的是,这里是在并发条件下,此时此刻,新的节点可能已经入队了,成为了新的尾节点,这将会导致node == tail && compareAndSetTail(node, pred)这一条件失败。

private void cancelAcquire(Node node) {
    // Ignore if node doesn't exist
    if (node == null)
        return;

    node.thread = null;

    // 由当前节点向前遍历,跳过那些已经被cancel的节点
    Node pred = node.prev;
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;
    
    // 从当前节点向前开始查找,找到第一个waitStatus>0的Node, 该节点为pred
    // predNext即是pred节点的下一个节点
    // 到这里可知,pred节点是没有被cancel的节点,但是pred节点往后,一直到当前节点Node都处于被Cancel的状态
    Node predNext = pred.next;

    //将当前节点的waitStatus的状态设为Node.CANCELLED
    node.waitStatus = Node.CANCELLED;

    // 如果当前节点是尾节点,则将之前找到的节点pred重新设置成尾节点,并将pred节点的next属性由predNext修改成Null
    // 这一段本质上是将pred节点后面的节点全部移出队列,因为它们都被cancel掉了
    if (node == tail && compareAndSetTail(node, pred)) {
        compareAndSetNext(pred, predNext, null);
    } else {
        // 到这里说明当前节点已经不是尾节点了,或者设置新的尾节点失败了
        // 我们前面说过,并发条件下,什么都有可能发生
        // 即在当前线程运行这段代码的过程中,其他线程可能已经入队了,成为了新的尾节点
        // 虽然我们之前已经将当前节点的waitStatus设为了CANCELLED 
        // 但是由我们在分析lock方法的文章可知,新的节点入队后会设置闹钟,将找一个没有CANCEL的前驱节点,将它的status设置成SIGNAL以唤醒自己。
        // 所以,在当前节点的后继节点入队后,可能将当前节点的waitStatus修改成了SIGNAL
        // 而在这时,我们发起了中断,又将这个waitStatus修改成CANCELLED
        // 所以在当前节点出队前,要负责唤醒后继节点。
        int ws;
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                compareAndSetNext(pred, predNext, next);
        } else {
            unparkSuccessor(node);
        }

        node.next = node; // help GC
    }
}

加锁流程图:
reentrantlock-lock-1

lockInterruptibly

前面的lock方法是阻塞式的,抢到锁就返回,抢不到锁就将线程挂起,并且在抢锁的过程中是不响应中断的,而lockInterruptibly可以响应中断,主要区别在于doAcquireInterruptibly这个方法。

lockInterruptibly会先判断是否中断过,是的话直接抛出异常。所以需要再业务逻辑处捕获异常。

private void doAcquireInterruptibly(int arg) throws InterruptedException {
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return; //与acquireQueued方法的不同之处
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException(); //与acquireQueued方法的不同之处,直接响应中断,抛出异常。
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

ReentrantLock 解锁过程

不管是公平锁还是非公平锁 都是使用AQS模版方法来释放锁:

public final boolean release(int arg) {
        if (tryRelease(arg)) { // 尝试释放锁
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h); // 唤醒后继结点
            return true;
        }
        return false;
    }

h == null, 这有两种可能,一种是一个线程在竞争锁,现在它释放了,当然没有所谓的唤醒后继节点,一种是其他线程正在运行竞争锁,只是还未初始化头节点.
h.waitStatus == 0,说明 head 的后继节点正在自旋竞争锁,也就是说线程是运行状态的。

tryRelease
tryRelease 方法定义在了 AQS 的子类 Sync 方法里:

判断当前线程是不是持有锁的线程,判断释放后的状态变量是不是0,是的话设置当前持有锁的线程为null,共享状态变量为0.

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    // 只有持有锁的线程才能释放锁,所以如果当前锁不是持有锁的线程,则抛异常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    // 说明线程持有的锁全部释放了,需要释放 exclusiveOwnerThread 的持有线程
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

unparkSuccessor
锁释放成功后唤醒 head 之后节点,让它来竞争锁。

private void unparkSuccessor(Node node) {
    // 获取 head 的 waitStatus(假设其为 SIGNAL),并用 CAS 将其置为 0,
    //为啥要做这一步呢,之前我们分析过多次,其实 waitStatus = SIGNAL(< -1)或 PROPAGATE(-·3) 
    //只是一个标志,代表在此状态下,后继节点可以唤醒,既然正在唤醒后继节点,自然可以将其重置为 0,
    //当然如果失败了也不影响其唤醒后继结点
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    // 以下操作为获取队列第一个非取消状态的结点,并将其唤醒
    Node s = node.next;
    // s 状态为非空,或者其为取消状态,说明 s 是无效节点,此时需要执行 if 里的逻辑
    if (s == null || s.waitStatus > 0) {
        s = null;
        // 以下操作为从尾向前获取最后一个非取消状态的结点
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread); // 唤醒线程
}

重新将head指针指向next线程对应的Node节点,且使用LockSupport.unpark方法来唤醒线程。
此时线程被唤醒,线程接着之前被park的地方继续执行,继续执行acquireQueued()方法

Condition

await

ConditionObject是条件队列,用来替换Object类的wait和notify,因为Object类的wait只能从条件队列里面随机唤醒一个线程。而Condition的signal可以达到指定唤醒具体的线程。
条件队列则是由condition形成的条件队列,线程被await操作挂起后就会被放入条件队列,这个队列中的节点都被挂起,他们都等待被signal进入阻塞队列再次获取锁。
主要的流程如下:

  1. 给当前线程创建条件结点Node,设置等待状态为Condition,条件结点进入条件队列末尾。
  2. 当前线程释放锁资源,返回重入锁次数。
  3. 判断是否存入AQS阻塞队列,当前状态为Condition,所以不进入阻塞队列:
  4. 当前线程调用unsafe.park挂起当前线程,等待被signal唤醒
  5. 当线程被signal唤醒的时候,或者判断当前结点需要进入阻塞队列的时候,node结点进入阻塞队列,
  6. 进入阻塞队列后通过自旋尝试获取锁,并设置持有锁的次数。
    public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();// 生产等待队列的结点,WaitStatus是Condition
            int savedState = fullyRelease(node);// 释放锁,返回锁重入次数
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {// 判断是否进入阻塞队列,WaitStatus是Condition 所以不进入阻塞队列
                LockSupport.park(this); // 阻塞当前线程,等待signal唤醒
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

signal

调用signal前必须先持有当前独占锁。
具体的步骤如下:

  1. 判断是否持有锁,没有锁直接抛出异常
  2. 唤醒条件队列中第一个节点,firstWaiter。
  3. 把需要唤醒的结点,从条件队列转移到阻塞队列里面,并返回当前结点的前驱结点。
  4. 如果前驱结点状态为cancle 或者 cas设置前驱结点的等待状态为signal失败,则unpark唤醒线程。
    public final void signal() {
            if (!isHeldExclusively()) //判断是否持有锁
                throw new IllegalMonitorStateException();
            Node first = firstWaiter; // 唤醒条件队列第一个结点
            if (first != null)
                doSignal(first);
    }
    private void doSignal(Node first) {
            do {
            // firstWaiter指向第一个结点后面的结果,如果为null,则尾结点为null
            
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null); //添加入AQS队列如果不成功一直往下找,知道找到成功的
    }
    
    final boolean transferForSignal(Node node) {
        /*
         * If cannot change waitStatus, the node has been cancelled.
         */
         // CAS更新等待状态
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

       // 添加入AQS阻塞队列,返回上一个节点
        Node p = enq(node);
        int ws = p.waitStatus;
        // 上一个节点为取消状态,替换该状态为取消,解除阻塞,唤醒线程
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }
        

面试问题

reentrantlock可中断是怎么实现的

reentrantlock可中断是怎么实现的?PS: 当前线程使用lockInterruptibly() 获取锁,其他线程一直执行,导致当前线程获取不到锁,当前线程可以使用thread.interrupt() 并响应中断,所以抛出异常(throw new InterruptedException();),在程序外部try_catch捕获异常,进行处理即可。即当前线程可以通过中断放弃等待

而lock()方法是不会响应线程的interrupt()方法的,只有等到获取锁的线程释放锁之后才会响应interrupt(),lock使用的是interrupted = true;。

Reference

理解AQS
ReentrantLock来解锁AQS

推荐下面这篇文章,讲的很细:包括为什么要从后往前遍历找到single,而不是从前往后?还有就是cancelAcquire的作用?还有就是可中断是这么实现的?
ReentrantLock源码分析


目录