【Kotlin】协程的取消中断与异常处理

【Kotlin】协程的取消中断与异常处理

本文介绍了Kotlin协程的取消机制和异常处理方案

文章源码和介绍来自Kotlin官方网站

协程的取消

在一个长时间运行的应用程序中,你也许需要对你的后台协程进行细粒度的控制。 比如说,一个用户也许关闭了一个启动了协程的界面,那么现在协程的执行结果已经不再被需要了,这时,它应该是可以被取消的。

launch 函数返回了一个可以被用来取消运行中的协程的 Job:

import kotlinx.coroutines.*

fun main() = runBlocking {
//sampleStart
    val job = launch {
        repeat(1000) { i ->
            println("job: I'm sleeping $i ...")
            delay(500L)
        }
    }
    delay(1300L) // 延迟一段时间
    println("main: I'm tired of waiting!")
    job.cancel() // 取消该作业
    job.join() // 等待作业执行结束
    println("main: Now I can quit.")
//sampleEnd
}

程序执行后的输出如下:

job: I’m sleeping 0 … job: I’m sleeping 1 … job: I’m sleeping 2 … main: I’m tired of waiting! main: Now I can quit.

一旦 main 函数调用了 job.cancel,我们在其它的协程中就看不到任何输出,因为它被取消了。 这里也有一个可以使 Job 挂起的函数 cancelAndJoin 它合并了对 cancel 以及 join 的调用。

取消是协作的

协程的取消是 协作 的。一段协程代码必须协作才能被取消。 所有 kotlinx.coroutines 中的挂起函数都是 可被取消的 。它们检查协程的取消, 并在取消时抛出 CancellationException。 然而,如果协程正在执行计算任务,并且没有检查取消的话,那么它是不能被取消的,就如如下示例代码所示:

import kotlinx.coroutines.*

fun main() = runBlocking {
//sampleStart
    val startTime = System.currentTimeMillis()
    val job = launch(Dispatchers.Default) {
        var nextPrintTime = startTime
        var i = 0
        while (i < 5) { // 一个执行计算的循环,只是为了占用 CPU
            // 每秒打印消息两次
            if (System.currentTimeMillis() >= nextPrintTime) {
                println("job: I'm sleeping ${i++} ...")
                nextPrintTime += 500L
            }
        }
    }
    delay(1300L) // 等待一段时间
    println("main: I'm tired of waiting!")
    job.cancelAndJoin() // 取消一个作业并且等待它结束
    println("main: Now I can quit.")
//sampleEnd
}

运行示例代码,并且我们可以看到它连续打印出了“I’m sleeping”,甚至在调用取消后, 作业仍然执行了五次循环迭代并运行到了它结束为止。

The same problem can be observed by catching a CancellationException and not rethrowing it:

import kotlinx.coroutines.*

fun main() = runBlocking {
//sampleStart
    val job = launch(Dispatchers.Default) {
        repeat(5) { i ->
            try {
                // print a message twice a second
                println("job: I'm sleeping $i ...")
                delay(500)
            } catch (e: Exception) {
                // log the exception
                println(e)
            }
        }
    }
    delay(1300L) // delay a bit
    println("main: I'm tired of waiting!")
    job.cancelAndJoin() // cancels the job and waits for its completion
    println("main: Now I can quit.")
//sampleEnd    
}

While catching Exception is an anti-pattern, this issue may surface in more subtle ways, like when using the runCatching function, which does not rethrow CancellationException.

使计算代码可取消

我们有两种方法来使执行计算的代码可以被取消。第一种方法是定期调用挂起函数来检查取消。对于这种目的 yield 是一个好的选择。 另一种方法是显式的检查取消状态。让我们试试第二种方法。

将前一个示例中的 while (i < 5) 替换为 while (isActive) 并重新运行它。

import kotlinx.coroutines.*

fun main() = runBlocking {
//sampleStart
    val startTime = System.currentTimeMillis()
    val job = launch(Dispatchers.Default) {
        var nextPrintTime = startTime
        var i = 0
        while (isActive) { // 可以被取消的计算循环
            // 每秒打印消息两次
            if (System.currentTimeMillis() >= nextPrintTime) {
                println("job: I'm sleeping ${i++} ...")
                nextPrintTime += 500L
            }
        }
    }
    delay(1300L) // 等待一段时间
    println("main: I'm tired of waiting!")
    job.cancelAndJoin() // 取消该作业并等待它结束
    println("main: Now I can quit.")
//sampleEnd
}

你可以看到,现在循环被取消了。isActive 是一个可以被使用在 CoroutineScope 中的扩展属性。

在 finally 中释放资源

我们通常使用如下的方法处理在被取消时抛出 CancellationException 的可被取消的挂起函数。比如说,try {……} finally {……} 表达式以及 Kotlin 的 use 函数一般在协程被取消的时候执行它们的终结动作:

import kotlinx.coroutines.*

fun main() = runBlocking {
//sampleStart
    val job = launch {
        try {
            repeat(1000) { i ->
                println("job: I'm sleeping $i ...")
                delay(500L)
            }
        } finally {
            println("job: I'm running finally")
        }
    }
    delay(1300L) // 延迟一段时间
    println("main: I'm tired of waiting!")
    job.cancelAndJoin() // 取消该作业并且等待它结束
    println("main: Now I can quit.")
//sampleEnd
}

join 和 cancelAndJoin 等待了所有的终结动作执行完毕, 所以运行示例得到了下面的输出:

job: I’m sleeping 0 … job: I’m sleeping 1 … job: I’m sleeping 2 … main: I’m tired of waiting! job: I’m running finally main: Now I can quit.

运行不能取消的代码块

在前一个例子中任何尝试在 finally 块中调用挂起函数的行为都会抛出 CancellationException,因为这里持续运行的代码是可以被取消的。通常,这并不是一个问题,所有良好的关闭操作(关闭一个文件、取消一个作业、或是关闭任何一种通信通道)通常都是非阻塞的,并且不会调用任何挂起函数。然而,在真实的案例中,当你需要挂起一个被取消的协程,你可以将相应的代码包装在 withContext(NonCancellable) {……} 中,并使用 withContext 函数以及 NonCancellable 上下文,见如下示例所示:

import kotlinx.coroutines.*

fun main() = runBlocking {
//sampleStart
    val job = launch {
        try {
            repeat(1000) { i ->
                println("job: I'm sleeping $i ...")
                delay(500L)
            }
        } finally {
            withContext(NonCancellable) {
                println("job: I'm running finally")
                delay(1000L)
                println("job: And I've just delayed for 1 sec because I'm non-cancellable")
            }
        }
    }
    delay(1300L) // 延迟一段时间
    println("main: I'm tired of waiting!")
    job.cancelAndJoin() // 取消该作业并等待它结束
    println("main: Now I can quit.")
//sampleEnd
}

超时

在实践中绝大多数取消一个协程的理由是它有可能超时。 当你手动追踪一个相关 Job 的引用并启动了一个单独的协程在延迟后取消追踪,这里已经准备好使用 withTimeout 函数来做这件事。 来看看示例代码:

import kotlinx.coroutines.*

fun main() = runBlocking {
//sampleStart
    withTimeout(1300L) {
        repeat(1000) { i ->
            println("I'm sleeping $i ...")
            delay(500L)
        }
    }
//sampleEnd
}

运行后得到如下输出:

I’m sleeping 0 … I’m sleeping 1 … I’m sleeping 2 … Exception in thread “main” kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 1300 ms

withTimeout 抛出了 TimeoutCancellationException,它是 CancellationException 的子类。 我们之前没有在控制台上看到堆栈跟踪信息的打印。这是因为在被取消的协程中 CancellationException 被认为是协程执行结束的正常原因。 然而,在这个示例中我们在 main 函数中正确地使用了 withTimeout。

由于取消只是一个例外,所有的资源都使用常用的方法来关闭。 如果你需要做一些各类使用超时的特别的额外操作,可以使用类似 withTimeout 的 withTimeoutOrNull 函数,并把这些会超时的代码包装在 try {…} catch (e: TimeoutCancellationException) {…} 代码块中,而 withTimeoutOrNull 通过返回 null 来进行超时操作,从而替代抛出一个异常:

import kotlinx.coroutines.*

fun main() = runBlocking {
//sampleStart
    val result = withTimeoutOrNull(1300L) {
        repeat(1000) { i ->
            println("I'm sleeping $i ...")
            delay(500L)
        }
        "Done" // 在它运行得到结果之前取消它
    }
    println("Result is $result")
//sampleEnd
}

运行这段代码时不再抛出异常:

I’m sleeping 0 … I’m sleeping 1 … I’m sleeping 2 … Result is null

异步超时和资源

withTimeout中的超时事件对于在其代码块中运行的代码来说是异步的,可以在任何时间发生,甚至可以在从超时代码块内部返回之前发生。如果您在代码块内打开或获取了某些资源,而这些资源需要在代码块外关闭或释放,请记住这一点。

例如,在这里我们用 Resource 类模仿了一个可关闭的资源,该类只需通过在其关闭函数中递增获取计数器和递减计数器来记录创建次数。现在,让我们创建大量的例行程序,每个例行程序都在 withTimeout 代码块末尾创建一个资源,并在代码块外释放该资源。我们添加了一个小延迟,这样就更有可能在 withTimeout 代码块已经完成时发生超时,从而导致资源泄漏。

import kotlinx.coroutines.*

//sampleStart
var acquired = 0

class Resource {
    init { acquired++ } // Acquire the resource
    fun close() { acquired-- } // Release the resource
}

fun main() {
    runBlocking {
        repeat(10_000) { // Launch 10K coroutines
            launch { 
                val resource = withTimeout(60) { // Timeout of 60 ms
                    delay(50) // Delay for 50 ms
                    Resource() // Acquire a resource and return it from withTimeout block     
                }
                resource.close() // Release the resource
            }
        }
    }
    // Outside of runBlocking all coroutines have completed
    println(acquired) // Print the number of resources still acquired
}
//sampleEnd

如果运行上述代码,你会发现它并不总是打印零值,不过这可能取决于你的机器的定时。你可能需要调整本示例中的超时时间,才能真正看到非零值。

需要注意的是,在这里通过 10K 例程对获取的计数器进行递增和递减是完全线程安全的,因为它总是在同一个线程(即 runBlocking 所使用的线程)中进行。关于这一点的更多解释,将在 “例程上下文 ”一章中进行。

要解决这个问题,可以在变量中存储对资源的引用,而不是从 withTimeout 代码块中返回。

import kotlinx.coroutines.*

var acquired = 0

class Resource {
    init { acquired++ } // Acquire the resource
    fun close() { acquired-- } // Release the resource
}

fun main() {
//sampleStart
    runBlocking {
        repeat(10_000) { // Launch 10K coroutines
            launch { 
                var resource: Resource? = null // Not acquired yet
                try {
                    withTimeout(60) { // Timeout of 60 ms
                        delay(50) // Delay for 50 ms
                        resource = Resource() // Store a resource to the variable if acquired      
                    }
                    // We can do something else with the resource here
                } finally {  
                    resource?.close() // Release the resource if it was acquired
                }
            }
        }
    }
    // Outside of runBlocking all coroutines have completed
    println(acquired) // Print the number of resources still acquired
//sampleEnd
}

本例始终打印 0。资源不会泄漏。

协程的异常处理

本节内容涵盖了异常处理与在异常上取消。 我们已经知道被取消的协程会在挂起点抛出 CancellationException 并且它会被协程的机制所忽略。在这里我们会看看在取消过程中抛出异常或同一个协程的多个子协程抛出异常时会发生什么。

异常的传播

协程构建器有两种形式:自动传播异常(launch)或向用户暴露异常(async 与 produce)。 当这些构建器用于创建一个根协程时,即该协程不是另一个协程的子协程, 前者这类构建器将异常视为未捕获异常,类似 Java 的 Thread.uncaughtExceptionHandler, 而后者则依赖用户来最终消费异常,例如通过 await 或 receive(produce 与 receive 的相关内容包含于通道章节)。

可以通过一个使用 GlobalScope 创建根协程的简单示例来进行演示:

GlobalScope 是一种微妙的应用程序接口,可能会产生非同小可的反作用。为整个应用程序创建根例行程序是 GlobalScope 罕见的合法用途之一,因此您必须通过 @OptIn(DelicateCoroutinesApi::class)明确选择使用 GlobalScope。

import kotlinx.coroutines.*

//sampleStart
@OptIn(DelicateCoroutinesApi::class)
fun main() = runBlocking {
    val job = GlobalScope.launch { // launch 根协程
        println("Throwing exception from launch")
        throw IndexOutOfBoundsException() // 我们将在控制台打印 Thread.defaultUncaughtExceptionHandler
    }
    job.join()
    println("Joined failed job")
    val deferred = GlobalScope.async { // async 根协程
        println("Throwing exception from async")
        throw ArithmeticException() // 没有打印任何东西,依赖用户去调用等待
    }
    try {
        deferred.await()
        println("Unreached")
    } catch (e: ArithmeticException) {
        println("Caught ArithmeticException")
    }
}
//sampleEnd

这段代码的输出如下(调试):

Throwing exception from launch Exception in thread “DefaultDispatcher-worker-1 @coroutine#2” java.lang.IndexOutOfBoundsException Joined failed job Throwing exception from async Caught ArithmeticException

CoroutineExceptionHandler将未捕获异常打印到控制台的默认行为是可自定义的。

根协程中的 CoroutineExceptionHandler 上下文元素可以被用于这个根协程通用的 catch 块,及其所有可能自定义了异常处理的子协程。

它类似于 Thread.uncaughtExceptionHandler 。

你无法从 CoroutineExceptionHandler 的异常中恢复。当调用处理者的时候,协程已经完成并带有相应的异常。通常,该处理者用于记录异常,显示某种错误消息,终止和(或)重新启动应用程序。

CoroutineExceptionHandler 仅在未捕获的异常上调用 — 没有以其他任何方式处理的异常。 特别是,所有子协程(在另一个 Job 上下文中创建的协程)委托它们的父协程处理它们的异常,然后它们也委托给其父协程,以此类推直到根协程, 因此永远不会使用在其上下文中设置的 CoroutineExceptionHandler。 除此之外,async 构建器始终会捕获所有异常并将其表示在结果 Deferred 对象中, 因此它的 CoroutineExceptionHandler 也无效。

在监督作用域内运行的协程不会将异常传播到其父协程,并且会从此规则中排除。本文档的另一个小节——监督提供了更多细节。

import kotlinx.coroutines.*

@OptIn(DelicateCoroutinesApi::class)
fun main() = runBlocking {
//sampleStart
    val handler = CoroutineExceptionHandler { _, exception -> 
        println("CoroutineExceptionHandler got $exception") 
    }
    val job = GlobalScope.launch(handler) { // 根协程,运行在 GlobalScope 中
        throw AssertionError()
    }
    val deferred = GlobalScope.async(handler) { // 同样是根协程,但使用 async 代替了 launch
        throw ArithmeticException() // 没有打印任何东西,依赖用户去调用 deferred.await()
    }
    joinAll(job, deferred)
//sampleEnd    
}

这段代码的输出如下:

CoroutineExceptionHandler got java.lang.AssertionError

取消与异常

取消与异常紧密相关。协程内部使用 CancellationException 来进行取消,这个异常会被所有的处理者忽略,所以那些可以被 catch 代码块捕获的异常仅仅应该被用来作为额外调试信息的资源。 当一个协程使用 Job.cancel 取消的时候,它会被终止,但是它不会取消它的父协程。

import kotlinx.coroutines.*

fun main() = runBlocking {
//sampleStart
    val job = launch {
        val child = launch {
            try {
                delay(Long.MAX_VALUE)
            } finally {
                println("Child is cancelled")
            }
        }
        yield()
        println("Cancelling child")
        child.cancel()
        child.join()
        yield()
        println("Parent is not cancelled")
    }
    job.join()
//sampleEnd    
}

这段代码的输出如下:

Cancelling child Child is cancelled Parent is not cancelled

如果一个协程遇到了 CancellationException 以外的异常,它将使用该异常取消它的父协程。 这个行为无法被覆盖,并且用于为结构化的并发(structured concurrency) 提供稳定的协程层级结构。 CoroutineExceptionHandler 的实现并不是用于子协程。

在这些示例中,CoroutineExceptionHandler 总是被设置在由 GlobalScope 启动的协程中。将异常处理者设置在 runBlocking 主作用域内启动的协程中是没有意义的,尽管子协程已经设置了异常处理者, 但是主协程也总是会被取消的。

当父协程的所有子协程都结束后,原始的异常才会被父协程处理, 见下面这个例子。

import kotlinx.coroutines.*

@OptIn(DelicateCoroutinesApi::class)
fun main() = runBlocking {
//sampleStart
    val handler = CoroutineExceptionHandler { _, exception -> 
        println("CoroutineExceptionHandler got $exception") 
    }
    val job = GlobalScope.launch(handler) {
        launch { // 第一个子协程
            try {
                delay(Long.MAX_VALUE)
            } finally {
                withContext(NonCancellable) {
                    println("Children are cancelled, but exception is not handled until all children terminate")
                    delay(100)
                    println("The first child finished its non cancellable block")
                }
            }
        }
        launch { // 第二个子协程
            delay(10)
            println("Second child throws an exception")
            throw ArithmeticException()
        }
    }
    job.join()
//sampleEnd 
}

这段代码的输出如下:

Second child throws an exception Children are cancelled, but exception is not handled until all children terminate The first child finished its non cancellable block CoroutineExceptionHandler got java.lang.ArithmeticException

异常聚合

当协程的多个子协程因异常而失败时, 一般规则是“取第一个异常”,因此将处理第一个异常。 在第一个异常之后发生的所有其他异常都作为被抑制的异常绑定至第一个异常。

import kotlinx.coroutines.*
import java.io.*

@OptIn(DelicateCoroutinesApi::class)
fun main() = runBlocking {
    val handler = CoroutineExceptionHandler { _, exception ->
        println("CoroutineExceptionHandler got $exception with suppressed ${exception.suppressed.contentToString()}")
    }
    val job = GlobalScope.launch(handler) {
        launch {
            try {
                delay(Long.MAX_VALUE) // 当另一个同级的协程因 IOException  失败时,它将被取消
            } finally {
                throw ArithmeticException() // 第二个异常
            }
        }
        launch {
            delay(100)
            throw IOException() // 首个异常
        }
        delay(Long.MAX_VALUE)
    }
    job.join()  
}

这段代码的输出如下:

CoroutineExceptionHandler got java.io.IOException with suppressed [java.lang.ArithmeticException]

注意,这个机制当前只能在 Java 1.7 以上的版本中使用。 在 JS 和原生环境下暂时会受到限制,但将来会取消。

取消异常是透明的,默认情况下是未包装的:

import kotlinx.coroutines.*
import java.io.*

@OptIn(DelicateCoroutinesApi::class)
fun main() = runBlocking {
//sampleStart
    val handler = CoroutineExceptionHandler { _, exception ->
        println("CoroutineExceptionHandler got $exception")
    }
    val job = GlobalScope.launch(handler) {
        val innerJob = launch { // 该栈内的协程都将被取消
            launch {
                launch {
                    throw IOException() // 原始异常
                }
            }
        }
        try {
            innerJob.join()
        } catch (e: CancellationException) {
            println("Rethrowing CancellationException with original cause")
            throw e // 取消异常被重新抛出,但原始 IOException 得到了处理
        }
    }
    job.join()
//sampleEnd    
}

这段代码的输出如下:

Rethrowing CancellationException with original cause CoroutineExceptionHandler got java.io.IOException

监督

正如我们之前研究的那样,取消是在协程的整个层次结构中传播的双向关系。让我们看一下需要单向取消的情况。

此类需求的一个良好示例是在其作用域内定义作业的 UI 组件。如果任何一个 UI 的子作业执行失败了,它并不总是有必要取消(有效地杀死)整个 UI 组件, 但是如果 UI 组件被销毁了(并且它的作业也被取消了),由于其结果不再需要了,因此有必要取消所有子作业。

另一个例子是服务进程孵化了一些子作业并且需要 监督 它们的执行,追踪它们的故障并在这些子作业执行失败的时候重启。

SupervisorJob

SupervisorJob 可以用于这些目的。 它类似于常规的 Job,唯一的不同是:SupervisorJob 的取消只会向下传播。这是很容易用以下示例演示:

import kotlinx.coroutines.*

fun main() = runBlocking {
//sampleStart
    val supervisor = SupervisorJob()
    with(CoroutineScope(coroutineContext + supervisor)) {
        // 启动第一个子作业——这个示例将会忽略它的异常(不要在实践中这么做!)
        val firstChild = launch(CoroutineExceptionHandler { _, _ ->  }) {
            println("The first child is failing")
            throw AssertionError("The first child is cancelled")
        }
        // 启动第二个子作业
        val secondChild = launch {
            firstChild.join()
            // 取消了第一个子作业且没有传播给第二个子作业
            println("The first child is cancelled: ${firstChild.isCancelled}, but the second one is still active")
            try {
                delay(Long.MAX_VALUE)
            } finally {
                // 但是取消了监督的传播
                println("The second child is cancelled because the supervisor was cancelled")
            }
        }
        // 等待直到第一个子作业失败且执行完成
        firstChild.join()
        println("Cancelling the supervisor")
        supervisor.cancel()
        secondChild.join()
    }
//sampleEnd
}

这段代码的输出如下:

The first child is failing The first child is cancelled: true, but the second one is still active Cancelling the supervisor The second child is cancelled because the supervisor was cancelled

监督作用域

对于作用域的并发,可以用 supervisorScope 来替代 coroutineScope 来实现相同的目的。它只会单向的传播并且当作业自身执行失败的时候将所有子作业全部取消。作业自身也会在所有的子作业执行结束前等待, 就像 coroutineScope 所做的那样。

import kotlin.coroutines.*
import kotlinx.coroutines.*

fun main() = runBlocking {
//sampleStart
    try {
        supervisorScope {
            val child = launch {
                try {
                    println("The child is sleeping")
                    delay(Long.MAX_VALUE)
                } finally {
                    println("The child is cancelled")
                }
            }
            // 使用 yield 来给我们的子作业一个机会来执行打印
            yield()
            println("Throwing an exception from the scope")
            throw AssertionError()
        }
    } catch(e: AssertionError) {
        println("Caught an assertion error")
    }
//sampleEnd
}

这段代码的输出如下:

The child is sleeping Throwing an exception from the scope The child is cancelled Caught an assertion error

监督协程中的异常

常规的作业和监督作业之间的另一个重要区别是异常处理。 监督协程中的每一个子作业应该通过异常处理机制处理自身的异常。 这种差异来自于子作业的执行失败不会传播给它的父作业的事实。 这意味着在 supervisorScope 内部直接启动的协程确实使用了设置在它们作用域内的 CoroutineExceptionHandler,与父协程的方式相同 (参见 CoroutineExceptionHandler 小节以获知更多细节)。

import kotlin.coroutines.*
import kotlinx.coroutines.*

fun main() = runBlocking {
//sampleStart
    val handler = CoroutineExceptionHandler { _, exception -> 
        println("CoroutineExceptionHandler got $exception") 
    }
    supervisorScope {
        val child = launch(handler) {
            println("The child throws an exception")
            throw AssertionError()
        }
        println("The scope is completing")
    }
    println("The scope is completed")
//sampleEnd
}

这段代码的输出如下:

The scope is completing The child throws an exception CoroutineExceptionHandler got java.lang.AssertionError The scope is completed

【Kotlin】inline&crossinline&noinline关键字

【Kotlin】inline&crossinline&noinline关键字

本文介绍了Compose的重组流程,主要是最小重组范围的界定和优化

来自扔物线朱凯大佬的博客学习笔记

JVM常量编译时优化

Kotlin中,使用了 const val 关键字修饰的变量,在编译时会被视为常量,并且在编译时进行了优化。直接将其值复制到调用处,而不是像普通变量一样在运行时进行变量访问。这可以提高代码的执行效率,因为避免了变量调用的开销。

const val CONST_VAL = 10

fun main() {
    println(CONST_VAL)
}

// 编译后
fun main() {
    println(10)
}

inline 内联函数

编译时同样被提前处理的还有内联函数,即使用了 inline 关键字修饰的函数。

JVM在编译时,会将inline函数内的代码直接复制到调用处,而不是像普通函数一样在运行时进行函数调用。听起来可能会对性能有优化,实际上少一层函数调用栈的优化是非常微小的。

而同时, 函数内联 不同于 常量内联 的地方在于,函数体通常比常量复杂多了,而函数内联会导致函数体被拷贝到每个调用处,如果函数体比较大而被调用处又比较多,就会导致编译出的字节码变大很多。

lambda参数实现方式

在Kotlin中,lambda参数的实现方式是使用了 匿名内部类 ,而不是使用了 函数指针

在编译之后,可以看到lambda参数调用的地方,实际上是Kotlin帮我们生成了一个匿名内部类,然后在调用处调用这个匿名内部类的方法。

class LambdaTest {
    fun testInline(lambdaParams:()->Unit) {
        lambdaParams()
    }
}

经过反编译成Java代码之后:

public final class LambdaTest {
   @NotNull
   public final LambdaTest testInline(@NotNull Function0 lambdaParams) {
      Intrinsics.checkNotNullParameter(lambdaParams, "lambdaParams");
      lambdaParams.invoke();
      return this;
   }
}

可以看到,lambdaParams的类型是 Function0 ,这是一个接口。在运行过程中,就会生成一个匿名内部类,然后在调用处调用这个匿名内部类的方法。

inline对lambda的优化

如果上述的testinline方法,在外部被高频循环调用。

fun main() {
    val lambdaTest = LambdaTest()
    for (i in 0..100000) {
        lambdaTest.testInline {
            println("hello world")
        }
    }
}

内存占用会蹭的一下涨上来。

如果使用了这个接收lambda参数的方法使用了 inline 关键字修饰,就不会生成匿名内部类,而是直接将lambda的代码块里面的代码复制到调用处。

inline 关键字不止可以内联自己的内部代码,还可以内联自己内部的内部的代码,意思是什么呢,就是你的函数在被加了 inline 关键字之后,编译器在编译时不仅会把函数内联过来,而且会把它内部的函数类型的参数——那就是那些 Lambda 表达式——也内联过来。换句话说,这个函数被编译器贴过来的时候是完全展开铺平的:

kotlin源代码:

class LambdaTest {
    inline fun testInline(lambdaParams:()->Unit) {
        lambdaParams()
    }
}

fun main() {
    val lambdaTest = LambdaTest()
    for (i in 0..100000) {
        lambdaTest.testInline {
            println("hello world")
        }
    }
}

反编译之后:

public final class LambdaTest {
   public final void testInline(@NotNull Function0 lambdaParams) {
      Intrinsics.checkNotNullParameter(lambdaParams, "lambdaParams");
      lambdaParams.invoke();
   }
}

public final class MainKt {
   public static final void main() {
      LambdaTest lambdaTest = new LambdaTest();
      int $i$iv = 0;
      int var3;
      for(var3 = 100000; $i$iv <= var3; ++$i$iv) {
         System.out.println("hello world");
      }
   }
}

高阶函数(Higher-order Functions)有它们天然的性能缺陷,我们通过 inline 关键字让函数用内联的方式进行编译,来减少参数对象的创建,从而避免出现性能问题。

inline另类用法

在kotlin的 UMath.kt 工具类中,有一个max方法:

@SinceKotlin("1.5")
@WasExperimental(ExperimentalUnsignedTypes::class)
@kotlin.internal.InlineOnly
public inline fun max(a: UInt, b: UInt): UInt {
    return maxOf(a, b)
}

这个maxOf方法,来自于另一个工具类 UComparisonsKt

@SinceKotlin("1.5")
@WasExperimental(ExperimentalUnsignedTypes::class)
public fun maxOf(a: UInt, b: UInt): UInt {
    return if (a >= b) a else b
}

这里就通过内联的方式,将maxOf方法的代码块内联到了调用处。

可以直接通过方便的顶层函数的方式,来使用工具类,不需要创建实例或者带外部类名。

noinline

inline 是内联,而 noinline 就是不内联。不过它不是作用于函数的,而是作用于函数的参数:对于一个标记了 inline 的内联函数,你可以对它的任何一个或多个函数类型的参数添加 noinline 关键字。添加了之后,这个参数就不会参与内联。

函数类型的参数,它本质上是个对象。我们可以把这个对象当做函数来调用,这也是最常见的用法。但同时我们也可以把它当做对象来用。比如把它当做返回值:

inline fun testInline(lambdaParams:()->Unit) {
    lambdaParams()
    return lambdaParams
}

但当我们把函数进行内联的时候,它内部的这些参数就不再是对象了,因为他们会被编译器拿到调用处去展开。

当一个函数被内联之后,它内部的那些函数类型的参数就不再是对象了,因为它们的壳被脱掉了。换句话说,对于编译之后的字节码来说,这个对象根本就不存在。一个不存在的对象,你怎么使用?

所以当你要把一个这样的参数当做对象使用的时候,Android Studio 会报错,告诉你这没法编译

noinline 就是用来局部地、指向性地关掉函数的内联优化的。既然是优化,为什么要关掉?因为这种优化会导致函数中的函数类型的参数无法被当做对象使用,也就是说,这种优化会对 Kotlin 的功能做出一定程度的收窄。而当你需要这个功能的时候,就要手动关闭优化了。这也是 inline 默认是关闭、需要手动开启的另一个原因:它会收窄 Kotlin 的功能。

crossinline

inline 函数将 Lambda 参数传递给另一个执行上下文(如另一个函数、另一个线程、协程或其他作用域)时,为了防止非局部返回,必须使用 crossinline

保持 Lambda 的内联优化,但禁止在 Lambda 内部使用裸奔的 return 关键字(即非局部返回)。它确保 Lambda 只能使用标签返回 (return@label)隐式返回。使用 crossinline 确保内联函数的行为符合预期,避免 Lambda 内部的 return 意外地跳出外部的非内联函数。

看这样一个情景:

一个内联函数,接受一个 lambda 参数。

inline fun lambdaReturnTest(insertAction: () -> Unit) {
    insertAction()
}

如果在调用处,lambda参数里带一个return:

override fun onCreate() {
    super.onCreate()

    Log.i("sdvgsrhbTAG", "before erftgyujhf")
    lambdaReturnTest {
        println("Hello World")
        return
    }
    Log.i("sdvgsrhbTAG", "after erftgyujhf")
}

这时候结束的不是这个lambdaReturnTest方法,而是onCreate方法。因为lambdaReturnTest方法被内联了,会直接铺平展开到调用处,连带里面的return。

这样的话,我们每次在lambda里面使用return还需要确认这个函数是否是内联函数,才可以确认这个return结束的是哪一个函数。为此Kotlin规定 不允许在lambda参数中使用return,除非这个使用lambda参数的函数是内联函数

那这样的话规则就简单了:

  • Lambda 里的 return,结束的不是直接的外层函数,而是外层再外层的函数;
  • 但只有内联函数的 Lambda 参数可以使用 return。

目前的Kotlin版本其实也可以在return后面使用\@来指明返回的哪一级的函数。

示例:异步或嵌套执行

假设您有一个 safeRun 函数,它在一个内部(非内联)的 Runnable 中执行您的 Lambda。

// 内部非内联函数,它接受一个普通 Lambda/Runnable
fun executeInExecutor(block: () -> Unit) {
    // 实际的 Android/Java 场景可能是:Executor.execute(Runnable { ... })
    println("任务被包装并排队...")
    block() // 模拟执行
}

// 场景:创建一个安全的执行块,但其中的任务会被传递到另一个函数中执行
inline fun safeRun(crossinline block: () -> Unit) {
    println("--- 准备执行 ---")
    // 如果这里没有 crossinline,编译器无法保证 block() 不会被非局部返回跳出 safeRun 之外
    executeInExecutor {
        // block 的代码在这里被执行
        block() 
    }
    println("--- 执行完毕 ---")
}

fun main() {
    fun callSafeRun() {
        safeRun {
            println("开始任务")
            // return // ❌ 编译错误:禁止非局部返回
            return @ safeRun // ✅ 允许:只能使用标签返回,只跳出 safeRun 
        }
        println("callSafeRun 结束")
    }
    
    callSafeRun() 
}

/* 输出:
--- 准备执行 ---
任务被包装并排队...
开始任务
--- 执行完毕 ---
callSafeRun 结束
*/

如果没有 crossinline,Lambda { return } 理论上可以执行非局部返回,直接跳出 callSafeRun 函数。但由于 Lambda 实际是在非内联的 executeInExecutor 内部执行的,这种行为是不允许的,因此 crossinline 强制阻止了非局部返回,以保证程序的控制流是清晰且安全的。

双层嵌套的lambda场景

inline fun lambdaReturnTest(insertAction: () -> Unit) {
    doubleLambda { insertAction() }
}

fun doubleLambda(insertAction: () -> Unit) {
    insertAction()
}

doubleLambda方法是一个普通函数,非内联函数,它的参数是一个函数类型的参数。

如果像这样带两层lambda调用,那么其中使用return就又会无法判断结束的到底是哪一层函数。 这里Kotlin是直接禁止了这种写法。

如果确实要有这种间接调用需求,那么可以使用crossinline来解决。当你给一个需要被间接调用的参数加上 crossinline,就对它进行了局部加强内联,相当于insertAction还是会被展开铺平到调用处,解除了这个限制,从而就可以对它进行双层间接调用了。

但是又会有return结束层级不确定性,所以Kotlin规定了使用了crossinline的函数,不能在lambda参数中使用return。

只能二选一了。

总结

结论就是:

  • inline 可以让你用内联——也就是函数内容直插到调用处——的方式来优化代码结构,从而减少函数类型的对象的创建;
  • noinline 是局部关掉这个优化,来摆脱 inline 带来的「不能把函数类型的参数当对象使用」的限制;
  • crossinline 是局部加强这个优化,让内联函数里的函数类型的参数可以被当做对象使用。

【Kotlin】协程的基础使用

【Kotlin】协程的基础使用

本文介绍Kotlin协程挂起和恢复的原理

文章后半部分源码和介绍来自Kotlin官方网站

协程简介

协程是一种并发设计模式,您可以在 Android 平台上使用它来简化异步执行的代码。协程 是在 1.3 版中添加到 Kotlin 的,基于既定的从其他语言转换成的概念。

Android 上,协程有助于管理长时间运行的任务,如果管理不当,这些任务可能会阻塞主线程并导致应用无响应。使用协程的专业开发者中有超过 50% 的人反映使用协程提高了工作效率。本主题介绍如何使用 Kotlin 协程解决以下问题,从而让您能够编写出更清晰、更简洁的应用代码。

协程和线程

线程

  • 线程是操作系统级别的概念
  • 我们开发者通过编程语言(Thread.java)创建的线程,本质还是操作系统内核线程的映射
  • JVM 中的线程与内核线程的存在映射关系,有“一对一”,“一对多”,“M对N”。* JVM 在不同操作系统中的具体实现会有差别,“一对一”是主流
  • 一般情况下,我们说的线程,都是内核线程,线程之间的切换,调度,都由操作系统负责
  • 线程也会消耗操作系统资源,但比进程轻量得多
  • 线程,是抢占式的,它们之间能共享内存资源,进程不行
  • 线程共享资源导致了多线程同步问题
  • 有的编程语言会自己实现一套线程库,从而能在一个内核线程中实现多线程效果,早期 JVM 的“绿色线程” 就是这么做的,这种线程被称为“用户线程”

协程

  • 协程不是操作系统级别的概念,无需操作系统支持
  • 协程有点像上面提到的“绿色线程”,一个线程上可以运行成千上万个协程
  • 协程是用户态的(userlevel),内核对协程无感知
  • 协程是协作式的,由开发者管理,不需要操作系统进行调度和切换,也没有抢占式的消耗,因此它更加高效
  • 协程它底层基于状态机实现,多协程之间共用一个实例,资源开销极小,因此它更加轻量
  • 协程本质还是运行于线程之上,它通过协程调度器,可以运行到不同的线程上

项目使用实例

最常见的使用方式,在 ViewModel 或者 Controller 里写业务逻辑,在 Activity 里调用,这样就可以在IO线程执行网络请求,拿到结果后自动切换到主线程更新UI。

// viewModel或者controller里获取数据逻辑
// 使用suspend限制在协程里使用;withContext切换调度器,指定在IO线程执行下面的任务
suspend fun getUserName() = withContext(Dispatchers.IO) {
    debugLog("thread name: ${Thread.currentThread().name}")
    ServiceCreator.createService<UserService>()
        .getUserName("2cd1e3c5ee3cda5a")
        .execute()
        .body()
}

// Activity调用处
override fun onCreate(savedInstanceState: Bundle?){
    // 最直接的声明方法,在主线程执行下面的逻辑
    lifeCycleScope.launch {
        // 相当于get这一半是在IO线程执行
        //拿到结果后的变量赋值这一半操作由调度器自动切换到主线程来执行了
        val userName = mViewModel.getUserName()
        infoLog("userName: $userName")
        binding.tvUserName.text = userName
    }
}

API介绍

四个基础概念

  • suspend function。即挂起函数,delay() 就是协程库提供的一个用于实现非阻塞式延时的挂起函数
  • CoroutineScope。即协程作用域,GlobalScope 是 CoroutineScope 的一个实现类,用于指定协程的作用范围,可用于管理多个协程的生命周期,所有协程都需要通过 CoroutineScope 来启动
  • CoroutineContext。即协程上下文,包含多种类型的配置参数。Dispatchers.IO 就是 CoroutineContext 这个抽象概念的一种实现,用于指定协程的运行载体,即用于指定协程要运行在哪类线程上
  • CoroutineBuilder。即协程构建器,协程在 CoroutineScope 的上下文中通过 launch、async 等协程构建器来进行声明并启动。launch、async 均被声明为 CoroutineScope 的扩展方法

Kotlin 协程(Coroutines)提供了一套丰富的 API 方法,用于简化异步编程。以下是一些常用的 API 方法及其简要说明:

启动

launch方法签名:

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy){
        LazyStandaloneCoroutine(newContext, block) 
    }else{
        StandaloneCoroutine(newContext, active = true)
    }
    coroutine.start(start, coroutine, block)
    return coroutine
}

start参数代表启动方式:

CoroutineStart.DEFAULT:协程创建后,立即开始调度,但 有可能在执行前被取消。在调度前如果协程被取消,其将直接进入取消响应的状态。 CoroutineStart.LAZY:只要协程被需要时(主动调用该协程的 start、 join、 await等函数时 ), 才会开始调度,如果调度前就被取消,协程将直接进入异常结束状态。 CoroutineStart.ATOMIC:协程创建后,立即开始调度, 协程执行到第一个挂起点之前不响应取消。其将调度和执行两个步骤合二为一,就像它的名字一样,其保证调度和执行是原子操作,因此协程也 一定会执行。 CoroutineStart.UNDISPATCHED:协程创建后,立即在当前线程中执行,直到遇到第一个真正挂起的点。是立即执行,因此协程 一定会执行。

context上下文参数:

Job:工作空间。用于启动or取消协程。

Dispatchers为调度器。用于指定协程的执行线程。 Default:默认调度器 ,适合处理后台计算,其是一个 CPU 密集型任务调度器。 IO:IO 调度器,适合执行 IO 相关操作,其是 IO 密集型任务调度器。 Main:UI 调度器,根据平台不同会被初始化为对应的 UI 线程的调度器, 在Android 平台上它会将协程调度到 UI 事件循环中执行,即通常在 主线程上执行。 Unconfined:“无所谓”调度器,不要求协程执行在特定线程上。 CoroutineExceptionHandler:全局异常捕获(只能在根协程配置)。

CoroutineName:协程名称。

协程上下文就是CoroutineContext,其中可以用加和函数plus()来连接使用,比如:

    override val coroutineContext: CoroutineContext
        get() = Dispatchers.Main + job + handler

这里的+就是加和函数,如上所写就是让CoroutineContext具备主线程+工作空间job,和CoroutineExceptionHandler的能力。

作用域

  • 顶级作用域:GlobalScope–> 全局范围,不会自动结束执行,无法取消。
  • 协同作用域:coroutineScope –> 抛出异常会取消父协程
  • 主从作用域:supervisorScope –> 抛出异常,不会取消父协程

三种作用域真正常用的其实只有主从作用域,谁也不想让自己写的协程挂了导致app崩溃吧。但实际使用过程中,由于没有作用域的概念,往往会用到顶级作用域和协同作用域,协程挂了导致app崩溃,然后再去解决异常。

常用的主从作用域有下面这些:

  • MainScope :主线程的作用域,全局范围,可以取消。
  • lifecycleScope : 生命周期范围,用于activity等有生命周期的组件,在Desroyed的时候会自动结束。
  • viewModelScope :ViewModel范围,用于ViewModel中,在ViewModel被回收时会自动结束。

主从作用域启动的协程,崩溃后不会影响其他协程执行。

以MainScope为例,在构建上下文时,加入了SupervisorJob(),SupervisorJob()是一个工作空间,它会在子协程抛出异常时,会将异常控制在子协程内部,不往上传递,不会影响父协程的执行。

线程切换

还是以launch方法签名为入口:

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy){
        LazyStandaloneCoroutine(newContext, block) 
    }else{
        StandaloneCoroutine(newContext, active = true)
    }
    coroutine.start(start, coroutine, block)
    return coroutine
}

追进start方法:

    /**
     * Starts this coroutine with the given code [block] and [start] strategy.
     * This function shall be invoked at most once on this coroutine.
     * 
     * - [DEFAULT] uses [startCoroutineCancellable].
     * - [ATOMIC] uses [startCoroutine].
     * - [UNDISPATCHED] uses [startCoroutineUndispatched].
     * - [LAZY] does nothing.
     */
    public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
        start(block, receiver, this)
    }

可以看到模式启动模式下,使用的是 startCoroutineCancellable ,最终会调用到 resumeCancellableWith 方法,在 resumeCancellableWith 方法中,会判断当前上下文是否需要重新分发,如果需要就将上下文中提取新的Dispathers赋给dispatcher,否则就在当前线程直接执行。

inline fun resumeCancellableWith(
        result: Result<T>,
        noinline onCancellation: ((cause: Throwable) -> Unit)?
    ) {
        val state = result.toState(onCancellation)
        // 判断当前上下文是否需要重新分发,如果需要就将上下文中提取新的Dispathers赋给dispatcher,否则就在当前线程直接执行
        if (dispatcher.isDispatchNeeded(context)) {
            _state = state
            resumeMode = MODE_CANCELLABLE
            dispatcher.dispatch(context, this)
        } else {
            executeUnconfined(state, MODE_CANCELLABLE) {
                if (!resumeCancelled(state)) {
                    resumeUndispatchedWith(result)
                }
            }
        }
    }

在不同的JVM平台上,Dispatcher.Main 调度器的执行位置取决于 具体的UI框架 。以下是主要情况:

  • Android平台上会调度到Android的主线程(UI线程)执行,这是通过Handler(Looper.getMainLooper())实现的
  • JavaFX平台会调度到JavaFX的Application线程执行,这是通过Platform.runLater()实现的
  • Swing平台会调度到Swing的Event Dispatch Thread (EDT)执行,这是通过SwingUtilities.invokeLater()实现的
  • 其他情况则会回退到单线程执行器

除了主调度器之外,其他几个切换也类似此流程,比如Dispatchers.Default是 创建了一个默认的线程池 ,而Dispatchers.IO也是沿用的线程池,只是对线程数量做了限制罢了。

IOS平台

在iOS平台上,Kotlin协程的线程切换主要通过以下方式实现:

  1. Main Dispatcher(主线程调度器): 使用DispatchQueue.main来调度到主线程执行 这是通过Kotlin/Native与iOS的GCD(Grand Central Dispatch)集成实现的 协程会被调度到主队列(Main Queue)执行,确保UI操作在主线程进行
  2. Default Dispatcher(默认调度器): 使用后台线程池执行任务 在iOS上,这通常是通过GCD的全局队列(Global Queue)实现的 使用DispatchQueue.global()来获取后台队列
  3. IO Dispatcher(IO调度器): 专门用于IO密集型操作 同样基于GCD实现,但使用不同的队列优先级 使用DispatchQueue.global(qos: .utility)或DispatchQueue.global(qos: .background)来执行IO操作

简化api表达:

// Main Dispatcher实现
internal class MainDispatcher : CoroutineDispatcher() {
    override fun dispatch(context: CoroutineContext, block: Runnable) {
        DispatchQueue.main.async {
            block.run()
        }
    }
}

// Default Dispatcher实现
internal class DefaultDispatcher : CoroutineDispatcher() {
    override fun dispatch(context: CoroutineContext, block: Runnable) {
        DispatchQueue.global().async {
            block.run()
        }
    }
}

流程图如下:

常用api

协程构建器

用于启动协程的主要方法。

  • launch
    启动一个不会返回结果的协程(Job 类型)。
    GlobalScope.launch {
        // 协程代码
    }
    
  • async
    启动一个会返回结果的协程(Deferred 类型),结果可以通过 await() 获取。
    val deferred = GlobalScope.async {
        // 协程代码
        "Result"
    }
    val result = deferred.await()
    
  • runBlocking
    阻塞当前线程,直到协程执行完毕。通常用于测试或主函数中。
    runBlocking {
        // 协程代码
    }
    

协程上下文与调度器

用于控制协程的执行线程或上下文。

  • Dispatchers.Default
    用于 CPU 密集型任务的默认线程池。
    launch(Dispatchers.Default) {
        // 在后台线程执行
    }
    
  • Dispatchers.IO
    用于 IO 密集型任务的线程池。
    launch(Dispatchers.IO) {
        // 执行 IO 操作
    }
    
  • Dispatchers.Main
    用于在主线程(如 Android 的 UI 线程)执行任务。
    launch(Dispatchers.Main) {
        // 更新 UI
    }
    
  • Dispatchers.Unconfined
    不限制协程的执行线程,根据调用点决定。
    launch(Dispatchers.Unconfined) {
        // 不限制线程
    }
    
  • withContext
    切换协程的上下文。
    withContext(Dispatchers.IO) {
        // 在 IO 线程执行
    }
    

协程作用域

用于管理协程的生命周期。

  • GlobalScope
    全局作用域,协程的生命周期与应用程序一致。
    GlobalScope.launch {
        // 全局协程
    }
    
  • CoroutineScope
    自定义作用域,通常与 lifecycleScopeviewModelScope 结合使用。
    val scope = CoroutineScope(Dispatchers.Main)
    scope.launch {
        // 协程代码
    }
    
  • lifecycleScope(Android)
    Lifecycle 绑定的作用域,协程在 Lifecycle 销毁时自动取消。
    lifecycleScope.launch {
        // 协程代码
    }
    
  • viewModelScope(Android)
    ViewModel 绑定的作用域,协程在 ViewModel 销毁时自动取消。
    viewModelScope.launch {
        // 协程代码
    }
    

协程取消与超时

用于控制协程的执行时间或取消协程。

  • cancel()
    取消协程。
    val job = launch {
        // 协程代码
    }
    job.cancel()
    
  • isActive
    检查协程是否仍处于活动状态。
    if (isActive) {
        // 协程仍在运行
    }
    
  • withTimeout
    设置协程的超时时间,超时后抛出 TimeoutCancellationException
    withTimeout(1000) {
        // 协程代码
    }
    
  • withTimeoutOrNull
    设置协程的超时时间,超时后返回 null 而不是抛出异常。
    val result = withTimeoutOrNull(1000) {
        // 协程代码
    }
    

协程挂起函数

用于在协程中挂起执行。

  • delay
    挂起协程一段时间。
    delay(1000) // 挂起 1 秒
    
  • yield
    挂起当前协程,让出执行权给其他协程。
    yield()
    

协程异常处理

用于处理协程中的异常。

  • try-catch
    捕获协程中的异常。
    try {
        // 协程代码
    } catch (e: Exception) {
        // 处理异常
    }
    
  • CoroutineExceptionHandler
    全局异常处理器。
    val handler = CoroutineExceptionHandler { _, exception ->
        // 处理异常
    }
    launch(handler) {
        // 协程代码
    }
    

协程组合与并发

用于处理多个协程的组合与并发。

  • awaitAll
    等待多个 Deferred 完成并返回结果列表。
    val deferred1 = async { 1 }
    val deferred2 = async { 2 }
    val results = awaitAll(deferred1, deferred2)
    
  • supervisorScope
    创建一个子作用域,子协程的失败不会影响其他子协程。
    supervisorScope {
        launch {
            // 子协程 1
        }
        launch {
            // 子协程 2
        }
    }
    
  • coroutineScope
    创建一个子作用域,子协程的失败会传播到父协程。
    coroutineScope {
        launch {
            // 子协程 1
        }
        launch {
            // 子协程 2
        }
    }
    

协程间的通信Channel

编写具有共享可变状态的代码非常困难且容易出错(例如在使用回调的解决方案中)。更简单的方法是通过通信而不是使用公共可变状态来共享信息。协程可以通过通道相互通信。

通道是允许数据在协程之间传递的通信原语。 一个协程可以向通道发送一些信息,而另一个协程可以从该通道接收该信息

使用方法

发送(生产)信息的协程通常称为生产者,接收(消费)信息的协程称为消费者。一个或多个协程可以向同一个通道发送信息,一个或多个协程也可以从该通道接收数据。

当多个协程从同一个通道接收信息时,每个元素仅由其中一个消费者处理一次。 一旦元素被处理,它将立即从通道中移除。

可以将通道视为元素集合,或者更准确地说,队列这种数据结构,其中元素被添加到一端并从另一端接收。但是,有一个重要的区别:与集合不同,即使在其同步版本中,通道也可以暂停 send()和receive()操作。当通道为空或满时会发生这种情况。如果通道大小有上限,则通道可能会满。

Channel由三个不同的接口表示:SendChannel、ReceiveChannel和Channel,其中后者扩展了前两个。您通常会创建一个通道并将其作为SendChannel实例提供给生产者,以便只有他们可以向该通道发送信息。

您将通道作为ReceiveChannel实例提供给消费者,以便只有他们可以从中接收信息。send和receive方法都声明为suspend:

interface SendChannel<in E> {
    suspend fun send(element: E)
    fun close(): Boolean
}

interface ReceiveChannel<out E> {
    suspend fun receive(): E
}

interface Channel<E> : SendChannel<E>, ReceiveChannel<E>

生产者可以关闭一个通道来表明没有更多的元素到来。

库中定义了几种类型的通道。它们的区别在于内部可以存储多少个元素以及是否send()可以暂停调用。对于所有通道类型,receive()调用的行为都类似:如果通道不为空,则接收一个元素;否则,调用将被暂停。

创建通道时,请指定其类型或缓冲区大小(如果需要缓冲):

val rendezvousChannel = Channel<String>()
val bufferedChannel = Channel<String>(10)
val conflatedChannel = Channel<String>(CONFLATED)
val unlimitedChannel = Channel<String>(UNLIMITED)

默认情况下,会创建一个“Rendezvous”通道。

在以下任务中,您将创建一个“Rendezvous”通道、两个生产者协程和一个消费者协程:

import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.*

fun main() = runBlocking<Unit> {
    val channel = Channel<String>()
    launch {
        channel.send("A1")
        channel.send("A2")
        log("A done")
    }
    launch {
        channel.send("B1")
        log("B done")
    }
    launch {
        repeat(3) {
            val x = channel.receive()
            log(x)
        }
    }
}

fun log(message: Any?) {
    println("[${Thread.currentThread().name}] $message")
}

Flow API

Flow API 是 Kotlin 协程库中的一部分,主要用于处理数据流。

  • flow
    创建一个冷流(Cold Flow)。
    val flow = flow {
        emit(1)
        emit(2)
    }
    
  • collect
    收集流中的数据。
    flow.collect { value ->
        // 处理数据
    }
    
  • map
    对流中的数据进行转换。
    flow.map { value -> value * 2 }
    
  • filter
    过滤流中的数据。
    flow.filter { value -> value > 1 }
    
  • flatMapConcat
    将流中的每个值映射为一个新流,并按顺序连接。
    flow.flatMapConcat { value -> flowOf(value, value * 2) }
    
  • zip
    将两个流合并为一个流。
    val flow1 = flowOf(1, 2)
    val flow2 = flowOf("A", "B")
    flow1.zip(flow2) { a, b -> "$a$b" }
    

关于更多Flow的基础和进阶使用,此前也写过更详细的一篇文章。

Kotlin Flow全面总结

【Kotlin】Flow全面总结

【Kotlin】Flow全面总结

本文介绍了Kotlin异步工具Flow框架的使用总结

Kotlin的Flow这个异步工具,在项目中其实一直在使用,得空参考下郭神在CSDN上的三篇文章,再自行扩展,在使用层面的规则上,进行一个相对较为全面的总结。

Flow最常见的使用场景,就是在viewmodel里面使用StateFlow 热流,在Ui层进行collect。用法和此前的LiveData是一样的。

例如一个获取Github仓库列表的网络请求数据:

    private val githubReposSate = MutableStateFlow(GithubReposState())
    val githubReposListStateFlow = githubReposSate.asStateFlow()

在Composable可组合项里进行消费:

    val githubReposState by viewModel.githubReposListStateFlow.collectAsState()

    Column(
        modifier = Modifier
            .fillMaxSize()
            .background(MaterialTheme.colors.background)
    ) {
        LazyColumn(
            modifier = Modifier
                .fillMaxSize()
                .background(MaterialTheme.colors.background)
        ) {
            items(githubReposState.githubReposList) {
                GithubRepoItem(it)
            }
        }
    }

冷流

使用flow构造器直接创建的为冷流,只有在调用collect函数时才会开始执行往外发送数据。

val TAG = "FlowOne".apply {
    Log.i(this, "init")
}

val flow = flow<Int> {
    repeat(5) {
        delay(500)
        emit(it)
    }
}

fun startCollect() {
    CoroutineScope(Dispatchers.IO).launch {
        delay(3000L)
        Log.i(TAG,"startCollect")
        flow.collect {
            Log.i(TAG,"FlowOne collect $it")
        }
    }
}

外部测试调用:

startCollect()

打印可以看到,flow创建好之后,没有数据打印,而是在三秒后collect时,才会从头开始进行发送:

16:24:21.803  I  init
16:24:24.825  I  startCollect
16:24:25.328  I  FlowOne collect 0
16:24:25.830  I  FlowOne collect 1
16:24:26.332  I  FlowOne collect 2
16:24:26.833  I  FlowOne collect 3
16:24:27.334  I  FlowOne collect 4

再次collect

如果我们在startCollect函数里再次调用collect。

fun startCollect() {
    CoroutineScope(Dispatchers.IO).launch {
        delay(3000L)
        Log.i(TAG, "startCollect")
        flow.collect {
            Log.i(TAG, "FlowOne collect $it")
        }
        flow.collect {
            Log.i(TAG, "FlowOne collect twice $it")
        }
    }
}

第一次收集完毕,延时6s,再次调用collect。打印结果如下:

19:34:57.752  I  init
19:35:00.758  I  startCollect
19:35:01.260  I  FlowOne collect 0
19:35:01.761  I  FlowOne collect 1
19:35:02.262  I  FlowOne collect 2
19:35:02.765  I  FlowOne collect 3
19:35:03.266  I  FlowOne collect 4
19:35:03.771  I  FlowOne collect twice 0
19:35:04.276  I  FlowOne collect twice 1
19:35:04.779  I  FlowOne collect twice 2
19:35:05.280  I  FlowOne collect twice 3
19:35:05.782  I  FlowOne collect twice 4

有两个值得关注的点,第一点是第二次 collect 打印开始的时间并不是紧跟着第一次,而是第一次收集所有数据完毕之后才开始。说明 collect() 函数是一个挂起的函数,只有在数据收集完毕之后,协程后面的函数恢复,才会继续往下执行。

简单来说,连续的两个 collect 操作是串行的,如果想要并行收集,就需要切换到不同的协程作用域。

第二个点是,第二次收集的数据也是从头开始打印的,说明冷流的每一次操作,都会从头开始。

collectLatest

有时候, collect 数据的地方,数据的消费逻辑没有走完,导致数据积压,会出现数据过时的情况,使用 collectLatest 可以解决这个问题。

collectLatest 函数,会在每次有新数据过来时,取消上一次还未执行完的逻辑,立即处理最新的这个数据。

例如,我们在collect函数里延时3s:

fun startCollect() {
    CoroutineScope(Dispatchers.IO).launch {
        delay(3000L)
        Log.i(TAG, "startCollect")
        flow.collectLatest {
            Log.i(TAG, "FlowOne collect $it")
            delay(3000L)
        }
    }
}

打印结果:

16:39:09.467  I  init
16:39:12.494  I  startCollect
16:39:12.997  I  FlowOne collect 0
16:39:13.502  I  FlowOne collect 1
16:39:14.007  I  FlowOne collect 2
16:39:14.510  I  FlowOne collect 3
16:39:15.013  I  FlowOne collect 4

可以看到数据仍然是按照源头的时间间隔来发送的,并不是延时3s才打印。说明 collectLatest() 函数收集的流,会在每次有新数据过来时,取消上一次还未执行完的逻辑,立即处理最新的这个数据。

Flow常用操作符

map

map 可以理解为一个拦截转换器,将 flow 的原数据,经过拦截器处理之后,转换成另一个数据发送出去。map接受一个lambda参数,lambda函数最后一行的数据,就是经过map转换后的返回值。

这里以平方转换为例:

fun mapTest() {
    CoroutineScope(Dispatchers.IO).launch {
        flowOf(1, 2, 3, 4, 5).map {
            it * it
        }.collectLatest {
            Log.i(TAG, "mapTest collect $it")
        }
    }
}

打印结果:

16:44:00.302  I  init
16:44:00.345  I  mapTest collect 1
16:44:00.353  I  mapTest collect 4
16:44:00.356  I  mapTest collect 9
16:44:00.357  I  mapTest collect 16
16:44:00.365  I  mapTest collect 25

filter

filter 函数,用于过滤数据,只将满足条件的数据发送出去。filter() 同样接受一个lambda参数,最后一行的需要返回一个Boolean类型,用于判断是否发送数据。

fun filterTest() {
    CoroutineScope(Dispatchers.IO).launch {
        flowOf(3, 6, 9, 11, 14).filter {
            it % 3 == 0
        }.collectLatest {
            Log.i(TAG, "filterTest collect $it")
        }
    }
}

打印可以发现,只有369,即三的倍数通过了过滤器被接收。

onEach

onEach函数,用于在每次数据发送之前,执行一些操作。可以打印查看原始的数据是否符合预期。

fun onEachTest() {
    CoroutineScope(Dispatchers.IO).launch {
        flowOf(1, 2, 3, 4, 5).onEach {
            Log.i(TAG, "onEachTest onEach $it")
        }.map { it + 10 }.collect {
            Log.i(TAG, "onEachTest collect $it")
        }
    }
}

打印结果:

16:51:51.942  I  init
16:51:51.982  I  onEachTest onEach 1
16:51:51.984  I  onEachTest collect 11
16:51:51.984  I  onEachTest onEach 2
16:51:51.984  I  onEachTest collect 12
16:51:51.985  I  onEachTest onEach 3
16:51:51.986  I  onEachTest collect 13
16:51:51.987  I  onEachTest onEach 4
16:51:51.987  I  onEachTest collect 14
16:51:51.988  I  onEachTest onEach 5
16:51:51.993  I  onEachTest collect 15

debounce

debounce 函数,用于在一段时间内,只发送最后一次数据。两次数据的时间间隔太近,前一次的数据就会被丢弃,后一次数据,在延时这段时间后发送。类似Handler的remove和postDelayed的防抖操作。

@OptIn(FlowPreview::class)
fun debounceTest() {
    CoroutineScope(Dispatchers.IO).launch {
        flow {
            emit(1)
            delay(100)
            emit(2)
            delay(1000)
            emit(3)
            delay(100)
            emit(4)
            delay(100)
            emit(5)
        }.debounce(500).collectLatest {
            Log.i(TAG, "debouneTest collect $it")
        }
    }
}

打印结果:

16:58:41.402  I  init
16:58:42.062  I  debouneTest collect 2
16:58:42.767  I  debouneTest collect 5

流程:

collect开始后,数据1立即发送,开始为期500ms的监测,100ms后数据2发送了,这时候数据1就被丢弃,再开始500ms监测。500ms后没有新数据来,将2发送出去。可以看到从init到第一次数据打印,就是耗时600ms。同理,2和5之间,间隔700ms。

blogs_flow_debounce

sample

sample 函数,作用类似debounce。sample有一个采样期,采样期结束,会将采样期内最后一次数据发送出去。

@OptIn(FlowPreview::class)
fun sampleTest() {
    CoroutineScope(Dispatchers.IO).launch {
        flow {
            emit(1)
            delay(150)
            emit(2)
            delay(150)
            emit(3)
            delay(150)
            emit(4)
            delay(150)
            emit(5)
        }.sample(200).collect {
            Log.i(TAG, "debouneTest collect $it")
        }
    }
}

每150ms发送一次数据,采样期为200ms,所以每200ms,会将采样期内最后一次数据发送出去。打印结果:

17:13:34.656  I  init
17:13:34.891  I  debouneTest collect 2
17:13:35.092  I  debouneTest collect 3
17:13:35.295  I  debouneTest collect 4

取值的过程如下图:

blogs_flow_sample

reduce

reduce 函数,用于迭代操作,在上一次计算结果的基础上,拿当前的值再进行下一步计算。

例如,将1到5的数字相乘:

fun reduceTest() {
    CoroutineScope(Dispatchers.IO).launch {
        val totalResult = 
        flowOf(1, 2, 3, 4, 5)
        .reduce { acc, value ->
            acc * value
        }
        Log.i(TAG, "reduceTest collect $totalResult")
    }
}

结果打印为120.

fold

fold 函数,和 reduce 基本一致,只是多了一个初始值。

fun foldTest() {
    CoroutineScope(Dispatchers.IO).launch {
        val totalResult = 
        flowOf(1, 2, 3, 4, 5)
        .fold(10) { acc, value ->
            acc * value
        }
        Log.i(TAG, "foldTest collect $totalResult")
    }
}

打印结果为1200.

reduce和fold不仅可以用于数字,还可以用于字符串的拼接。

flatMapConcat

以flatMap开头的操作符函数,分别是flatMapConcat、flatMapMerge和flatMapLatest。

flatMap的核心,就是将两个flow中的数据进行映射、合并、压平成一个flow,最后再进行输出。

flatMapConcat,是将两个flow中的数据进行合并,然后再进行输出。侧重点是按顺序拼接,类比C++里面两个数组的组合遍历。

@OptIn(ExperimentalCoroutinesApi::class)
fun flatMapConcatTest() {
    CoroutineScope(Dispatchers.IO).launch {
        flowOf(1, 2, 3).flatMapConcat {
            flowOf("a$it", "b$it")
        }.collect {
            Log.i(TAG, "flatMapConcatTest collect $it")
        }
    }
}

打印结果:

17:40:40.835  I  init
17:40:40.859  I  flatMapConcatTest collect a1
17:40:40.859  I  flatMapConcatTest collect b1
17:40:40.859  I  flatMapConcatTest collect a2
17:40:40.860  I  flatMapConcatTest collect b2
17:40:40.860  I  flatMapConcatTest collect a3
17:40:40.861  I  flatMapConcatTest collect b3

实际应用中,例如账号登陆获取用户数据的网络请求,需要先登录获取一个token,然后再拿这个token去获取用户数据。

fun getUserInfo() {
    CoroutineScope(Dispatchers.IO).launch {
        sendGetTokenRequest()
            .flatMapConcat { token ->
                sendGetUserInfoRequest(token)
            }
            .flowOn(Dispatchers.IO)
            .collect { userInfo ->
                println(userInfo)
            }
    }
}

flatMapMerge

flatMapMerge,同样是将两个flow中的数据进行合并,然后再进行输出。但是他的侧重点是并行的,即flow1每个数据的操作不是串行的,而是并行的。

@OptIn(ExperimentalCoroutinesApi::class)
fun flatMapMergeTest() {
    CoroutineScope(Dispatchers.IO).launch {
        flowOf(300, 200, 100)
            .flatMapMerge {
                flow {
                    delay(it.toLong())
                    emit("a$it")
                    emit("b$it")
                }
            }
            .collect {
                Log.i(TAG, "flatMapMergeTest collect $it")
            }
    }
}

打印结果:

17:55:37.695  I  init
17:55:37.864  I  flatMapMergeTest collect a100
17:55:37.865  I  flatMapMergeTest collect b100
17:55:37.955  I  flatMapMergeTest collect a200
17:55:37.956  I  flatMapMergeTest collect b200
17:55:38.055  I  flatMapMergeTest collect a300
17:55:38.058  I  flatMapMergeTest collect b300

可以看到flatMapMerge处理之后,优先把耗时更少的数据添加到新的flow里面去进行发送。如果这里用的是flatMapConcat,那么结果就是按照300,200,100顺序发送。

flatMapLatest

flatMapLatest,同样是将两个flow中的数据进行合并,然后再进行输出。但是他的侧重点是,只保留最新的一个数据。

@OptIn(ExperimentalCoroutinesApi::class)
fun flatMapLatestTest() {
    CoroutineScope(Dispatchers.IO).launch {
        flow {
            emit(1)
            delay(150)
            emit(2)
            delay(50)
            emit(3)
        }.flatMapLatest {
            flow {
                delay(100)
                emit("$it")
            }
        }.collect {
            Log.i(TAG, "flatMapLatestTest collect $it")
        }
    }
}

和collectLatest类似,如果使用flatMapLatest来合并多个flow,当flow1的前一个数据给到了,但是flow2没有及时合并完成,flow1的下一个数据又过来了,那么前一个数据的处理逻辑就会被掐断丢弃,直接处理最新的这个数据。

打印结果:

17:59:19.282 I init 17:59:19.444 I flatMapLatestTest collect 1 17:59:19.657 I flatMapLatestTest collect 3

zip

zip 函数和 flatMap 函数有点类似,都是作用在两个flow上的。

使用 zip 函数连接的两个flow,它们之间是并行的运行关系。而 flatMap 是一个flow中的数据流向另外一个flow,是串行的关系。

元素按照少的那个flow来决定

zip函数还有一个规则,就是 只要其中一个flow中的数据对应的数量,全部处理结束就会终止运行,剩余未处理的数据将不会得到处理。

@OptIn(ExperimentalCoroutinesApi::class)
fun zipTest() {
    CoroutineScope(Dispatchers.IO).launch {
        flowOf(1, 2, 3, 4, 5)
            .zip(flowOf("a", "b", "c", "d")) { a, b ->
                "$a+$b"
            }
            .collect {
                Log.i(TAG, "zipTest collect $it")
            }
    }
}

第一个flow有5个元素,第二个flow有4个元素,按照zip函数的规则,最终只会处理4个元素,最后一个元素5不会被处理。

打印结果:

19:45:12.248  I  init
19:45:12.261  I  zipTest collect 1+a
19:45:12.262  I  zipTest collect 2+b
19:45:12.263  I  zipTest collect 3+c
19:45:12.264  I  zipTest collect 4+d

运行时长按照长的那个flow来决定

下面例子中,flow1和flow2发送数据均有延时逻辑,zip是并行执行的,最终的运行时长,取决于运行时长更长的那个flow。

fun zipTest2() {
    CoroutineScope(Dispatchers.IO).launch {
        val start = System.currentTimeMillis()
        val flow1 = flow {
            delay(3000)
            emit("a")
        }
        val flow2 = flow {
            delay(2000)
            emit(1)
        }
        flow1.zip(flow2) { a, b ->
            a + b
        }.collect {
            val end = System.currentTimeMillis()
            Log.i(TAG, "Time cost: ${end - start}ms")
        }
    }
}

打印结果:

19:48:24.785  I  init
19:48:27.801  I  Time cost: 3012ms

zip的应用场景,好几个接口的请求返回耗时时长不一致,但是需要将数据一起返回给界面,就可以通过zip的特性,在耗时最长的flow执行完毕之后,再一同发送数据。

buffer

默认情况下,flow的数据发送和collect是在同一个协程上运行的,如果collect里面有耗时逻辑,也会对flow的数据发送造成影响。

在大多数情况,这都是最好规避掉的。

fun bufferTest() {
    CoroutineScope(Dispatchers.IO).launch {
        flow {
            emit(1)
            delay(1000)
            emit(2)
            delay(1000)
            emit(3)
        }.onEach {
            Log.i(TAG, "bufferTest onEach $it")
        }.collect {
            delay(1000)
            Log.i(TAG, "bufferTest collect $it")
        }
    }
}

collectemit 都有1s的延时,互相挂起,串行执行,所以应该每个collect就变成了2s,打印结果如下:

19:55:52.000  I  init
19:55:52.004  I  bufferTest onEach 1
19:55:53.006  I  bufferTest collect 1
19:55:54.011  I  bufferTest onEach 2
19:55:55.013  I  bufferTest collect 2
19:55:56.018  I  bufferTest onEach 3
19:55:57.021  I  bufferTest collect 3

这时候加一个 buffer 操作符,就可以让数据发送和collect并行执行。

buffer() 调用之后的结果打印:

20:00:03.426  I  init
20:00:03.432  I  bufferTest onEach 1
20:00:04.436  I  bufferTest collect 1
20:00:04.437  I  bufferTest onEach 2
20:00:05.439  I  bufferTest collect 2
20:00:05.439  I  bufferTest onEach 3
20:00:06.443  I  bufferTest collect 3

buffer 操作符有一个可选的 capacity 参数,用于指定缓冲区的大小。如果不指定 capacity,则缓冲区的大小默认为 Channel.BUFFERED,这意味着缓冲区的大小是无限制的。 然而,需要注意的是,虽然缓冲区的大小可以是无限制的,但在实际应用中,过大的缓冲区可能会导致内存占用过高,从而影响应用的性能。因此,建议根据具体的应用场景和需求来合理设置缓冲区的大小。

conflate

conflate 函数是对buffer函数的一个另选方案,它的作用是当收集器挂起之后,flow发射方把当前无法处理的数据丢弃掉,待收集器处理完逻辑,再给其发送新的值。可以解决buffer函数的问题,即当 collector 收集器过于耗时,又未指定容量,那缓存区的数据就越来越大。

与之有点相似的是 collectLatest 函数,上面展示了使用collectLatest函数,在数据发送的过程中,会取消上一次未运行完毕的收集逻辑,立即处理最新的数据。

而 conflate 函数,是在数据发送的过程中,如果本次 collect 仍然在运行,就把这个数据丢弃掉,等到 collector 收集器重新可接收数据之后,拿到的就是最新的数据。这样可以保证 collector 每次接收到数据之后,可以把当前的逻辑全部走完。

fun conflateTest() {
    CoroutineScope(Dispatchers.IO).launch {
        flow {
            repeat(7){
                delay(100)
                emit(it)
            }
        }.conflate().collect {
            Log.i(TAG, "conflateTest collect start handle $it")
            delay(210)
            Log.i(TAG, "conflateTest collect end handle $it")
        }
    }
}

打印结果:

20:17:58.356  I  init
20:17:58.465  I  conflateTest collect start handle 0
20:17:58.676  I  conflateTest collect end handle 0
20:17:58.677  I  conflateTest collect start handle 2
20:17:58.889  I  conflateTest collect end handle 2
20:17:58.889  I  conflateTest collect start handle 4
20:17:59.099  I  conflateTest collect end handle 4
20:17:59.099  I  conflateTest collect start handle 6
20:17:59.310  I  conflateTest collect end handle 6

可以看到当collector挂起的时候发送的数据就丢弃掉了。

conflate和collectLatest共用的情况

conflate 函数和 collectLatest 函数,一个丢弃数据,一个丢弃逻辑,如果都用是什么效果呢?

实测是 collectLatest 函数的效果是优先的,收集器会掐断正在执行的逻辑,转而处理更新的数据。

fun conflateTest() {
    CoroutineScope(Dispatchers.IO).launch {
        flow {
            repeat(7) {
                delay(100)
                emit(it)
            }
        }.conflate().collectLatest {
            Log.i(TAG, "conflateTest collect start handle $it")
            delay(210)
            Log.i(TAG, "conflateTest collect end handle $it")
        }
    }
}

打印结果:

20:22:19.626  I  init
20:22:19.736  I  conflateTest collect start handle 0
20:22:19.838  I  conflateTest collect start handle 1
20:22:19.938  I  conflateTest collect start handle 2
20:22:20.039  I  conflateTest collect start handle 3
20:22:20.140  I  conflateTest collect start handle 4
20:22:20.242  I  conflateTest collect start handle 5
20:22:20.342  I  conflateTest collect start handle 6
20:22:20.553  I  conflateTest collect end handle 6

StateFlow 和 SharedFlow

插入LiveData

在Java开发的时候,会使用LiveData来进行数据的传递。

LiveDataAndroid Jetpack的一部分,与常规的可观察类不同,LiveData 具有生命周期感知能力,意指它遵循其他应用组件(如 Activity、Fragment 或 Service)的生命周期。这种感知能力可确保 LiveData 仅更新处于活跃生命周期状态的应用组件观察者。

基本使用方式:

viewmodel里面维护一个私有的MutableLiveData数据,暴露一个公开的livedata变量,然后在activity中监听LiveData数据的变化。

class MainViewModel : ViewModel() {

    private val _countLiveData = MutableLiveData<Int>(0)

    val countLiveData: LiveData<Int>
        get() = _countLiveData

    suspend fun startCount() = withContext(Dispatchers.IO) {
        while (true) {
            delay(1000)
            withContext(Dispatchers.Main) {
                _countLiveData.value = _countLiveData.value?.plus(1)
            }
        }
    }
}

Activity中监听数据变化:

override fun onCreate(savedInstanceState: Bundle?) {
    super.onCreate(savedInstanceState)
    setContentView(R.layout.activity_main)
    Log.i("MainActivity", "onCreate")

    val tvCount = findViewById<TextView>(R.id.tv_count)

    mainViewModel.countLiveData.observe(this) {
        Log.i("MainActivity", "observe data: $it")
        tvCount.text = it.toString()
    }

    lifecycleScope.launch {
        mainViewModel.startCount()
    }
}

运行之后count数据就开始每秒加 1 了,一段时间后上滑回到桌面,日志显示activity的 onStop() 方法被调用,activity的生命周期进入后台。这时候Observer就不会处理推送过来的数据。

再过一段时间后,重新打开应用界面,日志里看到 activityonResume() 方法被调用,activity的生命周期进入前台。这时候 Observer 就会继续从最新的数据开始处理推送过来的数据。

打印结果:

10:58:10.265  I  onCreate
10:58:10.414  I  observe data: 0
10:58:10.417  I  onResume
10:58:11.391  I  observe data: 1
10:58:12.395  I  observe data: 2
10:58:13.397  I  observe data: 3
10:58:14.401  I  observe data: 4
10:58:15.404  I  observe data: 5
10:58:16.406  I  observe data: 6
10:58:20.866  I  onStop
10:58:26.357  I  observe data: 15
10:58:26.357  I  onResume
10:58:26.436  I  observe data: 16
10:58:27.440  I  observe data: 17
10:58:28.444  I  observe data: 18
10:58:29.448  I  observe data: 19

LiveData 遵循观察者模式。当生命周期状态发生变化时,LiveData 会通知 Observer 对象。当界面组件处于非活跃状态时,它不会接收任何 LiveData 事件。

下面是LiveData的observe的源码,展示了这种绑定注册关系:

@MainThread
public void observe(@NonNull LifecycleOwner owner, @NonNull Observer<? super T> observer) {
    assertMainThread("observe");
    if (owner.getLifecycle().getCurrentState() == DESTROYED) {
        // ignore
        return;
    }
    LifecycleBoundObserver wrapper = new LifecycleBoundObserver(owner, observer);
    ObserverWrapper existing = mObservers.putIfAbsent(observer, wrapper);
    if (existing != null && !existing.isAttachedTo(owner)) {
        throw new IllegalArgumentException("Cannot add the same observer"
                + " with different lifecycles");
    }
    if (existing != null) {
        return;
    }
    owner.getLifecycle().addObserver(wrapper);
}

Activity直接收集冷流flow

类比上面LiveData的写法,直接在Activity里收集冷流,试试看是什么效果。

class MainViewModel : ViewModel() {

    val countnFlow = flow<Int> {
        var count = 0
        while (true) {
            emit(count++)
            delay(1000)
        }
    }
}

Activity添加收集flow:

override fun onCreate(savedInstanceState: Bundle?) {
    super.onCreate(savedInstanceState)
    setContentView(R.layout.activity_main)
    Log.i("MainActivity", "onCreate")

    val tvCount = findViewById<TextView>(R.id.tv_count)

    lifecycleScope.launch {
        mainViewModel.countnFlow.collect {
            Log.i("MainActivity", "collect data: $it")
            tvCount.text = it.toString()
        }
    }
}

打印结果:

10:47:56.742  I  onCreate
10:47:56.865  I  collect data: 0
10:47:56.900  I  onResume
10:47:57.872  I  collect data: 1
10:47:58.875  I  collect data: 2
10:47:59.876  I  collect data: 3
10:48:00.878  I  collect data: 4
10:48:01.051  D  visibilityChanged oldVisibility=true newVisibility=false
10:48:01.084  I  onStop
10:48:01.880  I  collect data: 5
10:48:02.882  I  collect data: 6
10:48:03.886  I  collect data: 7
10:48:04.889  I  collect data: 8
10:48:05.891  I  collect data: 9
10:48:06.661  I  onResume
10:48:06.892  I  collect data: 10
10:48:07.896  I  collect data: 11
10:48:08.899  I  collect data: 12
10:48:09.904  I  collect data: 13

可以看到,在activity进入后台之后,数据依然在不断的发送,收集器也在不断的收集处理数据。因为这种方法并没有生命周期感知的特性。

使用repeatOnLifecycle

在协程里面,可以使用 repeatOnLifecycle 来让某些任务只在特定生命周期状态内才会执行。我们可以传入 Lifecycle.State.STARTED,表示只有activity在started状态下才运行。当再次处于started状态时,任务会重新开始执行。

使用 repeatOnLifecycle 需要导入 androidx.lifecycle:lifecycle-runtime-ktx:2.4.0 包。

override fun onCreate(savedInstanceState: Bundle?) {
    super.onCreate(savedInstanceState)
    setContentView(R.layout.activity_main)
    Log.i("MainActivity", "onCreate")

    val tvCount = findViewById<TextView>(R.id.tv_count)

    lifecycleScope.launch {
        repeatOnLifecycle(Lifecycle.State.STARTED) {
            mainViewModel.countnFlow.collect {
                Log.i("MainActivity", "collect data: $it")
                tvCount.text = it.toString()
            }
        }
    }
}

日志可以看到,onStop之后,数据的处理就停止了,在start之后,collect会重新调用,所以数据是从0开始的,日志打印结果:

11:08:40.925  I  onCreate
11:08:41.068  I  collect data: 0
11:08:41.072  I  onResume
11:08:42.072  I  collect data: 1
11:08:43.074  I  collect data: 2
11:08:44.076  I  collect data: 3
11:08:45.080  I  collect data: 4
11:08:46.082  I  collect data: 5
11:08:47.084  I  collect data: 6
11:08:47.621  D  visibilityChanged oldVisibility=true newVisibility=false
11:08:47.657  I  onStop
11:09:03.861  I  collect data: 0
11:09:03.862  I  onResume
11:09:04.863  I  collect data: 1
11:09:05.865  I  collect data: 2
11:09:06.868  I  collect data: 3
11:09:07.871  I  collect data: 4
11:09:08.875  I  collect data: 5

这样可以避免在activity处于后台的时候,数据的处理逻辑一直运行,导致资源浪费或者内存泄漏。

StateFlow

借助 repeatOnLifecycle,我们可以在activity处于started状态的时候,收集数据。使用StateFlow的效果可以说和LiveData几乎一致。

class MainViewModel : ViewModel() {

    private val _stateFlow = MutableStateFlow(0)

    val countStateFlow = _stateFlow.asStateFlow()

    suspend fun startCount() = withContext(Dispatchers.IO) {
        for (i in 0..100) {
            delay(1000)
            _stateFlow.value = i
        }
    }
}

activity里收集,注意 startCountcollect 均为挂起函数,两个函数需要放在不同的作用域内调用:


override fun onCreate(savedInstanceState: Bundle?) {
    super.onCreate(savedInstanceState)
    setContentView(R.layout.activity_main)
    Log.i("MainActivity", "onCreate")

    val tvCount = findViewById<TextView>(R.id.tv_count)

    lifecycleScope.launch {
        launch {
            mainViewModel.startCount()
        }
        repeatOnLifecycle(Lifecycle.State.STARTED) {
            mainViewModel.countStateFlow.collect {
                Log.i("MainActivity", "collect data: $it")
                tvCount.text = it.toString()
            }
        }
    }
}

结果打印,退到后台,数据的处理取消,回到前台后,重新collect,同时因为stateflow是热流,收集时会直接从最新的状态开始:

11:27:57.859  I  onCreate
11:27:58.028  I  collect data: 0
11:27:58.030  I  onResume
11:28:00.004  I  collect data: 1
11:28:01.005  I  collect data: 2
11:28:02.007  I  collect data: 3
11:28:03.009  I  collect data: 4
11:28:04.012  I  collect data: 5
11:28:05.014  I  collect data: 6
11:28:05.165  D  visibilityChanged oldVisibility=true newVisibility=false
11:28:05.210  I  onStop
11:28:11.967  I  collect data: 12
11:28:11.968  I  onResume
11:28:12.045  I  collect data: 13
11:28:13.027  I  collect data: 14
11:28:14.029  I  collect data: 15
11:28:15.031  I  collect data: 16

config变化导致协程取消

当屏幕方向发生变化时,activity会销毁重新创建,这时候 lifecycle 协程会被取消,然后重新启动。计时器任务也会取消重新执行。

竖屏变横屏的日志打印:

13:39:19.215 I onCreate 13:39:19.426 I collect data: 0 13:39:19.429 I onResume 13:39:21.396 I collect data: 1 13:39:22.397 I collect data: 2 13:39:23.399 I collect data: 3 13:39:24.401 I collect data: 4 13:39:25.403 I collect data: 5 13:39:26.395 I onStop 13:39:26.404 I onDestroy 13:39:26.450 I onCreate 13:39:26.479 I collect data: 5 13:39:26.487 I onResume 13:39:27.460 I collect data: 0 13:39:28.461 I collect data: 1 13:39:29.463 I collect data: 2 13:39:30.466 I collect data: 3 13:39:31.468 I collect data: 4

借助stateIn将冷流变成StateFlow

上面的config变化导致协程取消的问题,可以借助 stateIn 函数将冷流变成热流。然后把计时的操作移植到冷流中。

class MainViewModel : ViewModel() {
    private val timeFlow = flow {
        var time = 0
        while (true) {
            emit(time)
            delay(1000)
            time++
        }
    }

    val countStateFlow =
        timeFlow.stateIn(
            viewModelScope,
            SharingStarted.WhileSubscribed(5000),
            0
        )
}

stateIn 扩展函数,有三个参数,第一个参数是协程作用域,第三个参数是初始值。

其第二个参数是共享的策略,因为横竖屏切换通常很快就能完成,这里我们通过stateIn函数的第2个参数指定了一个5秒的超时时长,那么只要在5秒钟内横竖屏切换完成了,Flow就不会停止工作。

反过来讲,这也使得程序切到后台之后,如果5秒钟之内再回到前台,那么Flow也不会停止工作。但是如果切到后台超过了5秒钟,Flow就会全部停止了。

竖屏变横屏的日志打印:

13:47:49.368  I  onCreate
13:47:49.579  I  collect data: 0
13:47:49.588  I  onResume
13:47:50.589  I  collect data: 1
13:47:51.592  I  collect data: 2
13:47:52.594  I  collect data: 3
13:47:53.597  I  collect data: 4
13:47:54.600  I  collect data: 5
13:47:55.385  I  onStop
13:47:55.388  I  onDestroy
13:47:55.471  I  onCreate
13:47:55.477  I  collect data: 5
13:47:55.485  I  onResume
13:47:55.601  I  collect data: 6
13:47:56.605  I  collect data: 7
13:47:57.609  I  collect data: 8
13:47:58.613  I  collect data: 9
13:47:59.616  I  collect data: 10

SharedFlow

粘性消息的概念

LiveData 的粘性,是指当一个新的观察者开始观察 LiveData 时,它会 立即接收到 LiveData 最后一次设置的值 ,即使这个值是在观察者开始观察之前设置的。这种行为被称为粘性,因为它就像观察者“粘”在了 LiveData 的最后一个值上。

粘性的实现原理是基于 LiveData 的版本号机制。每当 LiveData 的值发生变化时,它的版本号就会增加。当一个新的观察者开始观察 LiveData 时,它会检查当前的版本号,如果版本号大于 0,说明 LiveData 已经有了一个值,那么观察者会立即接收到这个值。

粘性的优点是可以确保新的观察者不会错过 LiveData 的任何重要状态变化,即使它们在状态变化之后才开始观察。这对于一些需要实时更新的场景非常有用,例如用户界面的状态管理。

然而,粘性也可能会导致一些问题,特别是在处理事件流时。如果 LiveData 被用作事件总线,粘性可能会导致新的观察者接收到旧的事件,这可能会导致应用程序的行为不符合预期。

通过之前的例子,发现stateflow也是粘性的,开始收集时,是从上一个最新的值开始的。

SharedFlow使用

SharedFlowStateFlow 的用法还是略有不同的。

首先,MutableSharedFlow 是不需要传入初始值参数的。因为非粘性的特性,它本身就 不要求观察者在观察的那一刻就能收到消息 ,所以也没有传入初始值的必要。

另外就是,SharedFlow 无法像 StateFlow 那样通过给 value 变量赋值来发送消息,而是只能像传统 Flow 那样调用 emit 函数。而 emit 函数又是一个挂起函数,所以这里需要调用 viewModelScopelaunch 函数启动一个协程,然后再发送消息。

class MainViewModel : ViewModel() {

    private val _countSharedFlow = MutableSharedFlow<Int>()

    val countSharedFlow = _countSharedFlow.asSharedFlow()

    init {
        CoroutineScope(Dispatchers.IO).launch {
            repeat(20) {
                delay(1000)
                _countSharedFlow.emit(it)
            }
        }
    }
}

activity中收集代码仍然未改变:

override fun onCreate(savedInstanceState: Bundle?) {
    super.onCreate(savedInstanceState)
    setContentView(R.layout.activity_main)
    Log.i("MainActivity", "onCreate")

    val tvCount = findViewById<TextView>(R.id.tv_count)

    lifecycleScope.launch {
        repeatOnLifecycle(Lifecycle.State.STARTED) {
            mainViewModel.countSharedFlow.collect {
                Log.i("MainActivity", "collect data: $it")
                tvCount.text = it.toString()
            }
        }
    }
}

SharedFlow 在运行之后的发送和收集是解耦的。这意味着发送者和接收者可以独立地进行操作,而不需要彼此之间的直接交互。

在 Kotlin 中, SharedFlow 是一个热流(hot flow),它可以在没有订阅者的情况下开始发送数据,并且可以有多个订阅者同时接收数据。这种设计使得 SharedFlow 非常适合 用于在多个组件之间共享数据 ,而不需要显式地管理订阅者的生命周期。

SharedFlow 主要关注其非粘性的特点,其实可以通过一些参数的配置来让 SharedFlow 在有观察者开始工作之前缓存一定数量的消息,甚至还可以让 SharedFlow 模拟出 StateFlow 的效果。SharedFlow 是一个非常强大的工具,特别适合处理事件总线、一次性操作和需要多个订阅者的场景。

【通用开发】线程安全问题

【通用开发】线程安全问题

介绍了Android开发中实现线程安全的几种方式

线程的状态

一个 Thread 线程的生命周期:

各种状态一目了然,值得一提的是”blocked”这个状态:线程在Running的过程中可能会遇到阻塞(Blocked)情况

  • 调用 join()sleep() 方法,sleep() 时间结束或被打断,join() 中断去执行其他线程,IO完成都会回到 Runnable 状态,等待JVM的调度。
  • 调用 wait() ,使该线程处于等待池(wait blocked pool),直到 notify()/notifyAll() ,线程被唤醒被放到锁定池(lock blocked pool),释放同步锁使线程回到可运行状态(Runnable)
  • Running 状态的线程加同步锁(Synchronized)使其进入(lock blocked pool),同步锁被释放进入可运行状态(Runnable)。

此外,在 runnable 状态的线程是处于被调度的线程,此时的调度顺序是不一定的。 Thread 类中的 yield() 方法可以让一个running状态的线程转入runnable。

为什么会有线程安全问题

如果不使用任何同步机制,在多线程中读写同一个变量。那么,程序的结果是难以预料的。

主要原因有一下几点:

  • 简单的读写不是原子操作
  • CPU 可能会调整指令的执行顺序
  • 在 CPU cache 的影响下,一个 CPU 执行了某个指令,不会立即被其它 CPU 看见

1. 原子操作

原子操作(Atomic Operation)是指在多线程或并发编程中,不可被中断的一个或一系列操作。这些操作 要么全部执行完成,要么完全不执行 ,不会出现执行到一半被其他线程干扰的情况,从而保证操作的完整性和一致性。

原子操作在执行过程中不会被其他线程或进程打断。并且操作完成后,结果会立即对其他线程可见(通常由硬件或底层内存模型保证)。

非原子操作的影响

举例:

int64_t i = 0;     // global variable

Thread-1:              Thread-2:
i++;               std::cout << i;

C++ 并不保证 i++ 是原子操作。从汇编的角度看,读写内存的操作一般分为三步:

  1. 将内存单元读到 cpu 寄存器
  2. 修改寄存器中的值
  3. 将寄存器中的值回写入对应的内存单元

进一步,有的 CPU Architecture, 64 位数据(int64_t)在内存和寄存器之间的读写需要两条指令。

这就导致了 i++ 操作在 cpu 的角度是一个多步骤的操作。所以 Thread-2 读到的可能是一个中间状态。

2. CPU重排的影响

为了优化程序的执行性能,编译器和 CPU 可能会 调整指令的执行顺序 。为阐述这一点,下面的例子中,让我们假设所有操作都是原子操作:

int x = 0;     // global variable
int y = 0;     // global variable
  
Thread-1:              Thread-2:
x = 100;               while (y != 200) {}
y = 200;               std::cout << x;

如果 CPU 没有乱序执行指令,那么 Thread-2 将输出 100。然而,对于 Thread-1 来说,x = 100; 和 y = 200; 这 两个语句之间没有依赖关系 ,因此,CPU可能会允许调整语句的执行顺序。

在这种情况下,Thread-2 的打印,有可能是 0 也有可能是 100

2. CPU CACHE的影响

CPU cache 也会影响到程序的行为。下面的例子中,假设从时间上来讲,A 操作先于 B 操作发生:

int x = 0;     // global variable
  
Thread-1:                      Thread-2:
x = 100;    // A               std::cout << x;    // B

x = 100; ,这个看似简短的语句,在 CPU 的实际执行步骤为:

  1. 取指令:CPU从指令缓存中读取 mov 指令。
  2. 解码:解码指令,识别操作(写入内存)和操作数(地址 [x] 和值 100)。
  3. 内存访问。计算变量 x 的内存地址。若 x 不在缓存中,触发缓存加载(Cache Miss)。
  4. 数据写入:将值 100 写入 x 的内存地址。
  5. 缓存同步:更新缓存线,可能通过缓存一致性协议(如MESI)通知其他核心。

尽管从时间上来讲,A 先于 B,但 CPU cache 的影响下, Thread-2 不能保证立即看到 A 操作的结果,所以 Thread-2 可能输出 0 或 100。

Java中常见实现线程安全的操作

1. 使用 final 属性 (Immutability)

声明一个字段为 final 后,它的值在对象构造完成后就不能再被改变。如果一个对象的所有字段都是 final 并且它们引用的对象(如果是引用类型)也是不可变的,那么这个对象就是不可变对象 (Immutable Object)。不可变对象在多线程环境中天然是线程安全的,因为它们的状态不会被任何线程修改。

优点是简单、安全,是实现线程安全的“黄金法则”。应用场景比较有限。

代码举例:

public final class ImmutablePoint {
    // 两个final属性,它们的值在构造函数中确定后不可更改
    private final int x;
    private final int y;

    public ImmutablePoint(int x, int y) {
        this.x = x;
        this.y = y;
    }

    public int getX() {
        return x;
    }

    public int getY() {
        return y;
    }

    // 注意:没有提供任何修改x或y的方法(setter)
}

// 在多线程中使用 ImmutablePoint 对象时,无需任何同步措施。

2. ThreadLocal 线程隔离

ThreadLocal 为每个使用该变量的线程都提供了一个独立的、线程本地的副本。这样,一个线程对变量的修改不会影响到其他线程,从而实现了线程间的隔离,避免了共享资源的竞争。

适用于保存用户会话信息、数据库连接、事务上下文等,这些信息通常只需要在当前线程内共享。

代码举例:

public class ThreadLocalExample {
    // 创建一个 ThreadLocal 实例
    private static final ThreadLocal<String> threadLocalUser = new ThreadLocal<>();

    public static void main(String[] args) throws InterruptedException {
        Runnable task = () -> {
            String threadName = Thread.currentThread().getName();
            // 1. 设置当前线程的本地变量
            threadLocalUser.set(threadName + "'s Data");
            System.out.println(threadName + " set data: " + threadLocalUser.get()); 
            // 输出: A set data: A's Data

            try {
                Thread.sleep(50); // 模拟耗时操作
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }

            // 2. 获取当前线程的本地变量
            System.out.println(threadName + " get data: " + threadLocalUser.get()); 
            // 输出: A get data: A's Data

            // 3. 推荐在线程结束时移除,避免内存泄漏(尤其是在线程池中)
            threadLocalUser.remove();
        };

        Thread threadA = new Thread(task, "Thread-A");
        Thread threadB = new Thread(task, "Thread-B");

        threadA.start();
        threadB.start();
    }
}
/*
可能的输出(Thread-A 和 Thread-B 的数据互不影响):
Thread-A set data: Thread-A's Data
Thread-B set data: Thread-B's Data
Thread-A get data: Thread-A's Data
Thread-B get data: Thread-B's Data
*/

3. volatile 关键字

volatile 保证了对变量读写的可见性和操作的有序性,但 不保证原子性

  • 可见性 (Visibility): 当一个线程修改了 volatile 变量的值,新值会立即同步回主内存;当其他线程读取该变量时,会从主内存中重新获取最新值,而不是使用自己的工作内存副本。
  • 有序性 (Ordering): 禁止 JVM 对 volatile 变量的读写操作进行重排序。

volatile 变量的读写操作仍然在 CPU 缓存中进行,但 JVM 会在这些操作周围插入内存屏障 (Memory Barriers),来保证数据同步。

  • 写操作之后会插入一个 Store Barrier (写屏障) 。这个屏障会强制要求 CPU 将本地缓存中的最新值立即刷新(写入)到主内存。同时,它还会使其他 CPU 核心中该变量的缓存副本失效(Invalidate)。
  • 在读操作之前会插入一个 Load Barrier (读屏障) 。这个屏障会要求 CPU 重新从主内存中加载最新的值到本地缓存,而不是使用可能已过时的本地缓存副本。

适用于修饰状态标记 (flag) 或一次写、多次读的共享变量,但不适用于依赖当前值进行计算的场景(例如 i++)。

代码举例:

public class VolatileExample {
    // 状态标志,一个线程修改后,其他线程需要立即看到最新值
    private volatile boolean isRunning = true;

    public void stop() {
        isRunning = false; // 线程 A 修改
        System.out.println(Thread.currentThread().getName() + " set isRunning to false");
    }

    public void runLoop() {
        // 线程 B 持续读取 isRunning
        while (isRunning) {
            // ... 执行任务
        }
        System.out.println(Thread.currentThread().getName() + " loop stopped.");
    }

    public static void main(String[] args) throws InterruptedException {
        VolatileExample example = new VolatileExample();

        // 线程 B 启动循环
        Thread runnerThread = new Thread(example::runLoop, "Runner-Thread");
        runnerThread.start();

        // 主线程等待一段时间,让 runnerThread 运行起来
        Thread.sleep(100);

        // 线程 A (主线程) 停止循环
        example.stop();
        runnerThread.join(); // 等待 runnerThread 结束
    }
}

插入:乐观锁和悲观锁

悲观锁 (Pessimistic Locking)

假设最坏的情况,认为数据随时可能被其他线程或进程修改,所以 每次访问数据时都会先给数据上锁 ,防止其他人在自己操作期间修改数据。Java中的 synchronizedReentrantLock 都属于悲观锁。

乐观锁 (Optimistic Locking)

假设最好的情况,认为数据被修改的概率很低,所以它不会在访问数据时加锁,而是 在更新数据时才去检查在此期间有没有人修改过数据

配合 CAS(Compare and Swap) 机制,这是 CPU 指令级别的原子操作,是 Java 中实现乐观锁的基石。CAS 操作是一个由 CPU 指令保证的原子操作

在写值时会先读取一遍当前内存中是否还是原值,如果是则执行写入,如果不是,则放弃修改。

Java 并发包中很多原子类(如 AtomicInteger)就是基于 CAS 乐观锁思想实现的。

4. synchronized 关键字

synchronized 是一种内置的 互斥锁 (Intrinsic Lock) 机制,它确保同一时刻只有一个线程可以执行被它保护的代码块或方法。它保证了操作的原子性可见性有序性

使用方式:

  1. 同步实例方法: 锁住当前实例对象 (this)。
  2. 同步静态方法: 锁住当前类的 Class 对象。
  3. 同步代码块: 锁住括号内指定的对象。

代码举例 (同步代码块):

public class SynchronizedExample {
    private int count = 0;
    // 使用一个私有的 final 对象作为锁,避免外部干扰
    private final Object lock = new Object(); 

    public void increment() {
        // 只有获取到 lock 对象的锁的线程才能进入代码块
        synchronized (lock) { 
            // 这是一个复合操作 (读->改->写),必须是原子性的
            count++; 
        }
    }
    
    // 也可以同步方法: public synchronized void increment() { count++; }

    public int getCount() {
        return count; 
        // 实际上,为了保证可见性,这里读取操作也应该同步,或者将 count 声明为 volatile。
        // 为了演示 synchronized 保证原子性,此处简化。
    }
}

5. Lock 加锁 (J.U.C Lock Interface)

Lock 接口(如 ReentrantLock)是 Java 5 引入的,属于 java.util.concurrent.locks 包下的显式锁机制。它提供了比 synchronized 更灵活、更强大的功能,例如:

  • 可中断锁: lock.lockInterruptibly()
  • 尝试锁: lock.tryLock()
  • 定时锁: lock.tryLock(long timeout, TimeUnit unit)
  • 公平锁/非公平锁: 可以在构造函数中指定。

代码举例 (ReentrantLock):

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class LockExample {
    private int count = 0;
    // 创建一个可重入锁实例
    private final Lock lock = new ReentrantLock();

    public void increment() {
        // 1. 获取锁
        lock.lock(); 
        try {
            // 确保同步代码块中的操作是原子性的
            count++; 
        } finally {
            // 2. 释放锁。注意:必须放在 finally 块中,确保在发生异常时也能释放锁
            lock.unlock(); 
        }
    }
    
    public int getCount() {
        return count; 
    }
}

常见集合类容器的线程安全分析

Java 中的集合类主要分为 非线程安全 (Non-thread-safe)线程安全 (Thread-safe) 的同步容器和 并发容器 (Concurrent) 三类。

1. 列表类 (List)

ArrayList 是一个普通的、非同步的类,它的方法(如 add(), get(), remove() 等)都没有使用 synchronized 关键字进行同步控制。在多线程环境下并发操作(如增删改)会导致数据不一致或抛出 ConcurrentModificationException

为什么说它不是线程安全的?

如果在多线程环境中,多个线程同时对同一个 ArrayList 实例进行修改操作(例如一个线程在 add(),另一个线程在 remove()),就可能出现以下问题:

  1. 数据不一致(Data Corruption):例如,两个线程同时尝试添加元素,可能会导致底层数组的数据混乱。
  2. 竞态条件(Race Condition):可能导致 ArrayList 的内部状态(如记录大小的 size 变量)被错误更新。
  3. 抛出异常:最常见的情况是在遍历(迭代)时,另一个线程修改了列表结构,会抛出 ConcurrentModificationException
如何使 List 线程安全?

如果您需要在多线程环境中使用一个类似 ArrayList 的列表,有以下几种线程安全的替代方案:

  1. 使用同步包装器(Synchronized Wrapper):

    List<String> synchronizedList = Collections.synchronizedList(new ArrayList<String>());
    

    简单易用。但是性能较低,因为它对所有操作都是通过锁住整个列表对象来实现的,在高并发下会有性能瓶颈。

  2. 使用 JUC 包中的并发列表(推荐):

    List<String> safeList = new CopyOnWriteArrayList<String>();
    
    public void addItem(String item) {
        safeList.add(item); // 线程安全
    }
    

    Java原生提供的 CopyOnWriteArrayList 性能更高,尤其是在 读多写少 的场景。其采用了 写时复制 的策略。当列表需要被修改时(addset 等),它会创建一个新的底层数组副本,修改在新副本上进行,然后将列表的引用指向新副本。读取操作(get)则始终在旧的数组上进行,不需要加锁。

2. Map类

HashMap 是最常用的 Map 实现。基于哈希表(数组+链表/红黑树)实现。它的键和值都允许为 null。

HashMap 是 Java 中最常用的 Map 实现,它在设计时主要关注的是性能(查找、插入等操作的平均时间复杂度为 $O(1)$),而不是线程安全。它 适用于单线程环境 。性能最高 O(1) 级别,但在并发环境下(多线程同时读写)会引发问题,例如多个线程同时对同一个 HashMap 实例进行修改操作(put()remove() 等),会引发严重的问题,包括:

  1. 数据丢失或不一致: 多个线程同时操作同一个桶(Bucket)时,可能导致数据覆盖或链表结构混乱。
  2. 死循环 (Infinite Loop):HashMap 扩容(resize())的过程中,链表会被重新分配到新的数组中。在并发修改的情况下,可能会出现链表节点相互指向的情况,形成环状结构。当另一个线程尝试遍历这个环时,就会导致 CPU 占用 100% 的死循环,使程序彻底挂死。这个问题在早期的 JDK 版本中尤其常见。
  3. 抛出异常: 类似 ArrayList,在迭代过程中进行结构性修改,也会抛出 ConcurrentModificationException
Map线程安全的替代方案

如果需要在多线程环境中使用一个 Map 结构,类似于List的策略,可以使用以下线程安全的两种方案:

  • Collections.synchronizedMap() 包装一个HashMap可以实现线程安全,但是性能较差。通过包装 HashMap,使用锁住整个对象的方式实现同步。适用于对性能要求不高的多线程环境。
  • ConcurrentHashMap 是一个高性能的Map类。采用更细粒度的锁机制(如 Java 8 采用 CAS + Synchronized 锁住单个桶)。读操作通常是无锁的。绝大多数多线程 Map 场景的首选。 提供了高并发下的高性能。

【通用开发】Git基础与进阶

【通用开发】Git基础与进阶

本文介绍了工作过程中的Git工具使用总结

Git的各个库的组成部分主要分为以下几个区域:

git_status

区域的含义:

  • 工作区域(Working Directory)就是你平时存放项目代码的地方。
  • 暂存区域(Stage)用于临时存放你的改动,事实上它只是一个文件,保存即将提交的文件列表信息。
  • Git 仓库(Repository)就是安全存放数据的位置,这里边有你提交的所有版本的数据。其中,HEAD 指向最新放入仓库的版本(这第三棵树,确切的说,应该是 Git 仓库中 HEAD 指向的版本)。
  • remote就是远端的代码仓,存放在线的公共的代码。

基础提交代码三大步:

git add .
# 添加所有修改到暂存区

git commit -m "message"
# 提交修改到仓库区域

git push origin main
# 将提交的修改上传到git服务器仓库

两种拉取代码的方式

http克隆

直接通过仓库的https链接就可以拉取下载。

git clone https://github.com/stepheneasyshot/stepheneasyshot.github.io.git

ssh克隆

生成本地key

打开cmd命令行,输入以下命令:

C:\Users\stephen\Desktop>ssh-keygen -t rsa -b 4096 -C "zhanfeng990927@gmail.com"
Generating public/private rsa key pair.
Enter file in which to save the key (C:\Users\stephen/.ssh/id_rsa):
Created directory 'C:\\Users\\stephen/.ssh'.
Enter passphrase (empty for no passphrase):
Enter same passphrase again:
Your identification has been saved in C:\Users\stephen/.ssh/id_rsa
Your public key has been saved in C:\Users\stephen/.ssh/id_rsa.pub
The key fingerprint is:
SHA256:XXXXXXXXXX/4pbXXXXXXXXXXXXXXXXX zhanfeng990927@gmail.com
The key's randomart image is:
+---[RSA 4096]----+
|   XXXXXX        |
|  XXXXXXXXX      |
|  XXXXX          |
|   XXXXXX        |
|  XXXXXXXXXX     |
|  XXXXXXXXXX     |
| XXXXXXXXXXXX    |
| XXXXXXXXXX      |
| XXXXXXXXXXX     |
+----[SHA256]-----+

Enter passphrase这两步是提示输入密码,建议不要设置,直接生成。

完毕后在C盘的用户文件夹下应该会生成一个 .ssh 文件夹,可以在 .ssh 目录下看到两个文件:id_rsa 和 id_rsa.pub

其中的pub公钥就是需要上传到git服务器上做验证的文件。

上传key到git服务器

生成SSH Key后,需要将公钥添加到远程Git仓库中。使用cat命令查看公钥内容,Windows上应该直接使用记事本打开即可显示:

cat ~/.ssh/id_rsa.pub

一般格式如下:
ssh-rsa XXXXXX...lrw== zhanfeng990927@gmail.com

然后,复制这串公钥内容。 登录到远程Git仓库,找到SSH Key配置页面,将公钥粘贴到相应位置并保存。

验证并拉取代码

打开git bash,输入ssh -T git@github.com做初次验证,然后就可以通过ssh的方式拉取github上的代码了:

~/Desktop:$ ssh -T git@github.com
The authenticity of host 'github.com (20.205.243.166)' can't be established.
ED25519 key fingerprint is SHA256:+DiY3wvvV6TuJJhbpZisF/zLDA0zPMSvHdkr4UvCOqU.
This key is not known by any other names.
Are you sure you want to continue connecting (yes/no/[fingerprint])? y
Please type 'yes', 'no' or the fingerprint: yes
Warning: Permanently added 'github.com' (ED25519) to the list of known hosts.
Hi stepheneasyshot! You've successfully authenticated, but GitHub does not provide shell access.

~/Desktop:$ git clone git@github.com:stepheneasyshot/stepheneasyshot.github.io.git
Cloning into 'stepheneasyshot.github.io'...

Git文件状态

通常我们需要查看一个文件的状态,使用git status

Changes not staged for commit 表示得大概就是工作区有该内容,但是缓存区没有,需要我们git add.

Changes to be committed 一般而言,这个时候,文件放在缓存区了,我们需要git commit.

nothing to commit, working tree clean 这个时候,我们将本地的代码推送到远端即可。

git配置

列出当前配置

git config --list

列出当前Repository配置

git config --local --list

列出全局配置

git config --global --list

列出系统配置

git config --system --list

配置用户名

git config --global user.name "your name"

配置用户邮箱

git config --global user.email "youremail@github.com"

分支管理

查看本地分支

git branch

查看远程分支

git branch -r

查看本地和远程分支

git branch -a

从当前分支,切换到其他分支

git checkout <branch-name>

创建并切换到新建分支

git checkout -b <branch-name>

删除分支

git branch -d <branch-name>
// 若有未提交的代码,仍然强行删除刻可以使用 D

当前分支与指定分支合并

git merge <branch-name>

查看哪些分支已经合并到当前分支

git branch --merged

查看哪些分支没有合并到当前分支

git branch --no-merged

查看各个分支最后一个提交对象的信息

git branch -v

删除远程分支

git push origin -d <branch-name>

重命名分支

git branch -m <oldbranch-name> <newbranch-name>

拉取远程分支并创建本地分支

git checkout -b 本地分支名x origin/远程分支名x

// 另外一种方式,也可以完成这个操作。
git fetch origin <branch-name>:<local-branch-name>

fetch推荐写法

git fetch origin <branch-name>:<local-branch-name>

一般而言,这个origin是远程主机名,一般默认就是origin。
branch-name 你要拉取的分支
local-branch-name 通常而言,就是你本地新建一个新分支,将origin下的某个分支代码下载到本地分支。

撤销操作

撤销工作区修改

git checkout -- file

暂存区文件撤销 (不覆盖工作区)

git reset HEAD

版本回退

git reset --(soft | mixed | hard ) < HEAD ~(num) >
可以使用git log 找到commitid,以精确还原

比较差异

比较工作区与缓存区

git diff

比较缓存区与本地库最近一次commit内容

git diff -- cached

比较工作区与本地最近一次commit内容

git diff HEAD

比较两个commit之间差异

git diff <commit ID> <commit ID>

一般分支命名

master分支

主分支,用于部署生产环境的分支,确保稳定性。 master分支一般由develop以及hotfix分支合并,任何情况下都不能直接修改代码。

develop 分支

develop为开发分支,通常情况下,保存最新完成以及bug修复后的代码。 开发新功能时,feature分支都是基于develop分支下创建的。

feature分支

开发新功能,基本上以develop为基础创建feature分支。 分支命名:feature/ 开头的为特性分支, 命名规则: feature/user_module、 feature/cart_module。

release分支

release 为预上线分支,发布提测阶段,会release分支代码为基准提测。

hotfix分支

分支命名:hotfix/ 开头的为修复分支,它的命名规则与 feature 分支类似。线上出现紧急问题时,需要及时修复,以master分支为基线,创建hotfix分支,修复完成后,需要合并到master分支和develop分支。

.gitignore文件配置

这个文件的作用,会去忽略一些不需要纳入Git管理这种,我们也不希望出现在未跟踪文件列表。

一般配置方法:

# 井号,此行是注释 会被Git忽略

# 忽略 node_modules/ 目录下所有的文件
node_modules/

# 忽略所有.vscode结尾的文件
.vscode

# 忽略所有.md结尾的文件
*.md

# 但README.md 除外
!README.md

# 会忽略 doc/something.txt 但不会忽略doc/images/arch.txt
doc/*.txt

# 忽略 doc/ 目录下所有扩展名为txt文件

doc/**/*.txt

增加效率的一些操作

git stash

某一天你正在 feature 分支开发新需求,突然产品经理跑过来说线上有bug,必须马上修复。而此时你的功能开发到一半,于是你急忙想切到 master 分支,然后你就会看到以下报错:

error: Your local changes to the following files would be overwritten by checkout:

src/entries/index/config/group.js

Please commit your changes or stash them before youswitch branches.

Aborting

因为当前有文件更改了,需要提交commit保持工作区干净才能切分支。由于情况紧急,你只有急忙 commit 上去,commit 信息也随便写了个“暂存代码”,于是该分支提交记录就留了一条黑历史…

不想随便写一个commit,又要切代码回去,就可以用git stash这个命令。 代码修改会被暂存下来,当从主分支修完bug切回来时,可以使用

git stash apply 

来恢复代码。

相关命令如下:

# 保存当前未commit的代码
git stash

# 保存当前未commit的代码并添加备注
git stash save "备注的内容"

# 列出stash的所有记录
git stash list

# 删除stash的所有记录
git stash clear

# 应用最近一次的stash
git stash apply

# 应用最近一次的stash,随后删除该记录
git stash pop

# 删除最近的一次stash
git stash drop

可以生成多条stash,并使用git stash list来查看。

$ git stash list
stash@{0}: WIP on ...
stash@{1}: WIP on ...
stash@{2}: On ...

应用第二条记录:

$ git stash apply stash@{1}

pop,drop 同理。

git reset

有soft和hard两种。

git reset –hard commitId

将git头部回溯到这笔提交,这笔之后的所有本地修改全部删除,新建的文件貌似不会自动删除。

依次提交a,b,c,
使用hard模式回溯到a,
那么本地b,c的内容将被删除

git reset –soft commitId

将git的头部回溯到这一笔提交,之后的所有变更修改都会被保存下来放到暂存区,并当作下一笔的提交内容。

依次提交a,b,c,使用soft模式回溯,会将b,c的修改保留,在下一次commit的时候将合并到一起。

git cherry-pick

将一个分支的某些提交,复制到另一个分支上,比如release分支上修复的某一些bug,在开发dev分支上也需要同步上去。 这时候就可以使用这个命令。

单个提交举例

如果要把分支 b 的 test 提交给复制到分支 a 上。 先在b分支上使用git log,获取 test 这笔提交的commitHash值,切到分支a,使用:

git cherry-pick commitHash

就可以同步分支b的这一笔提交。

一次转移多个提交

git cherry-pick commit1 commit2

上面的命令将 commit1 和 commit2 两个提交应用到当前分支。

多个连续的commit,也可区间复制:

git cherry-pick commit1^..commit2

上面的命令将 commit1 到 commit2 这个区间的 commit 都应用到当前分支(包含commit1、commit2),commit1 是最早的提交。

cherry-pick冲突

如果分支b有提交1,2,3,4,5需要同步到分支a。 1,2,3顺利合并了,但是复制到第4个提交时,与分支a的本地提交修改的内容有冲突了,这时候需要解决冲突。

有三种方式

放弃cherry-pick
git cherry-pick --abort

这个操作会将已经合入的123也给回退掉,就像什么都没有发生过。

退出cherry-pick
git cherry-pick --quit

保留已经合入的1,2,3笔提交,还有正卡在冲突处的第4笔提交,退出合入过程,这时候第5笔就相当于放弃了。

解冲突后继续合入
git cherry-pick --continue

在手动解完冲突并commit提交后,执行continue命令,会把未合入的第5笔提交也同步进来。

git revert

有提交1,2,3,现在发现提交2会引起严重问题,需要将这一笔撤销掉。可以使用reset命令,但是如果使用reset –hard,会将提交3也回退,需要重新提一次第3笔。

这时候可以使用revert精准回退某一笔提交:

git revert commitHash

输入完后push上去,可以只回退提交2的内容。

包含merge节点的revert回退

如果有提交1,2,3,三笔提交,其中提交3是dev分支合并到主分支上的信息。

这时候直接revert掉2,git会搞不清楚是回退主分支上的2,还是回退掉dev分支上的2,执行不成功。

需要使用 -m 手动指定保留的分支,另一条分支上的2就会被回退。

-m 后面要跟一个 parent number 标识出”主线”,一般使用 1 保留主分支代码。

git revert -m 1 <commitHash>

revert了主分支,dev修好之后合并失效

情景和上面类似,但是把master分支上有问题的提交给revert掉,

如下图,有问题的提交是 b 这一笔,在master上revert掉之后,在dev上修复了这个有问题的提交,准备再次往主分支合并,这时候发现dev上的 b 提交没有同步到master,因为master上已经有一笔 b 的合入记录,在dev修改完毕准备合入时,git一比较,这两笔相同的commitHash,就不会合入dev上 b 的提交。 git_revert_demo

解决方案

使用reset将master回滚出问题之前,也就是 a 这一笔提交。这样master上面就不会有b的残留记录了。

然后把修复完毕的dev分支合并进来。

git reflog

reflog可以找到所有的操作记录。用于误删或者回退过多的场景。

即使在使用了 git reset --hardgit branch -D 等命令后,仍然可以通过 reflog 找到之前的状态‌。

如果有提交a,b,c。其中 c 是你提交的有问题的一笔,需要reset掉,但是看错了commitHash值,将别人提交的 b 都给回退掉了,现在使用git log回溯,git记录上只有a这笔提交了。

这时候可以使用git reflog,它记录了所有的操作记录,记下需要恢复的commitID,就可以还原回来被误删的 b 这笔提交了。

设置git的短命令

方式一

git config --global alias.ps push

方式二 打开用户目录下的 .gitconfig 文件,写入:

[alias] 
        co = checkout
        ps = push
        pl = pull
        mer = merge --no-ff
        cp = cherry-pick

就可以使用短命令来进行git操作了。

例如:

git cp <commitHash>

git rebase

git rebase最大的作用就是让提交记录更加简洁。

分支变基

现在有这一个场景:

rebase

master分支上A之后拉出一条开发分支,而后一个同事提交了B。

你在dev分支上提交了C和D,现在想把dev合并到主分支,使用merge的话,会多出一条空白的merge提交信息。

这时候可以使用

git checkout feature
git rebase master

//这两条命令等价于git rebase master feature

相当于让dev分支的基底从提交A的master开始,变成了从提交B开始,后面提交的C和D就是顺序的提交了。

整个提交记录则是ABCD连续的,合并到主分支上时不会产生merge信息。

同样适用于同一分支上不同开发的提交信息简化。

没有merge信息有好有坏吧,虽然看起来简洁,也无法回溯合代码的历史了。

重塑提交历史

写法如下:

git rebase -i [地址引用]

可以让你把最近的几笔提交信息进行操作,可以提交的调整顺序,合并,删除不想要的提交。即可以将多笔记录合并成一个。

例如:

git rebase -i HEAD~4

就可以列出HEAD开始往前四笔提交,供你编辑。

推荐场景

不同公司,不同情况有不同使用场景,不过大部分情况推荐如下:

单人开发的时候,拉公共分支最新代码的时候使用rebase,也就是git pull -r或git pull –rebase。这样的好处很明显,提交记录会比较简洁。但有个缺点就是rebase以后我就不知道我的当前分支最早是从哪个分支拉出来的了,因为基底变了嘛,所以看个人需求了。

多人开发,往公共分支上合代码的时候,使用merge。如果使用rebase,那么其他开发人员想看主分支的历史,就不是原来的历史了,历史已经被你篡改了。举个例子解释下,比如张三和李四从共同的节点拉出来开发,张三先开发完提交了两次然后merge上去了,李四后来开发完如果rebase上去(注意李四需要切换到自己本地的主分支,假设先pull了张三的最新改动下来,然后执行<git rebase 李四的开发分支>,然后再git push到远端),则李四的新提交变成了张三的新提交的新基底,本来李四的提交是最新的,结果最新的提交显示反而是张三的,就乱套了。

正因如此,大部分公司其实会禁用rebase,不管是拉代码还是push代码统一都使用merge,虽然会多出无意义的一条提交记录“Merge … to …”,但至少能清楚地知道主线上谁合了的代码以及他们合代码的时间先后顺序。

【通用开发】Java线程池

【通用开发】Java线程池

本文介绍了java线程池相关知识

线程的作用

public class Demo01 {

  public static void main(String[] args) {
    var thread = new Thread(() -> {
      System.out.println("Hello world from a Java thread");
    });
    thread.start();
  }
}

本质上Java编译器在编译的时候都认为传递给他的是一个对象,然后执行对象的run方法。

Thread在拿到这个对象的时候,当我们执行Thread的start方法的时候,最终会执行到一个native方法start0:

private native void start0();

当JVM执行到这个方法的时候会调用操作系统给上层提供的API创建一个线程,然后这个线程会去解释执行我们之前给Thread对象传入的对象的run方法字节码,当run方法字节码执行完成之后,这个线程就会退出。

看到这里我们仔细思考一下线程在做一件什么样的事情,JVM给我们创建一个线程好像执行完一个函数(run)的字节码之后就退出了,线程的生命周期就结束了。

确实是这样的,JVM给我们提供的线程就是去完成一个函数,然后退出(记住这一点,这一点很重要,为你后面理解线程池的原理有很大的帮助)。

事实上JVM在使用操作系统给他提供的线程的时候也是给这个线程传递一个函数地址,然后让这个线程执行完这个函数。只不过JVM给操作系统传递的函数,这个函数的功能就是去解释执行字节码,当解释执行字节码完成之后,这个函数也会退出(被系统回收)。

看到这里可以将线程的功能总结成一句话:

执行一个函数,当这个函数执行完成之后,线程就会退出,然后被回收,当然这个函数可以调用其他的函数。

可能你会觉得这句话非常简单,但是这句话会我们理解线程池的原理非常有帮助。

为什么需要线程池

当我们执行start的方法的时候,最终会走到start0方法,这是一个native方法,JVM在执行这个方法的时候会通过系统底层函数创建一个线程,然后去执行run方法,这里需要注意,创建线程是需要系统资源的,比如说内存,因为操作系统是系统资源的管理者,因此一般需要系统资源的方法都需要操作系统的参与,因此创建线程需要操作系统的帮忙,而一旦需要操作系统介入,执行代码的状态就需要从用户态到内核态转换(内核态能够执行许多用户态不能够执行的指令),当操作系统创建完线程之后又需要返回用户态,我们的代码将继续被执行,整个过程像下面这样。

thread_pool

从上图可以看到我们需要两次的上下文切换,同时还需要执行一些操作系统的函数,这个过程是非常耗时间的,如果在并发非常高的情况,我们频繁的去生成线程然后销毁,这对我们程序的性能影响还是非常大的。

因此许许多多聪明的程序员就想能不能不去频繁的创建线程而且也能够完成我们的功能——我们创建线程的目的就是想让我们的程序完成的更加快速,让多个不同的线程同时执行不同的任务,执行完这个任务再去阻塞队列取下一个任务执行。于是线程池就被创造出来了。

线程池的结构大致如下所示:

thread_pool

线程池实现原理

在前面我们已经提到了关于线程池和线程比较重要的两个点:

  • 线程就是执行一个函数。
  • 线程池当中的线程可以执行很多函数,但是不会退出。

那么如何实现上面两个要求?

答案就是在一个函数当中进行while循环,然后不断的从任务队列当中获取任务函数,然后进行执行,直到要求停止线程池当中的线程的时候线程再进行退出,整个过程的代码大致如下所示:

  public void run() {
    while (!isStopped) {
      try {
        Runnable task = tasks.take();
        task.run();
      } catch (InterruptedException e) {
        // do nothing
      }
    }
  }

为何要使用线程池?

  1. 降低开销:在创建和销毁线程的时候会产生很大的系统开销,频繁创建/销毁意味着CPU资源的频繁切换和占用,线程是属于稀缺资源,不可以频繁的创建。假设创建线程的时长记为t1,线程执行任务的时长记为t2,销毁线程的时长记为t3,如果我们执行任务 t2<t1+t3 ,那么这样的开销是不划算的,不使用线程池去避免创建和销毁的开销,将是极大的资源浪费。
  2. 易复用和管理:将线程都放在一个池子里,便于统一管理(可以延时执行,可以统一命名线程名称等),同时,也便于任务进行复用。
  3. 解耦:将线程的创建和销毁与执行任务完全分离出来,这样方便于我们进行维护,也让我们更专注于业务开发。

线程池的优势

  1. 提高资源的利用性:通过池化可以重复利用已创建的线程,空闲线程可以处理新提交的任务,从而降低了创建和销毁线程的资源开销。
  2. 提高线程的管理性:在一个线程池中管理执行任务的线程,对线程可以进行统一的创建、销毁以及监控等,对线程数做控制,防止线程的无限制创建,避免线程数量的急剧上升而导致CPU过度调度等问题,从而更合理的分配和使用内核资源。
  3. 提高程序的响应性:提交任务后,有空闲线程可以直接去执行任务,无需新建。
  4. 提高系统的可扩展性:利用线程池可以更好的扩展一些功能,比如定时线程池可以实现系统的定时任务。

线程池的类型

Java提供了一套Executor框架,封装了对多线程的控制,其体系结构如下图所示:

threablogs_java_excutord_pool

Executor只是一个接口,代码如下:

public interface Executor {
    void execute(Runnable command);
}

ExecutorService接口对该接口进行了扩展,增加很多方法:

shutdown()
shutdowmNow()
isShutdown()
isTerminated()
awaitTermination()
submit(Callable<T>)
submit(Runnable,T)
submit(Runnable)
invokeAll()

重点关注前五个方法:

  • shutdown(): 调用此方法通知线程池 shutdown,调用此方法后,线程池不再接受新的任务,已经提交的任务不会受到影响,会按照顺序执行完毕。不会阻塞调用此方法的线程。
  • shutdowmNow(),立即尝试停止所有正在运行的任务,返回一个待执行的任务列表。不会阻塞调用此方法的线程。该方法除了尽力去尝试停止线程外,没有任何保证,任何响应中断失败的线程可能永远不会停止(如:通过thread.interrupted()中断线程时)。
  • isShutdown():返回一个boolean值,如果已经 shutdown 返回true,反之false。
  • awaitTermination(timeout,timeUnit):阻塞直到所有任务全部完成,或者等待 timeout ,或者在等待timeout期间当前线程抛出InterruptedException
  • isTerminated(): 返回 true 如果所有的任务已经完成且关闭,否则返回false除非在先前已经调用过shutdown()/shutdownNow()

AbstractExecutorService 是一个抽象类,实现了 ExecutorService ,其子 ThreadPoolExecutor 进一步扩展了相关功能。Java中提供了一个工具类供我们去使用 ThreadPoolExecutor ,在 Executors 中提供了如下几种线程池。

thread_pool

这么多的线程池,但都是给ThreadPoolExecutor的构造函数传递不同的参数罢了。

线程池的创建与使用

ThreadPoolExecutor的构造函数:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

七大构造参数

int corePoolSize:该线程池中核心线程数最大值

这边我们区分两个概念:

  • 核心线程:线程池新建线程的时候,当前活动的线程总数< corePoolSize,新建的线程即为核心线程。
  • 非核心线程:线程池新建线程的时候,当前活动的线程总数> corePoolSize, 且阻塞队列已满,这时新建一个线程来执行新提交的任务即为非核心线程。

核心线程默认情况下会一直存活在线程池中,即使这个核心线程不工作(空闲状态),除非ThreadPoolExecutor 的 allowCoreThreadTimeOut这个属性为 true,那么核心线程如果空闲状态下,超过一定时间后就被销毁。

int maximumPoolSize:线程总数最大值

线程总数 = 核心线程数 + 非核心线程数

long keepAliveTime:非核心线程空闲超时时间

keepAliveTime即为空闲线程允许的最大的存活时间。如果一个非核心线程空闲状态的时长超过keepAliveTime了,就会被销毁掉。注意:如果设置allowCoreThreadTimeOut = true,就变成核心线程超时销毁了。

TimeUnit unit:是keepAliveTime 的单位

TimeUnit为枚举类型,列举如下:

TimeUnit.DAYS;               //天
TimeUnit.HOURS;             //小时
TimeUnit.MINUTES;           //分钟
TimeUnit.SECONDS;           //秒
TimeUnit.MILLISECONDS;      //毫秒
TimeUnit.MICROSECONDS;      //微妙
TimeUnit.NANOSECONDS;       //纳秒

BlockingQueue workQueue:存放任务的阻塞队列

当核心线程都在工作的时候,新提交的任务就会被添加到这个工作阻塞队列中进行排队等待;如果阻塞队列也满了,线程池就新建非核心线程去执行任务。

workQueue维护的是等待执行的Runnable对象。常用的 workQueue 类型:(无界队列、有界队列、同步移交队列)

  • SynchronousQueue:同步移交队列,适用于非常大的或者无界的线程池,可以避免任务排队,SynchronousQueue队列接收到任务后,会直接将任务从生产者移交给工作者线程,这种移交机制高效。它是一种不存储元素的队列,任务不会先放到队列中去等线程来取,而是直接移交给执行的线程。只有当线程池是无界的或可以拒绝任务的时候,SynchronousQueue队列的使用才有意义,maximumPoolSize 一般指定成 Integer.MAX_VALUE,即无限大。要将一个元素放入SynchronousQueue,就需要有另一个线程在等待接收这个元素。若没有线程在等待,并且线程池的当前线程数小于最大值,则ThreadPoolExecutor就会新建一个线程;否则,根据饱和策略,拒绝任务。newCachedThreadPool默认使用的就是这种同步移交队列。吞吐量高于LinkedBlockingQueue。
  • LinkedBlockingQueue:基于链表结构的阻塞队列,FIFO原则排序。当任务提交过来,若当前线程数小于corePoolSize核心线程数,则线程池新建核心线程去执行任务;若当前线程数等于corePoolSize核心线程数,则进入工作队列进行等待。LinkedBlockingQueue队列没有最大值限制,只要任务数超过核心线程数,都会被添加到队列中,这就会导致运行中的总线程数永远不会超过 corePoolSize,所以maximumPoolSize 是一个无效设定。newFixedThreadPool和newSingleThreadPool默认是使用的是无界LinkedBlockingQueue队列。吞吐量高于ArrayBlockingQueue。
  • ArrayBlockingQueue:基于数组结构的有界阻塞队列,可以设置队列上限值,FIFO原则排序。当任务提交时,若当前线程小于corePoolSize核心线程数,则新建核心线程执行任务;若当先线程数等于corePoolSize核心线程数,则进入队列排队等候;若队列的任务数也排满了,则新建非核心线程执行任务;若队列满了且总线程数达到了maximumPoolSize最大线程数,则根据饱和策略进行任务的拒绝。
  • DelayQueue:延迟队列,队列内的元素必须实现 Delayed 接口。当任务提交时,入队列后只有达到指定的延时时间,才会执行任务。
  • PriorityBlockingQueue:优先级阻塞队列,根据优先级执行任务,优先级是通过自然排序或者是Comparator定义实现。

ThreadFactory threadFactory

创建线程的方式,这是一个接口,你 new 他的时候需要实现他的 Thread newThread(Runnable r) 方法,一般用不上。

RejectedExecutionHandler handler:饱和策略

抛出异常专用,当队列和最大线程池都满了之后的拒绝策略。 JDK提供了几种不同的RejectedExecutionHandler实现:

  • CallerRunsPolicy : 调用线程处理任务
  • AbortPolicy : 抛出异常
  • DiscardPolicy : 直接丢弃
  • DiscardOldestPolicy : 丢弃队列中最老的任务,执行新任务
//默认策略,阻塞队列满,则丢任务、抛出异常
rejected = new ThreadPoolExecutor.AbortPolicy();

//阻塞队列满,则丢任务,不抛异常
rejected = new ThreadPoolExecutor.DiscardPolicy();

//删除队列中最旧的任务(最早进入队列的任务),尝试重新提交新的任务
rejected = new ThreadPoolExecutor.DiscardOldestPolicy();

//队列满,不丢任务,不抛异常,若添加到线程池失败,那么主线程会自己去执行该任务
rejected = new ThreadPoolExecutor.CallerRunsPolicy();

另外还有一个CallerRunsPolicy

CallerRunsPolicy

其为“调用者运行”策略,实现了一种调节机制 。它不会抛弃任务,也不会抛出异常。 而是将任务回退到调用者。它不会在线程池中执行任务,而是在一个调用了execute的线程中执行该任务。在线程满后,新任务将交由调用线程池execute方法的主线程执行,而由于主线程在忙碌,所以不会执行accept方法,从而实现了一种平缓的性能降低。  

当工作队列被填满后,没有预定义的饱和策略来阻塞execute(除了抛弃就是中止还有去让调用者去执行)。然而可以通过Semaphore来限制任务的到达率。

线程池的状态

  • RUNNING:运行状态,指可以接受任务并执行队列里的任务。
  • SHUTDOWN:调用了 shutdown() 方法,不再接受新任务,但队列里的任务会执行完毕。
  • STOP:指调用了 shutdownNow() 方法,不再接受新任务,所有任务都变成STOP状态,不管是否正在执行。该操作会抛弃阻塞队列里的所有任务并中断所有正在执行任务。
  • TIDYING:所有任务都执行完毕,程序调用 shutdown()/shutdownNow() 方法都会将线程更新为此状态,若调用shutdown(),则等执行任务全部结束,队列即为空,变成TIDYING状态;调用shutdownNow()方法后,队列任务清空且正在执行的任务中断后,更新为TIDYING状态。
  • TERMINATED:终止状态,当线程执行 terminated() 后会更新为这个状态。 关闭线程池 两种关闭线程池的区别:
  • shutdown(): 执行后停止接受新任务,会把队列的任务执行完毕。
  • shutdownNow(): 执行后停止接受新任务,但会中断所有的任务(不管是否正在执行中),将线程池状态变为 STOP状态。

【通用开发】Desktop系统环境适配等操作记录

【通用开发】Desktop系统环境适配等操作记录

本文介绍了桌面系统上的一些操作记录,环境配置等,供日后查看

重装系统特别多,有时候一些环境配置或者系统设置操作容易忘记,又要重新搜集,在此文作记录。

Windows文件资源管理器六个文件夹删除

文件资源管理器侧边栏的几个文件夹,在选取文件和查看时,占用很多不必要的空间,我希望需要选取资源的文件夹都放在快速访问里就够了。

通过删除注册表(运行regedit打开)把这几个文件夹的显示删除掉:

1、注册表路径:

HKEY_LOCAL_MACHINE
|-SOFTWARE
|-Microsoft
|-Windows
|-CurrentVersion
|-Explorer
|-MyComputer
|-NameSpace

2、找到相应的键值进行删除操作(删除之前先做好备份):

1)删除【下载】文件夹: {088e3905-0323-4b02-9826-5d99428e115f}

2)删除【图片】文件夹: {24ad3ad4-a569-4530-98e1-ab02f9417aa8}

3)删除【音乐】文件夹: {3dfdf296-dbec-4fb4-81d1-6a3438bcf4de}

4)删除【文档】文件夹: {d3162b92-9365-467a-956b-92703aca08af}

5)删除【视频】文件夹: {f86fa3ab-70d2-4fc7-9c99-fcbf05467f3a}

6)删除【桌面】文件夹: {B4BFCC3A-DB2C-424C-B029-7FE99A87C641}

7)删除【3D对象】文件夹: {0DB7E03F-FC29-4DC6-9020-FF41B59E513A}

删除完成之后,通过任务管理器重启文件资源管理器即可生效:


Windows安装ubuntu子系统

  1. 控制面板,最后一个程序模块,启用功能,打开勾选Hyper-V和适用于windows的linux子系统这两个选项
  2. 重启Windows电脑
  3. 微软商店里搜索Ununtu,下载安装
  4. 运行安装好的Ubuntu子系统,等待初始化即可

Windows和Ubuntu的共享文件系统,可以访问 mnt 路径:

stephen@DESKTOP-PA80G1H:~$ cd /mnt/e/Dev/Android
stephen@DESKTOP-PA80G1H:/mnt/e/Dev/Android$ ls
CommonDebugDemo  JniDemo  NetDataDemo  SmolChat-Android  gallery

可以进行文件的复制操作,在mnt下直接操作会有很严重的IO损耗,最好复制到ubuntu内部路径再使用。


pip换依赖源

python和pip环境变量地址

python主程序安装后,地址加入PATH才可以在cmd里随处使用:

E:\Env\python\python3135

pip包管理器其实就在Scripts路径下:

E:\Env\python\python3135\Scripts

要更换 pip 的package软件源,可以按照以下步骤进行:

临时换源

在安装包时使用 -i 参数,例如: 清华源:

pip install 包名 -i https://pypi.tuna.tsinghua.edu.cn/simple

阿里源:

pip install 包名 -i https://mirrors.aliyun.com/pypi/simple

永久换源

使用以下命令设置全局源: 清华源:

pip config set global.index-url https://pypi.tuna.tsinghua.edu.cn/simple

阿里源:

pip config set global.index-url https://mirrors.aliyun.com/pypi/simple

恢复默认源:使用命令 pip config unset global.index-url

通过更换源,可以显著提高安装 Python 包的速度。

pip 升级

pip 中升级一个已安装的包,命令格式如下:

pip install --upgrade 包名

或者使用短选项:

pip install -U 包名

示例: 升级 numpy

pip install --upgrade numpy

补充说明:

  1. 只升级一个包,不影响其他包
    pip install --upgrade 包名 只会升级指定的包及其依赖(如果有必要),不会动其他已安装的包。

  2. 如果该包未安装
    这条命令会直接安装最新版本(效果等同于 pip install 包名)。

  3. 指定版本号升级/降级
    例如升级到某个特定版本(比当前新或旧):
    pip install 包名==1.2.3
    
  4. 查看当前已安装的版本
    pip show 包名
    
  5. 批量升级所有包(需额外工具)
    pip 本身没有直接升级全部包的命令,常用 pip-reviewpip list --outdated 配合脚本处理。

Windows文件共享

选取要共享的文件夹,右键查看属性:

点击高级共享,勾选“共享此文件夹”,然后点击“权限”。

确保“Everyone”用户或特定用户的权限已设置为“读取”或“完全控制”。

点击“确定”保存。

回到“属性”窗口,切换到“安全”选项卡。

这里是非常关键的一步:安全设置也必须给予相应的权限。点击“编辑”,然后点击“添加”。

在输入框中输入“Everyone”,然后点击“检查名称”,再点击“确定”。

为“Everyone”用户设置相应的权限,例如“完全控制”。

点击“确定”保存所有更改。

检查网络共享中心的设置,专用,公用,所有的都开启网路共享

最后查看四个关键服务是否启动。某些 Windows 服务必须运行,才能确保网络发现和共享功能正常工作。

  1. Win + R,输入 services.msc,然后按回车。
  2. 在服务列表中,找到以下几项,确保它们的启动类型设置为“自动”,并且状态是“正在运行”:
    • Function Discovery Provider Host
    • Function Discovery Resource Publication
    • Server
    • TCP/IP NetBIOS Helper
  3. 如果某个服务没有运行,双击它,将启动类型改为“自动”,然后点击“启动”按钮。

完成以上步骤后,重启你的电脑,再尝试从另一台电脑访问共享内容。通常,经过这几个步骤,问题都能得到解决。如果问题依旧,你可以在另一台电脑的“文件资源管理器”地址栏中直接输入共享电脑的 IP 地址来尝试连接,例如 \\192.168.1.100


Ubuntu22换源

临时换源

在安装包时使用 -i 参数,例如: 清华源:

sudo apt install 包名 -i https://mirrors.tuna.tsinghua.edu.cn/ubuntu/

永久换源

编辑 /etc/apt/sources.list 文件,将其中的源地址替换为清华源或阿里源。

sudo nano /etc/apt/sources.list

将文件内容替换为以下内容(清华源):

deb https://mirrors.tuna.tsinghua.edu.cn/ubuntu/ focal main restricted universe multiverse
deb https://mirrors.tuna.tsinghua.edu.cn/ubuntu/ focal-updates main restricted universe multiverse
deb https://mirrors.tuna.tsinghua.edu.cn/ubuntu/ focal-backports main restricted universe multiverse
deb https://mirrors.tuna.tsinghua.edu.cn/ubuntu/ focal-security main restricted universe multiverse
deb https://mirrors.tuna.tsinghua.edu.cn/ubuntu/ focal-proposed main restricted universe multiverse

保存并退出编辑器(在 nano 中,按 Ctrl + O 保存,然后按 Ctrl + X 退出)。 然后更新软件包列表:

sudo apt update
sudo apt upgrade

Ubuntu24换源

24和之前的有些区别。Ubuntu24.04的源地址配置文件发生改变,不再使用以前的sources.list文件,升级24.04之后,而是使用如下文件:

/etc/apt/sources.list.d/ubuntu.sources
  1. 备份源配置文件
sudo cp /etc/apt/sources.list.d/ubuntu.sources  /etc/apt/sources.list.d/ubuntu.sources.bak
  1. 编辑源配置文件
sudo vim /etc/apt/sources.list.d/ubuntu.sources

3.使用dd将原本内容删除,然后添加以下新的源。清华源

Types: deb
URIs: http://mirrors.tuna.tsinghua.edu.cn/ubuntu/
Suites: noble noble-updates noble-security
Components: main restricted universe multiverse
Signed-By: /usr/share/keyrings/ubuntu-archive-keyring.gpg
  1. 更新
sudo apt-get update

三、其它源 1、中科大

Types: deb
URIs: http://mirrors.ustc.edu.cn/ubuntu/
Suites: noble noble-updates noble-security
Components: main restricted universe multiverse
Signed-By: /usr/share/keyrings/ubuntu-archive-keyring.gpg

2、阿里

Types: deb
URIs: http://mirrors.aliyun.com/ubuntu/
Suites: noble noble-updates noble-security
Components: main restricted universe multiverse
Signed-By: /usr/share/keyrings/ubuntu-archive-keyring.gpg

3、网易

Types: deb
URIs: http://mirrors.163.com/ubuntu/
Suites: noble noble-updates noble-security
Components: main restricted universe multiverse
Signed-By: /usr/share/keyrings/ubuntu-archive-keyring.gp

Windows设置一个exe开机自启动

这是最直接、最通用的方法。同时按下 Win + R 键,打开“运行”对话框。

输入 shell:startup 然后点击“确定”。这会打开当前用户的“启动”文件夹。

找到你想要开机自启的应用程序的 快捷方式,然后将它拖拽到这个“启动”文件夹中。

提示: 如果你的应用没有快捷方式,通常可以在开始菜单中找到它,然后右键点击,选择“更多” -> “打开文件位置”,在那里你可以找到快捷方式。如果还是找不到,你也可以自己创建快捷方式。


Windows配置C++开发环境

下载MSYS2

MSYS2官网下载最新版本的安装包。

安装MSYS2

运行下载的安装包,按照提示完成安装。建议安装在默认路径 C:\msys64

更新MSYS2

安装完MSYS2后,发现里面的环境目录都是空白的。

打开MSYS2 MINGW64终端,运行以下命令更新系统和软件包:

pacman -Syu

如果提示需要重启MSYS2终端,请关闭当前终端并重新打开。

安装开发工具

安装常用的开发工具和库,可以使用以下命令:

pacman -S --needed base-devel mingw-w64-ucrt-x86_64-toolchain

这将安装基本的开发工具和mingw-w64编译器。

配置环境变量

在MSYS2安装目录下找到 ucrt64 的路径,通常是:

D:\Program Files\MSYS2\ucrt64\bin

将这个路径添加到系统的PATH环境变量中。

重启VSCODE

重启完毕打开cpp文件编辑界面,按F5,选择C++(GDB/LLDB),会自动生成launch.json文件。

然后点击右上角运行,即可编译运行C++程序。


Python 转换md文件为pdf

使用pypandoc库。

安装pandoc

Pandoc官网下载最新版本的安装包。

安装MikTeX

MikTeX官网下载最新版本的安装包。

可能需要配置环境变量

python脚本

import pypandoc
import os


def markdown_to_pdf(input_md_file, output_pdf_file):
    """
    将 Markdown 文件转换为 PDF 文件。

    参数:
    input_md_file (str): 输入的 Markdown 文件路径。
    output_pdf_file (str): 输出的 PDF 文件路径。
    """
    if not os.path.exists(input_md_file):
        print(f"错误:找不到文件 '{input_md_file}'")
        return

    try:
        # 使用 pypandoc 将 Markdown 转换为 PDF
        pypandoc.convert_file(input_md_file, 'pdf', outputfile=output_pdf_file)
        print(f"成功将 '{input_md_file}' 转换为 '{output_pdf_file}'")
    except Exception as e:
        print(f"转换过程中发生错误: {e}")


if __name__ == "__main__":
    # 指定你的输入和输出文件路径
    input_file = "C:\\Users\\zhanf\\Desktop\\tesettestsetes.md"  # 替换成你的Markdown文件路径
    output_file = "C:\\Users\\zhanf\\Desktop\\OUTPUT_PDF_FILE.pdf"  # 替换成你想要的PDF文件路径

    markdown_to_pdf(input_file, output_file)

运行过程中,可能会自动安装一些宏包,一路点安装即可。最后会生成pdf文件。

目前貌似不兼容中文。


MACOS调整图片大小

命令:

sips --resampleHeightWidth <高度> <宽度> <输入文件路径> --out <输出文件路径>

NPM buildby 查看客户端构建技术栈

专业工具:buildby 这是 NPM 上一个专门用于检测桌面应用技术栈的工具,自动化程度高。

# 安装 (如果已安装Node.js环境)
npm install -g @wavever/buildby

# 检测单个应用
buildby "/Applications/YourApp.app"

# 扫描整个 Applications 文件夹并统计
buildby --scan

##

【通用开发】Java反射

【通用开发】Java反射

本文介绍了Java反射机制的使用和关于其速度的测试结论

简单来说,Java 反射(Reflection)就是一种让 Java 程序在运行时能够“看清”并操作自身的能力。

通常,我们编写 Java 代码时,在编译阶段就已经确定了类、方法、字段等信息。但反射打破了这种限制,它允许你在程序运行时:

  1. 获取类的信息: 比如,一个对象属于哪个类?这个类有哪些字段(属性)?有哪些方法?有哪些构造函数?等等。
  2. 操作类的成员:
    • 创建对象: 不通过 new 关键字,而是动态地创建类的实例。
    • 访问/修改字段: 即使是 private 的字段,也能读取或修改它的值。
    • 调用方法: 即使是 private 的方法,也能动态地调用它。

反射主要用于:

  • 框架和库的开发: 很多流行的 Java 框架(如 Spring、Hibernate、JUnit)都大量使用了反射。它们需要在运行时动态地加载类、注入依赖、调用方法等,而不需要提前知道用户会定义哪些具体的类。
  • 动态代理: 在不修改原有代码的情况下,为对象增加新的功能。
  • 序列化和反序列化: 当对象需要保存到文件或网络传输时,需要知道对象的内部结构。
  • 单元测试工具: 允许测试框架访问私有成员进行测试。

反射也可以叫自省,向内探查。那些外部访问不到的API,可以通过反射强行调用。如果编译时知道类或对象的具体信息,此时直接对类和对象正常初始化操作即可,无需使用反射(reflection)。如果编译不知道类或对象的具体信息,就要用到 反射 来实现。比如类的名称放在XML文件中,属性和属性值放在XML文件中,需要在运行时读取XML文件,动态获取类的信息。Web领域对动态扩展的要求很高,会大量用到反射。

场景

Java反射机制的核心是在程序运行时 动态加载类并获取类的详细信息 ,从而操作类或对象的属性和方法。本质是JVM得到class对象之后,再通过class对象进行反编译,从而获取对象的各种信息。

Java属于先编译再运行的语言,程序中对象的类型在编译期就确定下来了,而当程序在运行时可能需要动态加载某些类,这些类因为之前用不到,所以没有被加载到JVM。通过反射,可以在运行时动态地创建对象并调用其属性,不需要提前在编译期知道运行的对象是谁。

在编译时根本无法知道该对象或类可能属于哪些类,程序只依靠运行时信息来发现该对象和类的真实信息比如:log4j,Servlet、SSM框架技术都用到了反射机制。

Android平台上,LayoutInflator解析xml利用了反射生成view,还有EventBus使用反射进行了解耦处理等。

使用

使用反射创建对象,调用方法举例:


public class ReflectionExample {
    private static final String TAG = "ReflectionExample";
    public static void init() {
        try {
            // 获取类对象
            Class<?> clazz = Class.forName("com.stephen.commondemo.alltest.MyClass");
            // 获取构造函数
            Constructor<?> constructor = clazz.getConstructor();
            // 使用构造函数创建对象
            Object obj = constructor.newInstance();
            // 获取方法
            Method method = clazz.getMethod("myMethod", String.class);
            // 调用方法
            method.invoke(obj, "Hello, Reflection!");
        } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException |
                 InstantiationException | InvocationTargetException e) {
            e.printStackTrace();
        }
    }
}

class MyClass {
    private static final String TAG = "MyClass";
    public MyClass() {
        Log.i(TAG, "MyClass instance created.");
    }
    public void myMethod(String message) {
        Log.i(TAG, "Method called with message: " + message);
    }
}

优缺点

1、优点:

在运行时获得类的各种内容,进行反编译,对于Java这种先编译再运行的语言,能够让我们很方便的创建灵活的代码,这些代码可以在运行时装配,无需在组件之间进行源代码的链接,更加容易实现面向对象。

2、缺点:

(1)反射会消耗一定的系统资源,因此,如果不需要动态地创建一个对象,那么就不需要用反射; (2)反射调用方法时可以忽略权限检查,因此可能会破坏封装性而导致安全问题。

反射获取Class信息

反射的关键实现方法有以下几个:

  • 得到类:Class.forName(“类名”)
  • 得到所有字段:getDeclaredFields()
  • 得到所有方法:getDeclaredMethods()
  • 得到构造方法:getDeclaredConstructor()
  • 得到实例:newInstance()
  • 调用方法:invoke()

例如现在有一个Human类,设置几个参数,构造函数,公共方法。

package com.stephen.commondemo.alltest;

public class Human {
    private static final String TAG = "Human";
    public String gender;
    public String age;

    public Human(String gender, String age) {
        this.gender = gender;
        this.age = age;
    }

    private Human() {
    }

    public Human(String gender) {
        this.gender = gender;
    }

    public String getGender() {
        return gender;
    }

    public String getAge() {
        return age;
    }

    public void setGender(String gender) {
        this.gender = gender;
    }

    public void setAge(String age) {
        this.age = age;
    }

    public void eat() {
        System.out.println("Human is eating.");
    }

    public void speak(String str) {
        System.out.println("Human is speaking" + str);
    }
}

在另一个类,使用反射获取类的信息。

package com.stephen.commondemo.alltest;

import android.util.Log;

import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.Arrays;
import java.util.Objects;

public class HumanInfoGetter {

    private static final String TAG = "HumanInfoGetter";

    public static void init() throws Exception {
        // 1.获取一个类的结构信息(类对象 Class对象)
        Class<?> clazz = Class.forName("com.stephen.commondemo.alltest.Human");
        // 2.从类对象中获取类的各种结构信息
        // 2.1 获取基本结构信息
        Log.i(TAG, clazz.getName());
        Log.i(TAG, clazz.getSimpleName());
        Log.i(TAG, Objects.requireNonNull(clazz.getSuperclass()).getName());
        Log.i(TAG, Arrays.toString(clazz.getInterfaces()));
        // 2.2 获取构造方法
        // 只能得到public修饰的构造方法
        // Constructor[] constructors = clazz.getConstructors();
        // 可以得到所有的构造方法
        Constructor[] constructors = clazz.getDeclaredConstructors();
        Log.i(TAG, constructors.length + "");
        for (Constructor con : constructors) {
            // System.out.println(con.toString());
            Log.i(TAG, con.getName() + "||" +
                    Modifier.toString(con.getModifiers()) + "  ||"
                    + Arrays.toString(con.getParameterTypes()));
        }
        // Constructor con = clazz.getConstructor();// 获取无参数构造方法
        // Constructor con = clazz.getConstructor(String.class,String.class);
        Constructor<?> con =
                clazz.getDeclaredConstructor(String.class, String.class);
        Log.i(TAG, String.valueOf(con));
        // 2.3 获取属性
        // Field[] fields = clazz.getFields();
        Field[] fields = clazz.getDeclaredFields();
        Log.i(TAG, String.valueOf(fields.length));
        for (Field f : fields) {
            Log.i(TAG, String.valueOf(f));
        }
        // Field f = clazz.getField("color");
        // private 默认 protecte public都可以获取,但不包括父类的
        Field f = clazz.getDeclaredField("age");
        Log.i(TAG, String.valueOf(f));
        // 2.3 获取方法
        // Method[] methods = clazz.getMethods();
        Method[] methods = clazz.getDeclaredMethods();
        for (Method m : methods) {
            Log.i(TAG, String.valueOf(m));
        }

        Method m1 = clazz.getMethod("speak", String.class);
        Method m2 = clazz.getDeclaredMethod("eat");

        Log.i(TAG, String.valueOf(m1));
        Log.i(TAG, String.valueOf(m2));
    }
}

原理

从上述内容可以看出,对于反射来说,操纵类最主要的方法是 invoke,所以搞懂了 invoke 方法的实现,也就搞定了反射的底层实现原理了。

invoke 方法的执行流程如下:

  1. 查找方法:当通过 java.lang.reflect.Method 对象调用 invoke 方法时,Java 虚拟机(JVM)首先确认该方法是否存在并可以访问。这包括检查方法的访问权限、方法签名是否匹配等。
  2. 安全检查:如果方法是私有的或受保护的,还需要进行访问权限的安全检查。如果当前调用者没有足够的权限访问这个方法,将抛出 IllegalAccessException。
  3. 参数转换和适配:invoke 方法接受一个对象实例和一组参数,需要将这些参数转换成对应方法签名所需要的类型,并且进行必要的类型检查和装箱拆箱操作。
  4. 方法调用:对于非私有方法,Java 反射实际上是通过 JNI(Java Native Interface,Java 本地接口)调用到 JVM 内部的 native 方法,例如 java.lang.reflect.Method.invoke0()。这个 native 方法负责完成真正的动态方法调用。对于 Java 方法,JVM 会通过方法表、虚方法表(vtable)进行查找和调用;对于非虚方法或者静态方法,JVM 会直接调用相应的方法实现。
  5. 异常处理:在执行方法的过程中,如果出现任何异常,JVM 会捕获并将异常包装成 InvocationTargetException 抛出,应用程序可以通过这个异常获取到原始异常信息。
  6. 返回结果:如果方法正常执行完毕,invoke 方法会返回方法的执行结果,或者如果方法返回类型是 void,则不返回任何值。

通过这种方式,Java 反射的 invoke 方法能够打破编译时的绑定,实现运行时动态调用对象的方法,提供了极大的灵活性,但也带来了运行时性能损耗和安全隐患(如破坏封装性、违反访问控制等)。

反射为什么比正常加载慢

简单来说,因为 反射需要在运行时动态获取类的信息 ,这比在编译时就获取信息要慢。

反射性能低么?为什么?

  • 反射调用过程中会产生大量的临时对象,这些对象会占用内存,可能会导致频繁 gc,从而影响性能。
  • 反射调用方法时会从方法数组中遍历查找,并且会检查可见性等操作会耗时。
  • 反射在达到一定次数时,会动态编写字节码并加载到内存中,这个字节码没有经过编译器优化,也不能享受JIT优化。

  • 反射一般会涉及自动装箱/拆箱和类型转换,都会带来一定的资源开销。

经过方法调用的测试,反射比正常调用大约慢100倍。反射方法调用耗时大约是 0.0004ms ,而Android屏幕刷新率是60-120hz,每一帧的耗时大概8.3ms到16ms之间,如果要使用户感受到反射带来的卡顿,至少要17000多次调用。

除了循环之外,不会有这么多的反射调用。

所以反射虽然慢,在非高频的场景下,正常使用完全没有问题。

反射慢流传的原因

由于时代原因,在 Android 4.4 及之前的设备上,反射的耗时大约为0.008-0.09ms,大概慢了 20-300 倍。取个100倍。

按照每一帧16ms来算,给每一帧分配10%的时间片留给反射,那只有41次的调用机会了。

在Android5.0推出了ART,性能优化了一大截。

【通用开发】JVM泛型

【通用开发】JVM泛型

本文介绍了JVM的泛型实现原理

根据《Java编程思想》中的描述,泛型出现的动机:

有很多原因促成了泛型的出现,而最引人注意的一个原因,就是为了创建容器类。

泛型的本质就是”参数化类型”。一提到参数,最熟悉的就是定义方法的时候需要形参,调用方法的时候,需要传递实参。那”参数化类型”就是将原来具体的类型参数化。泛型的出现避免了强转的操作,在编译器完成类型转化,也就避免了运行的错误。

现在的程序开发大都是面向对象的,平时会用到各种类型的对象,一组对象通常需要用集合来存储它们,因而就有了一些集合类,比如 List、Map 等。

这些集合类里面都是装的具体类型的对象,如果每个类型都去实现诸如 TextViewList、ActivityList 这样的具体的类型,显然是不可能的。

因此就诞生了「泛型」,它的意思是把具体的类型泛化,编码的时候用符号来指代类型,在使用的时候,再确定它的类型。

实例

Java泛型也是一种语法糖,在编译阶段完成类型的转换的工作,避免在运行时强制类型转换而出现 ClassCastException ,类型转化异常。

不使用泛型:

public static void main(String[] args) {
    List list = new ArrayList();
    list.add(11);
    list.add("ssss");
    for (int i = 0; i < list.size(); i++) {
        System.out.println((String)list.get(i));
    }
}

因为list类型是Object。所以int,String类型的数据都是可以放入的,也是都可以取出的。但是上述的代码,运行的时候就会抛出 类型转化异常 ,这个相信大家都能明白。

使用泛型:

public static void main(String[] args) {
        List<String> list = new ArrayList();
        list.add("hahah");
        list.add("ssss");
        for (int i = 0; i < list.size(); i++) {
            System.out.println((String)list.get(i));
        }
    }

在上述的实例中,我们只能添加String类型的数据,否则编译器会报错。

泛型的使用

泛型的三种使用方式:泛型类,泛型方法,泛型接口

泛型类

即把泛型定义在类上:

public class 类名 <泛型类型1,...> {
    
}

注意事项:泛型类型必须是引用类型(非基本数据类型)

泛型方法

泛型方法概述:把泛型定义在方法上

public <泛型类型> 返回类型 方法名泛型类型 变量名 {
    
}

注意要点:

方法声明中定义的形参只能在该方法里使用,而接口、类声明中定义的类型形参则可以在整个接口、类中使用。当调用 fun() 方法时,根据传入的实际对象,编译器就会判断出类型形参 T 所代表的实际类型。

class Demo{  
   public <T> T fun(T t){   // 可以接收任意类型的数据  
       return t ;     // 直接把参数返回  
  }  
};  
public class GenericsDemo26{  
  public static void main(String args[]){  
    Demo d = new Demo() ; // 实例化Demo对象  
    String str = d.fun("汤姆") ; // 传递字符串  
    int i = d.fun(30) ;  // 传递数字,自动装箱  
    System.out.println(str) ; // 输出内容  
    System.out.println(i) ;  // 输出内容  
  }  
};

泛型接口

泛型接口概述:把泛型定义在接口

public interface 接口名<泛型类型> {
    
}

实例:

public interface Inter<T> {
    public abstract void show(T t) ;
}
/**
 * 子类是泛型类
 */
public class InterImpl<E> implements Inter<E> {
    @Override
    public void show(E t) {
        System.out.println(t);
    }
}

Inter<String> inter = new InterImpl<String>() ;
inter.show("hello") ;

源码中泛型的使用

下面是List接口和ArrayList类的代码片段。

//定义接口时指定了一个类型形参,该形参名为E
public interface List<E> extends Collection<E> {
   //在该接口里,E可以作为类型使用
   public E get(int index) {}
   public void add(E e) {} 
}

//定义类时指定了一个类型形参,该形参名为E
public class ArrayList<E> extends AbstractList<E> implements List<E> {
   //在该类里,E可以作为类型使用
   public void set(E e) {
   .......................
   }
}

泛型类派生子类

父类派生子类的时候不能在包含类型形参,需要传入具体的类型

错误的方式:

public class A extends Container<K, V> {}

正确的方式:

public class A extends Container<Integer, String> {}

也可以不指定具体的类型,系统就会把K,V形参当成Object类型处理

public class A extends Container {}

泛型构造器

构造器也是一种方法,所以也就产生了所谓的泛型构造器。 和使用普通方法一样没有区别,一种是显示指定泛型参数,另一种是隐式推断

class Person {
    public <T> Person(T t) {
        System.out.println(t);
    }
}

使用:

public static void main(String[] args) {
    new Person(22);// 隐式
    new <String> Person("hello");//显示
}

特殊说明:

如果构造器是泛型构造器,同时该类也是一个泛型类的情况下应该如何使用泛型构造器:因为泛型构造器可以显式指定自己的类型参数(需要用到菱形,放在构造器之前),而泛型类自己的类型实参也需要指定(菱形放在构造器之后),这就同时出现了两个菱形了,这就会有一些小问题,具体用法再这里总结一下。

以下面这个例子为代表

public class Person<E> {
    public <T> Person(T t) {
        System.out.println(t);
    }
}

正确用法:

public static void main(String[] args) {
    Person<String> person = new Person("sss");
}

PS:编译器会提醒你怎么做的

高级通配符

<? extends T> 上界通配符

上界通配符顾名思义, <? extends T> 表示的是类型的上界【包含自身】,因此通配的参数化类型可能是 T 或 T 的子类。

正因为无法确定具体的类型是什么,add方法受限(可以添加null,因为null表示任何类型),但可以从列表中获取元素后赋值给父类型。如上图中的第一个例子,第三个add()操作会受限,原因在于 List 和 List 是 List<? extends Animal> 的子类型。

它表示集合中的所有元素都是Animal类型或者其子类

List<? extends Animal>

这就是所谓的上限通配符,使用关键字extends来实现,实例化时,指定类型实参只能是extends后类型的子类或其本身。

例如: 这样就确定集合中元素的类型,虽然不确定具体的类型,但最起码知道其父类。然后进行其他操作。

这种赋值由于类型擦除机制,在编译器就会提示报错。

List<Button> buttons = new ArrayList<Button>();
List<TextView> textViews = buttons;

使用通配符:

List<Button> buttons = new ArrayList<Button>();
List<? extends TextView> textViews = buttons;

上界通配符可以使 Java 泛型具有「协变性 Covariance」,协变就是允许上面的赋值是合法的。

前面说到 List<? extends TextView> 的泛型类型是个未知类型 ?,编译器也不确定它是啥类型,只是有个限制条件。

由于它满足 ? extends TextView 的限制条件,所以 get 出来的对象,肯定是 TextView 的子类型,根据多态的特性,能够赋值给 TextView,啰嗦一句,赋值给 View 也是没问题的。

到了 add 操作的时候,我们可以这么理解:

  • List<? extends TextView> 由于类型未知,它可能是 List<Button>,也可能是 List<TextView>
  • 对于前者,显然我们要添加 TextView 是不可以的。

实际情况是编译器无法确定到底属于哪一种,无法继续执行下去,就报错了。

由于 add 的这个限制,使用了 ? extends 泛型通配符的 List,只能够向外提供数据被消费,从这个角度来讲,向外提供数据的一方称为「生产者 Producer」。对应的还有一个概念叫「消费者 Consumer」,对应 Java 里面另一个泛型通配符 ? super。

<? super T> 下界通配符

下界通配符 <? super T> 表示的是参数化类型是 T 的超类型(包含自身),层层至上,直至Object。下界通配符可以使 Java 泛型具有「逆变性 Contravariance」。

与上界通配符对应,这里 super 限制了通配符 ? 的子类型,所以称之为下界。

它也有两层意思:

  • 通配符 ? 表示 List 的泛型类型是一个未知类型。
  • super 限制了这个未知类型的下界,也就是泛型类型必须满足这个 super 的限制条件。
    • super 我们在类的方法里面经常用到,这里的范围不仅包括 Button 的直接和间接父类,也包括下界 Button 本身。
    • super 同样支持 interface。
List<? super Button> buttons = new ArrayList<Button>();
List<? super Button> buttons = new ArrayList<TextView>();
List<? super Button> buttons = new ArrayList<Object>();

使用下界通配符 ? super 的泛型 List,只能读取到 Object 对象,一般没有什么实际的使用场景,通常也只拿它来添加数据,也就是消费已有的 List<? super Button>,往里面添加 Button,因此这种泛型类型声明称之为「消费者 Consumer」。

<?> 无界通配符

任意类型,如果没有明确,那么就是Object以及任意的Java类了 无界通配符用 <?> 表示,?代表了任何的一种类型,能代表任何一种类型的只有null(Object本身也算是一种类型,但却不能代表任何一种类型,所以List和List的含义是不同的,前者类型是Object,也就是继承树的最上层,而后者的类型完全是未知的)

泛型擦除

Java 泛型擦除(Type Erasure)是 Java 语言实现泛型的一种机制。简单来说,它意味着在编译时期,所有泛型类型信息都会被“擦除”掉,替换成它们的上界类型(如果存在)或 Object 类型(如果不存在上界)。在运行时,JVM 实际上并不知道泛型的具体类型参数。

编译器编译带类型说明的集合时会去掉类型信息

3.2 验证实例:

public class GenericTest {
    public static void main(String[] args) {
        new GenericTest().testType();
    }

    public void testType(){
        ArrayList<Integer> collection1 = new ArrayList<Integer>();
        ArrayList<String> collection2= new ArrayList<String>();
        
        System.out.println(collection1.getClass()==collection2.getClass());
        //两者class类型一样,即字节码一致
        
        System.out.println(collection2.getClass().getName());
        //class均为java.util.ArrayList,并无实际类型参数信息
    }
}

输出结果:

true
java.util.ArrayList

分析:

这是因为不管为泛型的类型形参传入哪一种类型实参,对于Java来说,它们依然被当成同一类处理,在内存中也只占用一块内存空间。从Java泛型这一概念提出的目的来看,其只是作用于代码编译阶段,在编译过程中,对于正确检验泛型结果后,会将泛型的相关信息擦出,也就是说,成功编译过后的class文件中是不包含任何泛型信息的。泛型信息不会进入到运行时阶段。

在静态方法、静态初始化块或者静态变量的声明和初始化中不允许使用类型形参。由于系统中并不会真正生成泛型类,所以instanceof运算符后不能使用泛型类

泛型与反射

把泛型变量当成方法的参数,利用Method类的 getGenericParameterTypes 方法来获取泛型的实际类型参数

例子:

public class GenericTest {

    public static void main(String[] args) throws Exception {
        getParamType();
    }
    
     /*利用反射获取方法参数的实际参数类型*/
    public static void getParamType() throws NoSuchMethodException{
        Method method = GenericTest.class.getMethod("applyMap",Map.class);
        //获取方法的泛型参数的类型
        Type[] types = method.getGenericParameterTypes();
        System.out.println(types[0]);
        //参数化的类型
        ParameterizedType pType  = (ParameterizedType)types[0];
        //原始类型
        System.out.println(pType.getRawType());
        //实际类型参数
        System.out.println(pType.getActualTypeArguments()[0]);
        System.out.println(pType.getActualTypeArguments()[1]);
    }

    /*供测试参数类型的方法*/
    public static void applyMap(Map<Integer,String> map){

    }
}

输出结果:

java.util.Map<java.lang.Integer, java.lang.String>
interface java.util.Map
class java.lang.Integer
class java.lang.String

通过反射绕开编译器对泛型的类型限制

public static void main(String[] args) throws Exception {
		//定义一个包含int的链表
		ArrayList<Integer> al = new ArrayList<Integer>();
		al.add(1);
		al.add(2);
		//获取链表的add方法,注意这里是Object.class,如果写int.class会抛出NoSuchMethodException异常
		Method m = al.getClass().getMethod("add", Object.class);
		//调用反射中的add方法加入一个string类型的元素,因为add方法的实际参数是Object
		m.invoke(al, "hello");
		System.out.println(al.get(2));
	}

泛型的限制

模糊性错误

对于泛型类 User<K,V> 而言,声明了两个泛型类参数。在类中根据不同的类型参数重载show方法。

public class User<K, V> {
    
    public void show(K k) { // 报错信息:'show(K)' clashes with 'show(V)'; both methods have same erasure
        
    }
    public void show(V t) {

    }
}

由于泛型擦除,二者本质上都是Obejct类型。方法是一样的,所以编译器会报错。

换一个方式:

public class User<K, V> {

    public void show(String k) {

    }
    public void show(V t) {

    }
}

可以正常的使用

不能实例化类型参数

编译器也不知道该创建那种类型的对象

public class User<K, V> {

    private K key = new K(); // 报错:Type parameter 'K' cannot be instantiated directly

}

对静态成员的限制

静态方法无法访问类上定义的泛型;如果静态方法操作的类型不确定,必须要将泛型定义在方法上。

如果静态方法要使用泛型的话,必须将静态方法定义成泛型方法。

public class User<T> {

    //错误
    private static T t;

    //错误
    public static T getT() {
        return t;
    }

    //正确
    public static <K> void test(K k) {

    }
}

对泛型数组的限制

不能实例化元素类型为类型参数的数组,但是可以将数组指向类型兼容的数组的引用

public class User<T> {

    private T[] values;

    public User(T[] values) {
        //错误,不能实例化元素类型为类型参数的数组
        this.values = new T[5];
        //正确,可以将values 指向类型兼容的数组的引用
        this.values = values;
    }
}

对泛型异常的限制

泛型类型不能用于 catch 语句,例如

 try { ... } catch (MyGenericException<T> e) { ... } 

是不允许的。

Kotlin中的泛型

本节摘自扔物线的文章:

Kotlin 的泛型

Kotlin 中的 out 和 in

和 Java 泛型一样,Kolin 中的泛型本身也是不可变的。

使用关键字 out 来支持协变,等同于 Java 中的上界通配符 ? extends。 使用关键字 in 来支持逆变,等同于 Java 中的下界通配符 ? super。

var textViews: List<out TextView>
var textViews: List<in TextView>

换了个写法,但作用是完全一样的。out 表示,我这个变量或者参数只用来输出,不用来输入,你只能读我不能写我;in 就反过来,表示它只用来输入,不用来输出,你只能写我不能读我。

* 号

前面讲到了 Java 中单个 ? 号也能作为泛型通配符使用,相当于 ? extends Object。 它在 Kotlin 中有等效的写法:* 号,相当于 out Any。

var list: List<*>

和 Java 不同的地方是,如果你的类型定义里已经有了 out 或者 in,那这个限制在变量声明时也依然在,不会被 * 号去掉。

比如你的类型定义里是 out T : Number 的,那它加上 <*> 之后的效果就不是 out Any,而是 out Number。

where 关键字

Java 中声明类或接口的时候,可以使用 extends 来设置边界,将泛型类型参数限制为某个类型的子集:

//  T 的类型必须是 Animal 的子类型
class Monster<T extends Animal>{
}

注意这个和前面讲的声明变量时的泛型类型声明是不同的东西,这里并没有 ?。

同时这个边界是可以设置多个,用 & 符号连接:

// T 的类型必须同时是 Animal 和 Food 的子类型
class Monster<T extends Animal & Food>{ 
}

Kotlin 只是把 extends 换成了 : 冒号。

class Monster<T : Animal>

设置多个边界可以使用 where 关键字:

class Monster<T> where T : Animal, T : Food

reified 关键字

由于 Java 中的泛型存在类型擦除的情况,任何在运行时需要知道泛型确切类型信息的操作都没法用了。

比如你不能检查一个对象是否为泛型类型 T 的实例:

<T> void printIfTypeMatch(Object item) {
    if (item instanceof T) { // 👈 IDE 会提示错误,illegal generic type for instanceof
        System.out.println(item);
    }
}

Kotlin 里同样也不行:

fun <T> printIfTypeMatch(item: Any) {
    if (item is T) { // 👈 IDE 会提示错误,Cannot check for instance of erased type: T
        println(item)
    }
}

这个问题,在 Java 中的解决方案通常是额外传递一个 Class<T> 类型的参数,然后通过 Class#isInstance 方法来检查:

<T> void check(Object item, Class<T> type) {
    if (type.isInstance(item)) {
        System.out.println(item);
    }
}

Kotlin 中同样可以这么解决,不过还有一个更方便的做法:使用关键字 reified 配合 inline 来解决:

inline fun <reified T> printIfTypeMatch(item: Any) {
    if (item is T) {
        println(item)
    }
}

Pagination