Future
# Callable&Future&FutureTask介绍
直接继承Thread或者实现Runnable接口都可以创建线程,但是这两种方法都有一个问题就是:没有返回值,也就是不能获取执行完的结果。因此java1.5就提供了Callable接口来实现这一场景,而Future和FutureTask就可以和Callable接口配合起来使用。
# Callable和Runnable的区别
Runnable 的缺陷:
- 不能返回一个返回值
- 不能抛出 checked Exception
Callable的call方法可以有返回值,可以声明抛出异常。和 Callable 配合的有一个 Future 类,通过 Future 可以了解任务执行情况,或者取消任务的执行,还可获取任务执行的结果,这些功能都是 Runnable 做不到的,Callable 的功能要比 Runnable 强大。
# Future
# 主要功能
Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果。
# 获得异常
Future.get 里面3个异常中有一个ExecutionException就是业务异常
# Future 注意事项
- 当 for 循环批量获取 Future 的结果时容易 block,get 方法调用时应使用 timeout 限制
- Future 的生命周期不能后退。一旦完成了任务,它就永久停在了“已完成”的状态,不能从头再来
使用Callable 和Future 是不会产生新的线程了吗,底层就是Thread.start()
# 利用 FutureTask 创建 Future
Future实际采用FutureTask实现,该对象相当于是消费者和生产者的桥梁,消费者通过 FutureTask 存储任务的处理结果,更新任务的状态:未开始、正在处理、已完成等。而生产者拿到的 FutureTask 被转型为 Future 接口,可以阻塞式获取任务的处理结果,非阻塞式获取任务处理状态。
FutureTask既可以被当做Runnable来执行,也可以被当做Future来获取Callable的返回结果。
FutureTask的用法及两种常用的使用场景 (opens new window)
# CompletionService
# 大致原理
内部通过阻塞队列+FutureTask,实现了任务先完成可优先获取到,即结果按照完成先后顺序排序,内部有一个先进先出的阻塞队列,用于保存已经执行完成的Future,通过调用它的take方法或poll方法可以获取到一个已经执行完成的Future,进而通过调用Future接口实现类的get方法获取最终的结果
# 应用场景:
比如: Dubbo 中有一种叫做 Forking 的集群模式,这种集群模式下,支持并行地调用多个服务实例,只要有一个成功就返回结果。
- 当需要批量提交异步任务的时候建议你使用CompletionService。CompletionService将线程池Executor和阻塞队列BlockingQueue的功能融合在了一起,能够让批量异步任务的管理更简单。
- CompletionService能够让异步任务的执行结果有序化。先执行完的先进入阻塞队列,利用这个特性,你可以轻松实现后续处理的有序性,避免无谓的等待,同时还可以快速实现诸如Forking Cluster这样的需求。
- 线程池隔离。CompletionService支持自己创建线程池,这种隔离性能避免几个特别耗时的任务拖垮整个应用的风险。
# CompletableFuture
# 简介
类似于Javascript 的Promise 的功能,用来弥补Future的不足,CompletableFuture是Future接口的扩展和增强。
简单的任务,用Future获取结果还好,但我们并行提交的多个异步任务,往往并不是独立的,**很多时候业务逻辑处理存在串行[依赖]、并行、聚合的关系。**如果要我们手动用 Fueture 实现,是非常麻烦的。
CompletableFuture是Future接口的扩展和增强。借助这项能力,我们可以轻松地组织不同任务的运行顺序、规则以及方式。从某种程度上说,这项能力是它的核心能力。而在以往,虽然通过CountDownLatch等工具类也可以实现任务的编排,但需要复杂的逻辑处理,不仅耗费精力且难以维护。
# Future的主要缺点如下:
(1)不支持手动完成
这个意思指的是,我提交了一个任务,但是执行太慢了,我通过其他路径已经获取到了任务结果,现在没法把这个任务结果,通知到正在执行的线程,所以必须主动取消或者一直等待它执行完成。
就是需求,一个Future完成后,要执行其他的任务。
用get就阻塞了主线程??? low吧 那么定义2个Future,再一个执行完后再执行??? 麻烦吧,就想烧水泡茶的代码一样?? 也麻烦吧。 那么能不能做一个labm来做回调呢??? 这就是CompletableFuture
(2)不支持进一步的非阻塞调用
这个指的是我们通过Future的get方法会一直阻塞到任务完成,但是我还想在获取任务之后,执行额外的任务,因为Future不支持回调函数,所以无法实现这个功能。
(3)不支持链式调用
这个指的是对于Future的执行结果,我们想继续传到下一个Future处理使用,从而形成一个链式的pipline调用,这在Future中是没法实现的。
(4)不支持多个Future合并
比如我们有10个Future并行执行,我们想在所有的Future运行完毕之后,执行某些函数,是没法通过Future实现的。
(5)不支持异常处理
Future的API没有任何的异常处理的api,所以在异步运行时,如果出了问题是不好定位的。
Future的get() 也是可以处理的。
# CompletionStage
执行某一个阶段,可向下执行后续阶段。异步执行,默认线程池是ForkJoinPool.commonPool()
# 应用场景
描述依赖关系:
- thenApply() 把前面异步任务的结果,交给后面的Function
- thenCompose()用来连接两个有依赖关系的任务,结果由第二个任务返回
描述and聚合关系:
- thenCombine:任务合并,有返回值
- thenAccepetBoth:两个任务执行完成后,将结果交给thenAccepetBoth消耗,无返回值。
- runAfterBoth:两个任务都执行完成后,执行下一步操作(Runnable)。
描述or聚合关系:
- applyToEither:两个任务谁执行的快,就使用那一个结果,有返回值。
- acceptEither: 两个任务谁执行的快,就消耗那一个结果,无返回值。
- runAfterEither: 任意一个任务执行完成,进行下一步操作(Runnable)。
并行执行:
CompletableFuture类自己也提供了anyOf()和allOf()用于支持多个CompletableFuture并行执行
# 参考资料
Java并发编程Future超详细教程 (opens new window)
理解Java8里面CompletableFuture异步编程 (opens new window)
Callable抛出异常与future.get (opens new window)
future.get 获得异常.