概述
ExecutorService是一种Executor,它提供了管理终止和生成用于跟踪一个或多个异步任务进度的Future的方法。 ExecutorService可以关闭,这会导致它拒绝新任务。提供了两种不同的关闭ExecutorService的方法。shutdown方法将允许以前提交的任务在终止之前执行,而shutdownNow方法阻止等待任务启动并尝试停止当前执行的任务。在终止时,执行器没有积极执行的任务,没有等待执行的任务,也不能提交新任务。未使用的ExecutorService应该关闭以允许释放其资源。 submit方法通过创建并返回一个Future来扩展基本方法Executor.execute(Runnable)。该Future可用于取消执行和/或等待完成。invokeAny和invokeAll方法执行最常用的批量执行形式,执行一组任务然后等待至少一个或全部完成。(可以使用ExecutorCompletionService类编写这些方法的定制变体。
源码解析
void shutdown(); 启动有序关闭,先前提交的任务将被执行,但不会接受新任务。如果已经关闭,调用没有额外的效果。 此方法不等待先前提交的任务完成执行。使用awaitTermination来实现。
List<Runnable> shutdownNow(); 尝试停止所有正在执行的任务,中止等待任务的处理,并返回正在等待执行的任务列表。 此方法不等待正在执行的任务终止。使用awaitTermination来实现。 除了尽最大努力停止处理正在执行的任务之外,没有任何保证。例如,典型的实现将通过Thread.interrupt取消,因此任何未响应中断的任务可能永远不会终止。
boolean isShutdown();如果这个执行器已经关闭,返回true。
boolean isTerminated(); 如果在关闭后所有任务都已完成,则返回true。请注意,除非调用了shutdown或shutdownNow,isTerminated永远不会是true。
boolean awaitTermination(long timeout, TimeUnit unit)在关闭请求后阻塞,直到所有任务完成执行,或超时发生,或当前线程被中断,取决于哪个先发生。
<T> Future<T> submit(Callable<T> task);提交一个返回值任务执行并返回一个代表任务待处理结果的Future。在成功完成任务后,Future的get方法将返回任务的结果。 如果您希望立即阻塞等待任务,可以使用类似于 result = exec.submit(aCallable).get()的结构。 注意: Executors类包括一组可以将其他一些常见的闭包类对象(例如java.security.PrivilegedAction)转换为Callable形式的方法,以便它们可以提交。
<T> Future<T> submit(Runnable task, T result);提交一个可运行的任务执行并返回一个代表该任务的Future。在成功完成后,Future的get方法将返回给定的结果。
Future<?> submit(Runnable task);提交一个可运行的任务执行并返回一个代表该任务的Future。在成功完成后,Future的get方法将返回null。
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)执行给定的任务,在所有完成时返回一个保存其状态和结果的Future列表。当返回列表的每个元素完成时,Future.isDone为true。请注意,完成的任务可能正常终止或抛出异常。如果在此操作进行时修改了给定的集合,则此方法的结果是未定义的。
<T> T invokeAny(Collection<? extends Callable<T>> tasks)执行给定的任务,如果有成功完成(即没有抛出异常)的任务,则返回其结果。在正常或异常返回时,取消尚未完成的任务。如果在此操作进行时修改了给定的集合,则此方法的结果是未定义的。
用例
public class Java8ExecutorServiceIntegrationTest {
private Runnable runnableTask;
private Callable<String> callableTask;
private List<Callable<String>> callableTasks;
@Before
public void init() {
runnableTask = () -> {
try {
TimeUnit.MILLISECONDS.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
};
callableTask = () -> {
TimeUnit.MILLISECONDS.sleep(300);
return "Task's execution";
};
callableTasks = new ArrayList<>();
callableTasks.add(callableTask);
callableTasks.add(callableTask);
callableTasks.add(callableTask);
}
@Test
public void creationSubmittingTaskShuttingDown_whenShutDown_thenCorrect() {
ExecutorService executorService = Executors.newFixedThreadPool(10);
executorService.submit(runnableTask);
executorService.submit(callableTask);
executorService.shutdown();
assertTrue(executorService.isShutdown());
}
@Test
public void creationSubmittingTasksShuttingDownNow_whenShutDownAfterAwating_thenCorrect() {
ExecutorService threadPoolExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
for (int i = 0; i < 100; i++) {
threadPoolExecutor.submit(callableTask);
}
List<Runnable> notExecutedTasks = smartShutdown(threadPoolExecutor);
assertTrue(threadPoolExecutor.isShutdown());
assertFalse(notExecutedTasks.isEmpty());
assertTrue(notExecutedTasks.size() < 98);
}
private List<Runnable> smartShutdown(ExecutorService executorService) {
List<Runnable> notExecutedTasks = new ArrayList<>();
executorService.shutdown();
try {
if (!executorService.awaitTermination(800, TimeUnit.MILLISECONDS)) {
notExecutedTasks = executorService.shutdownNow();
}
} catch (InterruptedException e) {
notExecutedTasks = executorService.shutdownNow();
}
return notExecutedTasks;
}
@Test
public void submittingTasks_whenExecutedOneAndAll_thenCorrect() {
ExecutorService executorService = Executors.newFixedThreadPool(10);
String result = null;
List<Future<String>> futures = new ArrayList<>();
try {
result = executorService.invokeAny(callableTasks);
futures = executorService.invokeAll(callableTasks);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
assertEquals("Task's execution", result);
assertTrue(futures.size() == 3);
}
@Test
public void submittingTaskShuttingDown_whenGetExpectedResult_thenCorrect() {
ExecutorService executorService = Executors.newFixedThreadPool(10);
Future<String> future = executorService.submit(callableTask);
String result = null;
try {
result = future.get();
result = future.get(200, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
e.printStackTrace();
}
executorService.shutdown();
assertEquals("Task's execution", result);
}
@Test
public void submittingTask_whenCanceled_thenCorrect() {
ExecutorService executorService = Executors.newFixedThreadPool(10);
Future<String> future = executorService.submit(callableTask);
boolean canceled = future.cancel(true);
boolean isCancelled = future.isCancelled();
executorService.shutdown();
assertTrue(canceled);
assertTrue(isCancelled);
}
@Test
public void submittingTaskScheduling_whenExecuted_thenCorrect() {
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
Future<String> resultFuture = executorService.schedule(callableTask, 1, TimeUnit.SECONDS);
String result = null;
try {
result = resultFuture.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
executorService.shutdown();
assertEquals("Task's execution", result);
}
}
分享到: