• 0

  • 476

  • 收藏

源码分析 AQS 一

2个月前

《一行一行源码分析清楚 AbstractQueuedSynchronizer 一》

分析 java.util.concurrent 包源码时少不了要了解 AbstractQueuedSynchronizer (AQS) 这个抽象类,因为它是 Java 并发工具包的基础工具类,是实现 ReentrantLock、CountDownLatch、Semaphore、FutureTask 等类的基础工具类。

开篇尝试回答几个问题:

  1. AQS 的定位 ?
  2. AQS 的重要组成部分 ?
  3. AQS 运用的设计模式 ?
  4. AQS 实现的组件有哪些,分别是怎么应用的了解过吗 ?
  5. AQS 加锁和释放锁的流程说一下。
  6. AQS 对资源的共享方式有哪几种 ?有什么区别 ?

本文将从 ReentrantLock 的公平锁源码出发,分析下 AQS 这个抽象类是怎么工作的。

AQS 结构

先看看 AQS 有哪些属性,搞清楚这些基本就知道 AQS 是什么套路了。

// 头结点,直接把它当做是持有锁的线程是最好理解的
private transient volatile Node head;

// 阻塞的尾结点,每个新节点进来都插入到最后,也就形成了一个链表
private transient volatile Node tail;

// 这个是最重要的代表当前锁的状态。0 代表没有被占用,大于 0 表示有线程持有了当前锁
// 这个值可以大于 1 因为锁是可以重入的,每次重入 +1
private volatile int state;

// 代表当前持有锁的线程;举个最重要的使用例子:可重入锁
// reentrantLock.lock() 支持嵌套多次调用即可重入,所以每次都用这个判断当前线程是否已经拥有了锁
// if (Thread.currentThread == exculsiveOwnerThread) {state++;}
private transient Thread exculsiveOwnerThread;
复制代码

AQS 等待队列示意图如下所示,注意之后的分析过程所说的 queue 也就是阻塞队列是不包含 head 的,head 是当前持有锁的线程。

Node 结构

等待队列中每个线程被包装成一个 Node 实例,设定节点前驱和后继节点,将节点串联成链表。一起来看下 Node 内部类的源码:

static final class Node {
			  // 标记节点当前在共享模式下
        static final Node SHARED = new Node();
				// 标记节点当前在独占模式下
        static final Node EXCLUSIVE = null;

				// 当前线程取消了锁的竞争
        static final int CANCELLED =  1;
				// 当前 Node 的后继节点需要被唤醒
        static final int SIGNAL    = -1;
				// 当前 Node 等待在 Condition 上
        static final int CONDITION = -2;
        // 标识下一次共享式同步状态将会被无条件(unconditionally)的传播下去
        static final int PROPAGATE = -3;

        /**
         *   SIGNAL:     后继节点的线程处于等待状态,如果当前节点的线程释放了同步状态或者被取消,
         *							 将会唤醒后继节点使后继节点继续运行。
         *
         *   CANCELLED:  在同步队列中等待的当前线程发生等待超时或被中断,从同步队列中取消等待,节点进入取消状态后不会再发生变化。
         *
         *   CONDITION:  当前节点在等待队列中,节点线程等待在 Condition 上;当其他线程对 Condition 调用了 signal() 方法后,				 *							 该节点会从等待队列转移到同步队列,加入对同步状态的竞争。
         *
         *   PROPAGATE:  下一次共享式同步状态获取将会无条件传播下去。
         *
         *   0:          默认值,什么也不表示。
         */
        volatile int waitStatus;

        // 前驱节点引用
        volatile Node prev;

        // 后继节点引用
        volatile Node next;

        // 当前线程
        volatile Thread thread;

  		  // 在条件队列等待的当前节点的后继或者是特殊值 SHARED
        Node nextWaiter;
    }
复制代码

上面是基础知识,下文会多次用到要牢记,心中想着这个结构图就可以,再强调一遍我们说的阻塞队列不包含 head 节点。

ReentrantLock 使用

首先我们看下 ReentrantLock 的使用方式:

// 我用个web开发中的service概念吧
public class OrderService {
    // 使用static,这样每个线程拿到的是同一把锁,当然,spring mvc中service默认就是单例,别纠结这个
    private static ReentrantLock reentrantLock = new ReentrantLock(true);

    public void createOrder() {
        // 比如我们同一时间,只允许一个线程创建订单
        reentrantLock.lock();
        // 通常,lock 之后紧跟着 try 语句
        try {
            // 这块代码同一时间只能有一个线程进来(获取到锁的线程),
            // 其他的线程在lock()方法上阻塞,等待获取到锁,再进来
            // 执行代码...
        } finally {
            // 释放锁
            reentrantLock.unlock();
        }
    }
}
复制代码

ReentrantLock 在内部使用了抽象静态内部类 Sync 管理锁,所以真正地实现获取锁和释放锁是由 Sync 的实现类完成的。

abstract static class Sync extends AbstractQueuedSynchronizer {
}
复制代码

ReentrantLock 有两个实现分别是 NofairSync 和 FairSync,默认是 NoFairSync。

public ReentrantLock() {
        sync = new NonfairSync();
    }

public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }
复制代码

tryAcquire

我们先看 FairSync 的部分。

static final class FairSync extends Sync {
        private static final long serialVersionUID = -3000897897090466540L;

        final void lock() {
            acquire(1);
        }
  			
			  // 来自父类 AQS
  			// 先尝试获取锁,如果幸运获取成功,该方法就结束了
        // 否则 addWaiter 会将包装有当前线程的 Node 压入等待队列中, acquireQueued 抢占锁或者挂起抢占失败的锁,进入等待状态
  			public final void acquire(int arg) { // 此时 arg = 1
        // 看名字就知道这个方法只是试一试,如果 tryAcquire(1) 成功就不需要进队列排队等待了  
        if (!tryAcquire(arg) &&
            // 重点!!真正的线程被挂起,然后被唤醒后再次尝试获取锁都在 acquireQueued 方法里
            acquireQueued(
                // 当前线程包装成Node,进入阻塞队列,EXCLUSIVE 代表独占模式,是 Null 值
                addWaiter(Node.EXCLUSIVE), arg)
            )
            selfInterrupt();
    		}

        /**
         * Fair version of tryAcquire.  Don't grant access unless
         * recursive call or no waiters or is first.
         */
  			// 尝试获取锁,返回值是 boolean 代表加锁是否成功
  		 	// 返回 true: 1.锁的重入,线程本来就持有锁,理所当然可以直接获取;或者 2.没有其他线程在排队等待锁;
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            // 获取当前锁的状态 state
            // 0 表示没被占用,大于 0 表示已经有线程持有了当前锁
            // 这个值可以大于 1 因为锁是可以重入的
            int c = getState();
            if (c == 0) {
              	// 虽然此时此刻锁是可用的,即是公平锁见名知意,得先查看当前 Node 前面有没有正在排队的节点
	              // 是阻塞队列第一个节点才尝试抢锁
                if (!hasQueuedPredecessors() &&
                    // 通过 CAS 将当前锁状态从无锁 (state=0) 更新到 acquires
                    // 如果成功了就获取到锁了,否则就说明就在刚刚有另一个线程抢先了,因为我刚刚还判断 c==0 没人的
                    compareAndSetState(0, acquires)) {
                    // 把独占线程设成自己
                    setExclusiveOwnerThread(current);
                    // 抢占锁成功
                    return true;
                }
            }
            // 如果进到这个循环代表重入锁情况,即当前锁之前被自己抢占到了
            else if (current == getExclusiveOwnerThread()) {
                // 递增锁状态 state 记录当前锁被重入次数,释放锁时要一一减回来
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                // volatile 写 state 变量,刷新最新值到主内存
                setState(nextc);
                return true;
            }
            // 走到这里代表锁已经被别人抢占了,尝试抢锁失败
          	// 回到上一步外层调用方法继续看:
          	// if (!tryAcquire(arg) 
          	//					&& acquireQueued(
            //    	addWaiter(Node.EXCLUSIVE), arg))
            // 	selfInterrupt();
            return false;
        }
    }
复制代码

addWaiter

tryAcquire(arg) 返回 false 时,代码将继续执行 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)

这个方法首先需要执行 addWaiter(Node.EXCLUSIVE)

当前方法的作用是把尝试获取锁失败的当前线程包装成 Node,入队阻塞队列。此时方法的 mode 是 Node.EXCLUSIVE 代表独占模式

private Node addWaiter(Node mode) {
        // 当前线程封装到 Node,这里的 mode 是EXCLUSIVE,nextWaiter 是 null 独占模式
        // 什么是独占模式,怎么独占,共享又是怎么共享?
        // 当前 node 的 waitStatus 是 0
        Node node = new Node(Thread.currentThread(), mode);
        // pred 暂存尾结点
        Node pred = tail;
        // tail != null 队列不为空(tail==head 时其实队列也是空的,看到下面队列初始化就明白了)
        if (pred != null) {
            // 当前 node 的前驱指向 tail
            node.prev = pred;
            // 尝试 CAS 把自己设成队尾,如果成功后 tail == node,当前节点成为了阻塞队列的新尾巴
            if (compareAndSetTail(pred, node)) {
                // CAS 设置自己成队列 tail 成功,node 已入队
                // 注意阻塞队列不包括 head 节点,head 一般指的是占有锁的线程
                // 之前尾结点的后驱指向 node,配合前面的 node.prev = pred; 至此建立了和之前尾结点的双向连接
                pred.next = node;
                return node;
            }
        }
        // 如果能到这里有两种情况
        // 说明 pred == null(队列是空的)或者 CAS 入队 node 节点失败,有其他线程 node 在竞争入队
        // 对于第一种情况,队列为空时,初始化当前队列然后循环尝试入队
        // 对于第二种情况,CAS 设置 tail 的过程,第一次抢不到(有线程在竞争入队)就采用自旋的方式入队
        enq(node);
        // 返回当前线程Node节点,该节点已入队
        return node;
    }
复制代码
enq

当第一次CAS尝试当前 Node 入队阻塞队列失败时,会调用 enq(node) 方法循环入队直至成功。返回值是原先的尾结点。

private Node enq(final Node node) {
        for (;;) {
            // volatile 变量 tail,获取最新的 tail 值
            Node t = tail;
            // tail == null,初始化阻塞队列
            // 可见阻塞队列是延迟初始化的,等到新节点入队时才初始化
            if (t == null) { // Must initialize
                // 初始化 head 节点,无参Node构造方法,thread 成员变量是 null,waitStatus 默认值 0
              	// 还是 CAS 操作,你懂得现在可能是多个线程同时初始化队列
                if (compareAndSetHead(new Node()))
                    // 这时候有了 head 但 tail 还是 null,将 tail 指向 head,tail == head
                    // 注意这里只是 tail = head,没有 return, 没有 return
                    // 这就可以在下次循环时争抢设置 tail
                  	// 设置完后继续循环,再进来tail 就有值了,队列那时非空 
                    tail = head;
            } else {
                // 当前节点的前驱指向当前 tail
                node.prev = t;
                // CAS 设置自己成队尾 tail,尝试入队
                if (compareAndSetTail(t, node)) {
                    // CAS 设置 tail 为 node 成功
                    // 将之前 tail 节点的后驱指向当前 node,建立 node 和之前尾结点的双向连接
                    t.next = node;
                    // 返回之前的尾结点,即当前尾结点的前驱节点
                    return t;
                }
            }
        }
    }
复制代码

acquireQueued

现在又回到了 acquireQueued(final Node, int arg) 方法,此时当前线程 Node 节点已经成功入队。

注意如果 acquireQueued 方法返回 true 的话,会中断当前线程,所以正常情况下应该返回 false。

public final void acquire(int arg) {
        // 尝试获取锁,获取成功方法就结束了
        if (!tryAcquire(arg) &&
            // 重点!!真正的线程被挂起,然后被唤醒后去获取锁都在 acquireQueued 方法里
            acquireQueued(
                // 当前线程包装成Node,进入阻塞队列,EXCLUSIVE 代表独占模式,是 Null 值
                addWaiter(Node.EXCLUSIVE), arg)
            )
            selfInterrupt();
    }
复制代码

这个方法非常重要,真正的线程挂起、唤醒后再次尝试获取锁操作,都在 acquireQueued 方法里。

final boolean acquireQueued(final Node node, int arg) { // 此时arg是1
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                // 当前node节点的前驱
                final Node p = node.predecessor();
                // p == head 代表当前 node 节点虽然进入到阻塞队列了,
              	// 而且是阻塞队列的第一个(阻塞队列不包含 head!!),可以尝试获取锁
              	// 这里说下为什么当前节点为什么可以去试试:
	              // 首先它是队头,其次当前 head 有可能是刚刚初始化的 node
                // enq 方法里提过,head 是延迟初始化的 head = new Node
              	// 此时 head 节点没有指定线程,没有线程在占用锁,所以作为队头的 node 可以试试获取锁
                if (p == head && tryAcquire(arg)) {
                    // 抢占锁成功,volatile 写将 head 指向 node
                    // 置空 node 的 thread 为什么要置空 thread ??
                    // 置空 node 的前驱节点,毕竟是 head 节点了再无前驱
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 到这里说明说明上面的 if 分支不成立,要么当前Node不是队头要么 tryAcquire 没有抢赢别人
              	// 这个方法的意思是 "当前线程没有抢到锁,判断是否需要挂起当前线程?"
                if (shouldParkAfterFailedAcquire(p, node) &&
                    // 需要挂起时,park 挂起当前线程,unpark 后检查当前申请的中断标识
                    parkAndCheckInterrupt())
                    // 如果有其他线程申请过中断时打标记
                    interrupted = true;
            }
        } finally {
            // failed 是 true 时取消抢占锁
            // 什么时候 failed == true ? tryAcquire(arg) 报错抛出异常时候
            if (failed)
                // 抢不到,不抢了。取消抢占锁
                cancelAcquire(node);
        }
    }
复制代码
shouldParkAfterFailedAcquire

接着看 shouldParkAfterFailedAcquire(predessor, node) 方法,这个方法定义了"当前线程没有抢到锁时,是否需要挂起?"

// 第一个参数 pred 代表前驱结点,第二个参数 node 代表当前节点
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        // 前驱节点的 waitStatus
        int ws = pred.waitStatus;
        // wx == -1 前驱结点状态正常释放锁时,可以唤醒当前节点。
  			// 当前 node 线程就安心被挂起
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        // 前驱结点 waitStatus > 0 代表前驱节点已取消抢占锁
        // 前驱节点都取消抢占锁了那谁来 unpark 唤醒我呢(当前节点 node)
        // 所以需要 while 循环如果前驱节点取消抢占锁,那就找前驱的前驱;
        // 直到找到 waitStatus <= 0 的前驱结点做当前节点的前驱,找个好爹还得靠它唤醒当前线程呢。
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            // 将循环找到的前驱结点的后继指向当前节点,建立两者的双向连接
            pred.next = node;
        } else {
            // 进到这里说明,waitStatus 不是 -1 和 1,那只可能是 0 -2 -3
            // 在前面的源码中没有看到 waitStatus 的显式设置,所以是 Node 创建时的默认值 0
            // 正常情况下前驱节点是之前的 tail,那么它的 waitStatus 就是 0
            // 用 CAS 将前驱节点的 watStatus 设置为 -1
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        // 返回 false 再循环一遍进入当前方法,此时前驱节点就满足waitStatus == -1 从第一个条件返回 true
        return false;
    }
复制代码

我们分析下 shouldParkAfterFailedAcquire 方法返回值:

如果返回 true

说明前驱节点 waitStatus = SIGNAL (-1) 代表正常情况,那么当前线程是需要被挂起的,等到前驱结点拿到锁时候唤醒当前节点就好了。

if (shouldParkAfterFailedAcquire(p, node) &&
       // 需要挂起时,park 挂起当前线程,unpark 后检查当前申请的中断标识
       parkAndCheckInterrupt())
       // 如果有其他线程申请过中断时打标记
	interrupted = true;

// 前面方法返回 true 执行这个方法挂起当前线程
// 这里用到了 LockSuport.park(this) 挂起当前线程,然后就阻塞在这里了
// 等待前驱结点拿到锁执行 unpark() 唤醒后继节点的自己
private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
  			// interrupted 方法检查阻塞期间是否有线程来申请过中断,该方法会清除线程中断标识
  			// 回到外层调用处,对 interrupted 打标记向外抛
  			// 最终再次执行 Thread.currentThread().interrupt(); 恢复中断现场,保持中断状态不丢失
        return Thread.interrupted();
    }
复制代码

如果返回 false

说明前驱节点 waitStatus 不是 -1,尚不满足当前节点挂起的条件,没人能唤醒它。

仔细看 shouldParkAfterFailedAcquire(p, node) 我们发现其实第一次进来,都不会返回 true 的。阻塞队列中每个节点构造时,waitStatus 都是默认值 0,也就是说我还没有给前驱结点设置 -1 呢,怎么可能会返回 true。

注意看 shouldParkAfterFailedAcquire 方法是嵌套在 for 循环里的,等到第二次进入该方法时,waitStatus 就是 -1 了。

为什么 shouldParkAfterFailedAcquire 返回 false 不挂起当前线程呢 ?

是考虑经过该方法后面,前驱结点直接是 head 的情况,已经是队头的节点 Node 应该继续尝试加锁而不是挂起。

解锁操作

最后介绍下唤醒的动作,我们知道正常情况下,如果线程没有获取锁,线程会被 LockSupport.park(this) 挂起停止,等待被唤醒。

public void unlock() {
        sync.release(1);
    }

public final boolean release(int arg) {
  			// 尝试释放锁,对锁同步状态 state 减一
  			// 返回 true 如果 state == 0 代表锁完全被释放
  			// 返回 false 如果 state > 0 代表还有重入锁未被释放
        if (tryRelease(arg)) {
            Node h = head;
            // head != null && head.waitStatus != 0
            if (h != null && h.waitStatus != 0)
                // unpark 唤醒后继节点
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
复制代码

我们看下 tryRelease 方法:

protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            // 是否完全释放锁
            boolean free = false;
            if (c == 0) {
                // 其实就是重入的问题,当 c == 0 即没有重入锁了
                free = true;
                // 置空独占锁 释放锁
                setExclusiveOwnerThread(null);
            }
            // volatile 写 state
            setState(c);
            return free;
        }
复制代码

完全释放锁以后,执行 unparkSuccessor 从队尾 tail 往前遍历查找排名最靠前的 ws <= 0 非取消状态的节点。

private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        // 如果 head 节点 waitStatus < 0 将其修改为 0
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        // 唤醒后继节点,有可能后继节点取消了等待 (ws == 1)
        // 从队尾往前找,排名最靠前的 ws <= 0 的节点,不必担心中间节点取消(ws == 1)的情况
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            // 唤醒被阻塞的线程
            LockSupport.unpark(s.thread);
    }
复制代码

查找要唤醒的节点,为什么需要从队尾往前找呢 ?

唤醒线程以后,被唤醒的线程将从以下代码中继续往前走,再次尝试竞争锁。

private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
  			// 从这里继续执行
        return Thread.interrupted();
    }

// 再次回到 acquireQueued(node, arg) 进入下次循环尝试抢占锁
final boolean acquireQueued(final Node node, int arg) {...}
复制代码

锁的取消

当 tryAcquire 抛出异常时,当前节点退出锁的竞争,抢不到锁那就不抢了,取消Node节点。

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            // failed 是 true 时取消抢占锁
            // 什么时候 failed == true ?? tryAcquire(arg) 报错时候,通过重试解决不了的错误
            if (failed)
                // 抢不到,不抢了。取消抢占锁
                cancelAcquire(node);
        }
    }
复制代码

继续看下 cancelAcquire(node) 方法:

private void cancelAcquire(Node node) {
        // Ignore if node doesn't exist
        if (node == null)
            return;

        node.thread = null;

				// 跳过已取消的 node 前驱结点
        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;

        // predNext is the apparent node to unsplice. CASes below will
        // fail if not, in which case, we lost race vs another cancel
        // or signal, so no further action is necessary.
  			// 前驱节点的后继节点,可能是当前正在取消的 node 或者 node 已取消状态的先驱节点
        Node predNext = pred.next;

        // Can use unconditional write instead of CAS here.
        // After this atomic step, other Nodes can skip past us.
        // Before, we are free of interference from other threads.
        node.waitStatus = Node.CANCELLED;

        // If we are the tail, remove ourselves.
  			// 如果我自己就是 tail 节点,那 CAS 把 tail 更新成前驱结点(非取消状态 ws <= 0)
        if (node == tail && compareAndSetTail(node, pred)) {
            compareAndSetNext(pred, predNext, null);
        } else {
            // If successor needs signal, try to set pred's next-link
            // so it will get one. Otherwise wake it up to propagate.
          	// 进到这里有两种情况 
          	// 1. node 不是 tail 节点
          	// 2. node 是 tail 节点,但和其他的线程竞争设置 tail 没抢过
            int ws;
          	// pred != head && pred.thread != null 
          	// && (前驱节点 waitStatus == -1 || 前驱节点 CAS 更新 waitStatus = -1 成功)
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                  	// CAS 更新后继节点,将 node 的后继指向 pred 后继节点
                    compareAndSetNext(pred, predNext, next);
            } else {
              	// 唤醒当前节点后继
                unparkSuccessor(node);
            }

            node.next = node; // help GC
        }
    }
复制代码

总结

在并发环境下,加锁和解锁需要以下三个部件的协调:

  • 锁状态。我们用 state 来记录当前锁是否被占用;它为 0 时代表没有线程占有锁,用 CAS 将 state 设为 1,如果 CAS 成功说明抢到了锁,其他线程就抢不到了;如果锁重入的话 state + 1就行,解锁就是 state -1 直到 state = 0 代表所被释放,然后唤醒等待队列中的第一个线程让其来占有锁。
  • 锁的阻塞和解除阻塞。AQS 用 LockSupport.park(thread) 挂起线程,用 unpark(thread) 来唤醒线程。
  • 阻塞队列。争抢锁的线程可能有很多,但只能允许一个线程拿到锁,其他线程必须等待锁的释放;这时需要一个 queue 管理这些线程,AQS 使用的是一个 FIFO 队列,就是一个链表。每个 Node 都持有后继节点的引用。AQS使用CLH锁的变体来实现

下面属于回顾环节,用简单的实例快速过一遍。

首先,第一个线程调用 reentrantLock.lock() 时,tryAcquire(1) 直接抢占锁成功返回 true 结束了。这里只是设置了 state = 1,连 head 都没有初始化更谈不上阻塞队列。如果线程是串行执行的,线程1 unlock() 完第二个线程才来调用 lock() 方法抢占锁,那世界就太太平了,就没有 AQS 什么事了。

所以当第一个线程持有锁期间,第二个线程调用 lock() 方法,发生了什么?

线程 2 会初始化阻塞队列 head 和 tail 节点,同时线程2会入队阻塞队列并挂起;注意这里是一个 for 循环,初始化 head 和 tail 后并没有退出循环,而是只有入队成功才会跳出循环。

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}
复制代码

首先是线程2初始化 head 节点,此时 head == tail, waitStatus = 0

然后线程2入队

同时我们也要看到此时节点的 waitStatus;我们知道 head 节点是由线程2初始化的,初始化时没有设置 waitStatus 就是 Java 提供的默认值 0,但是执行 shouldParkAfterFailedAcquire 方法时,线程2会把前驱结点也就是 head 的 waitStatus 设为 -1。

那线程2节点此时的 waitStatus 是多少呢?由于没有设置,所以是 0。

如果线程3此时再进来,直接插入到线程2的后面作为 tail 节点,此时线程2和3的 waitStatus 都是0;线程3执行到 shouldParkAfterFaildedAcquire 时,会把前驱结点线程2的 waitStatus 设为 -1。

这里可以简单说下 SIGNAL(-1) 状态的意思;Doug Lea 注释是:代表后继节点需要被唤醒,也就是说这个 SIGNAL 代表的是后继节点的状态而不是当前节点的状态;每个 Node 入队时把前驱节点的 waitStatus 设为 -1,然后阻塞等待前驱结点释放锁时唤醒它。

(全文完)

免责声明:文章版权归原作者所有,其内容与观点不代表Unitimes立场,亦不构成任何投资意见或建议。

php

476

相关文章推荐

未登录头像

暂无评论