【Kotlin】协程的取消中断与异常处理

本文介绍了Kotlin协程的取消机制和异常处理方案
文章源码和介绍来自Kotlin官方网站
协程的取消
在一个长时间运行的应用程序中,你也许需要对你的后台协程进行细粒度的控制。 比如说,一个用户也许关闭了一个启动了协程的界面,那么现在协程的执行结果已经不再被需要了,这时,它应该是可以被取消的。
该 launch 函数返回了一个可以被用来取消运行中的协程的 Job:
import kotlinx.coroutines.*
fun main() = runBlocking {
//sampleStart
val job = launch {
repeat(1000) { i ->
println("job: I'm sleeping $i ...")
delay(500L)
}
}
delay(1300L) // 延迟一段时间
println("main: I'm tired of waiting!")
job.cancel() // 取消该作业
job.join() // 等待作业执行结束
println("main: Now I can quit.")
//sampleEnd
}
程序执行后的输出如下:
job: I’m sleeping 0 … job: I’m sleeping 1 … job: I’m sleeping 2 … main: I’m tired of waiting! main: Now I can quit.
一旦 main 函数调用了 job.cancel,我们在其它的协程中就看不到任何输出,因为它被取消了。 这里也有一个可以使 Job 挂起的函数 cancelAndJoin 它合并了对 cancel 以及 join 的调用。
取消是协作的
协程的取消是 协作 的。一段协程代码必须协作才能被取消。 所有 kotlinx.coroutines 中的挂起函数都是 可被取消的 。它们检查协程的取消, 并在取消时抛出 CancellationException。 然而,如果协程正在执行计算任务,并且没有检查取消的话,那么它是不能被取消的,就如如下示例代码所示:
import kotlinx.coroutines.*
fun main() = runBlocking {
//sampleStart
val startTime = System.currentTimeMillis()
val job = launch(Dispatchers.Default) {
var nextPrintTime = startTime
var i = 0
while (i < 5) { // 一个执行计算的循环,只是为了占用 CPU
// 每秒打印消息两次
if (System.currentTimeMillis() >= nextPrintTime) {
println("job: I'm sleeping ${i++} ...")
nextPrintTime += 500L
}
}
}
delay(1300L) // 等待一段时间
println("main: I'm tired of waiting!")
job.cancelAndJoin() // 取消一个作业并且等待它结束
println("main: Now I can quit.")
//sampleEnd
}
运行示例代码,并且我们可以看到它连续打印出了“I’m sleeping”,甚至在调用取消后, 作业仍然执行了五次循环迭代并运行到了它结束为止。
The same problem can be observed by catching a CancellationException and not rethrowing it:
import kotlinx.coroutines.*
fun main() = runBlocking {
//sampleStart
val job = launch(Dispatchers.Default) {
repeat(5) { i ->
try {
// print a message twice a second
println("job: I'm sleeping $i ...")
delay(500)
} catch (e: Exception) {
// log the exception
println(e)
}
}
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")
//sampleEnd
}
While catching Exception is an anti-pattern, this issue may surface in more subtle ways, like when using the runCatching function, which does not rethrow CancellationException.
使计算代码可取消
我们有两种方法来使执行计算的代码可以被取消。第一种方法是定期调用挂起函数来检查取消。对于这种目的 yield 是一个好的选择。 另一种方法是显式的检查取消状态。让我们试试第二种方法。
将前一个示例中的 while (i < 5) 替换为 while (isActive) 并重新运行它。
import kotlinx.coroutines.*
fun main() = runBlocking {
//sampleStart
val startTime = System.currentTimeMillis()
val job = launch(Dispatchers.Default) {
var nextPrintTime = startTime
var i = 0
while (isActive) { // 可以被取消的计算循环
// 每秒打印消息两次
if (System.currentTimeMillis() >= nextPrintTime) {
println("job: I'm sleeping ${i++} ...")
nextPrintTime += 500L
}
}
}
delay(1300L) // 等待一段时间
println("main: I'm tired of waiting!")
job.cancelAndJoin() // 取消该作业并等待它结束
println("main: Now I can quit.")
//sampleEnd
}
你可以看到,现在循环被取消了。isActive 是一个可以被使用在 CoroutineScope 中的扩展属性。
在 finally 中释放资源
我们通常使用如下的方法处理在被取消时抛出 CancellationException 的可被取消的挂起函数。比如说,try {……} finally {……} 表达式以及 Kotlin 的 use 函数一般在协程被取消的时候执行它们的终结动作:
import kotlinx.coroutines.*
fun main() = runBlocking {
//sampleStart
val job = launch {
try {
repeat(1000) { i ->
println("job: I'm sleeping $i ...")
delay(500L)
}
} finally {
println("job: I'm running finally")
}
}
delay(1300L) // 延迟一段时间
println("main: I'm tired of waiting!")
job.cancelAndJoin() // 取消该作业并且等待它结束
println("main: Now I can quit.")
//sampleEnd
}
join 和 cancelAndJoin 等待了所有的终结动作执行完毕, 所以运行示例得到了下面的输出:
job: I’m sleeping 0 … job: I’m sleeping 1 … job: I’m sleeping 2 … main: I’m tired of waiting! job: I’m running finally main: Now I can quit.
运行不能取消的代码块
在前一个例子中任何尝试在 finally 块中调用挂起函数的行为都会抛出 CancellationException,因为这里持续运行的代码是可以被取消的。通常,这并不是一个问题,所有良好的关闭操作(关闭一个文件、取消一个作业、或是关闭任何一种通信通道)通常都是非阻塞的,并且不会调用任何挂起函数。然而,在真实的案例中,当你需要挂起一个被取消的协程,你可以将相应的代码包装在 withContext(NonCancellable) {……} 中,并使用 withContext 函数以及 NonCancellable 上下文,见如下示例所示:
import kotlinx.coroutines.*
fun main() = runBlocking {
//sampleStart
val job = launch {
try {
repeat(1000) { i ->
println("job: I'm sleeping $i ...")
delay(500L)
}
} finally {
withContext(NonCancellable) {
println("job: I'm running finally")
delay(1000L)
println("job: And I've just delayed for 1 sec because I'm non-cancellable")
}
}
}
delay(1300L) // 延迟一段时间
println("main: I'm tired of waiting!")
job.cancelAndJoin() // 取消该作业并等待它结束
println("main: Now I can quit.")
//sampleEnd
}
超时
在实践中绝大多数取消一个协程的理由是它有可能超时。 当你手动追踪一个相关 Job 的引用并启动了一个单独的协程在延迟后取消追踪,这里已经准备好使用 withTimeout 函数来做这件事。 来看看示例代码:
import kotlinx.coroutines.*
fun main() = runBlocking {
//sampleStart
withTimeout(1300L) {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
}
//sampleEnd
}
运行后得到如下输出:
I’m sleeping 0 … I’m sleeping 1 … I’m sleeping 2 … Exception in thread “main” kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 1300 ms
withTimeout 抛出了 TimeoutCancellationException,它是 CancellationException 的子类。 我们之前没有在控制台上看到堆栈跟踪信息的打印。这是因为在被取消的协程中 CancellationException 被认为是协程执行结束的正常原因。 然而,在这个示例中我们在 main 函数中正确地使用了 withTimeout。
由于取消只是一个例外,所有的资源都使用常用的方法来关闭。 如果你需要做一些各类使用超时的特别的额外操作,可以使用类似 withTimeout 的 withTimeoutOrNull 函数,并把这些会超时的代码包装在 try {…} catch (e: TimeoutCancellationException) {…} 代码块中,而 withTimeoutOrNull 通过返回 null 来进行超时操作,从而替代抛出一个异常:
import kotlinx.coroutines.*
fun main() = runBlocking {
//sampleStart
val result = withTimeoutOrNull(1300L) {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
"Done" // 在它运行得到结果之前取消它
}
println("Result is $result")
//sampleEnd
}
运行这段代码时不再抛出异常:
I’m sleeping 0 … I’m sleeping 1 … I’m sleeping 2 … Result is null
异步超时和资源
withTimeout中的超时事件对于在其代码块中运行的代码来说是异步的,可以在任何时间发生,甚至可以在从超时代码块内部返回之前发生。如果您在代码块内打开或获取了某些资源,而这些资源需要在代码块外关闭或释放,请记住这一点。
例如,在这里我们用 Resource 类模仿了一个可关闭的资源,该类只需通过在其关闭函数中递增获取计数器和递减计数器来记录创建次数。现在,让我们创建大量的例行程序,每个例行程序都在 withTimeout 代码块末尾创建一个资源,并在代码块外释放该资源。我们添加了一个小延迟,这样就更有可能在 withTimeout 代码块已经完成时发生超时,从而导致资源泄漏。
import kotlinx.coroutines.*
//sampleStart
var acquired = 0
class Resource {
init { acquired++ } // Acquire the resource
fun close() { acquired-- } // Release the resource
}
fun main() {
runBlocking {
repeat(10_000) { // Launch 10K coroutines
launch {
val resource = withTimeout(60) { // Timeout of 60 ms
delay(50) // Delay for 50 ms
Resource() // Acquire a resource and return it from withTimeout block
}
resource.close() // Release the resource
}
}
}
// Outside of runBlocking all coroutines have completed
println(acquired) // Print the number of resources still acquired
}
//sampleEnd
如果运行上述代码,你会发现它并不总是打印零值,不过这可能取决于你的机器的定时。你可能需要调整本示例中的超时时间,才能真正看到非零值。
需要注意的是,在这里通过 10K 例程对获取的计数器进行递增和递减是完全线程安全的,因为它总是在同一个线程(即 runBlocking 所使用的线程)中进行。关于这一点的更多解释,将在 “例程上下文 ”一章中进行。
要解决这个问题,可以在变量中存储对资源的引用,而不是从 withTimeout 代码块中返回。
import kotlinx.coroutines.*
var acquired = 0
class Resource {
init { acquired++ } // Acquire the resource
fun close() { acquired-- } // Release the resource
}
fun main() {
//sampleStart
runBlocking {
repeat(10_000) { // Launch 10K coroutines
launch {
var resource: Resource? = null // Not acquired yet
try {
withTimeout(60) { // Timeout of 60 ms
delay(50) // Delay for 50 ms
resource = Resource() // Store a resource to the variable if acquired
}
// We can do something else with the resource here
} finally {
resource?.close() // Release the resource if it was acquired
}
}
}
}
// Outside of runBlocking all coroutines have completed
println(acquired) // Print the number of resources still acquired
//sampleEnd
}
本例始终打印 0。资源不会泄漏。
协程的异常处理
本节内容涵盖了异常处理与在异常上取消。 我们已经知道被取消的协程会在挂起点抛出 CancellationException 并且它会被协程的机制所忽略。在这里我们会看看在取消过程中抛出异常或同一个协程的多个子协程抛出异常时会发生什么。
异常的传播
协程构建器有两种形式:自动传播异常(launch)或向用户暴露异常(async 与 produce)。 当这些构建器用于创建一个根协程时,即该协程不是另一个协程的子协程, 前者这类构建器将异常视为未捕获异常,类似 Java 的 Thread.uncaughtExceptionHandler, 而后者则依赖用户来最终消费异常,例如通过 await 或 receive(produce 与 receive 的相关内容包含于通道章节)。
可以通过一个使用 GlobalScope 创建根协程的简单示例来进行演示:
GlobalScope 是一种微妙的应用程序接口,可能会产生非同小可的反作用。为整个应用程序创建根例行程序是 GlobalScope 罕见的合法用途之一,因此您必须通过 @OptIn(DelicateCoroutinesApi::class)明确选择使用 GlobalScope。
import kotlinx.coroutines.*
//sampleStart
@OptIn(DelicateCoroutinesApi::class)
fun main() = runBlocking {
val job = GlobalScope.launch { // launch 根协程
println("Throwing exception from launch")
throw IndexOutOfBoundsException() // 我们将在控制台打印 Thread.defaultUncaughtExceptionHandler
}
job.join()
println("Joined failed job")
val deferred = GlobalScope.async { // async 根协程
println("Throwing exception from async")
throw ArithmeticException() // 没有打印任何东西,依赖用户去调用等待
}
try {
deferred.await()
println("Unreached")
} catch (e: ArithmeticException) {
println("Caught ArithmeticException")
}
}
//sampleEnd
这段代码的输出如下(调试):
Throwing exception from launch Exception in thread “DefaultDispatcher-worker-1 @coroutine#2” java.lang.IndexOutOfBoundsException Joined failed job Throwing exception from async Caught ArithmeticException
CoroutineExceptionHandler将未捕获异常打印到控制台的默认行为是可自定义的。
根协程中的 CoroutineExceptionHandler 上下文元素可以被用于这个根协程通用的 catch 块,及其所有可能自定义了异常处理的子协程。
它类似于 Thread.uncaughtExceptionHandler 。
你无法从 CoroutineExceptionHandler 的异常中恢复。当调用处理者的时候,协程已经完成并带有相应的异常。通常,该处理者用于记录异常,显示某种错误消息,终止和(或)重新启动应用程序。
CoroutineExceptionHandler 仅在未捕获的异常上调用 — 没有以其他任何方式处理的异常。 特别是,所有子协程(在另一个 Job 上下文中创建的协程)委托它们的父协程处理它们的异常,然后它们也委托给其父协程,以此类推直到根协程, 因此永远不会使用在其上下文中设置的 CoroutineExceptionHandler。 除此之外,async 构建器始终会捕获所有异常并将其表示在结果 Deferred 对象中, 因此它的 CoroutineExceptionHandler 也无效。
在监督作用域内运行的协程不会将异常传播到其父协程,并且会从此规则中排除。本文档的另一个小节——监督提供了更多细节。
import kotlinx.coroutines.*
@OptIn(DelicateCoroutinesApi::class)
fun main() = runBlocking {
//sampleStart
val handler = CoroutineExceptionHandler { _, exception ->
println("CoroutineExceptionHandler got $exception")
}
val job = GlobalScope.launch(handler) { // 根协程,运行在 GlobalScope 中
throw AssertionError()
}
val deferred = GlobalScope.async(handler) { // 同样是根协程,但使用 async 代替了 launch
throw ArithmeticException() // 没有打印任何东西,依赖用户去调用 deferred.await()
}
joinAll(job, deferred)
//sampleEnd
}
这段代码的输出如下:
CoroutineExceptionHandler got java.lang.AssertionError
取消与异常
取消与异常紧密相关。协程内部使用 CancellationException 来进行取消,这个异常会被所有的处理者忽略,所以那些可以被 catch 代码块捕获的异常仅仅应该被用来作为额外调试信息的资源。 当一个协程使用 Job.cancel 取消的时候,它会被终止,但是它不会取消它的父协程。
import kotlinx.coroutines.*
fun main() = runBlocking {
//sampleStart
val job = launch {
val child = launch {
try {
delay(Long.MAX_VALUE)
} finally {
println("Child is cancelled")
}
}
yield()
println("Cancelling child")
child.cancel()
child.join()
yield()
println("Parent is not cancelled")
}
job.join()
//sampleEnd
}
这段代码的输出如下:
Cancelling child Child is cancelled Parent is not cancelled
如果一个协程遇到了 CancellationException 以外的异常,它将使用该异常取消它的父协程。 这个行为无法被覆盖,并且用于为结构化的并发(structured concurrency) 提供稳定的协程层级结构。 CoroutineExceptionHandler 的实现并不是用于子协程。
在这些示例中,CoroutineExceptionHandler 总是被设置在由 GlobalScope 启动的协程中。将异常处理者设置在 runBlocking 主作用域内启动的协程中是没有意义的,尽管子协程已经设置了异常处理者, 但是主协程也总是会被取消的。
当父协程的所有子协程都结束后,原始的异常才会被父协程处理, 见下面这个例子。
import kotlinx.coroutines.*
@OptIn(DelicateCoroutinesApi::class)
fun main() = runBlocking {
//sampleStart
val handler = CoroutineExceptionHandler { _, exception ->
println("CoroutineExceptionHandler got $exception")
}
val job = GlobalScope.launch(handler) {
launch { // 第一个子协程
try {
delay(Long.MAX_VALUE)
} finally {
withContext(NonCancellable) {
println("Children are cancelled, but exception is not handled until all children terminate")
delay(100)
println("The first child finished its non cancellable block")
}
}
}
launch { // 第二个子协程
delay(10)
println("Second child throws an exception")
throw ArithmeticException()
}
}
job.join()
//sampleEnd
}
这段代码的输出如下:
Second child throws an exception Children are cancelled, but exception is not handled until all children terminate The first child finished its non cancellable block CoroutineExceptionHandler got java.lang.ArithmeticException
异常聚合
当协程的多个子协程因异常而失败时, 一般规则是“取第一个异常”,因此将处理第一个异常。 在第一个异常之后发生的所有其他异常都作为被抑制的异常绑定至第一个异常。
import kotlinx.coroutines.*
import java.io.*
@OptIn(DelicateCoroutinesApi::class)
fun main() = runBlocking {
val handler = CoroutineExceptionHandler { _, exception ->
println("CoroutineExceptionHandler got $exception with suppressed ${exception.suppressed.contentToString()}")
}
val job = GlobalScope.launch(handler) {
launch {
try {
delay(Long.MAX_VALUE) // 当另一个同级的协程因 IOException 失败时,它将被取消
} finally {
throw ArithmeticException() // 第二个异常
}
}
launch {
delay(100)
throw IOException() // 首个异常
}
delay(Long.MAX_VALUE)
}
job.join()
}
这段代码的输出如下:
CoroutineExceptionHandler got java.io.IOException with suppressed [java.lang.ArithmeticException]
注意,这个机制当前只能在 Java 1.7 以上的版本中使用。 在 JS 和原生环境下暂时会受到限制,但将来会取消。
取消异常是透明的,默认情况下是未包装的:
import kotlinx.coroutines.*
import java.io.*
@OptIn(DelicateCoroutinesApi::class)
fun main() = runBlocking {
//sampleStart
val handler = CoroutineExceptionHandler { _, exception ->
println("CoroutineExceptionHandler got $exception")
}
val job = GlobalScope.launch(handler) {
val innerJob = launch { // 该栈内的协程都将被取消
launch {
launch {
throw IOException() // 原始异常
}
}
}
try {
innerJob.join()
} catch (e: CancellationException) {
println("Rethrowing CancellationException with original cause")
throw e // 取消异常被重新抛出,但原始 IOException 得到了处理
}
}
job.join()
//sampleEnd
}
这段代码的输出如下:
Rethrowing CancellationException with original cause CoroutineExceptionHandler got java.io.IOException
监督
正如我们之前研究的那样,取消是在协程的整个层次结构中传播的双向关系。让我们看一下需要单向取消的情况。
此类需求的一个良好示例是在其作用域内定义作业的 UI 组件。如果任何一个 UI 的子作业执行失败了,它并不总是有必要取消(有效地杀死)整个 UI 组件, 但是如果 UI 组件被销毁了(并且它的作业也被取消了),由于其结果不再需要了,因此有必要取消所有子作业。
另一个例子是服务进程孵化了一些子作业并且需要 监督 它们的执行,追踪它们的故障并在这些子作业执行失败的时候重启。
SupervisorJob
SupervisorJob 可以用于这些目的。 它类似于常规的 Job,唯一的不同是:SupervisorJob 的取消只会向下传播。这是很容易用以下示例演示:
import kotlinx.coroutines.*
fun main() = runBlocking {
//sampleStart
val supervisor = SupervisorJob()
with(CoroutineScope(coroutineContext + supervisor)) {
// 启动第一个子作业——这个示例将会忽略它的异常(不要在实践中这么做!)
val firstChild = launch(CoroutineExceptionHandler { _, _ -> }) {
println("The first child is failing")
throw AssertionError("The first child is cancelled")
}
// 启动第二个子作业
val secondChild = launch {
firstChild.join()
// 取消了第一个子作业且没有传播给第二个子作业
println("The first child is cancelled: ${firstChild.isCancelled}, but the second one is still active")
try {
delay(Long.MAX_VALUE)
} finally {
// 但是取消了监督的传播
println("The second child is cancelled because the supervisor was cancelled")
}
}
// 等待直到第一个子作业失败且执行完成
firstChild.join()
println("Cancelling the supervisor")
supervisor.cancel()
secondChild.join()
}
//sampleEnd
}
这段代码的输出如下:
The first child is failing The first child is cancelled: true, but the second one is still active Cancelling the supervisor The second child is cancelled because the supervisor was cancelled
监督作用域
对于作用域的并发,可以用 supervisorScope 来替代 coroutineScope 来实现相同的目的。它只会单向的传播并且当作业自身执行失败的时候将所有子作业全部取消。作业自身也会在所有的子作业执行结束前等待, 就像 coroutineScope 所做的那样。
import kotlin.coroutines.*
import kotlinx.coroutines.*
fun main() = runBlocking {
//sampleStart
try {
supervisorScope {
val child = launch {
try {
println("The child is sleeping")
delay(Long.MAX_VALUE)
} finally {
println("The child is cancelled")
}
}
// 使用 yield 来给我们的子作业一个机会来执行打印
yield()
println("Throwing an exception from the scope")
throw AssertionError()
}
} catch(e: AssertionError) {
println("Caught an assertion error")
}
//sampleEnd
}
这段代码的输出如下:
The child is sleeping Throwing an exception from the scope The child is cancelled Caught an assertion error
监督协程中的异常
常规的作业和监督作业之间的另一个重要区别是异常处理。 监督协程中的每一个子作业应该通过异常处理机制处理自身的异常。 这种差异来自于子作业的执行失败不会传播给它的父作业的事实。 这意味着在 supervisorScope 内部直接启动的协程确实使用了设置在它们作用域内的 CoroutineExceptionHandler,与父协程的方式相同 (参见 CoroutineExceptionHandler 小节以获知更多细节)。
import kotlin.coroutines.*
import kotlinx.coroutines.*
fun main() = runBlocking {
//sampleStart
val handler = CoroutineExceptionHandler { _, exception ->
println("CoroutineExceptionHandler got $exception")
}
supervisorScope {
val child = launch(handler) {
println("The child throws an exception")
throw AssertionError()
}
println("The scope is completing")
}
println("The scope is completed")
//sampleEnd
}
这段代码的输出如下:
The scope is completing The child throws an exception CoroutineExceptionHandler got java.lang.AssertionError The scope is completed