kotlin-coroutines-flows
Kotlin协程与Flow在Android和KMP中的模式——结构化并发、Flow操作符、StateFlow、错误处理和测试。
About this skill
This skill equips an AI agent with deep knowledge and best practices for working with Kotlin Coroutines and Flows across Android and Kotlin Multiplatform (KMP) development. It covers essential topics such as structured concurrency, proper scope management (e.g., `viewModelScope`, `LaunchedEffect`), reactive data streaming with Flow, StateFlow, and SharedFlow, common Flow operators, robust error handling mechanisms, and effective testing strategies for asynchronous code. Designed as part of the 'everything-claude-code' repository, this skill enables AI agents to generate high-quality, production-ready Kotlin code that adheres to modern concurrency and reactive programming paradigms, avoiding common pitfalls like `GlobalScope`.
Best use case
An AI agent needs to generate, review, or debug Kotlin code that involves asynchronous operations, reactive programming, or concurrent tasks in Android or KMP environments, ensuring best practices are followed.
Kotlin协程与Flow在Android和KMP中的模式——结构化并发、Flow操作符、StateFlow、错误处理和测试。
The AI agent will produce more accurate, idiomatic, and robust Kotlin code utilizing Coroutines and Flows. It will adhere to best practices like structured concurrency, proper scope management, and effective error handling, leading to maintainable and performant solutions for asynchronous and reactive programming tasks.
Practical example
Example input
Generate Kotlin code for an Android ViewModel that fetches user data from a repository and exposes it as a `StateFlow`, handling loading states and potential network errors. Ensure proper structured concurrency and provide testing considerations.
Example output
```kotlin
class UserViewModel(private val userRepository: UserRepository) : ViewModel() {
private val _uiState = MutableStateFlow<UiState>(UiState.Loading)
val uiState: StateFlow<UiState> = _uiState.asStateFlow()
init {
fetchUserData()
}
fun fetchUserData() {
viewModelScope.launch { // Uses viewModelScope for structured concurrency
_uiState.value = UiState.Loading
try {
// Collects from a Flow and takes the first item
val user = userRepository.getUser().first()
_uiState.value = UiState.Success(user)
} catch (e: Exception) {
// Error handling for network or other exceptions
_uiState.value = UiState.Error(e.localizedMessage ?: "Unknown error")
}
}
}
}
sealed class UiState {
object Loading : UiState()
data class Success(val user: User) : UiState()
data class Error(val message: String) : UiState()
}
// Testing considerations:
// - Use `runTest` from `kotlinx-coroutines-test` for predictable coroutine execution.
// - Inject a test dispatcher (e.g., `StandardTestDispatcher`) into your ViewModel.
// - Verify `uiState` emissions using `Turbine` for Flow assertions.
// - Mock `userRepository` to control test data and error scenarios.
```
(Explanation: This code demonstrates a ViewModel using `viewModelScope` for structured concurrency, `StateFlow` to expose UI state, and a `try-catch` block for error handling, following the patterns described in the skill. It also includes guidance on how to test such a component effectively.)When to use this skill
- When using Kotlin Coroutines for writing asynchronous code.
- When implementing reactive data streams with Flow, StateFlow, or SharedFlow.
- When handling concurrent operations such as parallel loading, debouncing, or retries.
- When testing coroutines and Flows effectively.
When not to use this skill
- When working with programming languages other than Kotlin, or when the task does not involve asynchronous operations, reactive streams, or concurrent programming.
Installation
Claude Code / Cursor / Codex
Manual Installation
- Download SKILL.md from GitHub
- Place it in
.claude/skills/kotlin-coroutines-flows/SKILL.mdinside your project - Restart your AI agent — it will auto-discover the skill
How kotlin-coroutines-flows Compares
| Feature / Agent | kotlin-coroutines-flows | Standard Approach |
|---|---|---|
| Platform Support | Claude | Limited / Varies |
| Context Awareness | High | Baseline |
| Installation Complexity | easy | N/A |
Frequently Asked Questions
What does this skill do?
Kotlin协程与Flow在Android和KMP中的模式——结构化并发、Flow操作符、StateFlow、错误处理和测试。
Which AI agents support this skill?
This skill is designed for Claude.
How difficult is it to install?
The installation complexity is rated as easy. You can find the installation instructions above.
Where can I find the source code?
You can find the source code on GitHub using the link provided at the top of the page.
Related Guides
AI Agents for Coding
Browse AI agent skills for coding, debugging, testing, refactoring, code review, and developer workflows across Claude, Cursor, and Codex.
Best AI Skills for Claude
Explore the best AI skills for Claude and Claude Code across coding, research, workflow automation, documentation, and agent operations.
AI Agents for Startups
Explore AI agent skills for startup validation, product research, growth experiments, documentation, and fast execution with small teams.
SKILL.md Source
# Kotlin 协程与 Flow
适用于 Android 和 Kotlin 多平台项目的结构化并发模式、基于 Flow 的响应式流以及协程测试。
## 何时启用
* 使用 Kotlin 协程编写异步代码
* 使用 Flow、StateFlow 或 SharedFlow 实现响应式数据
* 处理并发操作(并行加载、防抖、重试)
* 测试协程和 Flow
* 管理协程作用域与取消
## 结构化并发
### 作用域层级
```
Application
└── viewModelScope (ViewModel)
└── coroutineScope { } (结构化子作用域)
├── async { } (并发任务)
└── async { } (并发任务)
```
始终使用结构化并发——绝不使用 `GlobalScope`:
```kotlin
// BAD
GlobalScope.launch { fetchData() }
// GOOD — scoped to ViewModel lifecycle
viewModelScope.launch { fetchData() }
// GOOD — scoped to composable lifecycle
LaunchedEffect(key) { fetchData() }
```
### 并行分解
使用 `coroutineScope` + `async` 处理并行工作:
```kotlin
suspend fun loadDashboard(): Dashboard = coroutineScope {
val items = async { itemRepository.getRecent() }
val stats = async { statsRepository.getToday() }
val profile = async { userRepository.getCurrent() }
Dashboard(
items = items.await(),
stats = stats.await(),
profile = profile.await()
)
}
```
### SupervisorScope
当子协程失败不应取消同级协程时,使用 `supervisorScope`:
```kotlin
suspend fun syncAll() = supervisorScope {
launch { syncItems() } // failure here won't cancel syncStats
launch { syncStats() }
launch { syncSettings() }
}
```
## Flow 模式
### Cold Flow —— 一次性操作到流的转换
```kotlin
fun observeItems(): Flow<List<Item>> = flow {
// Re-emits whenever the database changes
itemDao.observeAll()
.map { entities -> entities.map { it.toDomain() } }
.collect { emit(it) }
}
```
### 用于 UI 状态的 StateFlow
```kotlin
class DashboardViewModel(
observeProgress: ObserveUserProgressUseCase
) : ViewModel() {
val progress: StateFlow<UserProgress> = observeProgress()
.stateIn(
scope = viewModelScope,
started = SharingStarted.WhileSubscribed(5_000),
initialValue = UserProgress.EMPTY
)
}
```
`WhileSubscribed(5_000)` 会在最后一个订阅者离开后,保持上游活动 5 秒——可在配置更改时存活而无需重启。
### 组合多个 Flow
```kotlin
val uiState: StateFlow<HomeState> = combine(
itemRepository.observeItems(),
settingsRepository.observeTheme(),
userRepository.observeProfile()
) { items, theme, profile ->
HomeState(items = items, theme = theme, profile = profile)
}.stateIn(viewModelScope, SharingStarted.WhileSubscribed(5_000), HomeState())
```
### Flow 操作符
```kotlin
// Debounce search input
searchQuery
.debounce(300)
.distinctUntilChanged()
.flatMapLatest { query -> repository.search(query) }
.catch { emit(emptyList()) }
.collect { results -> _state.update { it.copy(results = results) } }
// Retry with exponential backoff
fun fetchWithRetry(): Flow<Data> = flow { emit(api.fetch()) }
.retryWhen { cause, attempt ->
if (cause is IOException && attempt < 3) {
delay(1000L * (1 shl attempt.toInt()))
true
} else {
false
}
}
```
### 用于一次性事件的 SharedFlow
```kotlin
class ItemListViewModel : ViewModel() {
private val _effects = MutableSharedFlow<Effect>()
val effects: SharedFlow<Effect> = _effects.asSharedFlow()
sealed interface Effect {
data class ShowSnackbar(val message: String) : Effect
data class NavigateTo(val route: String) : Effect
}
private fun deleteItem(id: String) {
viewModelScope.launch {
repository.delete(id)
_effects.emit(Effect.ShowSnackbar("Item deleted"))
}
}
}
// Collect in Composable
LaunchedEffect(Unit) {
viewModel.effects.collect { effect ->
when (effect) {
is Effect.ShowSnackbar -> snackbarHostState.showSnackbar(effect.message)
is Effect.NavigateTo -> navController.navigate(effect.route)
}
}
}
```
## 调度器
```kotlin
// CPU-intensive work
withContext(Dispatchers.Default) { parseJson(largePayload) }
// IO-bound work
withContext(Dispatchers.IO) { database.query() }
// Main thread (UI) — default in viewModelScope
withContext(Dispatchers.Main) { updateUi() }
```
在 KMP 中,使用 `Dispatchers.Default` 和 `Dispatchers.Main`(在所有平台上可用)。`Dispatchers.IO` 仅适用于 JVM/Android——在其他平台上使用 `Dispatchers.Default` 或通过依赖注入提供。
## 取消
### 协作式取消
长时间运行的循环必须检查取消状态:
```kotlin
suspend fun processItems(items: List<Item>) = coroutineScope {
for (item in items) {
ensureActive() // throws CancellationException if cancelled
process(item)
}
}
```
### 使用 try/finally 进行清理
```kotlin
viewModelScope.launch {
try {
_state.update { it.copy(isLoading = true) }
val data = repository.fetch()
_state.update { it.copy(data = data) }
} finally {
_state.update { it.copy(isLoading = false) } // always runs, even on cancellation
}
}
```
## 测试
### 使用 Turbine 测试 StateFlow
```kotlin
@Test
fun `search updates item list`() = runTest {
val fakeRepository = FakeItemRepository().apply { emit(testItems) }
val viewModel = ItemListViewModel(GetItemsUseCase(fakeRepository))
viewModel.state.test {
assertEquals(ItemListState(), awaitItem()) // initial
viewModel.onSearch("query")
val loading = awaitItem()
assertTrue(loading.isLoading)
val loaded = awaitItem()
assertFalse(loaded.isLoading)
assertEquals(1, loaded.items.size)
}
}
```
### 使用 TestDispatcher 测试
```kotlin
@Test
fun `parallel load completes correctly`() = runTest {
val viewModel = DashboardViewModel(
itemRepo = FakeItemRepo(),
statsRepo = FakeStatsRepo()
)
viewModel.load()
advanceUntilIdle()
val state = viewModel.state.value
assertNotNull(state.items)
assertNotNull(state.stats)
}
```
### 模拟 Flow
```kotlin
class FakeItemRepository : ItemRepository {
private val _items = MutableStateFlow<List<Item>>(emptyList())
override fun observeItems(): Flow<List<Item>> = _items
fun emit(items: List<Item>) { _items.value = items }
override suspend fun getItemsByCategory(category: String): Result<List<Item>> {
return Result.success(_items.value.filter { it.category == category })
}
}
```
## 应避免的反模式
* 使用 `GlobalScope`——会导致协程泄漏,且无法结构化取消
* 在没有作用域的情况下于 `init {}` 中收集 Flow——应使用 `viewModelScope.launch`
* 将 `MutableStateFlow` 与可变集合一起使用——始终使用不可变副本:`_state.update { it.copy(list = it.list + newItem) }`
* 捕获 `CancellationException`——应让其传播以实现正确的取消
* 使用 `flowOn(Dispatchers.Main)` 进行收集——收集调度器是调用方的调度器
* 在 `@Composable` 中创建 `Flow` 而不使用 `remember`——每次重组都会重新创建 Flow
## 参考
关于 Flow 在 UI 层的消费,请参阅技能:`compose-multiplatform-patterns`。
关于协程在各层中的适用位置,请参阅技能:`android-clean-architecture`。Related Skills
kotlin-ktor-patterns
Ktor 服务器模式,包括路由 DSL、插件、身份验证、Koin DI、kotlinx.serialization、WebSockets 和 testApplication 测试。
kotlin-exposed-patterns
JetBrains Exposed ORM 模式,包括 DSL 查询、DAO 模式、事务、HikariCP 连接池、Flyway 迁移和仓库模式。
kotlin-testing
Kotest, MockK, coroutine testi, property-based testing ve Kover coverage ile Kotlin test kalıpları. İdiomatic Kotlin uygulamalarıyla TDD metodolojisini takip eder.
workspace-surface-audit
Audit the active repo, MCP servers, plugins, connectors, env surfaces, and harness setup, then recommend the highest-value ECC-native skills, hooks, agents, and operator workflows. Use when the user wants help setting up Claude Code or understanding what capabilities are actually available in their environment.
safety-guard
Use this skill to prevent destructive operations when working on production systems or running agents autonomously.
repo-scan
Cross-stack source code asset audit — classifies every file, detects embedded third-party libraries, and delivers actionable four-level verdicts per module with interactive HTML reports.
project-flow-ops
Operate execution flow across GitHub and Linear by triaging issues and pull requests, linking active work, and keeping GitHub public-facing while Linear remains the internal execution layer. Use when the user wants backlog control, PR triage, or GitHub-to-Linear coordination.
manim-video
Build reusable Manim explainers for technical concepts, graphs, system diagrams, and product walkthroughs, then hand off to the wider ECC video stack if needed. Use when the user wants a clean animated explainer rather than a generic talking-head script.
laravel-plugin-discovery
Discover and evaluate Laravel packages via LaraPlugins.io MCP. Use when the user wants to find plugins, check package health, or assess Laravel/PHP compatibility.
design-system
Use this skill to generate or audit design systems, check visual consistency, and review PRs that touch styling.
click-path-audit
Trace every user-facing button/touchpoint through its full state change sequence to find bugs where functions individually work but cancel each other out, produce wrong final state, or leave the UI in an inconsistent state. Use when: systematic debugging found no bugs but users report broken buttons, or after any major refactor touching shared state stores.
ck
Persistent per-project memory for Claude Code. Auto-loads project context on session start, tracks sessions with git activity, and writes to native memory. Commands run deterministic Node.js scripts — behavior is consistent across model versions.