Java8-CompletableFuture组合式异步编程

CompletableFuture进行组合式异步编程,包括: 创建异步计算,并获取计算结果;使用非阻塞操作提升吞吐量;设计和实现异步API;如何以异步的方式使用同步的API;如何对两个或多个异步操作进行流水线和合并操作;如何处理异步操作的完成状态


CompletableFuture进行组合式异步编程,包括: 创建异步计算,并获取计算结果;使用非阻塞操作提升吞吐量;设计和实现异步API;如何以异步的方式使用同步的API;如何对两个或多个异步操作进行流水线和合并操作;如何处理异步操作的完成状态

Future接口

  • Future接口在Java5中被引入,设计初衷是对将来某个时刻会发生的结果进行建模。它建模了一种异步计算,返回一个执行运算结果的引用,当运算结束后,这个引用被返回给调用方。在Future中触发那些潜在耗时的操作,从而把调用线程解放出来,让它能继续执行其他有价值的工作.
  • Future比更底层的Thread更易用。要使用Future,通常你只需要将耗时的操作封装在一个Callable对象中,再将它提交给ExecutorService.
  • 可以在ExecutorService以并发方式调用另一个线程执行耗时操作的同时,去执行一些其他的任务。接着,如果你已经运行到没有异步操作的结果就无法继续任何有意义的工作时,可以调用它的get方法去获取操作的结果。如果操作已经完成,该方法会立刻返回操作的结果,否则它会阻塞你的线程,直到操作完成,返回相应的结果。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 创建ExecutorService,通过它你可以向线程池提交任务
ExecutorService executor = Executors.newCachedThreadPool();
// 向ExecutorService提交一个Callable对象
Future<Double> future = executor.submit(new Callable<Double>() {
public Double call() {
// 以异步方式在新的线程中执异步操作进行的同时, 行耗时的操作
return doSomeLongComputation();
}});
// 异步操作进行的同时,行耗时的操作你可以做其他的事情
doSomethingElse();
try {
// 获取异步操作的结果,如果最终被阻塞, 无法得到结果,那么在最多等待1秒钟之后退出
Double result = future.get(1, TimeUnit.SECONDS);
} catch (ExecutionException ee) {
// 计算抛出一个异常
} catch (InterruptedException ie) {
// 当前线程在等待过程中被中断
} catch (TimeoutException te) {
// 在Future对象完成之前超过已过期
}

java8-future-1.png-38.9kB

Future 接口不能做什么

  • Future接口提供了方法来检测异步计算是否已经结束(使用isDone方法),等待异步操作结束,以及获取计算的结果。
  • Future不易实现的需求(可以利用CompletableFuture(implements Future)结合Java8来完成)
    • 将两个异步计算合并为一个 – 这两个异步计算之间相互独立,同时第二个又依赖于第一个的结果。
    • 等待Future集合中的所有任务都完成。
    • 仅等待Future集合中最快结束的任务完成(有可能因为它们试图通过不同的方式计算同一个值),并返回它的结果。
    • 通过编程方式完成一个Future任务的执行(即以手工设定异步操作结果的方式)。
    • 应对Future的完成事件(即当Future的完成事件发生时会收到通知,并能使用Future计算的结果进行下一步的操作,不只是简单地阻塞等待操作的结果)。
  • Stream和CompletableFuture的设计都遵循了类似的模式:它们都使用了Lambda表达式以及流水线的思想。所以说,CompletableFuture和Future的关系就跟Stream和Collection的关系一样。

使用 CompletableFuture 构建异步应用

需求:创建一个名为“最佳价格查询器”(best-price-finder)的应用,它会查询多个在线商店,依据给定的产品或服务找出最低的价格。

  • 同步API与异步API
    • 同步API:调用了某个方法,调用方在被调用方运行的过程中会等待,被调用方运行结束返回,调用方取得被调用方的返回值并继续运行。即使调用方和被调用方在不同的线程中运行,调用方还是需要等待被调用方结束运行,这就是阻塞式调用这个名词的由来。
    • 异步API会直接返回,或者至少在被调用方计算完成之前,将它剩余的计算任务交给另一个线程去做,该线程和调用方是异步的 – 这就是非阻塞式调用的由来。执行剩余计算任务的线程会将它的计算结果返回给调用方。返回的方式要么是通过回调函数,要么是由调用方再次执行一个“等待,直到计算完成”的方法调用(基本上就是一个阻塞式调用了)。

实现异步 API

  • CompletableFuture.complete方法,结束completableFuture对象的运行,并设置变量的值。
  • delay()引入延时,同步阻塞式调用
1
2
3
4
5
6
7
8
9
// getPrice的调用者每次调用都会被阻塞,因为delay()
public double getPrice(String product) {
return calculatePrice(product);
}

private double calculatePrice(String product) {
delay();
return random.nextDouble() * product.charAt(0) + product.charAt(1);
}
  • 异步API
1
2
3
4
5
6
7
8
9
10
11
12
public Future<Double> getPriceAsync(String product) {
CompletableFuture<Double> futurePrice = new CompletableFuture<>(); // 包含计算结果.
// 新开一个线程处理耗时调用.
new Thread( () -> {
double price = calculatePrice(product);
// 耗时任务结束,设置Future的返回值
futurePrice.complete(price);
}).start();

// 调用者线程直接返回
return futurePrice;
}
  • 测试异步API
    • 如果所有有意义的工作都已经完成(程序员根据业务逻辑确定),客户所有要执行的工作都依赖于商品价格时,再调用Future的get方法。执行了这个操作后,客户要么获得Future中封装的值(如果异步任务已经完成),要么发生阻塞(插入到最合适的位置调用,之后的计算都依赖于此值,那么即使它花费的时间再长,此时也是不可避免的) ,直到该异步任务完成,期望的值能够访问。
    • Future执行完毕可以发送一个通知,仅在计算结果可用时执行一个由Lambda表达式或者方法引用定义的回调函数。(这里没有)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class ShopMain {

public static void main(String[] args) {
Shop shop = new Shop("BestShop");
long start = System.nanoTime();
// 调用立刻返回了一个Future对象,通过该对象客户可以在将来的某个时刻取得商品的价格
Future<Double> futurePrice = shop.getPriceAsync("my favorite product");
long invocationTime = ((System.nanoTime() - start) / 1_000_000);
System.out.println("Invocation returned after " + invocationTime
+ " msecs");
// 执行更多任务,比如查询其他商店
doSomethingElse();
try {
// 从Future对象中读取价格,如果价格未知,会发生阻塞
double price = futurePrice.get();
System.out.printf("Price is %.2f%n", price);
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
long retrievalTime = ((System.nanoTime() - start) / 1_000_000);
System.out.println("Price returned after " + retrievalTime + " msecs");
}

private static void doSomethingElse() {
System.out.println("Doing something else...");
}
}
// 执行结果
Invocation returned after 87 msecs // 调用getPriceAsync()立即返回.
Doing something else...
Price is 123.26
Price returned after 1140 msecs

错误处理

  • 如上如果价格计算过程中产生了错误,那么该异常会被限制在试图计算商品价格的当前线程的范围内,最终会杀死该线程,而这会导致等待get方法返回结果的客户端永久地被阻塞。(实际抛出一个RuntimeException好像可以直接捕获到)
  • 为了让客户端能了解商店无法提供请求商品价格的原因,你需要使用CompletableFuture的completeExceptionally方法将导致CompletableFuture内发生问题的异常抛出。

使用工厂方法supplyAsync创建CompletableFuture

  • supplyAsync方法接受一个生产者(Supplier)作为参数,返回一个CompletableFuture对象,该对象完成异步执行后会读取调用生产者方法的返回值。生产者方法会交由ForkJoinPool池中的某个执行线程(Executor)运行,但是你也可以使用supplyAsync方法的重载版本,传递第二个参数指定不同的执行线程执行生产者方法。

  • 重构getPriceAsync()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public Future<Double> getPrice(String product) {
/*
CompletableFuture<Double> futurePrice = new CompletableFuture<>();
new Thread( () -> {
try {
double price = calculatePrice(product);
futurePrice.complete(price);
} catch (Exception ex) {
futurePrice.completeExceptionally(ex);
}
}).start();
return futurePrice;
*/
// 重构如上的步骤.
// CompletableFuture包含calculatePrice()时返回的值
return CompletableFuture.supplyAsync(() -> calculatePrice(product));
}

避免阻塞

需求:给定一个商店列表List,同时实现一个方法,它接受产品名作为参数,返回一个字符串列表,
这个字符串列表中包括商店的名称、该商店中指定商品的价格.

顺序查询

  • 总花费大概是4秒钟,因为对这4个商店的查询是顺序进行的,并且一个查询操作会阻塞另一个,每一个操作都要花费大约1秒左右的时间计算请求商品的价格.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private final List<Shop> shops = Arrays.asList(
new Shop("BestPrice"),
new Shop("LetsSaveBig"),
new Shop("MyFavoriteShop"),
new Shop("BuyItAll")/*,
new Shop("ShopEasy")*/);

public List<String> findPricesSequential(String product) {
return shops.stream()
.map(shop -> shop.getName() + " price is " + shop.getPrice(product))
.collect(Collectors.toList());
}
// 执行结果
[BestPrice price is 123.25651664705744, LetsSaveBig price is 169.4653393606115, MyFavoriteShop price is 214.12914480588853, BuyItAll price is 184.74384995303313]
sequential done in 4045 msecs

并行流对请求进行并行操作

  • 对四个不同商店的查询实现了并行,所以完成所有操作的总耗时只有1秒多一点儿。
1
2
3
4
5
6
7
8
public List<String> findPricesParallel(String product) {
return shops.parallelStream() // 只是这里改成了并行流
.map(shop -> shop.getName() + " price is " + shop.getPrice(product))
.collect(Collectors.toList());
}
// 执行结果
[BestPrice price is 197.15388829450728, LetsSaveBig price is 167.59404755738808, MyFavoriteShop price is 192.48730292081552, BuyItAll price is 199.67823140124116]
parallel done in 1006 msecs

使用 CompletableFuture 发起异步请求

  • CompletableFuture的工厂方法supplyAsync创建CompletableFuture对象,发起异步请求进行异步处理.
  • CompletableFuture类中的join方法和Future接口中的get有相同的含义,并且也声明在Future接口中,它们唯一的不同是join不会抛出任何检测到的异常。使用它你不再需要使用try/catch语句块让你传递给第二个map方法的Lambda表达式变得过于臃肿。
  • 实现中使用了两个不同的Stream流水线,而不是在同一个处理流的流水线上一个接一个地放置两个map操作.这是要考虑流操作之间的延迟特性,如果你在单一流水线中处理流,发向不同商家的请求只能以同步、顺序执行的方式才会成功。因此,每个创建CompletableFuture对象只能在前一个操作结束之后执行查询指定商家的动作、通知join方法返回计算结果。
  • CompletableFuture版本的程序跟并行流版本的程序时间复杂度相差不多。究其原因都一样:它们内部采用的是同样的通用线程池(如果异步请求没有提供executor的话),默认都使用固定数目的线程,具体线程数取决于Runtime.getRuntime().availableProcessors()的返回值。然而,CompletableFuture具有一定的优势,因为它允许你对执行器(Executor)进行配置,尤其是线程池的大小,让它以更适合应用需求的方式进行配置,满足程序的要求,而这是并行流API无法提供的。
  • 关于自定义执行器线程设置为守护线程(Daemon):Java程序无法终止或者退出一个正在运行中的线程,所以最后剩下的那个线程会由于一直等待无法发生的事件而引发问题。与此相反,如果将线程标记为守护进程,意味着程序退出时该线程也会被回收。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// 使用定制的执行器
private final Executor executor = Executors.newFixedThreadPool(shops.size(), new ThreadFactory() { // 配置线程的数目.
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
// 守护线程构成的线程池
t.setDaemon(true);
return t;
}
});

// 组合两条流水线
public List<String> findPricesFuture(String product) {
List<CompletableFuture<String>> priceFutures =
shops.stream()
.map(shop -> CompletableFuture.supplyAsync(() -> shop.getName() + " price is "
+ shop.getPrice(product), executor)) // 指定线程池执行生产者方法,如果不配置此选项,
.collect(Collectors.toList()); // 得到一个List<CompletableFuture<String>>,列表中的每个CompletableFuture对象在计算完成后都包含商店的String类型的名称。

// 要得到List<String>,需要等待所有的future执行完毕,将其包含的值抽取出来,填充到列表中才能返回。使用join方法获取CompletableFuture包含的值.
List<String> prices = priceFutures.stream()
.map(CompletableFuture::join) // 等待所有异步操作结束
.collect(Collectors.toList());
return prices;
}

// 单条流水线
public List<String> findPricesFutureOnePipeline(String product) {
List<String> result = shops.stream()
.map(shop -> CompletableFuture.supplyAsync(() -> shop.getName() + " price is "
+ shop.getPrice(product), executor))
.map(CompletableFuture::join)
.collect(Collectors.toList());
return result;
}

// 组合两条流水线执行结果
[BestPrice price is 171.10524235618578, LetsSaveBig price is 168.59369176671822, MyFavoriteShop price is 174.79155890558252, BuyItAll price is 154.82955565763797]
// 书上显示是2000ms左右,这里貌似跟直接并行流差不多.
composed CompletableFuture done in 1006 msecs

// 单条流水线执行结果.
[BestPrice price is 227.53480147033423, LetsSaveBig price is 200.89398407500244, MyFavoriteShop price is 161.14747297059597, BuyItAll price is 155.9041805933185]
one pipeline composed CompletableFuture done in 4004 msecs
  • 执行示意图
    • 单一流水线中,执行的流程(以虚线标识)是顺序的。事实上,新的CompletableFuture对象只有在前一个操作完全结束之后,才能创建。
    • 双流水线中,则将CompletableFutures对象聚集到一个列表中(即图中以椭圆表示的部分),让对象们可以在等待其他对象完成操作之前就能启动。

java8-completabefuture-lazy-1.png-130.4kB

使用流还是CompletableFutures

  • 如果你进行的是计算密集型的操作,并且没有I/O,那么推荐使用Stream接口,因为实现简单,同时效率也可能是最高的(如果所有的线程都是计算密集型的,那就没有必要创建比处理器核数更多的线程)
  • 反之,如果你并行的工作单元还涉及等待I/O的操作(包括网络连接等待),那么使用CompletableFuture灵活性更好,你可以设定需要使用的线程数。这种情况不使用并行流的另一个原因是,处理流的流水线中如果发生I/O等待,流的延迟特性会让我们很难判断到底什么时候触发了等待。

对多个异步任务进行流水线操作

需求:商店都同意使用一个集中式的折扣服务,利用此折扣服务计算报价.

  1. getPrice()现在以ShopName:price:DiscountCode的格式返回一个String类型的值,计算价格也是耗时的.
  2. 对商店返回字符串的解析操作封装到了Quote类之中
  3. 每个折扣代码的实际折扣比率有可能发生变化,所以你每次都需要查询折扣服务,查询折扣也是耗时的.

Quote.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class Quote {

private final String shopName;
private final double price;
private final Discount.Code discountCode;

public Quote(String shopName, double price, Discount.Code discountCode) {
this.shopName = shopName;
this.price = price;
this.discountCode = discountCode;
}

public static Quote parse(String s) {
String[] split = s.split(":");
String shopName = split[0];
double price = Double.parseDouble(split[1]);
Discount.Code discountCode = Discount.Code.valueOf(split[2]);
return new Quote(shopName, price, discountCode);
}

public String getShopName() {
return shopName;
}

public double getPrice() {
return price;
}

public Discount.Code getDiscountCode() {
return discountCode;
}
}

Discount.java

  • 折扣服务是远程服务,查询有耗时.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class Discount {
public enum Code {
NONE(0), SILVER(5), GOLD(10), PLATINUM(15), DIAMOND(20);

private final int percentage;

Code(int percentage) {
this.percentage = percentage;
}
}
public static String applyDiscount(Quote quote) {
return quote.getShopName() + " price is " + Discount.apply(quote.getPrice(), quote.getDiscountCode());
}
private static double apply(double price, Code code) {
// 延时.
delay();
// 利用一个非线程安全的formatter进行格式化,所以访问此formatter的时候需要进行加锁互斥访问.
return format(price * (100 - code.percentage) / 100);
}
}

顺序同步方法

  • 第一个map操作将每个shop对象转换成了一个字符串,该字符串包含了该shop中指定商品的价格和折扣代码。
  • 第二个map操作对这些字符串进行了解析,在Quote对象中对它们进行转换。
  • 第三个map会操作联系远程的Discount服务,计算出最终的折扣价格,并返回该价格及提供该价格商品的shop。
1
2
3
4
5
6
7
8
9
10
11
12
// 计算价格和请求折扣都需要1s.
public List<String> findPricesSequential(String product) {
return shops.stream()
.map(shop -> shop.getPrice(product)) // name + ":" + price + ":" + code;
.map(Quote::parse) // return new Quote(shopName, price, discountCode);
.map(Discount::applyDiscount) // 应用价格和折扣.
.collect(Collectors.toList());
}

// 结果耗时
[BestPrice price is 110.93, LetsSaveBig price is 135.58, MyFavoriteShop price is 192.72, BuyItAll price is 184.74, ShopEasy price is 167.28]
sequential done in 10070 msecs

CompletableFuture异步调用方法

  • 步骤解析
    • 只需要将Lambda表达式作为参数传递给supplyAsync工厂方法就可以以异步方式对shop进行查询。第一个map转换的结果是一个Stream<CompletableFuture<String>>,一旦运行结束,每个CompletableFuture对象中都会包含对应shop返回的字符串。同时传入了一个定制的执行器executor.
    • 对第一步中生成的CompletableFuture对象调用它的thenApply进行parse解析.parse解析不涉及任何远程服务,也不会进行任何I/O操作,它几乎可以在第一时间进行,所以能够采用同步操作,不会带来太多的延迟。
    • 第三个map操作涉及联系远程的Discount服务,为从商店中得到的原始价格申请折扣率。因为这一转换需要远程执行,可能耗时较大,所以也是异步的方式执行.同样,将这一操作以Lambda表达式的方式传递给了supplyAsync工厂方法,该方法最终会返回另一个CompletableFuture对象。
  • 以上的价格查询和查询远程折扣服务都是异步任务执行,这样就相当于有两个异步任务了.
  • CompletableFuture.thenCompose()方法允许你对两个异步操作进行流水线,第一个操作完成时,将其结果作为参数传递给第二个操作。换句话说,你可以创建两个CompletableFutures对象,对第一个CompletableFuture 对象调用thenCompose,并向其传递一个函数。当第一个CompletableFuture执行完毕后,它的结果将作为该函数的参数,这个函数的返回值是以第一个CompletableFuture的返回值作为输入计算出的第二个CompletableFuture对象。使用这种方式,即使Future在向不同的商店收集报价,主线程还是能继续执行其他重要的操作,比如响应UI事件。
  • 三次map操作的返回的Stream元素收集到一个列表,就得到了一个List<CompletableFuture<String>>,等这些CompletableFuture对象最终执行完毕,就可以利用join取得它们的返回值。
  • thenCompose vs thenComposeAsync
    • 名称中不带Async的方法和它的前一个任务一样,在同一个线程中运行
    • 名称以Async结尾的方法会将后续的任务提交到一个线程池,所以每个任务是由不同的线程处理的。(本例中,前后两个CompletabelFuture存在数据的依赖关系,所以放在一个线程中执行就好)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public List<String> findPricesFuture(String product) {
List<CompletableFuture<String>> priceFutures = findPricesStream(product)
.collect(Collectors.<CompletableFuture<String>>toList());

return priceFutures.stream()
// 等待流中的所有Future执行完毕,并提取各自的返回值
.map(CompletableFuture::join)
.collect(Collectors.toList());
}

public Stream<CompletableFuture<String>> findPricesStream(String product) {

return shops.stream()
// 以异步方式取得每个shop中指定产品的原始价格, 返回值 Stream<CompletableFuture<String>>
.map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(product), executor))
// 不需要异步, 因为只是price+折扣字符串->Quota对象,返回 Stream<CompletableFuture<Quote>>
.map(future -> future.thenApply(Quote::parse)) // 取future中的string进行解析.
// 使用另一个异步任务构造期望的Future,申请折扣
.map(future -> future.thenCompose(quote -> CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote), executor)));
}

// 执行时间
[BestPrice price is 204.78, LetsSaveBig price is 190.85, MyFavoriteShop price is 128.92, BuyItAll price is 140.31, ShopEasy price is 166.1]
composed CompletableFuture done in 2009 msecs
  • 执行流程

java8-completableFuture-2.png-47.8kB

整合两个无依赖的 CompletableFuture 对象

需求:一家商店提供的价格是以欧元(EUR)计价的,但是希望以美元的方式提供给的客户。可以用异步的方式向商店查询指定商品的价格,同时从远程的汇率服务那里查到欧元和美元之间的汇率。当二者都结束时,再将这两个结果结合起来,用返回的商品价格乘以当时的汇率,得到以美元计价的商品价格。用这种方式,你需要使用第三个CompletableFuture 对象 , 当前两个CompletableFuture 计算出结果 ,并由BiFunction方法完成合并后,由它来最终结束这一任务

  • 上一个例子中,对一个CompletableFuture对象调用了thenCompose方法,并向其传递了第二个CompletableFuture ,而第二个CompletableFuture又需要使用第一个CompletableFuture的执行结果作为输入。
  • 另一种比较常见的情况是,需要将两个完全不相干的CompletableFuture对象的结果整合起来,而且没有必要等到第一个任务完全结束才开始第二项任务。这种情况下应该使用thenCombine方法,它接收名为BiFunction的第二个参数:定义了当两个CompletableFuture对象完成计算后,结果如何合并
  • thenCombine同样有一个异步版本thenCombineAsync,使用thenCombineAsync会导致BiFunction中定义的合并操作被提交到线程池中,由另一个任务以异步的方式执行。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
// 实现1
public List<String> findPricesInUSD(String product) {
List<CompletableFuture<Double>> priceFutures = new ArrayList<>();
for (Shop shop : shops) {
// Start of Listing 10.20.
// Only the type of futurePriceInUSD has been changed to
// CompletableFuture so that it is compatible with the
// CompletableFuture::join operation below.
CompletableFuture<Double> futurePriceInUSD =
// 创建第一个任务查询商店取得商品的价格
CompletableFuture.supplyAsync(() -> shop.getPrice(product))
// 没有使用异步版本的thenCombineAsync, 因为组合计算直接可以利用第一个获取完价格的线程来执行.
.thenCombine(
// 因为不依赖于上一个CompletableFuture的值,所以不需要一个额外的Lambda,参见上一个例子,
// 创建第二个独立任务,查询美元和欧元之间的转换汇率
CompletableFuture.supplyAsync(
() -> ExchangeService.getRate(ExchangeService.Money.EUR, ExchangeService.Money.USD)),
(price, rate) -> price * rate // 组合计算.
);
priceFutures.add(futurePriceInUSD);
}
// Drawback: The shop is not accessible anymore outside the loop,
// so the getName() call below has been commented out.
List<String> prices = priceFutures
.stream()
.map(CompletableFuture::join)
.map(price -> /*shop.getName() +*/ " price is " + price)
.collect(Collectors.toList());
return prices;
}

// 实现2
public List<String> findPricesInUSD2(String product) {
List<CompletableFuture<String>> priceFutures = new ArrayList<>();
for (Shop shop : shops) {
// Here, an extra operation has been added so that the shop name
// is retrieved within the loop. As a result, we now deal with
// CompletableFuture<String> instances.
CompletableFuture<String> futurePriceInUSD =
CompletableFuture.supplyAsync(() -> shop.getPrice(product))
.thenCombine(
CompletableFuture.supplyAsync(
() -> ExchangeService.getRate(ExchangeService.Money.EUR, ExchangeService.Money.USD)),
(price, rate) -> price * rate
).thenApply(price -> shop.getName() + " price is " + price);
priceFutures.add(futurePriceInUSD);
}
List<String> prices = priceFutures
.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
return prices;
}
// 实现3
public List<String> findPricesInUSD3(String product) {
// Here, the for loop has been replaced by a mapping function...
Stream<CompletableFuture<String>> priceFuturesStream = shops
.stream()
.map(shop -> CompletableFuture
.supplyAsync(() -> shop.getPrice(product))
.thenCombine(
CompletableFuture.supplyAsync(() -> ExchangeService.getRate(ExchangeService.Money.EUR, ExchangeService.Money.USD)),
(price, rate) -> price * rate)
.thenApply(price -> shop.getName() + " price is " + price));
// However, we should gather the CompletableFutures into a List so that the asynchronous
// operations are triggered before being "joined."
List<CompletableFuture<String>> priceFutures = priceFuturesStream.collect(Collectors.toList());
List<String> prices = priceFutures
.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
return prices;
}
  • 执行流程

java8-completablefuture-async-2.png-74.3kB

简要回顾Future 和 CompletableFuture

  • CompletableFuture利用Lambda表达式以声明式的API提供了一种机制,能够用最有效的方式,非常容易地将多个以同步或异步方式执行复杂操作的任务结合到一起。
  • Java 7中提供的特性实现带汇率的价格计算,以下实现同上一个实现效果相同,但是明显CompletableFuture更加简洁.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public List<String> findPricesInUSDJava7(String product) {
// 创建一个ExecutorService将任务提交到线程池
ExecutorService executor = Executors.newCachedThreadPool();
List<Future<Double>> priceFutures = new ArrayList<>();
for (Shop shop : shops) {
// 创建一个查询欧元到美元转换汇率的Future
final Future<Double> futureRate = executor.submit(new Callable<Double>() {
public Double call() {
return ExchangeService.getRate(ExchangeService.Money.EUR, ExchangeService.Money.USD);
}
});

// 在第二个Future中查询指定商店中特定商品的价格
Future<Double> futurePriceInUSD = executor.submit(new Callable<Double>() {
public Double call() {
try {
double priceInEUR = shop.getPrice(product);
return priceInEUR * futureRate.get(); // 在同一个线程中进行计算.
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e.getMessage(), e);
}
}
});
priceFutures.add(futurePriceInUSD);
}
List<String> prices = new ArrayList<>();
for (Future<Double> priceFuture : priceFutures) {
try {
// 获取异步计算的结果.
prices.add(/*shop.getName() +*/ " price is " + priceFuture.get());
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
}
}
return prices;
}

响应 CompletableFuture 的 completion 事件

注:以上有示例代码都是通过在响应之前添加1秒钟的等待延迟(delay)模拟方法的远程调用。现在模拟一下随机时延.
需求:之前实现的findPrices方法只有在取得所有商店的返回值时才显示商品的价格。而现在希望的效果是,只要有商店返回商品价格就在第一时间显示返回值,不再等待那些还未返回的商店(有些甚至会发生超时)。

对最佳价格查询器应用的优化

  • 要完成以上的目标,要避免等待一个包含了所有价格的List创建完成。应该做的是直接处理CompletableFuture流,这样每个CompletableFuture都在为某个商店执行必要的操作。
  • 第4个map利用thenAccept操作在每个CompletableFuture(第三个map操作后stream中的值)上注册一个操作,该操作会在这些CompletableFuture完成执行后使用它的返回值(这里是一个String) 。
  • thenAccept vs thenAcceptAsync。异步版本的方法会对处理结果的消费者进行调度,从线程池中选择一个新的线程继续执行,不再由同一个线程完成CompletableFuture的所有任务。
  • thenAccept方法定义了如何处理CompletableFuture返回的结果,一旦CompletableFuture计算得到结果,它就返回一个CompletableFuture<Void>。所以,最后一个map操作返回的是一个Stream<CompletableFuture<Void>>。对这个<CompletableFuture<Void>>对象,你能做的事非常有限,只能等待其运行结束,不过这也是你所期望的。你还希望能给最慢的商店一些机会,让它有机会打印输出返回的价格。为了实现这一目的,你可以把构成Stream的所有CompletableFuture对象放到一个数组中,等待所有的任务执行完成
  • allOf工厂方法接收一个由CompletableFuture构成的数组,数组中的所有CompletableFuture对象执行完成之后,它返回一个CompletableFuture<Void>对象。如果你需要等待最初Stream中的所有CompletableFuture对象执行完毕,需要对allOf方法返回的CompletableFuture执行join操作。
  • 如果需要CompletableFuture对象数组中有任何一个执行完毕就不再等待,在这种情况下,你可以使用一个类似的工厂方法anyOf。该方法接收一个CompletableFuture对象构成的数组,返回由第一个执行完毕的CompletableFuture对象的返回值构成的CompletableFuture<Object>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public Stream<CompletableFuture<String>> findPricesStream(String product) {

return shops.stream()
// 以异步方式取得每个shop中指定产品的原始价格, 返回值 Stream<CompletableFuture<String>>
.map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(product), executor))
// 不需要异步, 因为只是price+折扣字符串->Quota对象,返回 Stream<CompletableFuture<Quote>>
.map(future -> future.thenApply(Quote::parse)) // 取future中的string进行解析.
// 使用另一个异步任务构造期望的Future,申请折扣
// Discount.apply的实现: return quote.getShopName() + " price is " + Discount.apply(quote.getPrice(), quote.getDiscountCode());
.map(future -> future.thenCompose(quote -> CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote), executor)));
}

// 对返回的Stream<CompletableFuture<String>>进行新增一个map操作,在每个CompletableFuture上注册一个操作.
public void printPricesStream(String product) {
long start = System.nanoTime();
CompletableFuture[] futures = findPricesStream(product)
// 在每个CompletableFuture上注册一个操作,该操作会在CompletableFuture完成执行后使用它的返回值 。
// thenAccept方法接收CompletableFuture执行完毕后的返回值做参数。即接受的是一个字符串.
.map(f -> f.thenAccept(s -> System.out.println(s + " (done in " + ((System.nanoTime() - start) / 1_000_000) + " msecs)")))
.toArray(size -> new CompletableFuture[size]);
CompletableFuture.allOf(futures).join();
System.out.println("All shops have now responded in " + ((System.nanoTime() - start) / 1_000_000) + " msecs");
}

// 执行结果示意
BestPrice price is 127.88 (done in 2024 msecs)
LetsSaveBig price is 147.21 (done in 2025 msecs)
MyFavoriteShop price is 119.11 (done in 2025 msecs)
ShopEasy price is 224.23 (done in 2026 msecs)
BuyItAll price is 111.53 (done in 2026 msecs)
All shops have now responded in 2026 msecs

小结

  • 执行比较耗时的操作时,尤其是那些依赖一个或多个远程服务的操作,使用异步任务可以改善程序的性能,加快程序的响应速度。
  • 使用CompletableFuture类提供的特性尽可能地为客户提供异步API。
  • CompletableFuture类提供了异常管理的机制,可以抛出/管理异步任务执行中发生的异常。
  • 将同步API的调用封装到一个CompletableFuture中,你能够以异步的方式使用其结果
  • 如果异步任务之间相互独立,或者它们之间某一些的结果是另一些的输入,你可以将这些异步任务构造或者合并成一个。
  • 你可以为CompletableFuture注册(accept)一个回调函数,在Future执行完毕或者它们计算的结果可用时,针对性地执行一些程序。
  • 你可以决定在什么时候结束程序的运行,是等待由CompletableFuture对象构成的列表中所有的对象都执行完毕,还是只要其中任何一个首先完成就中止程序的运行。