liuxiaoshui
发布于 2023-11-26 / 33 阅读
0
0

AQS之BlockingQueue源码跟踪记录

以ArrayBlockingQueue为例,使用如下

int BOUND = 10;//阻塞队列容量
BlockingQueue<Integer> queue = new ArrayBlockingQueue<Integer>(BOUND);

queue.put();//超过队列容量时则put()阻塞
queue.take();//队列容量为0时则take()阻塞

初始化逻辑

	public ArrayBlockingQueue(int capacity) {
	    this(capacity, false);//默认非公平锁
	}

    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);//创建一把锁
        notEmpty = lock.newCondition();//使用这把锁,创建条件对象
        notFull =  lock.newCondition();//使用这把锁,创建条件对象
    }

关于这三个属性,注释里很明白

/** Main lock guarding all access */
final ReentrantLock lock;

/** Condition for waiting takes */
private final Condition notEmpty;

/** Condition for waiting puts */
private final Condition notFull;

/生产者put,消费者take,ArrayBlockingQueue保证同一时刻只有一个线程能操作。

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();//以可中断方式获取锁
    try {
        while (count == items.length)
            notFull.await();
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();//以可中断方式获取锁
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

可以看出,put和take里都是先尝试获取锁,获取锁失败的线程,都会阻塞到parkAndCheckInterrupt()里:

private void doAcquireInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

此时,未获取锁的线程,已经入到CLH同步等待队列中,等待释放锁的线程调用unlock(),去唤醒CLH队首的线程。

put方法跟踪

假设此时某一个生产者线程p1获取了锁,此时执行到notFull.await();

public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();//以可中断方式获取锁
    try {
        while (count == items.length)//判断是否队列已满,
            notFull.await();//满了则阻塞
        enqueue(e);//未满则将操作元素e插入blockingqueue
    } finally {
        lock.unlock();
    }
}

进入notFull.await()方法

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();//入队条件等待队列
    int savedState = fullyRelease(node);//完全释放锁,即使本线程多次冲入,则释放多次,state直接置为0
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {//判断如果不在同步队列clh中,则需阻塞当前线程
        LockSupport.park(this);//生产者线程阻塞在此处
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

此处先不考虑可中断的情况,await()里逻辑包括以下几方面:

1.await作为往队列插入元素前的最后一道检查,需要保证,如果队列已满,则需阻塞当前线程,且 需要释放锁,供其他消费者线程获取后,执行take(),从而降低阻塞队列的容量。

2.不管put还是take(),操作队列中元素的前提都是获取锁,所以,未获取锁的线程,不管是是生产者还是消费者,都会进入clh中,而前面p1,在await里因为blockingqueue容量满,释放锁时,唤醒clh队首的线程,此时可能是生产者,也可能有消费者。

3.假设队首线程又是生产者,则此时该线程会拿到锁,又会进入到notFull.await(),然后判断已满,继续进入条件等待队列,继续释放锁,唤醒下一个clh队首线程,

如此重复,直到遇到下一个消费者线程。

4.注意一点,只有在clh队首的线程才能获取锁,条件等待队列中的线程是没法获取锁的,因为unlock里release方法里取要unpark的线程时,就是从clh队首head取的。

take方法跟踪

下面再看消费者take方法做了什么,notEmpty.await(),

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)//判断队列是否为空,即长度为0
                notEmpty.await();//
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

此处逻辑完全跟put里的一样,只不过,此时移到名为notEmpy的条件等待队列当中,

接上面p1线程释放锁,经过若干生产者线程继续移到上面的notFull的条件等待队列中,终于换醒了第一个消费者线程c1,

此时c1判断count=10,即执行dequeue(),

		private E dequeue() {
		    // assert lock.getHoldCount() == 1;
		    // assert items[takeIndex] != null;
		    final Object[] items = this.items;
		    @SuppressWarnings("unchecked")
		    E x = (E) items[takeIndex];
		    items[takeIndex] = null;
		    if (++takeIndex == items.length)
		        takeIndex = 0;
		    count--;
		    if (itrs != null)
		        itrs.elementDequeued();
		    notFull.signal();//notFull发出通知,然后其他生产者可以继续往队列插入元素了
		    return x;
		}


        public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;//取notFull条件等待队列首第一个
            if (first != null)
                doSignal(first);
        }
		
		private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&//移到sync queue
                     (first = firstWaiter) != null);
        }

	    final boolean transferForSignal(Node node) {
	        /*
	         * If cannot change waitStatus, the node has been cancelled.
	         */
	        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
	            return false;
	
	        /*
	         * Splice onto queue and try to set waitStatus of predecessor to
	         * indicate that thread is (probably) waiting. If cancelled or
	         * attempt to set waitStatus fails, wake up to resync (in which
	         * case the waitStatus can be transiently and harmlessly wrong).
	         */
	        Node p = enq(node);//移动到同步队列
	        int ws = p.waitStatus;
	        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
	            LockSupport.unpark(node.thread);//唤醒生产者线程
	        return true;
	    }		

唤醒生产者线程后,线程从await()里的park处恢复,

然后继续先去抢锁,

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)//抢到锁才继续,抢不到继续阻塞到此处
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

如果此时唤醒的p2线程,未在同步队列首位,则抢锁失败,又会阻塞在此。

如果抢锁成功,则往后执行,enqueue(e),插入元素。


评论