Logo
Published on

第一次學 Kotlin Koog AI 就上手 Day 17:增加使用者體驗:流式處理與即時回應

在實際的用戶互動中,等待 AI 完整回應往往會讓用戶感到焦急。今天我們要學習 Koog 框架的流式處理功能,實現即時的 AI 回應,大幅提升用戶體驗。透過 executeStreaming API,我們將讓智慧客服助手能夠像真人對話一樣,逐字逐句地即時回應用戶

流式處理的重要性

傳統批量處理 vs 流式處理

在傳統的 AI 應用中,我們習慣等待模型完全生成回應後再顯示給用戶

// 傳統批量處理方式
val response = executor.execute(prompt, model)
println(response.content) // 等待完整回應後一次性顯示

這種方式雖然簡單,但存在明顯的用戶體驗問題

  • 長時間等待:複雜問題可能需要等待 10-30 秒
  • 無回饋感:用戶不知道系統是否正在處理
  • 互動性差:無法提前預覽和打斷回應

流式處理則能夠

  • 即時回饋:文字逐字產生,如同真人打字
  • 降低感知延遲:用戶能立即看到回應開始
  • 更好的互動性:可以提前理解回應內容

Koog 框架中的流式處理架構

Koog 框架通過 Kotlin 的 Flow API 提供了優雅的流式處理支援

interface LLMClient {
    // 傳統同步執行
    abstract suspend fun execute(prompt: Prompt, model: LLModel, tools: List<ToolDescriptor>): List<Message.Response>

    // 流式執行 - 返回 Flow<String>
    abstract fun executeStreaming(prompt: Prompt, model: LLModel): Flow<String>
}

基本流式處理實作

簡單的流式聊天範例

讓我們從最基本的流式處理開始

suspend fun main() {

    // 建立執行器
    val executor = simpleOpenAIExecutor(ApiKeyManager.openAIApiKey!!)

    // 建立提示
    val prompt = prompt("streaming") {
        system("你是一個友善的 AI 助手,請使用正體中文回答問題")
        user("請簡單的說明,什麼是 Kotlin 的協程")
    }

    // 流式執行
    println("AI 正在回應...")
    executor.executeStreaming(prompt, OpenAIModels.CostOptimized.GPT4_1Mini)
        .collect { token ->
            // 即時輸出每個文字片段
            print(token)
        }

    println("\n回應完成!")
}

執行 AI 回應內容

直接從結果是看不太出來 streaming 的效果 XD,它會幾個字幾個字地輸出,讓使用者可以及時看到 AI 的回應進度

AI 正在回應...
Kotlin 的協程(Coroutine)是一種用來進行非同步程式設計的輕量級工具。它可以讓你用同步的寫法寫出非同步的程式碼,讓程式更簡潔且易於閱讀。協程可以暫停執行(掛起),等候某些操作完成後再繼續,而不會阻塞整個線程,提升效能並減少資源浪費。簡單來說,協程讓異步任務的撰寫變得像寫同步程式一樣直覺方便。
回應完成!

流式處理的錯誤處理

在網路環境中,流式處理更容易遇到中斷問題,因此需要完善的錯誤處理

class RobustStreamingChat(private val executor: PromptExecutor) {

    fun streamWithRetry(
        prompt: Prompt,
        model: LLModel,
        maxRetries: Int = 3
    ): Flow<String> = flow {
        var attempt = 0
        var success = false

        while (attempt <= maxRetries && !success) {
            try {
                executor.executeStreaming(prompt, model).collect { token ->
                    emit(token)

                    // 模擬出錯的情況
                    delay(100L)
                    throw SocketTimeoutException("Dummy timeout")
                }
                success = true
            } catch (e: Exception) {
                attempt++
                if (attempt < maxRetries) {
                    emit("\n[連線中斷,正在重新嘗試... ($attempt/$maxRetries)]\n")
                    // 指數退避
                    delay(1000L * attempt)
                } else {
                    emit("\n[連線失敗,請稍後再試... ($attempt/$maxRetries)]\n")
                    throw e
                }
            }
        }
    }
}

錯誤處理使用範例

suspend fun main() {

    // 建立執行器
    val executor = simpleOpenAIExecutor(ApiKeyManager.openAIApiKey!!)

    // 建立提示
    val prompt = prompt("streaming") {
        system("你是一個友善的 AI 助手,請使用正體中文回答問題")
        user("請簡單的說明,什麼是 Kotlin 的協程")
    }

    // 流式執行
    println("AI 正在回應...")
    val robustStreamingChat = RobustStreamingChat(executor)
    robustStreamingChat.streamWithRetry(prompt, OpenAIModels.CostOptimized.GPT4_1Mini)
        .collect { token ->
            // 即時輸出每個文字片段
            print(token)
        }

    println("\n回應完成!")
}

執行 AI 回應內容

AI 正在回應...

[連線中斷,正在重新嘗試... (1/3)]

[連線中斷,正在重新嘗試... (2/3)]

Exception in thread "main" [連線失敗,請稍後再試... (3/3)]
java.lang.IllegalStateException: Error from OpenAI API: 200 OK: Dummy timeout

效能優化技巧

基本效能追蹤

我們可以建立簡單的效能監控來追蹤流式處理的表現

class SimpleStreamingMonitor {

    data class StreamingStats(
        val totalTokens: Int,
        val duration: Duration,
        val firstTokenDelay: Duration
    )

    // 監控流式處理效能
    fun Flow<String>.withPerformanceTracking(): Flow<String> = flow {
        val startTime = TimeSource.Monotonic.markNow()
        var firstTokenTime: Duration? = null
        var tokenCount = 0

        collect { token ->
            tokenCount++

            // 記錄第一個 token 的時間
            if (firstTokenTime == null) {
                firstTokenTime = startTime.elapsedNow()
            }

            emit(token)
        }

        // 輸出統計資訊
        val totalDuration = startTime.elapsedNow()
        val stats = StreamingStats(
            totalTokens = tokenCount,
            duration = totalDuration,
            firstTokenDelay = firstTokenTime ?: Duration.ZERO
        )

        println("\n=== 效能統計 ===")
        println("總 Token 數:${stats.totalTokens}")
        println("總耗時:${stats.duration}")
        println("首 Token 延遲:${stats.firstTokenDelay}")
    }
}

基本效能追蹤使用範例

suspend fun main() {

    // 建立執行器
    val executor = simpleOpenAIExecutor(ApiKeyManager.openAIApiKey!!)

    // 建立提示
    val prompt = prompt("streaming") {
        system("你是一個友善的 AI 助手,請使用正體中文回答問題")
        user("請簡單的說明,什麼是 Kotlin 的協程")
    }

    // 流式執行
    println("AI 正在回應...")

    with(SimpleStreamingMonitor()) {
        executor.executeStreaming(prompt, OpenAIModels.CostOptimized.GPT4_1Mini)
            .withPerformanceTracking()
            .collect { token ->
                // 即時輸出每個文字片段
                print(token)
            }
    }

    println("\n回應完成!")
}

執行 AI 回應內容

AI 正在回應...
Kotlin 的協程(Coroutine)是一種輕量級的非同步程式設計工具,讓你可以用類似同步的方式撰寫非同步或並行的程式碼。它可以在不阻塞主執行緒的情況下,進行耗時操作(例如網路請求、檔案讀寫),提升效能和使用者體驗。協程能夠暫停和恢復執行,讓程式更容易閱讀和維護。簡單來說,協程就是用來簡化非同步程式碼寫法的技術。
=== 效能統計 ===
Token 數:130
總耗時:3.520779625s
Token 延遲:1.451959625s

回應完成!

超時管理

確保流式處理在合理時間內完成

class StreamingWithTimeout(private val executor: PromptExecutor) {

    suspend fun execute(
        prompt: Prompt,
        model: LLModel,
        timeoutSeconds: Long = 5
    ): Flow<String> = flow {
        try {
            withTimeout(timeoutSeconds * 1000) {
                executor.executeStreaming(prompt, model).collect { token ->

                    // 模擬超過時間
                    delay(6000)

                    emit(token)
                }
            }
        } catch (e: TimeoutCancellationException) {
            emit("\n[回應超時,請稍後再試]")
        } catch (e: IllegalStateException) {
            emit("\n[回應超時,請稍後再試]")
        }
    }
}

超時管理使用範例

suspend fun main() {

    // 建立執行器
    val executor = simpleOpenAIExecutor(ApiKeyManager.openAIApiKey!!)

    // 建立提示
    val prompt = prompt("streaming") {
        system("你是一個友善的 AI 助手,請使用正體中文回答問題")
        user("請簡單的說明,什麼是 Kotlin 的協程")
    }

    // 流式執行
    println("AI 正在回應...")

    val streamingWithTimeout = StreamingWithTimeout(executor)
    streamingWithTimeout.execute(prompt, OpenAIModels.CostOptimized.GPT4_1Mini)
        .collect { token ->
            // 即時輸出每個文字片段
            print(token)
        }

    println("\n回應完成!")
}

執行 AI 回應內容

6 秒後會呈現超時的訊息

AI 正在回應...

[回應超時,請稍後再試]
回應完成!

總結

今天我們學習了 Koog 框架的流式處理基礎功能

  • 掌握 executeStreaming API:學會使用 Flow 處理即時回應
  • 基本錯誤處理:處理網路中斷和連線問題
  • 流式客服系統:建立支援多輪對話的即時客服助手
  • 效能追蹤:監控 token 速率和回應延遲
  • 超時管理:確保流式處理的穩定性

流式處理大幅提升了 AI 應用的用戶體驗,讓互動更加自然流暢。透過 Kotlin 的 Flow API,我們可以輕鬆實現複雜的即時回應功能。在下一篇文章中,我們將學習多 LLM 整合與智慧切換,探討如何在不同場景下選擇最適合的語言模型

參考資料


支持創作

如果這篇文章對您有幫助,歡迎透過 贊助連結 支持我持續創作優質內容。您的支持是我前進的動力!


圖片來源:AI 產生