今天是:
带着程序的旅程,每一行代码都是你前进的一步,每个错误都是你成长的机会,最终,你将抵达你的目的地。
title

ExecutorService

概述

ExecutorService是一种Executor,它提供了管理终止和生成用于跟踪一个或多个异步任务进度的Future的方法。 ExecutorService可以关闭,这会导致它拒绝新任务。提供了两种不同的关闭ExecutorService的方法。shutdown方法将允许以前提交的任务在终止之前执行,而shutdownNow方法阻止等待任务启动并尝试停止当前执行的任务。在终止时,执行器没有积极执行的任务,没有等待执行的任务,也不能提交新任务。未使用的ExecutorService应该关闭以允许释放其资源。 submit方法通过创建并返回一个Future来扩展基本方法Executor.execute(Runnable)。该Future可用于取消执行和/或等待完成。invokeAny和invokeAll方法执行最常用的批量执行形式,执行一组任务然后等待至少一个或全部完成。(可以使用ExecutorCompletionService类编写这些方法的定制变体。

源码解析

void shutdown(); 启动有序关闭,先前提交的任务将被执行,但不会接受新任务。如果已经关闭,调用没有额外的效果。 此方法不等待先前提交的任务完成执行。使用awaitTermination来实现。

List<Runnable> shutdownNow(); 尝试停止所有正在执行的任务,中止等待任务的处理,并返回正在等待执行的任务列表。 此方法不等待正在执行的任务终止。使用awaitTermination来实现。 除了尽最大努力停止处理正在执行的任务之外,没有任何保证。例如,典型的实现将通过Thread.interrupt取消,因此任何未响应中断的任务可能永远不会终止。

boolean isShutdown();如果这个执行器已经关闭,返回true。

boolean isTerminated(); 如果在关闭后所有任务都已完成,则返回true。请注意,除非调用了shutdown或shutdownNow,isTerminated永远不会是true。

boolean awaitTermination(long timeout, TimeUnit unit)在关闭请求后阻塞,直到所有任务完成执行,或超时发生,或当前线程被中断,取决于哪个先发生。

<T> Future<T> submit(Callable<T> task);提交一个返回值任务执行并返回一个代表任务待处理结果的Future。在成功完成任务后,Future的get方法将返回任务的结果。 如果您希望立即阻塞等待任务,可以使用类似于 result = exec.submit(aCallable).get()的结构。 注意: Executors类包括一组可以将其他一些常见的闭包类对象(例如java.security.PrivilegedAction)转换为Callable形式的方法,以便它们可以提交。

<T> Future<T> submit(Runnable task, T result);提交一个可运行的任务执行并返回一个代表该任务的Future。在成功完成后,Future的get方法将返回给定的结果。

Future<?> submit(Runnable task);提交一个可运行的任务执行并返回一个代表该任务的Future。在成功完成后,Future的get方法将返回null。

<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)执行给定的任务,在所有完成时返回一个保存其状态和结果的Future列表。当返回列表的每个元素完成时,Future.isDone为true。请注意,完成的任务可能正常终止或抛出异常。如果在此操作进行时修改了给定的集合,则此方法的结果是未定义的。

<T> T invokeAny(Collection<? extends Callable<T>> tasks)执行给定的任务,如果有成功完成(即没有抛出异常)的任务,则返回其结果。在正常或异常返回时,取消尚未完成的任务。如果在此操作进行时修改了给定的集合,则此方法的结果是未定义的。

用例


public class Java8ExecutorServiceIntegrationTest {

    private Runnable runnableTask;
    private Callable<String> callableTask;
    private List<Callable<String>> callableTasks;

    @Before
    public void init() {

        runnableTask = () -> {
            try {
                TimeUnit.MILLISECONDS.sleep(300);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };

        callableTask = () -> {
            TimeUnit.MILLISECONDS.sleep(300);
            return "Task's execution";
        };

        callableTasks = new ArrayList<>();
        callableTasks.add(callableTask);
        callableTasks.add(callableTask);
        callableTasks.add(callableTask);
    }

    @Test
    public void creationSubmittingTaskShuttingDown_whenShutDown_thenCorrect() {

        ExecutorService executorService = Executors.newFixedThreadPool(10);
        executorService.submit(runnableTask);
        executorService.submit(callableTask);
        executorService.shutdown();

        assertTrue(executorService.isShutdown());
    }

    @Test
    public void creationSubmittingTasksShuttingDownNow_whenShutDownAfterAwating_thenCorrect() {

        ExecutorService threadPoolExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());

        for (int i = 0; i < 100; i++) {
            threadPoolExecutor.submit(callableTask);
        }

        List<Runnable> notExecutedTasks = smartShutdown(threadPoolExecutor);

        assertTrue(threadPoolExecutor.isShutdown());
        assertFalse(notExecutedTasks.isEmpty());
        assertTrue(notExecutedTasks.size() < 98);
    }

    private List<Runnable> smartShutdown(ExecutorService executorService) {

        List<Runnable> notExecutedTasks = new ArrayList<>();
        executorService.shutdown();
        try {
            if (!executorService.awaitTermination(800, TimeUnit.MILLISECONDS)) {
                notExecutedTasks = executorService.shutdownNow();
            }
        } catch (InterruptedException e) {
            notExecutedTasks = executorService.shutdownNow();
        }
        return notExecutedTasks;
    }

    @Test
    public void submittingTasks_whenExecutedOneAndAll_thenCorrect() {

        ExecutorService executorService = Executors.newFixedThreadPool(10);

        String result = null;
        List<Future<String>> futures = new ArrayList<>();
        try {
            result = executorService.invokeAny(callableTasks);
            futures = executorService.invokeAll(callableTasks);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }

        assertEquals("Task's execution", result);
        assertTrue(futures.size() == 3);
    }

    @Test
    public void submittingTaskShuttingDown_whenGetExpectedResult_thenCorrect() {

        ExecutorService executorService = Executors.newFixedThreadPool(10);

        Future<String> future = executorService.submit(callableTask);
        String result = null;
        try {
            result = future.get();
            result = future.get(200, TimeUnit.MILLISECONDS);
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            e.printStackTrace();
        }

        executorService.shutdown();

        assertEquals("Task's execution", result);
    }

    @Test
    public void submittingTask_whenCanceled_thenCorrect() {

        ExecutorService executorService = Executors.newFixedThreadPool(10);

        Future<String> future = executorService.submit(callableTask);

        boolean canceled = future.cancel(true);
        boolean isCancelled = future.isCancelled();

        executorService.shutdown();

        assertTrue(canceled);
        assertTrue(isCancelled);
    }

    @Test
    public void submittingTaskScheduling_whenExecuted_thenCorrect() {

        ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();

        Future<String> resultFuture = executorService.schedule(callableTask, 1, TimeUnit.SECONDS);
        String result = null;
        try {
            result = resultFuture.get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }

        executorService.shutdown();

        assertEquals("Task's execution", result);
    }
}

 

分享到:

专栏

类型标签

网站访问总量