AQS同步器以及各类lock锁的使用

标签:

本文出自jvm123.com-java技术分享站:http://jvm123.com/2020/04/aqs-lock-cas.html

AQS是 AbstractQueuedSynchronizer 的简称,即抽象队列同步器,就是使用队列的方式实现线程同步的框架。从类结构图看如下:

AQS同步器以及各类lock锁的使用插图

Node类,thread表示要执行的线程,pre和next表示前后节点的引用,用于实现等待执行线程的双向队列。nextWaiter,是condititon单向队列的指针,用于存放等待被唤醒的线程,这些线程被唤醒后,会被加入到等待执行的双向队列中。

AbstractQueuedSynchronizer中有内部类Node,AQS持有以head为头tail结尾的双向链表,每个线程Thread都保存在Node节点中。即,AQS中有一个双向链表,每个Node中都包含了需要执行的线程。unsafe用来执行cas操作,compare and set。

AQS同步器以及各类lock锁的使用插图(1)

AQS是一个抽象类,提供了使用队列进行同步线程的思想。具体使用时,需要子类实现tryAcquire(int)tryRelease(int)等方法。在AQS中这两个方法如下,不能直接使用:

    protected boolean tryAcquire(int arg) {
       throw new UnsupportedOperationException();
  }
   protected boolean tryRelease(int arg) {
       throw new UnsupportedOperationException();
  }

Semaphore 信号量

举例使用场景:29个clerk使用两台打印机,他们的打印请求被保存在队列中等待,直到有空闲的打印机,队列中的线程才会依次执行。

package com.yawn.juc;

import java.util.concurrent.Semaphore;

/**
* Semaphore 信号量,有限资源下线程等待执行案例
* @author yawn http://jvm123.com
* 2020/4/26 10:06
*/
public class SemaphoreTest {

   public static void main(String[] args) {
       Semaphore semaphore = new Semaphore(2);
       Clerk clerk = new Clerk(semaphore);
       for (int i = 0; i < 30; i++) {
           new Thread(clerk, "客户 " + i).start();
      }

  }
}

class Clerk implements Runnable {

   private Semaphore semaphore;

   public Clerk(Semaphore semaphore) {
       this.semaphore = semaphore;
  }

   @Override
   public void run() {
       try {
           semaphore.acquire();
           System.out.println(System.currentTimeMillis() + " -------" + Thread.currentThread().getName() + " is running---------");
           Thread.sleep(1003);
           System.out.println(System.currentTimeMillis() + " -------" + Thread.currentThread().getName() + " has completed------");
           semaphore.release();
      } catch (InterruptedException e) {
           e.printStackTrace();
      }
  }
}

执行结果表明,每一时刻,最多只有两个线程在运行,其他的都在队列中等待。如下:

 1587888915215 -------客户 0 is running---------
 1587888915216 -------客户 1 is running---------
 1587888916219 -------客户 0 has completed------
 1587888916219 -------客户 2 is running---------
 1587888916220 -------客户 1 has completed------
 1587888916220 -------客户 4 is running---------
 1587888917222 -------客户 2 has completed------
 1587888917222 -------客户 3 is running---------
 1587888917224 -------客户 4 has completed------
 1587888917224 -------客户 6 is running---------
 1587888918226 -------客户 3 has completed------
...

CyclicBarrier

CyclicBarrier 意思为可循环使用的栅栏锁(类似于闭锁)。

举例使用场景:33个乘客Passenger去乘车,每辆车满5人才会发车,所以先上车的乘客线程等待,知道等待线程数满5时,这五个线程才会继续执行。最终,只有30个线程会继续执行,最后的3个不会继续执行,程序也无法退出。

package com.yawn.juc;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

/**
* CyclicBarrier 人齐开车案例
* @author yawn http://jvm123.com
* 2020/4/26 10:06
*/
public class CyclicBarrierTest {

   public static void main(String[] args) throws InterruptedException {
       CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
       Passenger passenger = new Passenger(cyclicBarrier);
       for (int i = 0; i < 33; i++) {
           Thread.sleep(100);
           new Thread(passenger, "乘客 " + i).start();
      }
  }

}

class Passenger implements Runnable {

   private CyclicBarrier cyclicBarrier;

   public Passenger(CyclicBarrier cyclicBarrier) {
       this.cyclicBarrier = cyclicBarrier;
  }

   @Override
   public void run() {
       try {
           Thread.sleep(1003);
           System.out.println(System.currentTimeMillis() + " -------" + Thread.currentThread().getName() + " is ready------");
           // 线程各自执行到这里就会等待,直到等待的线程等于5时,将会继续执行。
           cyclicBarrier.await();
           System.out.println(System.currentTimeMillis() + " -------" + Thread.currentThread().getName() + " continue------");
      } catch (InterruptedException | BrokenBarrierException e) {
           e.printStackTrace();
      }
  }
}

执行结果表明,每次凑够五个线程,才会开始执行。如下:

 1587889002937 -------乘客 0 is ready------
 1587889003038 -------乘客 1 is ready------
 1587889003139 -------乘客 2 is ready------
 1587889003239 -------乘客 3 is ready------
 1587889003340 -------乘客 4 is ready------
 1587889003340 -------乘客 4 continue------
 1587889003340 -------乘客 0 continue------
 1587889003340 -------乘客 3 continue------
 1587889003340 -------乘客 2 continue------
 1587889003340 -------乘客 1 continue------
 1587889003441 -------乘客 5 is ready------
 1587889003542 -------乘客 6 is ready------
 1587889003642 -------乘客 7 is ready------
...

CountDownLatch 闭锁

CountDownLatch 闭锁或者倒数计数器锁。

举例使用场景:五个朋友聚餐,等人到齐了才开始上菜,CountDownLatch就是一个倒数计数器,当计数器减为0 时,才会继续执行。

package com.yawn.juc;

import java.util.concurrent.CountDownLatch;

/**
* CountDownLatch 主线程等多个子线程执行完,才会继续执行
* @author yawn http://jvm123.com
* 2020/4/26 10:06
*/
public class CountDownLatchTest {

   public static void main(String[] args) throws InterruptedException {
       CountDownLatch countDownLatch = new CountDownLatch(5);
       Person person = new Person(countDownLatch);
       for (int i = 0; i < 5; i++) {
           Thread.sleep(100);
           new Thread(person, "朋友 " + i).start();
      }
       countDownLatch.await();
       System.out.println("人已经到齐了,开始上菜---");
  }

}

class Person implements Runnable {

   private CountDownLatch countDownLatch;
   
   public Person(CountDownLatch countDownLatch) {
       this.countDownLatch = countDownLatch;
  }

   @Override
   public void run() {
       try {
           Thread.sleep(1003);
           System.out.println(System.currentTimeMillis() + " -------" + Thread.currentThread().getName() + " is ready------");
      } catch (InterruptedException e) {
           e.printStackTrace();
      } finally {
           // 线程执行完,就会将计数器减一
           countDownLatch.countDown();
      }
  }
}

执行结果表明,只有计数器减为0时,才会继续执行后面的任务。如下:

 1587889109116 -------朋友 0 is ready------
 1587889109216 -------朋友 1 is ready------
 1587889109317 -------朋友 2 is ready------
 1587889109418 -------朋友 3 is ready------
 1587889109520 -------朋友 4 is ready------
 人已经到齐了,开始上菜---

ReentrantLock 可重入锁

ReentrantLock 可重入锁,也就是统一线程可以多次进入的锁。

举例使用场景:货物goods余量为10000,10000个线程各去购买一个,如果不使用lock,将会发生错乱。

package com.yawn.juc;

import java.util.concurrent.locks.ReentrantLock;

/**
* ReentrantLock 可重入锁,货物goods余量为10000,10000个线程各去购买一个,如果不使用lock,将会发生错乱
* @author yawn http://jvm123.com
* 2020/4/26 14:58
*/
public class ReentrantLockTest {


   public static void main(String[] args) throws InterruptedException {

       ReentrantLock lock = new ReentrantLock();
       Goods goods = new Goods(lock);
       for (int i = 0; i < 10000; i++) {
           new Thread(goods).start();
      }

       // 等待五秒钟,所有线程执行完毕后看结果
       Thread.sleep(5000);
       System.out.println(goods.count);

  }

}

class Goods implements Runnable {

   private ReentrantLock lock;
   public Integer count = 10000;

   public Goods(ReentrantLock lock) {
       this.lock = lock;
  }

   @Override
   public void run() {
       lock.lock();
       try {
           count--;
      } finally {
           lock.unlock();
      }
  }
}

执行结果为0,如果去掉lock()\unlock()之后,执行结果将会很随机,但一般都比0大。

Condition 线程唤醒

Condition 条件,指的是线程唤醒的条件。

package com.yawn.juc;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * Condition 使用Aqs中的单向队列实现一个等待队列,按条件唤醒后,加会重新加入aqs双向队列中
 * @author yawn http://jvm123.com
 * 2020/4/26 15:37
 */
public class ConditionTest {

    public static void main(String[] args) {
        ReentrantLock lock = new ReentrantLock();
        Condition condition = lock.newCondition();

        new Thread(() -> {
            lock.lock();
            System.out.println("1 thread 1 is running");
            try {
                System.out.println("2 thread 1 is await");
                condition.await(); // 线程将会被移动到condition单向队列当中
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("6 thread 1 is running again");
            lock.unlock();
        }).start();

        new Thread(() -> {
            lock.lock();
            System.out.println("3 thread 2 is running");
            condition.signalAll(); // 在condition单项队列当中的线程将会被加入到aqs双向队列当中
            System.out.println("4 thread 2 signaled all");
            lock.unlock();
        }).start();

        new Thread(() -> {
            lock.lock();
            System.out.println("5 thread 3 is running and completed");
            lock.unlock();
        }).start();
    }
}

执行结果表明,线程1进入await队列后将会等待,线程2唤醒线程1之后,线程1会加入等待队列的尾端,并等待机会执行。如下:

 1 thread 1 is running
 2 thread 1 is await
 3 thread 2 is running
 4 thread 2 signaled all
 5 thread 3 is running and completed
 6 thread 1 is running again

发表评论