CountDownLatch内部实现原理, 基于AQS

CountDownLatch是一个同步工具类,它允许一个或多个线程一直等待,直到其他线程执行完后再执行。例如,应用程序的主线程希望在负责启动框架服务的线程已经启动所有框架服务之后执行。

CountDownLatch使用示例:

首先我们写一个示例,看看怎么使用CountDownLatch工具类

CountDownLatchTest.java

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

package com.study.thread.juc_thread.base;

 

import java.util.concurrent.CountDownLatch;

 

/**

 * <p>Description: CountDownLatch 在所有任务结束之前,一个或者多线线程可以一直等待(通过计数器来判断) </p>

 * @author duanfeixia

 * @date 2019年8月12日

 */

public class CountDownLatchTest {

 

    static CountDownLatch countDown = new CountDownLatch(10);//10个线程任务

     

    /**

     * <p>Description: 主线程执行任务</p>

     * @author duanfeixia

     * @date 2019年8月12日

     */

    static class BossThread extends Thread{

        public void run(){

            System.out.println("boss已经到达会议室,共有"+countDown.getCount()+"人参加会议...");

            //等待人齐

            try {

                countDown.await();//进入阻塞队列

            } catch (InterruptedException e) {

                e.printStackTrace();

            }

            System.out.println("人已经到齐啦,开会啦...");

        }

    }

     

    /**

     * <p>Description:子线程执行任务 </p>

     * @author duanfeixia

     * @date 2019年6月24日

     */

    static class EmployThread extends Thread{

 

        @Override

        public void run() {

            System.out.println(Thread.currentThread().getName()+"到达会议室,还有"+countDown.getCount()+"人未到");

            countDown.countDown();//计数器-1

        }

         

    }

     

     

    /*测试启动*/

    public static void main(String args[]){

        new BossThread().start();

        int len=(int) countDown.getCount();

        for(int i=0;i<len;i++){

            new EmployThread().start();

        }

    }

}

  

测试结果如下:(每次执行的结果都会不一样哦)

CountDownLatch内部实现原理, 基于AQS

 

CountDownLatch原理解析

CountDownLatch内部依赖Sync实现,而Sync继承AQS。

CountDownLatch主要分析以下三点:

1. 构造方法 (创建CountDownLatch对象时指定线程个数)

2. await()方法的实现 (当前线程计数器为0之前一致等待,除非线程被中断)

3. countDown()方法的实现 (每执行一个线程方法就将计数器减一,当计数为0时 启用当前线程)

 

CountDownLatch的静态内部类 Sync (这里我们需要特别注意的一点是  Sync 继承了  AbstractQueuedSynchronizer 类, 重要方法会被重写)

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

private static final class Sync extends AbstractQueuedSynchronizer {

       private static final long serialVersionUID = 4982264981922014374L;

 

       Sync(int count) {

           setState(count);

       }

 

       int getCount() {

           return getState();

       }

 

       protected int tryAcquireShared(int acquires) {

           return (getState() == 0) ? 1 : -1;

       }

 

       protected boolean tryReleaseShared(int releases) {

           // Decrement count; signal when transition to zero

           for (;;) {

               int c = getState();

               if (c == 0)

                   return false;

               int nextc = c-1;

               if (compareAndSetState(c, nextc))

                   return nextc == 0;

           }

       }

   }

  

 

>> CountDownLatch只提供了一个有参构造方法: (参数计数器总量)

1

2

3

4

public CountDownLatch(int count) {

      if (count < 0) throw new IllegalArgumentException("count < 0");

      this.sync = new Sync(count);

  }

  

>> await() 方法 [注释: 以下 AQS 均表示 AbstractQueuedSynchronizer ]

初步进入我们会发现调用的是 CountDownLacth 的 await()  -- >>  获取共享锁

1

2

3

public void await() throws InterruptedException {

       sync.acquireSharedInterruptibly(1);

   }

 

acquireSharedInterruptibly()  方法是父类 AQS 中定义的,这里会发现此方法是被final修饰的,无法被重写,但是子类可以重写里面调用的 tyAcquireShared(arg) 方法

1

2

3

4

5

6

7

public final void acquireSharedInterruptibly(int arg)

        throws InterruptedException {

    if (Thread.interrupted())

        throw new InterruptedException();

    if (tryAcquireShared(arg) < 0)

        doAcquireSharedInterruptibly(arg);

}

  

先来看看 AQS 中 默认对 tryAcquireShared方法的默认实现

1

2

3

protected int tryAcquireShared(int arg) {

     throw new UnsupportedOperationException();

 }

  

CountDownLatch中的内部类 Sync对 AQS 的 tryAcquireShared方法进行了复写

当前计数器的值为0的时候 返回 1   #获取锁成功,直接返回 线程可继续操作

当前计数器的值不为0的时候 返回 -1.  #获取锁失败 ,开始进入队列中排队等待。接下来就会继续按照 AQS acquireSharedInterruptibly 方法中的逻辑,执行 doAcquireSharedInterruptibly(int arg)

1

2

3

protected int tryAcquireShared(int acquires) {

     return (getState() == 0) ? 1 : -1;

}

  

AQS 中 doAcquireSharedInterruptibly(int arg) 实现如下 (该方法为一个自旋方法会尝试一直去获取同步状态)

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

/**

    * Acquires in shared interruptible mode.

    * @param arg the acquire argument

    */

   private void doAcquireSharedInterruptibly(int arg)

       throws InterruptedException {

       final Node node = addWaiter(Node.SHARED);

       boolean failed = true;

       try {

           for (;;) {

               final Node p = node.predecessor();

               if (p == head) {

                   int r = tryAcquireShared(arg);

                   if (r >= 0) {

                       setHeadAndPropagate(node, r);

                       p.next = null; // help GC

                       failed = false;

                       return;

                   }

               }

               if (shouldParkAfterFailedAcquire(p, node) &&

                   parkAndCheckInterrupt())

                   throw new InterruptedException();

           }

       } finally {

           if (failed)

               cancelAcquire(node);

       }

   }

 

这里需要进一步说明的是方法 parkAndCheckInterrupt ,调用 LockSupportpark方法,禁用当前线程

LockSupport是JDK中比较底层的类,用来创建锁和其他同步工具类的基本线程阻塞原语。

1

2

3

4

private final boolean parkAndCheckInterrupt() {

      LockSupport.park(this);

      return Thread.interrupted();

  }

  

  

>> countDown()方法的实现

CountDownLatch方法中实现如下,方法递减锁存器的计数,如果计数到达零,则释放所有等待的线程。 -->> 释放共享锁

1

2

3

public void countDown() {

     sync.releaseShared(1);

 }

  

AQS中 releaseShared方法的实现如下:(同样 releaseShared方法被final修饰 不能被重写 但是我们CountDownLatch的内部类 Sync重写了 tryReleaseShared方法)

1

2

3

4

5

6

7

public final boolean releaseShared(int arg) {

       if (tryReleaseShared(arg)) {

           doReleaseShared();

           return true;

       }

       return false;

   }

  

Sync中 tryReleaseShared方法的实现

1

2

3

4

5

6

7

8

9

10

11

protected boolean tryReleaseShared(int releases) {

            // Decrement count; signal when transition to zero

            for (;;) {

                int c = getState();//获取锁状态

                if (c == 0)

                    return false; //计数器为0的时候 说明释放锁成功 直接返回

                int nextc = c-1; //将计数器减一 使用CAS更新计算器的值

                if (compareAndSetState(c, nextc))

                    return nextc == 0;

            }

        }

  

然后执行 AQS 中 releaseShared方法中的 doReleaseShared方法 去释放锁信息

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

private void doReleaseShared() {

       /*

        * Ensure that a release propagates, even if there are other

        * in-progress acquires/releases.  This proceeds in the usual

        * way of trying to unparkSuccessor of head if it needs

        * signal. But if it does not, status is set to PROPAGATE to

        * ensure that upon release, propagation continues.

        * Additionally, we must loop in case a new node is added

        * while we are doing this. Also, unlike other uses of

        * unparkSuccessor, we need to know if CAS to reset status

        * fails, if so rechecking.

        */

       for (;;) {

           Node h = head;

           if (h != null && h != tail) {

               int ws = h.waitStatus;

               if (ws == Node.SIGNAL) {

                   if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))

                       continue;            // loop to recheck cases

                   unparkSuccessor(h);

               }

               else if (ws == 0 &&

                        !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))

                   continue;                // loop on failed CAS

           }

           if (h == head)                   // loop if head changed

               break;

       }

   }

  

这里需要特别说明的是 unparkSuccessor(h)方法,调用 LockSupportunpark方法 启动当前线程

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

/**

   * Wakes up node's successor, if one exists.

   *

   * @param node the node

   */

  private void unparkSuccessor(Node node) {

      /*

       * If status is negative (i.e., possibly needing signal) try

       * to clear in anticipation of signalling.  It is OK if this

       * fails or if status is changed by waiting thread.

       */

      int ws = node.waitStatus;

      if (ws < 0)

          compareAndSetWaitStatus(node, ws, 0);

 

      /*

       * Thread to unpark is held in successor, which is normally

       * just the next node.  But if cancelled or apparently null,

       * traverse backwards from tail to find the actual

       * non-cancelled successor.

       */

      Node s = node.next;

      if (s == null || s.waitStatus > 0) {

          s = null;

          for (Node t = tail; t != null && t != node; t = t.prev)

              if (t.waitStatus <= 0)

                  s = t;

      }

      if (s != null)

          LockSupport.unpark(s.thread);

  }

  

总结:

CountDownLatch内部通过共享锁实现。在创建CountDownLatch实例时,需要传递一个int型的参数:count,该参数为计数器的初始值,也可以理解为该共享锁可以获取的总次数。

当某个线程调用await()方法,程序首先判断count的值是否为0,如果不会0的话则会一直等待直到为0为止。

当其他线程调用countDown()方法时,则执行释放共享锁状态,使count值 – 1。

当在创建CountDownLatch时初始化的count参数,必须要有count线程调用countDown方法才会使计数器count等于0,锁才会释放,前面等待的线程才会继续运行。注意CountDownLatch不能回滚重置。