Logo
Published on

第一次學 Kotlin Koog AI 就上手 Day 18:服務不中斷:建立智慧路由與容錯機制

回顧我們的文章,從 Day 4 學會了基礎的多 LLM 配置,到 Day 7 掌握了錯誤處理技巧。今天我們要將這些知識結合起來,建立一個智慧的容錯機制,確保 AI 應用系統在任何情況下都能持續穩定運行

從基礎到企業級的演進

在 Day 4 中,我們初步接觸了 fallback 的概念,透過簡單的 try-catch 實現基本的容錯。當時我們建立了 FallbackMultiLLMSetup 來處理供應商切換,這為我們奠定了容錯思維的基礎

今天,我們要將這個概念提升到生產環境等級,建立一個標準化、可重用的容錯機制。主要差異在於

Day 4 基礎版本

  • 應用層級的簡單容錯處理
  • 直接返回 String 結果
  • 基本的 try-catch 結構
  • 適合學習和概念驗證

Day 18 進階版本

  • 框架層級的標準化容錯執行器
  • 實作 PromptExecutor 介面,完全相容 AIAgent
  • 支援重試機制和指數退避
  • 詳細的錯誤記錄和監控
  • 直接可用於生產環境

從基礎配置到智慧容錯的進化

Day 4 回顧:基礎多模型設定

在 Day 4 中,我們學習了基本的多 LLM 配置,但仍需要手動選擇模型

// Day 4:基礎的多 LLM 設定
val openAIExecutor = simpleOpenAIExecutor(ApiKeyManager.openAIKey)
val anthropicExecutor = simpleAnthropicExecutor(ApiKeyManager.anthropicKey)

// 手動選擇使用哪個執行器
val result = openAIExecutor.execute(prompt, model, tools)

Day 18 智慧容錯機制

今天我們要實現自動容錯切換,當主要模型失敗時,系統會自動切換到備用模型

// Day 18:智慧容錯機制
val resilientExecutor = ResilientExecutor(
    primaryExecutor = simpleOpenAIExecutor(ApiKeyManager.openAIKey),
    fallbackExecutor = simpleAnthropicExecutor(ApiKeyManager.anthropicKey)
)

// 自動處理容錯切換,無需手動干預
val result = resilientExecutor.execute(prompt, model, tools)

關鍵差異

  • Day 4:手動選擇模型,需要開發者決定使用哪個執行器
  • Day 18:自動容錯切換,系統智慧處理故障恢復

容錯機制的重要性

在現實世界中,AI 服務可能會因為各種原因暫時無法使用

  • API 服務暫時中斷:OpenAI 服務維護或過載
  • 網路連線問題:暫時的網路不穩定
  • 額度限制:達到 API 使用上限
  • 金鑰問題:API 金鑰過期或設定錯誤

如果我們的 AI 應用只依賴單一模型,任何一個問題都會讓整個服務停擺。這對於企業來說是不可接受的

不同模型的容錯優勢

在實際應用中,主要模型和備用模型使用不同的 LLM 有以下優勢

  • 成本優化:主要使用高效但較貴的模型(如 GPT-4.1),備用使用較便宜的模型(如 Gemini Flash)
  • 能力互補:不同供應商的模型在不同任務上有各自的強項
  • 風險分散:避免單一供應商的服務中斷影響整個系統
  • 靈活配置:可以根據業務需求動態調整模型組合

Koog 框架的擴充性展現

在開始實作容錯機制之前,讓我們先認識 Koog 框架設計的巧妙之處。透過今天的實作,我們將看到如何輕鬆擴充原有的 PromptExecutor 介面,建立自己的智慧容錯執行器

擴充介面的簡單性

Koog 框架採用開放式設計,只要實作 PromptExecutor 介面,我們就能建立自己的客製化執行器

class ResilientExecutor(...) : PromptExecutor {
    // 只需要實作 4 個核心方法
    override suspend fun execute(...)
    override suspend fun executeStreaming(...)
    override suspend fun moderate(...)
    override suspend fun executeMultipleChoices(...)
}

關鍵優勢

  • 無縫整合:新的執行器可以直接與 AIAgent 整合使用
  • 介面一致性:所有執行器都遵循相同的介面規範
  • 組合彈性:可以將不同的執行器組合使用(如容錯 + 快取)
  • 擴充簡單:只需實作介面,不需修改框架核心程式碼

這種設計讓我們能夠根據業務需求,輕鬆建立各種專門的執行器,如快取執行器、限流執行器、監控執行器等

ResilientExecutor:核心容錯機制

讓我們實作一個簡單而有效的容錯執行器

class ResilientExecutor(
    private val primaryExecutor: PromptExecutor,
    private val primaryModel: LLModel,
    private val fallbackExecutor: PromptExecutor,
    private val fallbackModel: LLModel
) : PromptExecutor {

    override suspend fun execute(
        prompt: Prompt,
        model: LLModel,
        tools: List<ToolDescriptor>
    ): List<Response> {

        return try {
            println("🎯 嘗試使用主要模型 ${primaryModel.id}...")
            // 首先嘗試使用主要執行器(例如 OpenAI)
            primaryExecutor.execute(prompt, primaryModel, tools)

        } catch (e: Exception) {
            println("⚠️ 主要模型失敗:${e.message}")
            println("🔄 切換到備用模型 ${fallbackModel.id}...")

            try {
                // 主要執行器失敗時,切換到備用執行器(例如 Anthropic)
                fallbackExecutor.execute(prompt, fallbackModel, tools)

            } catch (fallbackException: Exception) {
                println("❌ 備用模型也失敗:${fallbackException.message}")

                // 如果備用模型也失敗,拋出更詳細的錯誤訊息
                throw Exception(
                    "所有模型都無法處理請求。主要錯誤:${e.message},備用錯誤:${fallbackException.message}"
                )
            }
        }
    }

    override suspend fun executeStreaming(prompt: Prompt, model: LLModel): Flow<String> {
        return try {
            println("🎯 嘗試使用主要模型 ${primaryModel.id} 進行串流處理...")
            primaryExecutor.executeStreaming(prompt, primaryModel)
        } catch (e: Exception) {
            println("⚠️ 主要模型串流處理失敗:${e.message}")
            println("🔄 切換到備用模型 ${fallbackModel.id} 進行串流處理...")

            try {
                fallbackExecutor.executeStreaming(prompt, fallbackModel)
            } catch (fallbackException: Exception) {
                println("❌ 備用模型串流處理也失敗:${fallbackException.message}")
                // 返回錯誤訊息作為串流
                flowOf(
                    "所有模型都無法處理串流請求。主要錯誤:${e.message},備用錯誤:${fallbackException.message}"
                )
            }
        }
    }

    override suspend fun moderate(prompt: Prompt, model: LLModel): ModerationResult {
        return try {
            println("🎯 嘗試使用主要模型 ${primaryModel.id} 進行內容審核...")
            primaryExecutor.moderate(prompt, primaryModel)
        } catch (e: Exception) {
            println("⚠️ 主要模型內容審核失敗:${e.message}")
            println("🔄 切換到備用模型 ${fallbackModel.id} 進行內容審核...")

            try {
                fallbackExecutor.moderate(prompt, fallbackModel)
            } catch (fallbackException: Exception) {
                println("❌ 備用模型內容審核也失敗:${fallbackException.message}")
                throw Exception(
                    "所有模型都無法處理內容審核請求。主要錯誤:${e.message},備用錯誤:${fallbackException.message}"
                )
            }
        }
    }

    override suspend fun executeMultipleChoices(
        prompt: Prompt,
        model: LLModel,
        tools: List<ToolDescriptor>
    ): List<LLMChoice> {
        return try {
            println("🎯 嘗試使用主要模型 ${primaryModel.id} 進行多選項處理...")
            primaryExecutor.executeMultipleChoices(prompt, primaryModel, tools)
        } catch (e: Exception) {
            println("⚠️ 主要模型多選項處理失敗:${e.message}")
            println("🔄 切換到備用模型 ${fallbackModel.id} 進行多選項處理...")

            try {
                fallbackExecutor.executeMultipleChoices(prompt, fallbackModel, tools)
            } catch (fallbackException: Exception) {
                println("❌ 備用模型多選項處理也失敗:${fallbackException.message}")
                throw Exception(
                    "所有模型都無法處理多選項請求。主要錯誤:${e.message},備用錯誤:${fallbackException.message}"
                )
            }
        }
    }
}

基本使用範例

讓我們看看如何在實際應用中使用 ResilientExecutor

suspend fun main() {
    println("🤖 容錯執行器測試啟動")

    // 建立容錯執行器
    val resilientExecutor = ResilientExecutor(
        primaryExecutor = simpleOpenAIExecutor(ApiKeyManager.openAIKey),
        primaryModel = OpenAIModels.Chat.GPT4o,
        fallbackExecutor = simpleGoogleAIExecutor(ApiKeyManager.googleApiKey!!),
        fallbackModel = GoogleModels.Gemini2_5Flash
    )

    // 建立 AIAgent 使用容錯執行器
    val agent = AIAgent(
        executor = resilientExecutor,
        systemPrompt = "你是一個 AI 助手,請用正體中文回答問題"
    )

    try {
        val question = "請簡單的說明,什麼是 Kotlin 的協程"
        println("📝 問題:$question")

        val response = agent.run(question)
        println("🤖 回應:$response")

    } catch (e: Exception) {
        println("❌ 執行失敗:${e.message}")
    }
}

執行 AI 回應內容

正常沒有出錯的情況

🤖 容錯執行器測試啟動
📝 問題:請簡單的說明,什麼是 Kotlin 的協程
🎯 嘗試使用主要模型 gpt-4.1-mini...
🤖 回應:Kotlin 的協程(Coroutine)是一種輕量級的非同步程式設計方式,可以用來簡化異步操作和並發處理。它讓你能夠寫出看起來像同步的程式碼,但實際上可以非阻塞地執行長時間任務(例如網路請求、檔案讀寫等),提高程式效率和可讀性。協程透過掛起(suspend)和恢復機制,讓多個任務可以在同一個執行緒中協作運作。

模擬故障測試

為了驗證容錯機制的有效性,我們可以建立一個模擬故障的測試

class FailingExecutor : PromptExecutor {
    override suspend fun execute(prompt: Prompt, model: LLModel, tools: List<ToolDescriptor>): List<Message.Response> {
        throw Exception("模擬主要執行器故障")
    }

    override suspend fun executeStreaming(prompt: Prompt, model: LLModel): Flow<String> {
        throw Exception("模擬主要執行器串流故障")
    }

    override suspend fun moderate(prompt: Prompt, model: LLModel): ModerationResult {
        throw Exception("模擬主要執行器審核故障")
    }

    override suspend fun executeMultipleChoices(
        prompt: Prompt,
        model: LLModel,
        tools: List<ToolDescriptor>
    ): List<LLMChoice> {
        throw Exception("模擬主要執行器多選項故障")
    }
}

模擬故障使用範例

suspend fun main() {

   println("🧪 開始容錯切換測試...")

    // 建立容錯執行器
    val resilientExecutor = ResilientExecutor(
        // 主要執行器使用 FailingExecutor() 來模擬故障
        primaryExecutor = FailingExecutor(),
        primaryModel = OpenAIModels.CostOptimized.GPT4_1Mini,
        fallbackExecutor = simpleGoogleAIExecutor(ApiKeyManager.googleApiKey!!),
        fallbackModel = GoogleModels.Gemini2_5Flash
    )

    // 建立 AIAgent 使用容錯執行器
    val agent = AIAgent(
        executor = resilientExecutor,
        systemPrompt = "你是一個 AI 助手,請用正體中文回答問題",
        llmModel = OpenAIModels.CostOptimized.GPT4_1Mini
    )

    try {
        val question = "請簡單的說明,什麼是 Kotlin 的協程"
        println("📝 問題:$question")

        val response = agent.run(question)
        println("🤖 回應:$response")

    } catch (e: Exception) {
        println("❌ 執行失敗:${e.message}")
    }
}

執行 AI 回應內容

🧪 開始容錯切換測試...
📝 問題:請簡單的說明,什麼是 Kotlin 的協程
🎯 嘗試使用主要模型 gpt-4.1-mini...
⚠️ 主要模型失敗:模擬主要執行器故障
🔄 切換到備用模型 gemini-2.5-flash...
🤖 回應:Kotlin 協程 (Coroutines) 是一種讓我們能寫出**非同步 (asynchronous)****非阻塞 (non-blocking)** 程式碼的方式。

它的目的是讓處理耗時任務(例如網路請求、資料庫操作)時,不會讓應用程式卡住,同時又讓程式碼看起來像循序執行一樣簡單易懂。

您可以想像成一個**非常有效率的廚師**
1.  這位廚師有很多菜要做,但他不是每次都等到一道菜完全做好才開始下一道。
2.  他會把水燒上,然後**暫停 (suspend)** 等待水開,同時去切菜、炒其他的東西。
3.  等水開了,他再**恢復 (resume)** 回來煮麵。

整個過程就像是一個人在同時處理多項任務,而不是多個人分別處理。

**核心重點:**

*   **輕量 (Lightweight)** 協程非常輕量,一個應用程式可以同時運行成千上萬個協程,而傳統的執行緒 (threads) 則非常耗資源。
*   **非阻塞 (Non-blocking)** 當協程執行一個耗時操作時(例如等待網路回應),它不會佔用執行緒,而是將自己「暫停」,讓執行緒去做其他事情。等到操作完成,它再「恢復」執行,這樣就避免了卡頓。
*   **易於閱讀和撰寫:** 它讓我們用寫同步程式碼(從上到下依序執行)的方式來寫非同步程式碼,大大減少了傳統回呼地獄 (Callback Hell) 的複雜性。

**總之,Kotlin 協程就是一個聰明、輕量又高效的方式,讓我們在處理需要等待的任務時,能寫出更清晰、更易於維護的非同步程式碼,同時不影響使用者介面或應用程式的流暢度。**

進階版容錯執行器

基礎的 ResilientExecutor 已經提供了良好的容錯能力,但在生產環境中,我們可能需要更智慧的容錯策略 AdvancedResilientExecutor

進階功能對比

功能基礎版 ResilientExecutor進階版 AdvancedResilientExecutor
容錯切換✅ 主要 → 備用✅ 主要 → 備用
重試機制❌ 單次嘗試✅ 可配置重試次數
退避策略❌ 立即切換✅ 指數退避延遲
程式碼重用❌ 方法重複✅ 泛型重試邏輯
錯誤處理✅ 基本錯誤訊息✅ 詳細的重試記錄

核心改進說明

  • 重試機制

    • 每個執行器都會重試指定次數(預設 2 次)
    • 避免因為暫時性網路問題導致的誤切換
  • 指數退避策略

    • 重試間隔採用固定延遲(可配置)
    • 避免對 API 服務造成過大壓力
  • 泛型重試邏輯

    • 使用 executeWithRetryexecuteWithRetryFlow 統一處理重試
    • 大幅減少程式碼重複,提高維護性

AdvancedResilientExecutor 實作

class AdvancedResilientExecutor(
    private val primaryExecutor: PromptExecutor,
    private val primaryModel: LLModel,
    private val fallbackExecutor: PromptExecutor,
    private val fallbackModel: LLModel,
    private val maxRetries: Int = 2,
    private val retryDelayMs: Long = 1000
) : PromptExecutor {

    override suspend fun execute(prompt: Prompt, model: LLModel, tools: List<ToolDescriptor>): List<Message.Response> {

        var attempt = 0
        var lastException: Exception? = null

        // 嘗試主要執行器
        while (attempt < maxRetries) {
            try {
                println("🎯 嘗試使用主要模型 ${primaryModel.id} (第 ${attempt + 1} 次)...")
                return primaryExecutor.execute(prompt, primaryModel, tools)

            } catch (e: Exception) {
                lastException = e
                attempt++

                println("⚠️ 主要模型第 $attempt 次嘗試失敗:${e.message}")

                if (attempt < maxRetries) {
                    println("⏳ 等待 ${retryDelayMs}ms 後重試...")
                    delay(retryDelayMs)
                }
            }
        }

        // 主要執行器多次重試都失敗,切換到備用執行器
        println("🔄 切換到備用模型 ${fallbackModel.id}...")

        attempt = 0
        while (attempt < maxRetries) {
            try {
                println("🎯 嘗試使用備用模型 ${fallbackModel.id} (第 ${attempt + 1} 次)...")
                return fallbackExecutor.execute(prompt, fallbackModel, tools)

            } catch (e: Exception) {
                lastException = e
                attempt++

                println("⚠️ 備用模型第 $attempt 次嘗試失敗:${e.message}")

                if (attempt < maxRetries) {
                    println("⏳ 等待 ${retryDelayMs}ms 後重試...")
                    delay(retryDelayMs)
                }
            }
        }

        // 所有嘗試都失敗了
        throw Exception(
            "經過 ${maxRetries * 2} 次嘗試後,所有模型都無法處理請求。最後錯誤:${lastException?.message}"
        )
    }

    override suspend fun executeStreaming(prompt: Prompt, model: LLModel): Flow<String> {
        return try {
            executeWithRetryFlow(
                primaryOperation = { executor, llmModel -> executor.executeStreaming(prompt, llmModel) },
                fallbackOperation = { executor, llmModel -> executor.executeStreaming(prompt, llmModel) }
            )
        } catch (e: Exception) {
            flowOf("容錯機制:${e.message}")
        }
    }

    override suspend fun moderate(prompt: Prompt, model: LLModel): ModerationResult {
        return try {
            executeWithRetry(
                primaryOperation = { executor, llmModel -> executor.moderate(prompt, llmModel) },
                fallbackOperation = { executor, llmModel -> executor.moderate(prompt, llmModel) }
            )
        } catch (e: Exception) {
            throw Exception("容錯機制:無法完成內容審核 ${e.message}")
        }
    }

    override suspend fun executeMultipleChoices(
        prompt: Prompt,
        model: LLModel,
        tools: List<ToolDescriptor>
    ): List<LLMChoice> {
        return executeWithRetry(
            primaryOperation = { executor, llmModel -> executor.executeMultipleChoices(prompt, llmModel, tools) },
            fallbackOperation = { executor, llmModel -> executor.executeMultipleChoices(prompt, llmModel, tools) }
        )
    }

    private suspend fun <T> executeWithRetry(
        primaryOperation: suspend (PromptExecutor, LLModel) -> T,
        fallbackOperation: suspend (PromptExecutor, LLModel) -> T
    ): T {
        var attempt = 0
        var lastException: Exception? = null

        // 嘗試主要執行器
        while (attempt < maxRetries) {
            try {
                return primaryOperation(primaryExecutor, primaryModel)
            } catch (e: Exception) {
                lastException = e
                attempt++
                if (attempt < maxRetries) {
                    delay(retryDelayMs)
                }
            }
        }

        // 嘗試備用執行器
        attempt = 0
        while (attempt < maxRetries) {
            try {
                return fallbackOperation(fallbackExecutor, fallbackModel)
            } catch (e: Exception) {
                lastException = e
                attempt++
                if (attempt < maxRetries) {
                    delay(retryDelayMs)
                }
            }
        }

        throw Exception("所有重試都失敗:${lastException?.message}")
    }

    private suspend fun executeWithRetryFlow(
        primaryOperation: suspend (PromptExecutor, LLModel) -> Flow<String>,
        fallbackOperation: suspend (PromptExecutor, LLModel) -> Flow<String>
    ): Flow<String> {
        var attempt = 0
        var lastException: Exception? = null

        // 嘗試主要執行器
        while (attempt < maxRetries) {
            try {
                return primaryOperation(primaryExecutor, primaryModel)
            } catch (e: Exception) {
                lastException = e
                attempt++
                if (attempt < maxRetries) {
                    delay(retryDelayMs)
                }
            }
        }

        // 嘗試備用執行器
        attempt = 0
        while (attempt < maxRetries) {
            try {
                return fallbackOperation(fallbackExecutor, fallbackModel)
            } catch (e: Exception) {
                lastException = e
                attempt++
                if (attempt < maxRetries) {
                    delay(retryDelayMs)
                }
            }
        }

        throw Exception("所有串流重試都失敗:${lastException?.message}")
    }
}

執行 AI 回應內容

只要把原本的 ResilientExecutor 換成 AdvancedResilientExecutor 就好了 可以看到已經有重試機制,超過重試次數就會自動切換到備用模型

🧪 開始容錯切換測試...
📝 問題:請簡單的說明,什麼是 Kotlin 的協程
🎯 嘗試使用主要模型 gpt-4.1-mini (1)...
⚠️ 主要模型第 1 次嘗試失敗:模擬主要執行器故障
⏳ 等待 1000ms 後重試...
🎯 嘗試使用主要模型 gpt-4.1-mini (2)...
⚠️ 主要模型第 2 次嘗試失敗:模擬主要執行器故障
🔄 切換到備用模型 gemini-2.5-flash...
🎯 嘗試使用備用模型 gemini-2.5-flash (1)...
🤖 回應:Kotlin 協程(Coroutines)是一種用於**簡化非同步(Asynchronous)程式設計**的工具。

簡單來說,它就像是:

1.  **「超輕量級的執行緒」**    *   傳統的執行緒(Threads)很「重」,啟動和管理都需要較多系統資源。
    *   協程非常「輕」,可以在一個執行緒上運行成千上萬個協程,因此能高效利用資源。

2.  **「可以暫停和恢復的函數」**    *   協程的核心是 `suspend`(暫停)這個關鍵字。當一個標記為 `suspend` 的函數執行到一個耗時操作(例如網路請求、資料庫查詢)時,它會**暫停**自己的執行,**不會阻塞**底層的執行緒。
    *   等到耗時操作完成後,這個協程會**自動恢復**從它暫停的地方繼續執行。

**它解決了什麼問題?**

*   **「回呼地獄(Callback Hell)」**:傳統非同步程式碼常使用巢狀的回呼函數,導致程式碼難以閱讀和維護。協程讓非同步程式碼看起來像是循序執行的同步程式碼,大大提高了可讀性。
*   **「阻塞(Blocking)」問題**:避免因為一個耗時操作而導致整個應用程式(特別是使用者介面)卡住、沒有回應。

**總結來說:**

Kotlin 協程提供了一種更**簡潔、可讀性高且高效**的方式來編寫非同步和並行程式碼,讓你能夠輕鬆處理網路請求、資料庫操作等耗時任務,而不會阻塞主執行緒,確保應用程式保持流暢回應。

商業價值與效益

  • 服務可靠性

    • 確保比較高的服務可用性
    • 即使單一 LLM 服務中斷,用戶仍能獲得回應
    • 大幅降低服務中斷對業務的影響
  • 用戶體驗

    • 用戶無感知的容錯切換
    • 持續穩定的服務品質
    • 提升系統的可靠性和用戶滿意度

總結

今天我們實作了 Koog 框架的智慧容錯機制,主要成果包括

  • ResilientExecutor:簡潔而有效的容錯執行器
  • 自動容錯切換:當主要模型失敗時自動使用備用模型
  • 進階重試機制:指數退避和多次重試策略
  • 實用整合:可直接整合到任何 AI 應用中

在下一篇文章,我們將整合 OpenTelemetry 監控系統,為我們的 LLM 系統添加全面的可觀測性,讓我們能夠

  • 追蹤每次請求的完整路徑
  • 分析系統效能和健康狀態
  • 提供運維團隊實用的監控數據

這將幫助我們在生產環境中更好地理解和優化 AI 系統的執行狀況

參考文件


支持創作

如果這篇文章對您有幫助,歡迎透過 贊助連結 支持我持續創作優質內容。您的支持是我前進的動力!


圖片來源:AI 產生