不断的学习,我们才能不断的前进
一个好的程序员是那种过单行线马路都要往两边看的人

Semaphore

Semaphore信号量,用于控制并发访问共享资源的线程数量,是基于AQS模板实现的。主要操作包括acquire() 获取一个许可、release() 释放一个许可。

初始化

Semaphore默认是非公平锁的机制,可以通过传入参数实现公平锁。

    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }
    
    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }

acquire

acquire 尝试去获取一个许可,直到该信号量获得成功 或者 该线程是可中断的。

    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

acquireSharedInterruptibly

以共享模式获取,如果被中断则中止。 通过首先检查中断状态,如果可中断则抛出异常。然后尝试去获取一个许可,获取成功则结束,否则排队进入阻塞队列

    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

tryAcquireShared

通过自旋来判断当前信号量是否可获取,不可以获取直接返回remaining(负数),如果可以获取则CAS获取一个许可,失败则自旋保证成功。

    protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }    

公平锁的话会先判断阻塞队列里面是否有节点,有的话直接退出,线程入队;没有的话才回去CAS获取许可。

acquireSharedInterruptibly

如果信号量获取失败,则进入阻塞队列;
如果当前结点的前驱结点是头结点,则尝试获取共享变量。获取成功则头结点出队 并 传播唤醒线程。
否则判断是否可以挂起线程。

private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED); // 进入阻塞队列,AQS模版方法
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) { // 如果前驱 结点是头结点,则再次尝试获取信号量
                    int r = tryAcquireShared(arg);
                    if (r >= 0) { // 如果获取到,队头的node就可以出队了
                        setHeadAndPropagate(node, r); // 头结点出对,并去唤醒线程
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                //不是队头的线程就要考虑挂起是否线程了
                // 前一个结点ws是SIGNAL则挂起线程
                // 前一个结点ws是Cancle则移除前面所有的Cancle结点
                // 设置前一个结点的等待状态为SIGNAL
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
                //  如果线程自旋中因为异常等原因获取锁最终失败,则调用此方法,
                //设置等待状态为cancle状态
            if (failed)
                cancelAcquire(node);
        }
    }

shouldParkAfterFailedAcquire

判断是否挂起当前线程:获取当前结点的前驱结点的waitStatus:

  • 如果是SIGNAL则返回true,进行线程阻塞,等待其他线程唤醒,阻塞失败则抛出异常,处理finally语句块。
  • 如果是Cancle(ws>0), 则移除前面所有的Cancle结点;否则设置前驱结点的等待状态为SIGNAL,返回false。
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            return true;
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

流程图:
semapha-de-acquire-3

当在阻塞队列里面tryAquireShare竞争到锁时,头结点会出队,会去传播唤醒阻塞队列里面的结点来竞争

release

尝试去释放一个信号量,然后去决定唤醒哪一个阻塞线程。

    public void release() {
        sync.releaseShared(1);
    }
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

tryReleaseShared

通过CAS去释放一个信号量,失败则自旋;

    protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }

doReleaseShared

唤醒阻塞队列里面的结点。首先判断阻塞队列里面是否有结点,没有则退出;有的话判断头节点等待状态是否等于SIGNAL,是的话CAS设置头节点等待状态为0,成功则唤醒后继的其他线程;失败则自旋。

    private void doReleaseShared() {
        for (;;) {
            Node h = head;
            // 看下阻塞队列里是否还有阻塞线程
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                //只要head成功得从SIGNAL修改为0,那么head的后继的代表线程肯定会被唤醒了。
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    // 唤醒线程
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            // 阻塞队列队头节点变化,说明队头已有被唤醒了
            if (h == head)                   // loop if head changed
                break;
        }
    }

Reference

Semaphore源码分析


目录