今天是:
带着程序的旅程,每一行代码都是你前进的一步,每个错误都是你成长的机会,最终,你将抵达你的目的地。
title

CyclicBarrier

概述

CyclicBarrier 是一种同步辅助工具,允许一组线程相互等待,直到它们到达公共的屏障点。CyclicBarrier 在涉及必须偶尔等待彼此的固定大小的线程组的程序中很有用。该屏障被称为周期性的,因为在释放等待的线程之后,它可以重新使用。 CyclicBarrier 支持一个可选的 Runnable 命令,该命令在每个屏障点运行一次,在最后一个线程到达之后,但在任何线程被释放之前。这个屏障动作对于在任何一方继续之前更新共享状态很有用。

CyclicBarrier 使用一种全部或无的破坏模型来处理同步尝试失败的情况:如果一个线程由于中断、失败或超时而过早离开屏障点,那么所有在该屏障点等待的其他线程也会异常地离开,通过 BrokenBarrierException(如果它们在同一时间也被中断,则会使用 InterruptedException)。 内存一致性效果:在调用 await() 之前在线程中执行的操作会发生在屏障操作的一部分之前,而这又会在其他线程中从相应的 await() 成功返回之后发生。

CyclicBarrier和CountDownLatch的区别

闭锁用于所有线程等待一个外部事件的发生,,可以认为countDown()达到设定的阈值这个事件,一种情况是等待所有子线程都执行完这个事件发生后,主线程继续执行;栅栏则是所有线程相互等待,直到所有线程都到达某一点时才打开栅栏,然后线程可以继续执行。栅栏可以重复使用

原理

class Generation  

屏障的每次使用都表示为一个 generation 实例。当屏障触发或重置时,generation 会改变。使用屏障的线程可能有许多 generation,因为锁可能以非确定性的方式分配给等待的线程,但一次只能有一个是活动的(即适用于 count 的那个),所有其他的都是断开的或触发的。如果有断开,但没有后续的重置,则不需要活动的 generation。

/** 用于保护屏障入口的锁 */
private final ReentrantLock lock = new ReentrantLock();
/** 等待触发的条件 */
private final Condition trip = lock.newCondition();
/** 参与者数量 */
private final int parties;
/** 触发时要运行的命令。 */
private final Runnable barrierCommand;
/** 当前代 */
private Generation generation = new Generation();

/**
仍在等待的参与者数量。在每个 generation 上从 parties 倒数到 0。它在每个新 generation 或在断开时重置为 parties。
*/
private int count;

 

主要的阻塞方法

    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            final Generation g = generation;

            if (g.broken)
                throw new BrokenBarrierException();

            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }

            //减去当前的参与者
            int index = --count;
            //等于0时所有的参与者都完成了,执行构造时传的Runnable
            if (index == 0) {  // tripped
                Runnable command = barrierCommand;
                if (command != null) {
                    try {
                        command.run();
                    } catch (Throwable ex) {
                        breakBarrier();
                        throw ex;
                    }
                }
                //用完了,生成新的一代
                nextGeneration();
                return 0;
            }

            // 循环,直到触发、断开、中断或超时。
            for (;;) {
                try {
                    //继续等待
                    if (!timed)
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }

                if (g.broken)
                    throw new BrokenBarrierException();
                
                if (g != generation)
                    return index;

                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }

用例

public class Solver {
        final int N;
        final double[][] data;
        final CyclicBarrier barrier;

        class Worker implements Runnable {
            int myRow;
            Worker(int row) { myRow = row; }
            public void run() {
                    double[] doubles = processRow(myRow);
                    data[myRow]=doubles;
                    try {
                        System.out.println("线程"+Thread.currentThread().getName()+"完成");
                        barrier.await();
                        System.out.println("线程"+Thread.currentThread().getName()+"继续执行任务");

                    } catch (InterruptedException ex) {
                        return;
                    } catch (BrokenBarrierException ex) {
                        return;
                    }
            }

            private boolean done() {
                return barrier.isBroken();
            }

            private double[] processRow(int myRow) {
                double[] datum = data[myRow];
                return Arrays.stream(datum).map(d->d*2).toArray();
            }
        }
    public Solver(double[][] matrix) {
        data = matrix;
        N = matrix.length;
        Runnable barrierAction = this::mergeRows;
        barrier = new CyclicBarrier(N, barrierAction);

        List<Thread> threads = new ArrayList<>(N);
        for (int i = 0; i < N; i++) {
            Thread thread = new Thread(new Worker(i));
            threads.add(thread);
            thread.start();
        }
    }

    private void mergeRows() {
        System.out.println("处理后数据================================================================");
        printMatrix(data);
    }

    public static void printMatrix(double [][] data) {
        for (double[] d : data) {
            System.out.println(Arrays.toString(d));
        }
    }

    public static void main(String[] args) {
            int n =5;
            double[][] rmdata = new double[n][n];
            for (int i = 0; i < n; i++) {
                for (int j = 0; j < n; j++) {
                    rmdata[i][j] = RandomUtils.nextInt(1, 10 * n);
                }
            }
        System.out.println("处理前数据================================================================");
        printMatrix(rmdata);
        Solver solver = new Solver(rmdata);
    }
}

 

分享到:

专栏

类型标签

网站访问总量