Java异步编程模型 - Future

Future与CompletableFuture简化异步编程,后者增强前者异步处理的适用场景。

Future

Java提供Future执行异步任务,future能查看和操纵异步任务,获取异步执行结果。

//线程池
ExecutorService executor = Executors.newFixedThreadPool(2);
//任务
Callable a = () -> {
   // to do
   return 1;
};
//句柄/handle
Future f = executor.submit(a);

Future API

f.isDone();

询问任务是否完成。

f.get();

阻塞主线程,等待任务执行完成。

f.cancel(true);

取消已提交的任务,如果任务在执行,参数为true表示强行中断任务。

f.isCancelled();

查询任务是否被取消。

executor.shutdown();

ExecutorService中的线程在等待新任务,导致JVM继续运行,根据实际情况决定是否需要关闭。

返回值

Callable接口,有返回值。

Callable c = () -> {
   return 1;
};
Future<?> obj = executor.submit(c);
obj.get();

Runnable接口,无返回值。

Runnable r = () -> {
   return;
};
Future<?> v = executor.submit(r);

多任务

Callable a = () -> {
   return 1;
};
List task = new ArrayList(); 
task.add(a);
task.add(a);
List<Future> invokeAll = executor.invokeAll(task);

如果,希望多个任务中其中一个执行完成就返回,使用如下方法提交任务:

executor.invokeAny(list);

其中一个任务完成时,其他的任务会被取消,即使已在运行中。

结果合并

List<Future> invokeAll = executor.invokeAll(task);
Integer count = 0;
for (Future f : invokeAll) {
 Integer res = (Integer) f.get();
 count += res;
}

返回结果无顺序要求,可用这种方式执行合并。

CompletableFuture

CompletableFuture<String> completableFuture = new CompletableFuture<String>();
completableFuture.get();

一个无参的CompletableFuture,所有线程都可以调用get方法,然后都将被阻塞,根本没有任何需要执行的任务。

completableFuture.complete("Future's Result");

所有被阻塞的线程都将收到同一个结果。

异步执行

无返回结果的异步任务:

CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> {
   System.out.println(1);
});
completableFuture.get();

返回结果的异步任务:

CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
   return "Hi!";
});
String msg = completableFuture.get();

runAsync和supplyAsync支持线程池,在提交任务时传递一个线程池。

ExecutorService executor = Executors.newFixedThreadPool(2);

CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
   return "Hi!";
}, executor);

有序调用

CompletableFuture<String> cp1 = CompletableFuture.supplyAsync(() -> {
    return "Hi~ ";
}, executor);
    
Function cp2 = (msg) -> {
    return msg + " Tom!";
};
    
CompletableFuture cps = cp1.thenApply(cp2);
String string = (String) cps.get();

可无限追加任务。

cps.thenAccept((e) -> {
  System.out.println(e);
  System.out.println("happy end");
});

thenAccept方法提交的任务更像一个消费者,无需返回结果。

合并任务结果

CompletableFuture<String> cp1 = CompletableFuture.supplyAsync(() -> {
   return "Hi~ ";
}, executor);
CompletableFuture<String> cp2 = CompletableFuture.supplyAsync(() -> {
   return "tom ~ ";
}, executor);


CompletableFuture cps = cp1.thenCombine(cp2, (c1, c2) -> {
   return c1 + c2;
});
Object object = cps.get();

将多个CompletableFutures组合在一起

CompletableFuture<String> cp1 = CompletableFuture.supplyAsync(() -> {
   return "Hi~ ";
}, executor);
CompletableFuture<String> cp2 = CompletableFuture.supplyAsync(() -> {
   return "tom ~ ";
}, executor);

CompletableFuture[] task = new CompletableFuture[] {cp1, cp2};

CompletableFuture<Void> all = CompletableFuture.allOf(task);
all.get();

如果,需要提取任务返回结果,将all.get()替换成如下代码:

List<CompletableFuture> taskList = Arrays.asList(task);
CompletableFuture<List<Object>> thenApply = all.thenApply(value -> {
  return taskList.stream().map(t -> t.join()).collect(Collectors.toList());
});
  
 thenApply.get().forEach(e -> {
 System.out.println(e);
});

异常处理

CompletableFuture<String> task = CompletableFuture.supplyAsync(() -> {
   throw new RuntimeException();
}, executor);

  task.exceptionally(ex -> {
  System.out.println("Oops! We have an exception - " + ex.getMessage());
  return "Unknown!";
});
System.out.println(task.get());

异常处理单元或使用总会执行的handle方法:

task.handle((res, ex) -> {
	if (ex == null) {
  	  System.out.println("it's work");
  	  return res;
  	}
  	System.out.println("Oops! We have an exception - " + ex.getMessage());
  	return "Unknown!";
});