Future和CompletableFuture异步编程模型

Future和CompletableFuture接口简化了异步编程,后者做为前者的扩展,增强了异步处理的适用场景。

Future

使用线程池异步处理任务,很多场景都需要知道任务执行情况,Java提供Future做为异步任务的代理人,透过Future查看和操纵异步任务,获取异步执行结果。

//线程池
ExecutorService executor = Executors.newFixedThreadPool(2);
//任务
Callable a = () -> {
   // to do
   return 1;
};
//句柄/handle
Future f = executor.submit(a);

拿到Future后可做的事:

f.isDone();

询问任务有没有完成。

f.get();

阻塞主线程,等待任务执行完成。

f.cancel(true);

取消提交的任务,如果,任务在执行,参数为true表示强行中断任务。

f.isCancelled();

查询任务是否被取消。

executor.shutdown();

ExecutorService中的线程在等待新任务,将导致JVM继续运行,根据实际情况决定是否需要关闭。

有返回值与无返回值的Future

Callable接口,有返回值。

Callable c = () -> {
   return 1;
};
Future<?> obj = executor.submit(c);
obj.get();

Runnable接口,无返回值。

Runnable r = () -> {
   return;
};
Future<?> v = executor.submit(r);

多个Future

Callable a = () -> {
   return 1;
};
List task = new ArrayList(); 
task.add(a);
task.add(a);
List<Future> invokeAll = executor.invokeAll(task);

如果,期望任务有一个执行完成就返回,使用以下方法提交任务:

executor.invokeAny(list);

当其中一个任务完成时,其他的任务会被取消,即使已在运行中。

Future结果合并

List<Future> invokeAll = executor.invokeAll(task);
Integer count = 0;
for (Future f : invokeAll) {
 Integer res = (Integer) f.get();
 count += res;
}

如果对返回结果无顺序要求,可以用这种方式合并,虽不及CompletableFuture优雅。

CompletableFuture

CompletableFuture<String> completableFuture = new CompletableFuture<String>();
completableFuture.get();

一个无参的CompletableFuture,所有线程都可以调用get方法,然而,都将被阻塞,因为,根本没有任何需要执行的任务。

completableFuture.complete("Future's Result");

所有被阻塞的线程都将收到同一个结果,很像wait和notifyAll操作。

异步执行

无需返回结果的异步任务:

CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
   System.out.println(1);
});
completableFuture.get();

需要返回结果的异步任务:

CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
   return "Hi!";
});
String msg = completableFuture.get();

区别在于使用了不同的方法提交任务,runAsync和supplyAsync。runAsync和supplyAsync是支持线程池的,只需要在提交任务时传递一个线程池。

ExecutorService executor = Executors.newFixedThreadPool(2);

CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
   return "Hi!";
}, executor);

有序调用异步执行

CompletableFuture<String> cp1 = CompletableFuture.supplyAsync(() -> {
    return "Hi~ ";
}, executor);
    
Function cp2 = (msg) -> {
    return msg + " Tom!";
};
    
CompletableFuture cps = cp1.thenApply(cp2);
String string = (String) cps.get();

可以无限追加任务。

cps.thenAccept((e) -> {
  System.out.println(e);
  System.out.println("happy end");
});

thenAccept方法提交的任务更像一个消费者,不需要提供任务返回值。

合并两个异步任务的结果

CompletableFuture<String> cp1 = CompletableFuture.supplyAsync(() -> {
   return "Hi~ ";
}, executor);
CompletableFuture<String> cp2 = CompletableFuture.supplyAsync(() -> {
   return "tom ~ ";
}, executor);


CompletableFuture cps = cp1.thenCombine(cp2, (c1, c2) -> {
   return c1 + c2;
});
Object object = cps.get();

将多个CompletableFutures组合在一起

CompletableFuture<String> cp1 = CompletableFuture.supplyAsync(() -> {
   return "Hi~ ";
}, executor);
CompletableFuture<String> cp2 = CompletableFuture.supplyAsync(() -> {
   return "tom ~ ";
}, executor);

CompletableFuture[] task = new CompletableFuture[] {cp1, cp2};

CompletableFuture<Void> all = CompletableFuture.allOf(task);
all.get();

如果,需要提取任务的返回结果,把all.get()替换成如下代码。

List<CompletableFuture> taskList = Arrays.asList(task);
CompletableFuture<List<Object>> thenApply = all.thenApply(value -> {
  return taskList.stream().map(t -> t.join()).collect(Collectors.toList());
});
  
 thenApply.get().forEach(e -> {
 System.out.println(e);
});

CompletableFuture异常处理

CompletableFuture<String> task = CompletableFuture.supplyAsync(() -> {
   throw new RuntimeException();
}, executor);

  task.exceptionally(ex -> {
  System.out.println("Oops! We have an exception - " + ex.getMessage());
  return "Unknown!";
});
System.out.println(task.get());

一个会出现异常的任务,和一个能够处理异常的执行单元。或者使用总会被执行的handle方法:

task.handle((res, ex) -> {
	if (ex == null) {
  	  System.out.println("it's work");
  	  return res;
  	}
  	System.out.println("Oops! We have an exception - " + ex.getMessage());
  	return "Unknown!";
});