AQS

目录

概述

AQS(AbstractQueuedSynchronizer)字面意思:抽象队列同步器

AQS 是用来实现锁或者其他同步器组件的公共基础部分的抽象实现。是重量级基础框架及整个 JUC 体系的基石,主要用于解决锁分配给 “谁” 的问题

加锁会导致阻塞,

有阻塞就需要排队,实现排队必然需要队列

AQS 由一个抽象 FIFO 队列 + int 类变量组成

  • CLH 双端队列:完成资源获取线程的排队工作。头部出队,尾部入队。
  • int 类变量:表示持有锁的状态:0 表示资源空闲,1 表示资源被占用。

锁与同步器的关系:

  • 锁是面向使用者的,定义了程序员与锁交互使用的API,隐藏了实现细节,调用即可。
  • 同步器是面向锁的实现者的,DougLee 提出统一规范并简化了锁的实现,将其抽象出来,屏蔽了同步状态管理、同步队列的管理和维护、阻塞线程排队和通知、唤醒机制等,是一切锁和同步组件实现的公共基础部分

如下图:

  • head 和 tail 是队列的头指针和尾指针,双端队列。
  • 队列中数据(线程)被封装在 Node 对象中。
  • volatile 的 int 类型 state 字段表示同步状态:0 表示资源空闲,1 表示资源被占用。通过 CAS 完成对 state 值的修改。

如下图:

  • Node 对象中有 prev 和 next 指针,表明是一个双向队列。
  • Thread 字段中存储需要阻塞(排队)的线程。
  • waitStatus 表示排队线程的等待状态:

class Node {
    // 表示线程以共享的模式等待锁
    static final Node SHARED = new Node();
    // 表示线程以独占的模式等待锁
    static final Node EXCLUSIVE = null;

    // 表示线程获取锁的请求已经取消
    static final int CANCELLED = 1;

    // 表示线程已经准备好了,就等待资源释放
    static final int SIGNAL = -1;

    // 表示节点在等待队列中,节点线程等待唤醒。等待 condition 唤醒
    static final int CONDITION = -2;

    // 共享式同步状态获取将会无条件地传播下去,当前线程处在 SHARED 情况下,该字段才会使用。
    static final int PROPAGATE = -3;

    // 初始值为 0,状态是上面的几种
    volatile int waitStatus;

    // 前置节点
    volatile Node prev;

    // 后继节点
    volatile Node next;

    // 需要排队的线程
    volatile Thread thread;

  	// 指向下一个处于 CONDITIION 状态的节点。
    Node nextWaiter;

    final boolean isShared() {
        return nextWaiter == SHARED;
    }

		// 返回前驱节点,没有的话抛出 NPE
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }
}

与 AQS 有关的锁

  • ReentrantLock
  • CountDownLatch
  • ReentrantReadWriteLock
  • Semaphore

源码分析

公平锁与非公平锁

Lock 的使用

    public static void main(String[] args) {
        Lock lock = new ReentrantLock();
        lock.lock();

        try {

        } finally {
            lock.unlock();
        }
    }

公平锁的 lock

新来的线程去获取锁。

非公平锁的 lock

非公平锁多了 compareAndSetState ,新来的线程去尝试设置 AbstractQueuedSynchronizer 中的 state 字段,一旦设置设置成功,新来的线程就成功获取到锁了。这也是不公平地方。因为此时可能 CLH 队列中有可能有等待的线程,也就是说新来的线程插队了。就像在银行窗口排队,新进来一个人看见窗口空闲,立马去抢占办理窗口,而没有取号。对于那些拿到号在排队的人来说,是不公平的。

如果新来线程抢到锁:setExclusiveOwnerThread(Thread.currentThread()); 将当前线程设置

公平锁和非公平锁的 tryAcquire 方法实现也不一样。公平多一个 !hasQueuedPredecessors() 判断队列是否有前置节点(除了当前线程外,判断队列是否为空)。公平锁判断如果队列中有等待的线程,当前线程就不能去尝试抢锁。

公平 非公平

判断除了当前线程外,CLA 队列中是否还有其他等待的线程。

    public final boolean hasQueuedPredecessors() {
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }
  • 公平锁:讲究先来先得,线程在获取锁时,如果 CLA 队列中已经有其他线程在等待,那么当前线程就会进入等待对列中。
  • 非公平锁:不管是否有等待队列,如果可以获取锁,则立即占有锁对象。也就是队列的头节点线程苏醒后,不一定能获得锁,它还是需要参加竞争锁(存在线程竞争的情况下),后来的线程可能不讲武德插队夺锁。

公平锁和非公平锁都会调用 acquire。

三个主要方法:

  1. tryAcquire:尝试抢锁
  2. addWaiter:将当前当前线程插入等待队列。
  3. acquireQueued:当前线程入队后,尝试在抢一把锁。
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

Lock 过程

场景:A、B、C 三个顾客,去银行办理业务,A 先到,此时窗口空无一人,他优先获得办理窗口的机会,办理业务。

    public static void main(String[] args) {
        Lock lock = new ReentrantLock();
        lock.lock();

        // A 耗时严重,估计长时间占有窗口。
        new Thread(() -> {
            lock.lock();
            try {
                System.out.println("come in A");
                // 暂停 20 分钟
                TimeUnit.MINUTES.sleep(20);
            } catch (Exception e) {
            } finally {
                lock.unlock();
            }
        }, "A").start();

        // B 是第 2 顾客,受理窗口被 A 占用,只能去等候区等待,进入 AQS 队列,等待 A 办理完成,尝试去抢占受理窗口
        new Thread(() -> {
            lock.lock();
            try {
                System.out.println("come in B");
            } catch (Exception e) {
            } finally {
                lock.unlock();
            }
        }, "B").start();

        // C 是第 3 顾客,受理窗口被 A 占用,只能去等候区等待,进入 AQS 队列,前边是 B
        new Thread(() -> {
            lock.lock();
            try {
                System.out.println("come in C");
            } catch (Exception e) {
            } finally {
                lock.unlock();
            }
        }, "C").start();
    }

初始化

当 A 顾客到来,银行受理窗口空闲,A 直接去办理业务。

compareAndSetState 期望 state 是 0,设置为 1。此时设置成功。setExclusiveOwnerThread(Thread.currentThread()); 就是 A 顾客去窗口办理业务。

        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

B 是第 2 顾客,受理窗口被 A 占用,只能去等候区等待,进入 AQS 队列,等待 A 办理完成,尝试去抢占受理窗口

顾客 B 调用 compareAndSetState(0,1) 失败。只能走 acquire(1)

        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

在 acquire(1) 中,调用 tryAcquire(arg) 尝试抢占锁,一路走到 nonfairTryAcquire(int acquires),返回 false。 在 acquire(1) 中,由于 tryAcquire(arg) 返回 false,!tryAcquire(arg) 为 true,因此调用 addWaiter(Node.EXCLUSIVE),将线程 B 入队。

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            // Node.EXCLUSIVE 添加独占模式
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }

        final boolean nonfairTryAcquire(int acquires) {
           // 当前线程为 B
            final Thread current = Thread.currentThread();
          	// 此时 A 在占用窗口,所以 state = 1
            int c = getState();
            if (c == 0) {
              	// 如果 c == 0,表示窗口空闲,赶紧再抢一把锁。
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
          	// getExclusiveOwnerThread() 获取的线程为 A
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
           // 此次线程 B 调用返回 false。
            return false;
        }

    private Node addWaiter(Node mode) {
      	// 为线程 B 生成 Node 对象。
        Node node = new Node(Thread.currentThread(), mode);
        
      	// 此时 tail 和 pred 为 null
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
      	// 入队
        enq(node);
        return node;
    }

    private Node enq(final Node node) {
       // 死循环
        for (;;) {
          	// 循环第一轮:此时 tail 和 t 为 null
            // 循环第二轮:此时 tail 和 t 为 哨兵节点
            Node t = tail;
          	// 为链表初始用
            if (t == null) { 
              	// 新增一个哨兵节点
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
              	// 循环第二轮:将 node 节点插入链表尾部,返回结果,跳出死循环。
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

至此节点B已经添加到得 CLH 队列中了,但是线程还没有阻塞。

  // node 是 B,  arg = 1
	final boolean acquireQueued(final Node node, int arg) {
    		// 异常退出,需要取消排队
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
               // 循环第一轮:p 节点为哨兵节点(即 head)
                final Node p = node.predecessor();
              	// 如果 p 是head,那么 p 节点抢锁优先级最高,这里尝试一下抢锁。本次抢锁失败
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
              	// 当获取(资源)失败后,检查并且更新结点状态
              	// 循环第一轮:shouldParkAfterFailedAcquire(p, node) 返回 false
              	// 循环第二轮:shouldParkAfterFailedAcquire(p, node) 返回 ture,进入 parkAndCheckInterrupt()
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

		// 当获取(资源)失败后,检查并且更新结点状态
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
      	// 第一次调用:pred 为 head 节点,node 是 B 节点,此时 ws = 0(默认值)
      	// 第二次调用:pred 为 head 节点,node 是 B 节点,此时 ws = -1(第一次修改)
        int ws = pred.waitStatus;
      	
        if (ws == Node.SIGNAL)
          	// 第二次调用:直接返回
            return true;
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
          	// 第一次调用:ws = 0,走这个分支,将 pred 的 waitStatus 设置为 -1
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

    private final boolean parkAndCheckInterrupt() {
      	//阻塞线程
        LockSupport.park(this);
        return Thread.interrupted();
    }


    // 表示线程获取锁的请求已经取消
    static final int CANCELLED = 1;
    // 表示线程已经准备好了,就等待资源释放
    static final int SIGNAL = -1;
    // 表示节点在等待队列中,节点线程等待唤醒。等待 condition 唤醒
    static final int CONDITION = -2;
    // 共享式同步状态获取将会无条件地传播下去,当前线程处在 SHARED 情况下,该字段才会使用。
    static final int PROPAGATE = -3;

C 是第 3 顾客,受理窗口被 A 占用,只能去等候区等待,进入 AQS 队列,前边是 B

顾客 C 与顾客 B 过程类似,只是不需要创建链表的哨兵节点,直接将节点 C ,插入队尾。

unLock 过程

如下图:线程 A 已经释放资源的过程。

    public void unlock() {
        sync.release(1);
    }

    public final boolean release(int arg) {
      	// 线程 A 尝试释放资源
        if (tryRelease(arg)) {
          	// 线程 A 释放资源完毕后。等待列表头部节点(哨兵节点除外)最应该抢占资源
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

        protected final boolean tryRelease(int releases) {
          	// getState() 获取的值为 1,releases = 1 则 c = 1 - 1 = 0
            int c = getState() - releases;
          	// 正确性校验
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
              	// 释放资源
                free = true;
                setExclusiveOwnerThread(null);
            }
          	// 将 state 设置为 0
            setState(c);
          	// 返回 true
            return free;
        }

		// 唤醒
    private void unparkSuccessor(Node node) {
      	// node 为哨兵节点:waitStatus = -1
        int ws = node.waitStatus;
        if (ws < 0)
          	// 将哨兵节点的 waitStatus 赋值为 0
            compareAndSetWaitStatus(node, ws, 0);
      
      	// s 为节点 B,waitStatus = -1
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {   // 此路不通
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
          	// 唤醒节点 B 的线程
            LockSupport.unpark(s.thread);
    }


    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this); // 线程 B 被唤醒
      	// 由于线程 B 被唤醒,Thread.interrupted() 为 false
        return Thread.interrupted();
    }

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
               // 2. 节点 B 的前置节点为 head (哨兵节点)
                final Node p = node.predecessor();
              	// p == head,节点 B 去抢夺资源成功。在非公平锁是,节点 B 有可能抢夺失败
                if (p == head && tryAcquire(arg)) {
                  	// 下边代码是从等待队列中移除节点 B。策略:将节点 B 作为哨兵节点,移除 p(之前的哨兵节点)
                    // 设置新的 head 为node(node 成为新的哨兵节点)
                    setHead(node);
                  	// 旧的哨兵节点断开链接,后续被垃圾回收
                    p.next = null; // help GC
                    failed = false;
                  	// 返回 false
                    return interrupted;
                }
              	// 1.线程 B 的 parkAndCheckInterrupt() 为 false,进入下次循环。
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

    public final void acquire(int arg) {
      	// 节点 B 的 acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 返回 false。此方法执行完毕。
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

如下图:线程 A 释放资源后的状态

如下图:线程 B 抢占资源后

线程取消排队过程

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
          	// 由于异常,线程走到此处
            if (failed)
              	// 此方法目的:将 node 节点从等待队列中移除。
                cancelAcquire(node);
        }
    }

    // 表示线程获取锁的请求已经取消
    static final int CANCELLED = 1;
    // 表示线程已经准备好了,就等待资源释放
    static final int SIGNAL = -1;
    // 表示节点在等待队列中,节点线程等待唤醒。等待 condition 唤醒
    static final int CONDITION = -2;
    // 共享式同步状态获取将会无条件地传播下去,当前线程处在 SHARED 情况下,该字段才会使用。
    static final int PROPAGATE = -3;

    private void cancelAcquire(Node node) {
        if (node == null)
            return;

    		// 将线程置空
        node.thread = null;
      
        Node pred = node.prev;
      	// pred.waitStatus > 0 表示节点状态为:已取消。node 之前节点也可能是已取消状态,需要找到最前边的不取消的节点给 pred
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;
        Node predNext = pred.next;
			
        node.waitStatus = Node.CANCELLED;

        // 如果 node 是 tail,需要特殊处理,直接将 pred 节点设置为 tail
        if (node == tail && compareAndSetTail(node, pred)) {
          	// pred.next = null
            compareAndSetNext(pred, predNext, null);
        } else {
            int ws;
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
                Node next = node.next;
              	// next.waitStatus <= 0:next 节点是非取消状态
                if (next != null && next.waitStatus <= 0)
                  	// pred.next = next
                    compareAndSetNext(pred, predNext, next);
            } else {
              	// 唤醒 node
                unparkSuccessor(node);
            }

            node.next = node; // help GC
        }
    }

线程取消排队的两种情况:

  1. 尾节点取消排队
  2. 中间节点取消排队

注意:可能存在已经连续取消的节点,一并处理了。