• 0

  • 点赞

  • 收藏

JAVA并发包解析(1)——AQS

1个月前

前言

java concurrent包下面有Semaphore、ReentrantLock和CountDownLatch等多线程的辅助类,而他们内部类都继承了一个抽象类——AbstractQueuedSynchronizer,简称AQS。AQS可以用于构建锁或同步器,一个抽象类能够实现各种同步器,其中的奥秘势必要探索一番。

正文

AQS

AQS内部由一个双向链表组成,使用了一个int类型来当作状态(state),对于这个state不同实现类有不同的理解方法,所以全靠实现类来重写抽象方法来定义state的理解方法。

未命名文件.png

compareAndSetState原子修改方法

Node节点共同组成了一个阻塞队列,每个Node都保存了线程实例以及状态,如果说AQS的实现类是锁的话,那么每一个Node都代表想要获取锁的线程,看一下Node源码:

static final class Node {
        static final Node SHARED = new Node();
        static final Node EXCLUSIVE = null;
        static final int CANCELLED =  1;
        static final int SIGNAL    = -1;
        static final int CONDITION = -2;
        static final int PROPAGATE = -3;

	volatile int waitStatus;
	Node nextWaiter;
	volatile Node prev;
	volatile Node next;
	volatile Thread thread;

	final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }
复制代码

prev和next表示前驱和后继节点,thread表示入队时候的线程,值得关注的是waitStatus和nextWaiter。

waitStatus

表示了当前节点的状态,一共有以下几种状态:

  • 阻塞状态(0),这个状态没有定义出来,实际表示node正在队列中等待获取锁
  • 取消状态(1),定义为CANCELLED,表示这个节点/线程已经被取消了
  • 信号状态(-1),定义为SIGNAL,表示当前节点的后继节点需要被通知运行
  • 条件状态(-2),定义为CONDITION,表示当前节点正在CONDITION队列中
  • 传播状态(-3),定义为PROPAGATE,表示当前场景下后续的acquireShared能够得以执行

根据以上定义,可以把大于0表示为取消,小于0表示为特殊的事件

nextWaiter

nextWaiter组成了一个单链表,可以表示为CONDITION队列;也可以是特殊值SHARED,表示当前节点是一个共享模式。如果为空表示EXCLUSIVE为独享模式。

nextOffset = unsafe.objectFieldOffset
                (Node.class.getDeclaredField("next"));

 private static final boolean compareAndSetNext(Node node,
                                                   Node expect,
                                                   Node update) {
        return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
    }
复制代码

对于链表节点的修改,也是需要原子性的。这里AQS使用了unsafe计算出next的偏移量,通过compareAndSwapObject去修改链表的next值

API

AQS提供了获取和释放状态的API,由以下几个方法组成

方法 描述
acquire(int) 排他模式下获取状态
acquireInterruptibly(int) 排他模式下获取状态,如果当前线程被中断抛出异常
acquireShared(int) 共享模式下获取状态
acquireSharedInterruptibly(int) 共享模式下获取状态,如果当前线程被中断抛出异常
release(int) 释放状态
acquireShared(int) 共享模式下释放状态

acquire和release相当于获取和释放锁操作,AQS并不关心我们如何获取锁,而是把它交给了子类实现:

  • tryAcquire/Share:检查当前状态下是否能够可以获取,分为共享和排他两种模式,由子类分别实现。
  • tryRelease/Share:当前状态是否允许释放,同上
  • isHeldExclusively:在排他模式下,状态是否被占用

通过以上api可以构成一个同步器,同步器的功能由子类实现。如果需要实现锁,那么Lcok和UnLcok对应acquire和release,我们只需要实现tryAcquire和tryRelease来判断是否可以获取锁以及释放的逻辑。


来看具体实现:

  • acquire和release
public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
复制代码

acquireQueued是一个自旋获取状态的操作,如果不符合获取条件且自旋失败,那么只能中断自己的线程,避免占用I0。

//向当前队列添加一个Waiter
private Node addWaiter(Node mode) {
	//保存线程信息
        Node node = new Node(Thread.currentThread(), mode);
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

//进队操作
private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { 
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
复制代码

在调用acquireQueued的时候先调用了addWaiter,将当前线程作为一个节点加入阻塞队列,enq是进队操作将新的节点加入到队列尾部变成tail,head是一个空节点。使用了原子方法如果多个线程同时入队同一时间只有一个线程能够成功入队,剩下的线程则循环继续添加。

++入队以后开始进行自旋获取锁操作++

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
复制代码

使用了failed和interrupted来表示成功和是否被打断。每次循环都判断prev节点是否为head并且是否有获取的条件。当获取失败后进入shouldParkAfterFailedAcquire和parkAndCheckInterrupt

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            return true;
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
复制代码

返回一个boolean表示线程是否应该被阻塞

  • 如果它的prev是信号状态,表示上一个线程已经设置了状态并且释放后会发出信号,则可以安全的阻塞。
  • 如果它的prev节点被取消了,就需要将这些节点移除队列重新连接。
  • 如果它的prev节点状态是0,则给它设置一个信号状态
private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }
复制代码

这里使用了unsafe的park方法,可以理解为线程的wait和notify。

private void cancelAcquire(Node node) {
     
        if (node == null)
            return;

        node.thread = null;

        Node pred = node.prev;
	//将被取消的节点给断开
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;
        Node predNext = pred.next;

        node.waitStatus = Node.CANCELLED;

     	//如果自己是尾节点,则移除自己
        if (node == tail && compareAndSetTail(node, pred)) {
            compareAndSetNext(pred, predNext, null);
        } else {
      
            int ws;
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {
                unparkSuccessor(node);
            }

            node.next = node; // help GC
        }
    }
复制代码

当自旋失败的时候,需要删除这个节点,删除这个节点也有讲究的。因为如果下面还有节点,next节点需要上一个节点是信号状态来通知自己,在这个过程中要断开自己并且设置上一个节点的状态为SIGNAL。如果我们找到了这个节点而点不是head,并且下一个节点是有效的,则可以将这两个节点给连接起来,如果不是这个情况就需要将下一个节点给唤醒让它自旋处理。

private void unparkSuccessor(Node node) {
 
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);
        Node s = node.next;
	//如果next节点是空或者被取消了,继续寻找找到一个符合条件的
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }
复制代码

unparkSuccessor主要是通知对next节点进行unpark,如果next是一个无效节点那么会从后往前找。因为是一个FIFO队列,所以要从尾部开始找。

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
复制代码

release就很简单了,加上子类实现的tryRelease,会通过unparkSuccessor唤醒下一个节点。


  • acquireShared和releaseShared
public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }
复制代码

共享模式只有在acquireShared<0的时候会进入自旋,因为共享模式是多个线程共享一个状态,当这个状态未达到的时候才会阻塞线程。

private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
复制代码

doAcquireShared和acquireQueued一定程度上相同,不同点是:

	if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
复制代码

主要是进入setHeadAndPropagate

 private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }
复制代码

下一个节点也是共享节点,则调用doReleaseShared

for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
复制代码

这个方法需要将当前head的状态设置为PROPAGATE,如果当前状态是SIGNAL则需要通知后续节点,并且当head被其他线程改变的时候也要自旋。

public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }
复制代码

releaseShared非常简单,当触发tryReleaseShared成功后调用doReleaseShared。

总结

通过对acquire和release进行分析,可以了解到我们自定义逻辑是在Try开头方法上进行了,而根据排他和共享模式的不同,AQS进行的处理不同,但本质上都是对FIFO队列进行的操作。使用队列来管理线程优点是可以保证线程的有序,可以针对某个线程进行唤醒而不是使用notifyAll唤醒全部线程。

ConditionObject

AQS内部有个实体类是ConditionObject,它是AQS功能的一个扩展。当我们使用AQS编写的锁的时候,可以生成多个条件变量,而条件变量可以阻塞和唤醒线程。

ConditionObject内部也有和AQS一样的队列被成为条件队列,使用CONDITION来表示条件状态:

 private transient Node firstWaiter;
 private transient Node lastWaiter;
复制代码

ConditionObject有三个方法:await、signal和signalAll对应线程的wait、notify和notifyAll,具体实现如下:

  • await
public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }
复制代码

首先调用addConditionWaiter新建一个节点添加到条件队列

private Node addConditionWaiter() {
            Node t = lastWaiter;
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }

private void unlinkCancelledWaiters() {
            Node t = firstWaiter;
            Node trail = null;
            while (t != null) {
                Node next = t.nextWaiter;
                if (t.waitStatus != Node.CONDITION) {
                    t.nextWaiter = null;
                    if (trail == null)
                        firstWaiter = next;
                    else
                        trail.nextWaiter = next;
                    if (next == null)
                        lastWaiter = trail;
                }
                else
                    trail = t;
                t = next;
            }
        }
复制代码

首先删除条件队列中不是CONDITION的节点,然后新建一个CONDITION的节点添加到队尾

int savedState = fullyRelease(node);

 final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            int savedState = getState();
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }
复制代码

将当前的状态给保存下来,释放掉当前状态。如果当前线程没有通过acquire获取状态,就会抛出异常。

这里将锁给释放掉是因为当前条件不满足,会让其他线程获取锁执行逻辑

while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
复制代码

当锁释放以后,这个线程就加入了条件队列中,isOnSyncQueue是判断是否在同步阻塞队列,也就是AQS队列中;因为当其他线程执行了signal的时候,会将条件队列的node放入到同步阻塞队列,如果这个node没有在同步阻塞队列说明还没有调用signal。

++先不看接下来的代码,看一下signal++

public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }
复制代码

条件变量是独享的,所以要子类实现isHeldExclusively不然抛出异常,真正逻辑是DoSignal

private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }
复制代码

while内部是将条件队列的第一个node迁移到同步队列,如果不成功就迁移下一个

final boolean transferForSignal(Node node) {
        
        // cas失败说明其他线程完成了转移,返回继续转移下个节点
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;
        
        // 将该节点加入同步阻塞队列
        Node p = enq(node);
        int ws = p.waitStatus;
        
        // 节点被取消的情况下,unpark唤醒线程返回true。让线程进行自旋
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }
复制代码

这个时候节点已经被放入同步阻塞队列了,这个时候线程在同步队列中有可能被其他线程给唤醒。还是看一下这段代码:

  while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
复制代码

线程被唤醒后,while的条件就不成立了,但是还会执行这个方法checkInterruptWhileWaiting,检查是否被中断

private int checkInterruptWhileWaiting(Node node) {
            return Thread.interrupted() ?
                (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
                0;
        }
复制代码

如果中断过,要判断是否为REINTERRUPT(退出await的时候重新设置中断)或者THROW_IE(await退出的时候需要抛出异常),具体逻辑看transferAfterCancelledWait,如果没有中断则是0。

final boolean transferAfterCancelledWait(Node node) {
        // 如果唤醒是由同步队列发生的,node状态就不再是CONDITION,说明中断是在signal前发生的
        if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
            enq(node);
            return true;
        }
        
        // 执行到这一步,说明上边CAS失败,说明waitstatus已不为CONDITION,即signal已经发生后才发生的中断
        // signal 方法会将节点转移到阻塞队列,但是可能还没完成,这边自旋等待其完成
        while (!isOnSyncQueue(node))
            Thread.yield();
        return false;
}
复制代码

因为唤醒这个线程并不一定是在同步队列中被唤醒,还可能是假唤醒、节点被取消了或修改SIGNAL失败而唤醒的,所以要判断。

++现在假设已经signal成功,该节点成功加入同步阻塞队列++

 if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null)
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
复制代码

将之前保存的state当作参数传入acquireQueued,这里的操作和直接调用acquire是一样的

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
复制代码

acquire有一个selfInterrupt的判断,而await同样会这样判断

 private void reportInterruptAfterWait(int interruptMode)
            throws InterruptedException {
            if (interruptMode == THROW_IE)
                throw new InterruptedException();
            else if (interruptMode == REINTERRUPT)
                selfInterrupt();
        }
复制代码

总结

AQS和ConditionObject都是用了队列来保存状态,可能会有一些绕,只要细心梳理逻辑就可以了解。通过TryAcquire方法来定义是否能够获取到状态,如果没有获取到状态则自旋阻塞,当某一个线程释放状态后会唤醒下一个节点,从而实现了多个节点同步。

免责声明:文章版权归原作者所有,其内容与观点不代表Unitimes立场,亦不构成任何投资意见或建议。

java

0

相关文章推荐

未登录头像

暂无评论