CyclicBarrier
了解CyclicBarrier
CyclicBarrier是什么
- CyclickBarrier是一个同步工具类,其允许一组线程互相等待,直到大家共同到达某个公共屏障点。
- 在涉及一组固定数量的线程的程序中,这些线程必须不时地互相等待,在这种情况下CyclicBarrier就显得非常有用。为什么称为cyclic呢? 因为他在等待的线程释放之后可以重复循环使用。
- CyclicBarrier支持一个可选的Runnable命令,这个Runnable命令在一组线程的最后一个线程执行完到达屏障点之后,但是在任何线程释放之前,仅在每个屏障点运行一次。又称之为屏障操作。这个屏障操作比较有用。
- 对于失败的同步尝试,CyclicBarrier使用了一种要么全部失败,要么全部成功的breakage model。如果一个线程因为中断,执行失败或者超时而导致其早早地离开,那么该CyclicBarrier将被破坏,而执行完等在屏障点的其他线程也会因为CyclicBarrier被破坏而抛出BrokenBarrierException异常,从而导致所有线程都执行失败。如果所有线程在同一时间被中断,将会抛出InterruptedException异常。
- 同时CyclicBarrier中有内存一致性这个属性:
- 调用await()之前的行为 优先于 任何屏障操作
- 屏障操作成功返回这一行为 优先于 所有其他等待线程await()的行为
源码浅析
1 | public class CyclicBarrier { |
个人感觉
在看了CyclicBarrier里面的代码实现之后,发现Doug Lea真的很厉害(因为我看了好长时间才看懂)。里面有两个概念:parties和count。
count:
parties:
然后一开始我是不太明白count跟parties的区别,然后看了下CyclicBarrier的源码实现,结合CyclicBarrier的惯用方式,推断出count是等待进入屏障的线程数量,而parties是参与者,也就是线程总数量。
emmmm其实理解过程中还是要回到CyclicBarrier这个类本身。count在类里面就是相对于类来说的,在类内部就是相对于类来说,就是等待进入屏障的线程数量。而在我们角度来说,count就是在执行而尚没有执行到await()方法进入barrier的线程的数量。
await()
1 | public int await() throws InterruptedException, BrokenBarrierException { |
await()方法里调用了一个内部私有方法 dowait()
dowait()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock(); // 使用了lock锁住。
try {
final Generation g = generation;
if (g.broken) // 判断是否被破坏了
throw new BrokenBarrierException();
if (Thread.interrupted()) { // 判断是否被中断了
breakBarrier();
throw new InterruptedException();
}
int index = --count; // 看这里~
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run(); // 执行屏障操作 barrierAction
ranAction = true;
nextGeneration();
return 0; // 返回0
} finally {
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index; // 返回减了1 的count
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}上面可以看见dowait()是CyclicBarrier的核心代码。里面中首先进行一些状态检查操作。然后确认有线程进入到了屏障内,count减1,赋值给index变量,表示到达的索引顺序。如果index为0,表示最后一个线程到达屏障,开始执行屏障操作。整个过程都用lock进行锁住。
为什么用lock,而不用synchronized呢?
个人觉得有一部分原因是dowait()代码较长,而synchronized适用于代码较短的语句块。在《java编程思想》中有提到两者的性能分析。在代码较长的情况下,lock比synchronized展现出更好的性能。
为什么用ReentrantLock,而不是别的?
getNumberWaiting()
1 | public int getNumberWaiting() { |
上面同样使用了ReentrantLock。返回的是parties与count的差值。这里说明count就是等待进入屏障的线程数量,也就是我们理解的还没执行到屏障点的线程数量。
演示正常实例
综合上面的源码的简短分析。可以看出,只有所有的线程都执行到了屏障点时,才能继续往下执行。因此在生活中,就好像一个旅游巴士,等到人齐了才发车。
1 | import java.util.concurrent.*; |
- 执行结果
- 结论分析:
我们在示例中看到在类加载的时候就开始记录开始时间,然后运行到barrier action触发的时候记录结束时间。可以看到每个线程都是独立运行。最后总的等待时间就是2s多一点点。这一点点就是实际上不算上等待时间的真实运行时间。、
await()对线程进行拦截,直到最后一个线程运行到barrier的时候,count = 0,就运行barrier action,就发车。
演示复用实例
CyclicBarrier是适用于复用的,其与CountDownLatch最大的区别就是,其能够复用。
1 | import java.util.concurrent.*; |
- 执行结果
- 结论分析
可以看到上面复用的实例跟之前正常的实例并没有太大的变化,只是重新启动了一组线程而已。同时可以看到上面中加多了一个打印CyclicBarrier中count的值,可以看到count在各自的CyclicBarrier中是有减少,同时经过复用之后,count是有重置为原来的parties的。
小结
CyclicBarrier是可以复用的屏障。保持一种人齐出车的思想。其中count表示等待进入屏障的线程数量。parties表示参与者,即指定将要到达屏障的线程数量。
参考资料
CyclicBarrier工作原理及实例: https://blog.csdn.net/carson0408/article/details/79471490