• 0

  • 点赞

  • 收藏

JAVA并发包解析(三)——常用同步器分析2

1个月前

前言

这篇文章继续分析concurrent包下各种同步器的实现原理。

正文

ReadWriteLock

ReadWriteLock顾名思义为读写锁,读请求是不涉及到资源的改变所以不需要加锁,而锁的出现是因为一边读一边写会造成不一致的情况。面对读请求多于写的情况下可以使用读写锁,在加读锁的情况下所有读请求均可正常请求,而写请求被阻塞;在加写锁的情况下,所有请求都被阻塞直到写锁释放。

public interface ReadWriteLock {
    Lock readLock();

    Lock writeLock();
}

复制代码

接口方法是分别获取读锁和写锁

public class ReentrantReadWriteLock
        implements ReadWriteLock, java.io.Serializable {
    private final ReentrantReadWriteLock.ReadLock readerLock;
    private final ReentrantReadWriteLock.WriteLock writerLock;
    final Sync sync;
复制代码

按照AQS的管理,Sync继承了AQS作为锁的实现类,先看以下Sync的源码

	//状态以16位来分割
 	static final int SHARED_SHIFT   = 16;
	//读状态的1位
        static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
	//读锁的最大次数
        static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
	//计算写锁次数
        static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

      	//右移16位就是读锁的次数
        static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
	//将前16位变成0,获取后16位的数就是写锁
        static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
复制代码

image.png

借用网上的图片,读写锁的状态是一个32的位的int类型,前32位为读状态,后32位为写状态,使用位来控制当前读线程重入的次数

static final class HoldCounter {
    int count = 0;
    final long tid = getThreadId(Thread.currentThread());
}
static final class ThreadLocalHoldCounter
    extends ThreadLocal<HoldCounter> {
    public HoldCounter initialValue() {
        return new HoldCounter();
    }
}
private transient ThreadLocalHoldCounter readHolds;
//最后一个线程的缓存数据
private transient HoldCounter cachedHoldCounter;
//第一个获取读锁的线程以及次数
private transient Thread firstReader = null;
private transient int firstReaderHoldCount;
复制代码

使用ThreadLocal来记录当前线程获取读锁的次数

        abstract boolean readerShouldBlock();
        abstract boolean writerShouldBlock();
复制代码

获取读写锁的try代码是通用的,定义了两个抽象方法交给子类实现,这两个方法主要是公平和非公平的判断

在读写锁的实现看,读锁对应了共享状态,写锁则是独享状态,所以读锁的实现是Shared结尾方法。

protected final boolean tryRelease(int releases) {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            int nextc = getState() - releases;
            boolean free = exclusiveCount(nextc) == 0;
            if (free)
                setExclusiveOwnerThread(null);
            setState(nextc);
            return free;
        }
复制代码

释放写锁的代码和可重入锁一样,因为是独占的,所以只需要减去状态即可,最后判断是否完全释放掉了锁

protected final boolean tryAcquire(int acquires) {
            
            Thread current = Thread.currentThread();
            int c = getState();
            int w = exclusiveCount(c);
            if (c != 0) {
             
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                setState(c + acquires);
                return true;
            }
            if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
                return false;
            setExclusiveOwnerThread(current);
            return true;
        }
复制代码

尝试获取写锁,还是得判断锁是否已经加上了,是则判断条件是否满足重入,否则尝试获取锁,逻辑很简单。

protected final int tryAcquireShared(int unused) {    
            Thread current = Thread.currentThread();
            int c = getState();
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
            int r = sharedCount(c);
            if (!readerShouldBlock() &&
                r < MAX_COUNT &&
                compareAndSetState(c, c + SHARED_UNIT)) {
                if (r == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                }
                return 1;
            }
            return fullTryAcquireShared(current);
        }
复制代码

尝试获取读锁有三个步骤:

  1. 判断其他线程是否持有写锁,有了写锁则不能获取读锁
  2. 第二阶段则是尝试增加读状态,并且根据是否为第一个获取读锁线程来记录
  3. CAS失败后到最终阶段fullTryAcquireShared
final int fullTryAcquireShared(Thread current) {
            HoldCounter rh = null;
            for (;;) {
                int c = getState();
		//同上步骤1
                if (exclusiveCount(c) != 0) {
                    if (getExclusiveOwnerThread() != current)
                        return -1;
                } else if (readerShouldBlock()) {
                    if (firstReader == current) {
                    } else {
                        if (rh == null) {
                            rh = cachedHoldCounter;
                            if (rh == null || rh.tid != getThreadId(current)) {
                                rh = readHolds.get();
                                if (rh.count == 0)
                                    readHolds.remove();
                            }
                        }
                        if (rh.count == 0)
                            return -1;
                    }
                }
                if (sharedCount(c) == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                if (compareAndSetState(c, c + SHARED_UNIT)) {
                    if (sharedCount(c) == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        if (rh == null)
                            rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                        cachedHoldCounter = rh;
                    }
                    return 1;
                }
            }
        }
复制代码

这部分进去自旋操作,和tryAcquireShared有一部重复了,也没有什么值得注意的点

protected final boolean tryReleaseShared(int unused) {
            Thread current = Thread.currentThread();
            if (firstReader == current) {
                if (firstReaderHoldCount == 1)
                    firstReader = null;
                else
                    firstReaderHoldCount--;
            } else {
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                int count = rh.count;
                if (count <= 1) {
                    readHolds.remove();
                    if (count <= 0)
                        throw unmatchedUnlockException();
                }
                --rh.count;
            }
            for (;;) {
                int c = getState();
                int nextc = c - SHARED_UNIT;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }

复制代码

释放读锁的主要逻辑在后面自旋的部分,将高16位减少1,再判断是否减少到0而触发共享通知

ReentrantReadWriteLock先介绍到这里,还有一些细节的部分建议自行阅读源码。

Semaphore

信号量的作用是提供一个当前线程访问的许可,当持有许可的时候线程才会执行任务,如果没有持有许可则阻塞,当线程执行完毕后回归许可。根据这个描述可以感觉到Semaphore实现非常简单,主要使用共享获取方式,当许可被归还的时候就可以共享通知线程了。

  Sync(int permits) {
            setState(permits);
        }

        final int getPermits() {
            return getState();
        }

复制代码

state表示许可的数量

通用释放方法

  protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current)
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }
复制代码

这个方法永远返回true,当CAS归还许可成功后触发共享通知

非公平获取许可

 final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

复制代码

非公平直接尝试获取,当状态为负数或者CAS失败表示获取不到,进入自旋后阻塞

公平获取许可

 protected int tryAcquireShared(int acquires) {
            for (;;) {
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }
复制代码

只是增加了一段判断是否有其他线程排队的逻辑

Semaphore实现机制简单,判断state的数量即可

总结

本期介绍了ReadWriteLock和Semaphore,再次感叹AQS的设计精妙。。。。

免责声明:文章版权归原作者所有,其内容与观点不代表Unitimes立场,亦不构成任何投资意见或建议。

java

0

相关文章推荐

未登录头像

暂无评论