CyclicBarrier

CyclicBarrier

了解CyclicBarrier

CyclicBarrier是什么

  • CyclickBarrier是一个同步工具类,其允许一组线程互相等待,直到大家共同到达某个公共屏障点。
  • 在涉及一组固定数量的线程的程序中,这些线程必须不时地互相等待,在这种情况下CyclicBarrier就显得非常有用。为什么称为cyclic呢? 因为他在等待的线程释放之后可以重复循环使用。
  • CyclicBarrier支持一个可选的Runnable命令,这个Runnable命令在一组线程的最后一个线程执行完到达屏障点之后,但是在任何线程释放之前,仅在每个屏障点运行一次。又称之为屏障操作。这个屏障操作比较有用。
  • 对于失败的同步尝试,CyclicBarrier使用了一种要么全部失败,要么全部成功的breakage model。如果一个线程因为中断,执行失败或者超时而导致其早早地离开,那么该CyclicBarrier将被破坏,而执行完等在屏障点的其他线程也会因为CyclicBarrier被破坏而抛出BrokenBarrierException异常,从而导致所有线程都执行失败。如果所有线程在同一时间被中断,将会抛出InterruptedException异常。
  • 同时CyclicBarrier中有内存一致性这个属性:
    • 调用await()之前的行为 优先于 任何屏障操作
    • 屏障操作成功返回这一行为 优先于 所有其他等待线程await()的行为

源码浅析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
public class CyclicBarrier {

private static class Generation {
boolean broken = false;
}

// 守护屏障实例的锁
private final ReentrantLock lock = new ReentrantLock();

private final Condition trip = lock.newCondition();

// 线程的数量
private final int parties;

// barrier action
private final Runnable barrierCommand;

// 当前循环使用次数
private Generation generation = new Generation();

// 仍旧在等待的数量,等待什么呢?我个人认为等待的是进入屏障的线程数量。在屏障这里的概念是等待进入,因为这是在屏障类内。而在我们的理解是正在执行,但仍然没执行到barrier point的线程。因为这样理解才能解释几点:
// 为什么await()方法返回的当前线程的到达索引中,getParties()-1返回的是第一个到达线程, 而0是最后一个到达线程?
// 答:await()方法中调用的是doAwait()方法,而doAwait()中返回的就是减1之后的count值。即只有调用了await()方法之后,才表示进入到了barrier中,count要减1,表示有线程进入屏障了!因此等待进入屏障的数量减1.
// 为什么getNumberWaiting()这个返回在barrier等待的线程数量的方法里用的是 parties - count?
// 答:总的参与者 - 还没进入屏障的参与者 = 已经进入屏障的参与者。
private int count;


// 进入下一个循环,更新屏障状态到初始时候,同时叫醒所有线程。
// 这个方法只有在持有锁的时候才能被调用
private void nextGeneration() {...}


// 设置当前屏障为被破坏
// 只有在持有锁的时候才能被调用
private void breakBarrier() {...}


// 最主要的屏障代码。
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {...}


// 创建一个新的CyclicBarrier,其将在给定的参与者(即线程)数量都处于等待执行状态时启动,并在该barrier启动时执行一个指定的barrierAction,该执行的动作由最后一个进入barrier的线程触发
// parties: 参与者数量(线程数量)
// barrierAction: 当该barrier触发,这个barrierAction开始执行。如果没有action的话,设置为null
public CyclicBarrier(int parties, Runnable barrierAction) {...}


// 同上
public CyclicBarrier(int parties) {...}


// 返回一个需要执行到barrier的参与者数量(即线程数量)
public int getParties() {
return parties;
}


// 在所有参与者都已经在此barrier调用await()方法之前,将一直等待
// 如果当前线程不是最后到达的线程(即已经执行完,在barrier等待,可能是第一个等待,可能是第二个等待,总之就不是最后一个到达的),那么出于线程调度目的,其将变得不可用,同时处于休眠状态,直到....
// 1. 最后一个线程到达
// 2. 其他某个线程中断了当前线程
// 3. 其他某个线程中断另一个等待线程
// 4. 其他某个线程在等待barrier时超时
// 5. 其他某个线程在此barrier上调用reset()
//
// 如果当前线程
// 1. 在进入此方法时已经设置了该线程的中断状态
// 2. 在等待时中断
// 就抛出 InterruptedException 异常
//
// 如果当前线程
// 1. 处于等待状态时barrier被reset()
// 2. 调用await()时barrier被破坏
// 3. 任意一个线程正处于等待状态
// 则抛出 BrokenBarrierException 异常
//
// 如果在执行barrierAction的过程中,出现了异常,异常将传播到当前线程中
//
// 返回:当前线程的到达索引。getParties() - 1:表示第一个到达,0:表示最后一个到达。
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}


// 同上
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {...}


// 查询此屏障是否处于损坏状态
// 返回:如果自从构造barrier或者最新的reset()方法以来,因为中断或者超时而导致1个或者多个参与者退出这个barrier,就返回true;其他情况返回false。
public boolean isBroken() {...}


// 将屏障重置为初始状态。
// 如果所有的参与者当前都在barrier等待,那么这些参与者将返回,并抛出一个BrokenBarrierException。
// 注意:在由于其他原因造成损坏 之后,实行重置可能会变得很复杂;此时需要使用其他方式重新同步线程,并选择其中一个线程来执行重置。与为后续使用创建一个新 barrier 相比,这种方法可能更好一些。
public void reset() {...}


// 返回:当前在barrier等待的参与者的数量。这个方法在调试和断言的比较有用。
public int getNumberWaiting() {...}
}

个人感觉

在看了CyclicBarrier里面的代码实现之后,发现Doug Lea真的很厉害(因为我看了好长时间才看懂)。里面有两个概念:parties和count。

count

count文档

parties

parties文档

然后一开始我是不太明白count跟parties的区别,然后看了下CyclicBarrier的源码实现,结合CyclicBarrier的惯用方式,推断出count是等待进入屏障的线程数量,而parties是参与者,也就是线程总数量。

emmmm其实理解过程中还是要回到CyclicBarrier这个类本身。count在类里面就是相对于类来说的,在类内部就是相对于类来说,就是等待进入屏障的线程数量。而在我们角度来说,count就是在执行而尚没有执行到await()方法进入barrier的线程的数量。

await()

1
2
3
4
5
6
7
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}

await()方法里调用了一个内部私有方法 dowait()

  • dowait()

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
    TimeoutException {
    final ReentrantLock lock = this.lock;
    lock.lock(); // 使用了lock锁住。
    try {
    final Generation g = generation;

    if (g.broken) // 判断是否被破坏了
    throw new BrokenBarrierException();

    if (Thread.interrupted()) { // 判断是否被中断了
    breakBarrier();
    throw new InterruptedException();
    }

    int index = --count; // 看这里~
    if (index == 0) { // tripped
    boolean ranAction = false;
    try {
    final Runnable command = barrierCommand;
    if (command != null)
    command.run(); // 执行屏障操作 barrierAction
    ranAction = true;
    nextGeneration();
    return 0; // 返回0
    } finally {
    if (!ranAction)
    breakBarrier();
    }
    }

    // loop until tripped, broken, interrupted, or timed out
    for (;;) {
    try {
    if (!timed)
    trip.await();
    else if (nanos > 0L)
    nanos = trip.awaitNanos(nanos);
    } catch (InterruptedException ie) {
    if (g == generation && ! g.broken) {
    breakBarrier();
    throw ie;
    } else {
    // We're about to finish waiting even if we had not
    // been interrupted, so this interrupt is deemed to
    // "belong" to subsequent execution.
    Thread.currentThread().interrupt();
    }
    }

    if (g.broken)
    throw new BrokenBarrierException();

    if (g != generation)
    return index; // 返回减了1 的count

    if (timed && nanos <= 0L) {
    breakBarrier();
    throw new TimeoutException();
    }
    }
    } finally {
    lock.unlock();
    }
    }

    上面可以看见dowait()是CyclicBarrier的核心代码。里面中首先进行一些状态检查操作。然后确认有线程进入到了屏障内,count减1,赋值给index变量,表示到达的索引顺序。如果index为0,表示最后一个线程到达屏障,开始执行屏障操作。整个过程都用lock进行锁住。

为什么用lock,而不用synchronized呢?

个人觉得有一部分原因是dowait()代码较长,而synchronized适用于代码较短的语句块。在《java编程思想》中有提到两者的性能分析。在代码较长的情况下,lock比synchronized展现出更好的性能。

​ 为什么用ReentrantLock,而不是别的?

getNumberWaiting()

1
2
3
4
5
6
7
8
9
public int getNumberWaiting() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return parties - count;
} finally {
lock.unlock();
}
}

上面同样使用了ReentrantLock。返回的是parties与count的差值。这里说明count就是等待进入屏障的线程数量,也就是我们理解的还没执行到屏障点的线程数量。

演示正常实例

综合上面的源码的简短分析。可以看出,只有所有的线程都执行到了屏障点时,才能继续往下执行。因此在生活中,就好像一个旅游巴士,等到人齐了才发车。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import java.util.concurrent.*;

public class TestCyclicBarrier {

private static final int SIZE = 5;

private static long start = System.currentTimeMillis();

static final CyclicBarrier cyclicBarrier = new CyclicBarrier(SIZE, new Runnable() {
@Override
public void run() {
long end = System.currentTimeMillis();
System.out.println("发车啦");
System.out.println("历时: " + (end - start) + "ms");
}
});

private static class WaitPeople implements Runnable {
private final String name;

public WaitPeople(String name) {
this.name = name;
}


@Override
public void run() {
System.out.println(name + " 上车了");
try {
System.out.println(name + " 等待");
TimeUnit.MILLISECONDS.sleep(2000);
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}

public static void main(String[] args) {
String[] strings = {"张大妈", "李大妈", "王大妈", "赵大妈", "钱大妈", "李大伯"};
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < SIZE; i++) {
executorService.execute(new WaitPeople(strings[i]));
}
executorService.shutdown();
}
}
  • 执行结果

CyclicBarrier的实例结果

  • 结论分析:

我们在示例中看到在类加载的时候就开始记录开始时间,然后运行到barrier action触发的时候记录结束时间。可以看到每个线程都是独立运行。最后总的等待时间就是2s多一点点。这一点点就是实际上不算上等待时间的真实运行时间。、

await()对线程进行拦截,直到最后一个线程运行到barrier的时候,count = 0,就运行barrier action,就发车。

演示复用实例

CyclicBarrier是适用于复用的,其与CountDownLatch最大的区别就是,其能够复用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import java.util.concurrent.*;

public class TestCyclicBarrier {

private static final int SIZE = 5;

private static long start = System.currentTimeMillis();

static final CyclicBarrier cyclicBarrier = new CyclicBarrier(SIZE, new Runnable() {
@Override
public void run() {
long end = System.currentTimeMillis();
System.out.println("发车啦");
System.out.println("历时: " + (end - start) + "ms");
}
});

private static class WaitPeople implements Runnable {
private final String name;

public WaitPeople(String name) {
this.name = name;
}


@Override
public void run() {
System.out.println(name + " 上车了");
try {
System.out.println(name + " 等待");
System.out.println("剩余count: " + (cyclicBarrier.getParties() - cyclicBarrier.getNumberWaiting())); // 证明count 有减到,同时在复用下,证明count有被重置
// TimeUnit.MILLISECONDS.sleep(2000);
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}

public static void main(String[] args) {
String[] strings = {"张大妈", "李大妈", "王大妈", "赵大妈", "钱大妈", "李大伯"};
String[] strings1 = {"张大妈2", "李大妈2", "王大妈2", "赵大妈2", "钱大妈2", "李大伯2"};
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < SIZE; i++) {
executorService.execute(new WaitPeople(strings[i]));
}

try {
TimeUnit.MILLISECONDS.sleep(3000);
System.out.println("戳得码喋");
System.out.println("还是没赶上,不过幸好又来了一辆车");
} catch (InterruptedException e) {
e.printStackTrace();
}
for (int i = 0; i < SIZE; i++) {
executorService.execute(new WaitPeople(strings1[i]));
}
executorService.shutdown();
}
}
  • 执行结果

CyclicBarrier复用结果

  • 结论分析

可以看到上面复用的实例跟之前正常的实例并没有太大的变化,只是重新启动了一组线程而已。同时可以看到上面中加多了一个打印CyclicBarrier中count的值,可以看到count在各自的CyclicBarrier中是有减少,同时经过复用之后,count是有重置为原来的parties的。

小结

CyclicBarrier是可以复用的屏障。保持一种人齐出车的思想。其中count表示等待进入屏障的线程数量。parties表示参与者,即指定将要到达屏障的线程数量。

参考资料

CyclicBarrier工作原理及实例: https://blog.csdn.net/carson0408/article/details/79471490

-------------本文结束感谢您的阅读-------------