- Published on
第一次學 Kotlin Koog AI 就上手 Day 17:增加使用者體驗:流式處理與即時回應
在實際的用戶互動中,等待 AI 完整回應往往會讓用戶感到焦急。今天我們要學習 Koog 框架的流式處理功能,實現即時的 AI 回應,大幅提升用戶體驗。透過 executeStreaming
API,我們將讓智慧客服助手能夠像真人對話一樣,逐字逐句地即時回應用戶
流式處理的重要性
傳統批量處理 vs 流式處理
在傳統的 AI 應用中,我們習慣等待模型完全生成回應後再顯示給用戶
// 傳統批量處理方式
val response = executor.execute(prompt, model)
println(response.content) // 等待完整回應後一次性顯示
這種方式雖然簡單,但存在明顯的用戶體驗問題
- 長時間等待:複雜問題可能需要等待 10-30 秒
- 無回饋感:用戶不知道系統是否正在處理
- 互動性差:無法提前預覽和打斷回應
流式處理則能夠
- 即時回饋:文字逐字產生,如同真人打字
- 降低感知延遲:用戶能立即看到回應開始
- 更好的互動性:可以提前理解回應內容
Koog 框架中的流式處理架構
Koog 框架通過 Kotlin 的 Flow
API 提供了優雅的流式處理支援
interface LLMClient {
// 傳統同步執行
abstract suspend fun execute(prompt: Prompt, model: LLModel, tools: List<ToolDescriptor>): List<Message.Response>
// 流式執行 - 返回 Flow<String>
abstract fun executeStreaming(prompt: Prompt, model: LLModel): Flow<String>
}
基本流式處理實作
簡單的流式聊天範例
讓我們從最基本的流式處理開始
suspend fun main() {
// 建立執行器
val executor = simpleOpenAIExecutor(ApiKeyManager.openAIApiKey!!)
// 建立提示
val prompt = prompt("streaming") {
system("你是一個友善的 AI 助手,請使用正體中文回答問題")
user("請簡單的說明,什麼是 Kotlin 的協程")
}
// 流式執行
println("AI 正在回應...")
executor.executeStreaming(prompt, OpenAIModels.CostOptimized.GPT4_1Mini)
.collect { token ->
// 即時輸出每個文字片段
print(token)
}
println("\n回應完成!")
}
執行 AI 回應內容
直接從結果是看不太出來 streaming 的效果 XD,它會幾個字幾個字地輸出,讓使用者可以及時看到 AI 的回應進度
AI 正在回應...
Kotlin 的協程(Coroutine)是一種用來進行非同步程式設計的輕量級工具。它可以讓你用同步的寫法寫出非同步的程式碼,讓程式更簡潔且易於閱讀。協程可以暫停執行(掛起),等候某些操作完成後再繼續,而不會阻塞整個線程,提升效能並減少資源浪費。簡單來說,協程讓異步任務的撰寫變得像寫同步程式一樣直覺方便。
回應完成!
流式處理的錯誤處理
在網路環境中,流式處理更容易遇到中斷問題,因此需要完善的錯誤處理
class RobustStreamingChat(private val executor: PromptExecutor) {
fun streamWithRetry(
prompt: Prompt,
model: LLModel,
maxRetries: Int = 3
): Flow<String> = flow {
var attempt = 0
var success = false
while (attempt <= maxRetries && !success) {
try {
executor.executeStreaming(prompt, model).collect { token ->
emit(token)
// 模擬出錯的情況
delay(100L)
throw SocketTimeoutException("Dummy timeout")
}
success = true
} catch (e: Exception) {
attempt++
if (attempt < maxRetries) {
emit("\n[連線中斷,正在重新嘗試... ($attempt/$maxRetries)]\n")
// 指數退避
delay(1000L * attempt)
} else {
emit("\n[連線失敗,請稍後再試... ($attempt/$maxRetries)]\n")
throw e
}
}
}
}
}
錯誤處理使用範例
suspend fun main() {
// 建立執行器
val executor = simpleOpenAIExecutor(ApiKeyManager.openAIApiKey!!)
// 建立提示
val prompt = prompt("streaming") {
system("你是一個友善的 AI 助手,請使用正體中文回答問題")
user("請簡單的說明,什麼是 Kotlin 的協程")
}
// 流式執行
println("AI 正在回應...")
val robustStreamingChat = RobustStreamingChat(executor)
robustStreamingChat.streamWithRetry(prompt, OpenAIModels.CostOptimized.GPT4_1Mini)
.collect { token ->
// 即時輸出每個文字片段
print(token)
}
println("\n回應完成!")
}
執行 AI 回應內容
AI 正在回應...
[連線中斷,正在重新嘗試... (1/3)]
[連線中斷,正在重新嘗試... (2/3)]
Exception in thread "main" [連線失敗,請稍後再試... (3/3)]
java.lang.IllegalStateException: Error from OpenAI API: 200 OK: Dummy timeout
效能優化技巧
基本效能追蹤
我們可以建立簡單的效能監控來追蹤流式處理的表現
class SimpleStreamingMonitor {
data class StreamingStats(
val totalTokens: Int,
val duration: Duration,
val firstTokenDelay: Duration
)
// 監控流式處理效能
fun Flow<String>.withPerformanceTracking(): Flow<String> = flow {
val startTime = TimeSource.Monotonic.markNow()
var firstTokenTime: Duration? = null
var tokenCount = 0
collect { token ->
tokenCount++
// 記錄第一個 token 的時間
if (firstTokenTime == null) {
firstTokenTime = startTime.elapsedNow()
}
emit(token)
}
// 輸出統計資訊
val totalDuration = startTime.elapsedNow()
val stats = StreamingStats(
totalTokens = tokenCount,
duration = totalDuration,
firstTokenDelay = firstTokenTime ?: Duration.ZERO
)
println("\n=== 效能統計 ===")
println("總 Token 數:${stats.totalTokens}")
println("總耗時:${stats.duration}")
println("首 Token 延遲:${stats.firstTokenDelay}")
}
}
基本效能追蹤使用範例
suspend fun main() {
// 建立執行器
val executor = simpleOpenAIExecutor(ApiKeyManager.openAIApiKey!!)
// 建立提示
val prompt = prompt("streaming") {
system("你是一個友善的 AI 助手,請使用正體中文回答問題")
user("請簡單的說明,什麼是 Kotlin 的協程")
}
// 流式執行
println("AI 正在回應...")
with(SimpleStreamingMonitor()) {
executor.executeStreaming(prompt, OpenAIModels.CostOptimized.GPT4_1Mini)
.withPerformanceTracking()
.collect { token ->
// 即時輸出每個文字片段
print(token)
}
}
println("\n回應完成!")
}
執行 AI 回應內容
AI 正在回應...
Kotlin 的協程(Coroutine)是一種輕量級的非同步程式設計工具,讓你可以用類似同步的方式撰寫非同步或並行的程式碼。它可以在不阻塞主執行緒的情況下,進行耗時操作(例如網路請求、檔案讀寫),提升效能和使用者體驗。協程能夠暫停和恢復執行,讓程式更容易閱讀和維護。簡單來說,協程就是用來簡化非同步程式碼寫法的技術。
=== 效能統計 ===
總 Token 數:130
總耗時:3.520779625s
首 Token 延遲:1.451959625s
回應完成!
超時管理
確保流式處理在合理時間內完成
class StreamingWithTimeout(private val executor: PromptExecutor) {
suspend fun execute(
prompt: Prompt,
model: LLModel,
timeoutSeconds: Long = 5
): Flow<String> = flow {
try {
withTimeout(timeoutSeconds * 1000) {
executor.executeStreaming(prompt, model).collect { token ->
// 模擬超過時間
delay(6000)
emit(token)
}
}
} catch (e: TimeoutCancellationException) {
emit("\n[回應超時,請稍後再試]")
} catch (e: IllegalStateException) {
emit("\n[回應超時,請稍後再試]")
}
}
}
超時管理使用範例
suspend fun main() {
// 建立執行器
val executor = simpleOpenAIExecutor(ApiKeyManager.openAIApiKey!!)
// 建立提示
val prompt = prompt("streaming") {
system("你是一個友善的 AI 助手,請使用正體中文回答問題")
user("請簡單的說明,什麼是 Kotlin 的協程")
}
// 流式執行
println("AI 正在回應...")
val streamingWithTimeout = StreamingWithTimeout(executor)
streamingWithTimeout.execute(prompt, OpenAIModels.CostOptimized.GPT4_1Mini)
.collect { token ->
// 即時輸出每個文字片段
print(token)
}
println("\n回應完成!")
}
執行 AI 回應內容
6 秒後會呈現超時的訊息
AI 正在回應...
[回應超時,請稍後再試]
回應完成!
總結
今天我們學習了 Koog 框架的流式處理基礎功能
- 掌握 executeStreaming API:學會使用 Flow 處理即時回應
- 基本錯誤處理:處理網路中斷和連線問題
- 流式客服系統:建立支援多輪對話的即時客服助手
- 效能追蹤:監控 token 速率和回應延遲
- 超時管理:確保流式處理的穩定性
流式處理大幅提升了 AI 應用的用戶體驗,讓互動更加自然流暢。透過 Kotlin 的 Flow API,我們可以輕鬆實現複雜的即時回應功能。在下一篇文章中,我們將學習多 LLM 整合與智慧切換,探討如何在不同場景下選擇最適合的語言模型
參考資料
支持創作
如果這篇文章對您有幫助,歡迎透過 贊助連結 支持我持續創作優質內容。您的支持是我前進的動力!
圖片來源:AI 產生