kotlin 的 Coroutine 是【结构化并发】,与结构化并发对应的方式是【fire-and-forget 】姑且称之为【离散性并发】吧,可能不太准确。一个例子解释下离散性并发,java 里我们开启一个线程之后,是不具备跟踪管理这个线程的能力的。如下
public void javaThreadFun() {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
//do some work
}
});
thread.setName("child-thread");
thread.start();
}
这个例子中,调用javaThreadFun()方法所在的线程,创建并启动child-thread线程之后两个线程没有明确的父子关系,javaThreadFun()方法所在的线程不能天然的感知在自己线程里启动的"子线程",子线程发生异常之后也不会影响到自己。如果父线程要取消中止在自己线程里启动的那些线程也没有现成的方式去供使用。总之,层级关系管理上很离散。
kotlin的协程天然的具备父协程管理取消子协程、子协程的异常失败影响父协程或者父协程感知子协程错误和失败的能力。如下示例
GlobalScope.launch {
val parentJob = launch {
val childJob = launch {
delay(1_000)//子任务做一些事情
throw NullPointerException() //会导致父协程任务和兄弟协程任务都会被取消
}
delay(5_000)
}
}
与传统的相比
这一章节我们展开聊下Kotlin协程的取消机制,上一节我们提到,父协程/作用域的取消也会取消其子协程我们看个例子。
GlobalScope.launch {
val mParentJOb: Job = this.launch {
val child1Job: Job = this.launch {
this.launch {
delay(300)
}.invokeOnCompletion { throwable ->
println("child1Job 执行完毕,收到了${throwable}")
}
val child2Job = this.launch {
delay(500)
}.invokeOnCompletion { throwable ->
println("child2Job 执行完毕,收到了${throwable}")
}
}
delay(100)
}
mParentJOb.invokeOnCompletion { throwable ->
println("mParentJOb 执行完毕,收到了${throwable}")
}
println("发起取消 mParentJOb")
mParentJOb.cancel()
}.join()
运行结果:
发起取消 mParentJOb
child1Job 执行完毕,收到了kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job=StandaloneCoroutine{Cancelling}@100b06de
child2Job 执行完毕,收到了kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job=StandaloneCoroutine{Cancelling}@100b06de
mParentJOb 执行完毕,收到了kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job=StandaloneCoroutine{Cancelled}@100b06de
private suspend fun brotherCoroutine() {
coroutineScope {
launch {
delay(500)
println("is running")
}
launch {
delay(100)
cancel()
}.invokeOnCompletion {
println("job2 is canceled")
}
}
}
这似乎没有什么可解释的,某个协程的取消并不会影响到其兄弟协程。
协程的取消是协作式的体现在,对取消的通知需要主动的感知然后做出处理。举个例子
private suspend fun coroutineCanceling() {
coroutineScope {
val job = launch {
var i = 0
while (true) {//1
println(" is running ${i++}")
}
}
job.invokeOnCompletion {
println("job is completion ${it}")
}
delay(50)
job.cancel()
}
}
会发现上面这个段代码并不能被取消,原因就是协程并没有感知到自己已经被取消了。这一点跟java thead interrupt机制类似,需要我们感知取消。感知取消的方式有
协程取消后我们可能会做一些诸如回收资源的动作,但在一个已经处于取消状态的协程里再调用suspend方法就抛出CancellationException异常。此时我们要使用 withContext(NonCancellable) 做取消后的工作
private suspend fun handleCanceling() {
coroutineScope {
val job = launch {
try {
delay(100)//do Something
} finally {
withContext(NonCancellable) {
delay(100)
}
}
}
job.invokeOnCompletion {
println("job is completion ${it}")
}
delay(50)
job.cancel()
}
}
另外,还有特别注意的一点是,被取消的协程会向外抛出异常如果使用try-catch捕获但不抛出异常CancellationException,会影响到异常的传播,也就破坏了协程的异常传播机制,具体下一节异常传播机制展开。
看下面这段代码,思考一个问题,2处字符串会被打印出出来吗,为什么?
private suspend fun parentChildStructTest() {
coroutineScope {
val job1 = launch {
val job2 = launch(Job()) {//1
delay(500)
println("job2 is finish")//2
}
delay(100)
this.cancel()
}
}
}
会打印,不知道你有没有答对。
不是说好的,取消父协程的时候会取消掉其子协程吗?而且子协程里还调用了delay()方式,也会响应取消。问题的关键点在于,job1和job2的父子结构被破坏了。示例代码里1处传入了一个Job对象,此时job2的父层级已经变成了传入的job对象。我们稍加改造下,这里只是为了理解,不建议这么用,会发现job2可以被取消了。
private suspend fun parentChildStructTest() {
coroutineScope {
val job1 = launch {
val j = Job()
val job2 = launch(j) {
delay(500)
println("job2 is finish")
}.invokeOnCompletion {
println("job2 OnCompletion $it")
}
delay(100)
j.cancel() //1
}
}
}
新协程的context的组成有两个公式
parentContext = scopeContext + AddionalContext(launch方法传入的context)
childContext = parentConxtext + job(新建)
(图来自[Roman Elizarov])
当我们使用coroutineScope.launch(Job()){}传入了一个job实例的时候,其实子协程的job和传入的job实例建立了父子结构,破坏了原本的父子结构。
private suspend fun destroyCoroutineScope() {
coroutineScope {
launch {
launch {
delay(500)
throw NullPointerException()
}.invokeOnCompletion {
println("job-1 invokeOnCompletion $it")
}
launch {
delay(800)
}.invokeOnCompletion {
println("job-2 invokeOnCompletion $it")
}
}.invokeOnCompletion {
println("job-parent completion $it")
}
}
}
基本表现:使用supervisorScope启动的子协程发生异常时,不影响父协程和兄弟协程。
private suspend fun supervisorJobTest() {
supervisorScope {
launch {
delay(100)
throw NullPointerException()
}
launch {
delay(800)
println("job 2 is running")
}
}
}
如上代码,supervisor范围内第一个job抛出异常后,并不会影响第二个job;把错误异常控制在范围内。
但其他的结构化并发特性仍然存在
简单的讲,监督协程具备单向传播的特性,即子协程的异常和取消不影响父协程,父协程的异常和取消会影响子协程
两种方式:
注意:
监督协程中的每一个子作业应该通过异常处理机制处理自身的异常。如果不处理异常会被吞掉。
用于捕获协程执行过程中未捕获的异常,被用来定义一个全局的异常处理器。
举个例子
suspend fun coroutineExceptionHandlerTest() {
supervisorScope {
val handler = CoroutineExceptionHandler { _, _ -> println("handleException in coroutineExceptionHandler") }
launch(handler) {
delay(100)
throw NullPointerException()
}
}
}
主从作用域和协作作用域的表现区别上文已经讲到了,通常我们构建一个协程作用域两种方式
val scope = CoroutineScope(Job())
val supervisorJob = CoroutineScope(SupervisorJob())
supervisorScope { scope -> xx }
coroutineScope { scope ->xx }
private class SupervisorCoroutine<in T>(
context: CoroutineContext,
uCont: Continuation<T>
) : ScopeCoroutine<T>(context, uCont) {
override fun childCancelled(cause: Throwable): Boolean = false
}
private class SupervisorJobImpl(parent: Job?) : JobImpl(parent) {
override fun childCancelled(cause: Throwable): Boolean = false
}
两种作用域在代码上的区别是 fun childCancelled(cause: Throwable) 方法的实现不同,监督作用域直接返回fasle表示不处理子协程的错误异常。让其自己处理
//JobSupport
private fun cancelParent(cause: Throwable): Boolean {
...
return parent.childCancelled(cause) || isCancellation //1
}
private fun finalizeFinishingState(state: Finishing, proposedUpdate: Any?): Any? {
...
val handled = cancelParent(finalException) || handleJobException(finalException)//2
if (handled) (finalState as CompletedExceptionally).makeHandled()
...
}
源代码中的核心逻辑,
代码很多细节不展开有兴趣的自行研究。
private suspend fun childChildSupervisorJob() {
supervisorScope { // SupervisorCouroutine
launch { // ScopeCoroutine
val job1 = launch {
delay(100)
throw NullPointerException()
}
val job2 = launch {
delay(800)
println("job 2 is running")
}.invokeOnCompletion {
println("job2 is completion $it")
}
}
}
}
private suspend fun textSupervisorJob() {
supervisorScope {
launch(SupervisorJob()) {//1
launch {
delay(100)
throw NullPointerException()
}
launch {
delay(800)
println("job 2 is running")
}.invokeOnCompletion {
println("job2 is completion $it")
}
}
}
}
通常构建一个协程除了使用CoroutineScope.launch{}还会使用CoroutineScope.async{}。
经常看到这种说法,async方式启动的协程返回一个Deferred对象,当调用deffered的await()方法的时候才会抛出异常
private suspend fun asyncSample() {
val h = CoroutineScope(CoroutineExceptionHandler { _, _ -> println("发生了异常") })
val d = h.launch {
async {
delay(100)
throw NullPointerException()
}
launch { //job2
delay(500)
println("job 2 is finish")
}
}.join()
}
这个例子没有调用await(),实际发现也会立马抛出异常,导致jo2都没执行完。跟我们认为的不一样。
实际情况是这样的:当async被用作构建根协程(由协程作用域直接管理的协程)或者监督作用域直接管理协程时,异常不会主动抛出,而是在调用.await()时抛出。其他情况不等待await就会抛出异常。
本文梳理了Kotlin的协程的取消和异常传播处理机制。机制的设置总的来说是服务于结构化并发的。本文应该能让我们了解掌握以下问题才算合格