• 0

  • 点赞

  • 收藏

Java 队列同步器 AQS

1个月前

本文部分摘自《Java 并发编程的艺术》

概述

队列同步器 AbstractQueuedSynchronize(以下简称同步器),是用来构建锁(Lock)或者其他同步组件(JUC 并发包)的基础框架,它使用了一个 int 成员变量表示同步状态,通过内置的 FIFO 队列来完成资源获取线程的排队工作

同步器的主要使用方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状态,子类推荐被定义为自定义同步组件的静态内部类。同步器自身没有实现任何同步接口,它仅仅是定义了若干同步状态的获取和释放方法来供自定义组件使用

一言以蔽之,同步器是实现锁(也可以是任意同步组件)的一种方式,它屏蔽了更加底层的一些机制,使开发者更易于理解和使用

队列同步器的接口

同步器的设计是基于模板方法模式的,使用者需要继承队列同步器并重写指定的方法,随后将同步器组合在自定义同步组件的实现中,并调用同步器提供的模板方法,而这些模板方法将会调用使用者重写的方法

1. 访问或修改同步状态

重写同步器指定的方法时,需要使用同步器提供的如下三个方法来访问或修改同步状态:

  • getState()

    获取当前同步状态

  • setState(int newState)

    设置当前同步状态

  • compareAndSetState(int expect, int update)

    使用 CAS 设置当前状态,该方法能保证状态设置的原子性

2. 同步器可重写的方法

方法名称 描述
protected boolean tryAcquire(int arg) 独占式获取同步状态,实现该方法需要查询当前状态,并判断同步状态是否符合预期,然后再进行 CAS 设置同步状态
protected boolean tryRelease(int arg) 独占式地释放同步状态,等待获取同步状态的线程将有机会获取同步状态
protected int tryAcquireShared(int arg) 共享式获取同步状态,返回大于等于 0 的值,表示获取成功,否则获取失败
protected boolean tryReleaseShared(int arg) 共享式释放同步状态
protected boolean isHeldExclusively() 当前同步器是否在独占模式下被线程占有,一般该方法表示是否被当前线程所独占

3. 同步器提供的模板方法

方法名称 描述
void acquire(int arg) 独占式获取同步状态,如果当前线程获取同步状态成功,则由该方法返回,否则,将会进入同步队列等待,该方法将会调用重写的 tryAcquire(int arg) 方法
void acquireInterruptibly(int arg) 与 acquire(int arg) 相同,但该方法响应中断,当前线程未获取到同步状态而进入同步队列中,如果当前线程被中断,则该方法会抛出 InterruptedException 并返回
boolean tryAcquireNanos(int arg, long nanos) 在 acquireInterruptibly(int arg) 的基础上增加了超时限制
void acquireShared(int arg) 共享式的获取同步状态,与独占式获取的主要区别是在同一时刻可以有多个线程获取到同步状态
void acquireSharedInterruptibly(int arg) 与 acquireShared(int arg) 相同,该方法响应中断
boolean tryAcquireSharedNanos(int arg, long nanos) 在 acquireSharedInterruptibly 的基础上增加了超时限制
boolean release(int arg) 独占式的释放同步状态,该方法会在释放同步状态之后,将同步队列中第一个节点包含的线程唤醒
boolean releaseShared(int arg) 共享式的释放同步状态
Collection<Thread> getQueuedThreads() 获取等待在同步队列上的线程集合

4. 示例

下面通过一个独占锁的示例来深入了解一下同步器的工作原理。顾名思义,独占锁就是在同一时刻只能有一个线程获取到锁,其他获取锁的线程只能处于同步队列中等待,只有获取锁的线程释放了锁,后继的线程才能获取锁

public class Mutex implements Lock {

    /**
     * 自定义同步器
     */
    private static class Sync extends AbstractQueuedSynchronizer {

        @Override
        protected boolean isHeldExclusively() {
            // 是否处于占用状态
            return getState() == 1;
        }

        @Override
        public boolean tryAcquire(int acquires) {
            // 当状态为 0 时获取锁
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        @Override
        protected boolean tryRelease(int releases) {
            // 释放锁,将状态设置为 0
            if (getState() == 0) {
                throw new IllegalMonitorStateException();
            }
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        /**
         * 返回一个 Condition, 每个 condition 都包含一个 condition 队列
         */
        Condition newCondition() {
            return new ConditionObject();
        }
    }

    private final Sync sync = new Sync();

    @Override
    public void lock() {
        sync.acquire(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    @Override
    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(time));
    }

    @Override
    public void unlock() {
        sync.release(1);
    }

    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }
}
复制代码

Mutex 中定义了一个静态内部类,该内部类继承了同步器并实现了独占式获取和释放同步状态。用户使用 Mutex 时并不会直接和内部同步器实现打交道,而是调用 Mutex 提供的方法,大大降低了实现一个可靠自定义组件的门槛

队列同步器的实现

1. 同步队列

同步器依赖内部的同步双向队列来完成同步状态的管理,当前线程获取同步状态失败后,同步器会将当前线程及其等待状态等信息构造成一个节点,并加入同步队列,同时阻塞当前线程。当同步状态释放后,会把首节点中的线程唤醒,使其再次尝试获取同步状态

节点是构成同步队列的基础,同步器拥有首节点(head)和尾结点(tail),没有成功获取同步状态的线程将会成为节点并加入该队列的尾部

同步队列的基本结构如下:

同步器将节点加入到同步队列的过程如图所示:

首节点是获取同步状态成功的节点,首节点线程在释放同步状态时,会唤醒后继节点,而后继节点将会在获取同步状态成功时将自己设置为首节点,过程如下:

设置首节点是通过获取同步状态成功的线程来完成的,由于只有一个线程能够成功获取同步状态,因此设置头节点的方法并不需要使用 CAS 来保证,只需要将首节点设置成原首节点的后继节点并断开原首节点的 next 引用即可

2. 独占式同步状态获取与释放

通过调用同步器的 acquire(int arg) 方法可以获取同步状态,该方法对中断不敏感,线程获取同步状态失败则进入同步队列中,后续对线程进行中断操作,线程不会从同步队列中移出

独占式同步状态获取流程,也就是 acquire(int arg) 方法调用流程如图所示:

如果当前线程获取同步状态失败,就会生成一个节点(独占式 Node.EXCLUSIVE,同一时刻只能有一个线程成功获取同步状态),并加入到队列尾部。一个队列里有很多节点,而只有前驱节点是头节点的节点才能尝试获取同步状态,原因有两个:

  • 头节点是成功获取到同步状态的节点,而头节点的线程释放了同步状态之后,将会唤醒其后继节点,后继节点的线程被唤醒后需要检查自己的前驱节点是否是头节点
  • 维护同步队列的 FIFO 原则

因此,如果队列中的非头节点线程的前驱节点出队或者被中断而从等待状态返回,那么其随后会检查自己的前驱是否为头节点,如果是则尝试获取同步状态

当前线程获取同步状态并执行了相应逻辑之后,就需要释放同步状态,使得后继节点能够继续获取同步状态。通过调用同步器的 release(int arg) 方法可以释放同步状态,该方法执行时,会唤醒头节点的后继节点线程

3. 共享式同步状态获取与释放

共享式获取与独占式获取最主要的区别在于同一时刻能否有多个线程同时获取到同步状态。以文件的读写为例,若一个程序在对文件进行读操作,那么这一时刻对于该文件的写操作均被阻塞,而读操作能够同时进行。写操作要求对资源的独占式访问,而读操作可以是共享式访问,两种不同的访问模式在同一时刻对文件或资源的访问情况,如下图所示:

通过调用同步器的 acquireShared(int arg) 方法可以共享式地获取同步状态,其代码核心逻辑和 acquire() 差不多,也是判断当前节点的前驱是否为头节点,如果是就尝试获取同步状态。头节点在释放同步状态之后,也会唤醒后续处于等待状态的节点

问题的关键在于如何做到多个线程访问同步状态,因为按照上面所讲的过程,和独占式几乎没有任何区别。独占式与共享式在实现上的差别其实仅仅在于:每次头节点释放同步状态之后,独占式只是把其后继节点设置为头节点,而共享式还多了一个传播的过程(笔者能力有限,这一块没搞明白,就不瞎写了。。)

与独占式一样,共享式获取也需要释放同步状态,通过调用 releaseShared(int arg) 方法可以释放同步状态,并唤醒后续处于等待状态的节点

4. 独占式超时获取同步状态

通过调用同步器的 doAcquireNanos(int arg, long nanosTimeout) 方法可以超时获取同步状态,即在指定的时间段内获取同步状态

在介绍这个方法之前,先介绍一下响应中断的同步状态获取过程。Java5 以后,同步器提供了 acquireInterruptibly(int arg) 方法,这个方法在等待获取同步状态时,如果当前线程被中断,会立刻返回,并抛出 InterruptedException

超时获取同步状态可以视为响应中断获取同步状态的增强版。独占式超时和非独占式获取在流程上非常相似,其主要区别在于未获取到同步状态时的处理逻辑。acquire(int arg) 在未获取到同步状态时,会使当前线程一致处于等待状态,而 doAcquireNanos(int arg, long nanosTimeout) 会使当前线程等待 nanosTimeout 纳秒,如果当前线程在 nanosTimeout 纳秒内没有获取同步状态,将会从等待逻辑中自动返回

自定义同步组件

设计一个同步工具:同一时刻,只能允许至多两个线程同时访问,超过两个线程的访问将被阻塞。显然这是共享式访问,主要设计思路如下:

  • 重写 tryAcquireShared(int args) 方法和 tryReleaseShared(int args) 方法
  • 定义初始状态 status 为 2,当一个线程进行获取,status 减 1,该线程释放,status 加 1,为 0 时再有其他线程进行获取,则阻塞

示例代码如下:

public class TwinsLock implements Lock {

    private final Sync sync = new Sync(2);

    private static final class Sync extends AbstractQueuedSynchronizer {

        Sync(int count) {
            if (count <= 0) {
                throw new IllegalArgumentException("count must large than zero");
            }
            setState(count);
        }

        @Override
        public int tryAcquireShared(int reduceCount) {
            while (true) {
                int current = getState();
                int newCount = current - reduceCount;
                if (newCount < 0 || compareAndSetState(current, newCount)) {
                    return newCount;
                }
            }
        }

        @Override
        protected boolean tryReleaseShared(int reduceCount) {
            while (true) {
                int current = getState();
                int newCount = current + reduceCount;
                if (compareAndSetState(current, newCount)) {
                    return true;
                }
            }
        }

        Condition newCondition() {
            return new ConditionObject();
        }
    }

    @Override
    public void lock() {
        sync.acquireShared(1);
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    @Override
    public boolean tryLock() {
        return sync.tryAcquireShared(1) > 0;
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(time));
    }

    @Override
    public void unlock() {
        sync.releaseShared(1);
    }

    @Override
    public Condition newCondition() {
        return sync.newCondition();
    }
}
复制代码

再编写一个测试来验证 TwinsLock 是否按预期工作

public class TwinsLockTest {


    public static void main(String[] args) {

        final Lock lock = new TwinsLock();

        class Worker extends Thread {

            @Override
            public void run() {
                while (true) {
                    lock.lock();
                    try {
                        SleepUtils.second(1);
                        System.out.println(Thread.currentThread().getName());
                        SleepUtils.second(1);
                    } finally {
                        lock.unlock();
                    }
                }
            }
        }

        for (int i = 0; i < 10; i++) {
            Worker worker = new Worker();
            worker.setDaemon(true);
            worker.start();
        }

        for (int i = 0; i < 10; i++) {
            SleepUtils.second(1);
            System.out.println();
        }
    }
}
复制代码

运行该测试用例,发现线程名称成对输出,说明同一时刻只有两个线程能够获取到锁

免责声明:文章版权归原作者所有,其内容与观点不代表Unitimes立场,亦不构成任何投资意见或建议。

java

0

相关文章推荐

未登录头像

暂无评论