概述
CyclicBarrier 是一种同步辅助工具,允许一组线程相互等待,直到它们到达公共的屏障点。CyclicBarrier 在涉及必须偶尔等待彼此的固定大小的线程组的程序中很有用。该屏障被称为周期性的,因为在释放等待的线程之后,它可以重新使用。 CyclicBarrier 支持一个可选的 Runnable 命令,该命令在每个屏障点运行一次,在最后一个线程到达之后,但在任何线程被释放之前。这个屏障动作对于在任何一方继续之前更新共享状态很有用。
CyclicBarrier 使用一种全部或无的破坏模型来处理同步尝试失败的情况:如果一个线程由于中断、失败或超时而过早离开屏障点,那么所有在该屏障点等待的其他线程也会异常地离开,通过 BrokenBarrierException(如果它们在同一时间也被中断,则会使用 InterruptedException)。 内存一致性效果:在调用 await() 之前在线程中执行的操作会发生在屏障操作的一部分之前,而这又会在其他线程中从相应的 await() 成功返回之后发生。
CyclicBarrier和CountDownLatch的区别
闭锁用于所有线程等待一个外部事件的发生,,可以认为countDown()达到设定的阈值这个事件,一种情况是等待所有子线程都执行完这个事件发生后,主线程继续执行;栅栏则是所有线程相互等待,直到所有线程都到达某一点时才打开栅栏,然后线程可以继续执行。栅栏可以重复使用
原理
class Generation
屏障的每次使用都表示为一个 generation 实例。当屏障触发或重置时,generation 会改变。使用屏障的线程可能有许多 generation,因为锁可能以非确定性的方式分配给等待的线程,但一次只能有一个是活动的(即适用于 count 的那个),所有其他的都是断开的或触发的。如果有断开,但没有后续的重置,则不需要活动的 generation。
/** 用于保护屏障入口的锁 */
private final ReentrantLock lock = new ReentrantLock();
/** 等待触发的条件 */
private final Condition trip = lock.newCondition();
/** 参与者数量 */
private final int parties;
/** 触发时要运行的命令。 */
private final Runnable barrierCommand;
/** 当前代 */
private Generation generation = new Generation();
/**
仍在等待的参与者数量。在每个 generation 上从 parties 倒数到 0。它在每个新 generation 或在断开时重置为 parties。
*/
private int count;
主要的阻塞方法
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
//减去当前的参与者
int index = --count;
//等于0时所有的参与者都完成了,执行构造时传的Runnable
if (index == 0) { // tripped
Runnable command = barrierCommand;
if (command != null) {
try {
command.run();
} catch (Throwable ex) {
breakBarrier();
throw ex;
}
}
//用完了,生成新的一代
nextGeneration();
return 0;
}
// 循环,直到触发、断开、中断或超时。
for (;;) {
try {
//继续等待
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
用例
public class Solver {
final int N;
final double[][] data;
final CyclicBarrier barrier;
class Worker implements Runnable {
int myRow;
Worker(int row) { myRow = row; }
public void run() {
double[] doubles = processRow(myRow);
data[myRow]=doubles;
try {
System.out.println("线程"+Thread.currentThread().getName()+"完成");
barrier.await();
System.out.println("线程"+Thread.currentThread().getName()+"继续执行任务");
} catch (InterruptedException ex) {
return;
} catch (BrokenBarrierException ex) {
return;
}
}
private boolean done() {
return barrier.isBroken();
}
private double[] processRow(int myRow) {
double[] datum = data[myRow];
return Arrays.stream(datum).map(d->d*2).toArray();
}
}
public Solver(double[][] matrix) {
data = matrix;
N = matrix.length;
Runnable barrierAction = this::mergeRows;
barrier = new CyclicBarrier(N, barrierAction);
List<Thread> threads = new ArrayList<>(N);
for (int i = 0; i < N; i++) {
Thread thread = new Thread(new Worker(i));
threads.add(thread);
thread.start();
}
}
private void mergeRows() {
System.out.println("处理后数据================================================================");
printMatrix(data);
}
public static void printMatrix(double [][] data) {
for (double[] d : data) {
System.out.println(Arrays.toString(d));
}
}
public static void main(String[] args) {
int n =5;
double[][] rmdata = new double[n][n];
for (int i = 0; i < n; i++) {
for (int j = 0; j < n; j++) {
rmdata[i][j] = RandomUtils.nextInt(1, 10 * n);
}
}
System.out.println("处理前数据================================================================");
printMatrix(rmdata);
Solver solver = new Solver(rmdata);
}
}
分享到: