- Published on
第一次學 Kotlin Koog AI 就上手 Day 18:服務不中斷:建立智慧路由與容錯機制
回顧我們的文章,從 Day 4 學會了基礎的多 LLM 配置,到 Day 7 掌握了錯誤處理技巧。今天我們要將這些知識結合起來,建立一個智慧的容錯機制,確保 AI 應用系統在任何情況下都能持續穩定運行
從基礎到企業級的演進
在 Day 4 中,我們初步接觸了 fallback 的概念,透過簡單的 try-catch 實現基本的容錯。當時我們建立了 FallbackMultiLLMSetup
來處理供應商切換,這為我們奠定了容錯思維的基礎
今天,我們要將這個概念提升到生產環境等級,建立一個標準化、可重用的容錯機制。主要差異在於
Day 4 基礎版本
- 應用層級的簡單容錯處理
- 直接返回 String 結果
- 基本的 try-catch 結構
- 適合學習和概念驗證
Day 18 進階版本
- 框架層級的標準化容錯執行器
- 實作 PromptExecutor 介面,完全相容 AIAgent
- 支援重試機制和指數退避
- 詳細的錯誤記錄和監控
- 直接可用於生產環境
從基礎配置到智慧容錯的進化
Day 4 回顧:基礎多模型設定
在 Day 4 中,我們學習了基本的多 LLM 配置,但仍需要手動選擇模型
// Day 4:基礎的多 LLM 設定
val openAIExecutor = simpleOpenAIExecutor(ApiKeyManager.openAIKey)
val anthropicExecutor = simpleAnthropicExecutor(ApiKeyManager.anthropicKey)
// 手動選擇使用哪個執行器
val result = openAIExecutor.execute(prompt, model, tools)
Day 18 智慧容錯機制
今天我們要實現自動容錯切換,當主要模型失敗時,系統會自動切換到備用模型
// Day 18:智慧容錯機制
val resilientExecutor = ResilientExecutor(
primaryExecutor = simpleOpenAIExecutor(ApiKeyManager.openAIKey),
fallbackExecutor = simpleAnthropicExecutor(ApiKeyManager.anthropicKey)
)
// 自動處理容錯切換,無需手動干預
val result = resilientExecutor.execute(prompt, model, tools)
關鍵差異
- Day 4:手動選擇模型,需要開發者決定使用哪個執行器
- Day 18:自動容錯切換,系統智慧處理故障恢復
容錯機制的重要性
在現實世界中,AI 服務可能會因為各種原因暫時無法使用
- API 服務暫時中斷:OpenAI 服務維護或過載
- 網路連線問題:暫時的網路不穩定
- 額度限制:達到 API 使用上限
- 金鑰問題:API 金鑰過期或設定錯誤
如果我們的 AI 應用只依賴單一模型,任何一個問題都會讓整個服務停擺。這對於企業來說是不可接受的
不同模型的容錯優勢
在實際應用中,主要模型和備用模型使用不同的 LLM 有以下優勢
- 成本優化:主要使用高效但較貴的模型(如 GPT-4.1),備用使用較便宜的模型(如 Gemini Flash)
- 能力互補:不同供應商的模型在不同任務上有各自的強項
- 風險分散:避免單一供應商的服務中斷影響整個系統
- 靈活配置:可以根據業務需求動態調整模型組合
Koog 框架的擴充性展現
在開始實作容錯機制之前,讓我們先認識 Koog 框架設計的巧妙之處。透過今天的實作,我們將看到如何輕鬆擴充原有的 PromptExecutor 介面,建立自己的智慧容錯執行器
擴充介面的簡單性
Koog 框架採用開放式設計,只要實作 PromptExecutor
介面,我們就能建立自己的客製化執行器
class ResilientExecutor(...) : PromptExecutor {
// 只需要實作 4 個核心方法
override suspend fun execute(...)
override suspend fun executeStreaming(...)
override suspend fun moderate(...)
override suspend fun executeMultipleChoices(...)
}
關鍵優勢
- 無縫整合:新的執行器可以直接與
AIAgent
整合使用 - 介面一致性:所有執行器都遵循相同的介面規範
- 組合彈性:可以將不同的執行器組合使用(如容錯 + 快取)
- 擴充簡單:只需實作介面,不需修改框架核心程式碼
這種設計讓我們能夠根據業務需求,輕鬆建立各種專門的執行器,如快取執行器、限流執行器、監控執行器等
ResilientExecutor:核心容錯機制
讓我們實作一個簡單而有效的容錯執行器
class ResilientExecutor(
private val primaryExecutor: PromptExecutor,
private val primaryModel: LLModel,
private val fallbackExecutor: PromptExecutor,
private val fallbackModel: LLModel
) : PromptExecutor {
override suspend fun execute(
prompt: Prompt,
model: LLModel,
tools: List<ToolDescriptor>
): List<Response> {
return try {
println("🎯 嘗試使用主要模型 ${primaryModel.id}...")
// 首先嘗試使用主要執行器(例如 OpenAI)
primaryExecutor.execute(prompt, primaryModel, tools)
} catch (e: Exception) {
println("⚠️ 主要模型失敗:${e.message}")
println("🔄 切換到備用模型 ${fallbackModel.id}...")
try {
// 主要執行器失敗時,切換到備用執行器(例如 Anthropic)
fallbackExecutor.execute(prompt, fallbackModel, tools)
} catch (fallbackException: Exception) {
println("❌ 備用模型也失敗:${fallbackException.message}")
// 如果備用模型也失敗,拋出更詳細的錯誤訊息
throw Exception(
"所有模型都無法處理請求。主要錯誤:${e.message},備用錯誤:${fallbackException.message}"
)
}
}
}
override suspend fun executeStreaming(prompt: Prompt, model: LLModel): Flow<String> {
return try {
println("🎯 嘗試使用主要模型 ${primaryModel.id} 進行串流處理...")
primaryExecutor.executeStreaming(prompt, primaryModel)
} catch (e: Exception) {
println("⚠️ 主要模型串流處理失敗:${e.message}")
println("🔄 切換到備用模型 ${fallbackModel.id} 進行串流處理...")
try {
fallbackExecutor.executeStreaming(prompt, fallbackModel)
} catch (fallbackException: Exception) {
println("❌ 備用模型串流處理也失敗:${fallbackException.message}")
// 返回錯誤訊息作為串流
flowOf(
"所有模型都無法處理串流請求。主要錯誤:${e.message},備用錯誤:${fallbackException.message}"
)
}
}
}
override suspend fun moderate(prompt: Prompt, model: LLModel): ModerationResult {
return try {
println("🎯 嘗試使用主要模型 ${primaryModel.id} 進行內容審核...")
primaryExecutor.moderate(prompt, primaryModel)
} catch (e: Exception) {
println("⚠️ 主要模型內容審核失敗:${e.message}")
println("🔄 切換到備用模型 ${fallbackModel.id} 進行內容審核...")
try {
fallbackExecutor.moderate(prompt, fallbackModel)
} catch (fallbackException: Exception) {
println("❌ 備用模型內容審核也失敗:${fallbackException.message}")
throw Exception(
"所有模型都無法處理內容審核請求。主要錯誤:${e.message},備用錯誤:${fallbackException.message}"
)
}
}
}
override suspend fun executeMultipleChoices(
prompt: Prompt,
model: LLModel,
tools: List<ToolDescriptor>
): List<LLMChoice> {
return try {
println("🎯 嘗試使用主要模型 ${primaryModel.id} 進行多選項處理...")
primaryExecutor.executeMultipleChoices(prompt, primaryModel, tools)
} catch (e: Exception) {
println("⚠️ 主要模型多選項處理失敗:${e.message}")
println("🔄 切換到備用模型 ${fallbackModel.id} 進行多選項處理...")
try {
fallbackExecutor.executeMultipleChoices(prompt, fallbackModel, tools)
} catch (fallbackException: Exception) {
println("❌ 備用模型多選項處理也失敗:${fallbackException.message}")
throw Exception(
"所有模型都無法處理多選項請求。主要錯誤:${e.message},備用錯誤:${fallbackException.message}"
)
}
}
}
}
基本使用範例
讓我們看看如何在實際應用中使用 ResilientExecutor
suspend fun main() {
println("🤖 容錯執行器測試啟動")
// 建立容錯執行器
val resilientExecutor = ResilientExecutor(
primaryExecutor = simpleOpenAIExecutor(ApiKeyManager.openAIKey),
primaryModel = OpenAIModels.Chat.GPT4o,
fallbackExecutor = simpleGoogleAIExecutor(ApiKeyManager.googleApiKey!!),
fallbackModel = GoogleModels.Gemini2_5Flash
)
// 建立 AIAgent 使用容錯執行器
val agent = AIAgent(
executor = resilientExecutor,
systemPrompt = "你是一個 AI 助手,請用正體中文回答問題"
)
try {
val question = "請簡單的說明,什麼是 Kotlin 的協程"
println("📝 問題:$question")
val response = agent.run(question)
println("🤖 回應:$response")
} catch (e: Exception) {
println("❌ 執行失敗:${e.message}")
}
}
執行 AI 回應內容
正常沒有出錯的情況
🤖 容錯執行器測試啟動
📝 問題:請簡單的說明,什麼是 Kotlin 的協程
🎯 嘗試使用主要模型 gpt-4.1-mini...
🤖 回應:Kotlin 的協程(Coroutine)是一種輕量級的非同步程式設計方式,可以用來簡化異步操作和並發處理。它讓你能夠寫出看起來像同步的程式碼,但實際上可以非阻塞地執行長時間任務(例如網路請求、檔案讀寫等),提高程式效率和可讀性。協程透過掛起(suspend)和恢復機制,讓多個任務可以在同一個執行緒中協作運作。
模擬故障測試
為了驗證容錯機制的有效性,我們可以建立一個模擬故障的測試
class FailingExecutor : PromptExecutor {
override suspend fun execute(prompt: Prompt, model: LLModel, tools: List<ToolDescriptor>): List<Message.Response> {
throw Exception("模擬主要執行器故障")
}
override suspend fun executeStreaming(prompt: Prompt, model: LLModel): Flow<String> {
throw Exception("模擬主要執行器串流故障")
}
override suspend fun moderate(prompt: Prompt, model: LLModel): ModerationResult {
throw Exception("模擬主要執行器審核故障")
}
override suspend fun executeMultipleChoices(
prompt: Prompt,
model: LLModel,
tools: List<ToolDescriptor>
): List<LLMChoice> {
throw Exception("模擬主要執行器多選項故障")
}
}
模擬故障使用範例
suspend fun main() {
println("🧪 開始容錯切換測試...")
// 建立容錯執行器
val resilientExecutor = ResilientExecutor(
// 主要執行器使用 FailingExecutor() 來模擬故障
primaryExecutor = FailingExecutor(),
primaryModel = OpenAIModels.CostOptimized.GPT4_1Mini,
fallbackExecutor = simpleGoogleAIExecutor(ApiKeyManager.googleApiKey!!),
fallbackModel = GoogleModels.Gemini2_5Flash
)
// 建立 AIAgent 使用容錯執行器
val agent = AIAgent(
executor = resilientExecutor,
systemPrompt = "你是一個 AI 助手,請用正體中文回答問題",
llmModel = OpenAIModels.CostOptimized.GPT4_1Mini
)
try {
val question = "請簡單的說明,什麼是 Kotlin 的協程"
println("📝 問題:$question")
val response = agent.run(question)
println("🤖 回應:$response")
} catch (e: Exception) {
println("❌ 執行失敗:${e.message}")
}
}
執行 AI 回應內容
🧪 開始容錯切換測試...
📝 問題:請簡單的說明,什麼是 Kotlin 的協程
🎯 嘗試使用主要模型 gpt-4.1-mini...
⚠️ 主要模型失敗:模擬主要執行器故障
🔄 切換到備用模型 gemini-2.5-flash...
🤖 回應:Kotlin 協程 (Coroutines) 是一種讓我們能寫出**非同步 (asynchronous)**、**非阻塞 (non-blocking)** 程式碼的方式。
它的目的是讓處理耗時任務(例如網路請求、資料庫操作)時,不會讓應用程式卡住,同時又讓程式碼看起來像循序執行一樣簡單易懂。
您可以想像成一個**非常有效率的廚師**:
1. 這位廚師有很多菜要做,但他不是每次都等到一道菜完全做好才開始下一道。
2. 他會把水燒上,然後**暫停 (suspend)** 等待水開,同時去切菜、炒其他的東西。
3. 等水開了,他再**恢復 (resume)** 回來煮麵。
整個過程就像是一個人在同時處理多項任務,而不是多個人分別處理。
**核心重點:**
* **輕量 (Lightweight):** 協程非常輕量,一個應用程式可以同時運行成千上萬個協程,而傳統的執行緒 (threads) 則非常耗資源。
* **非阻塞 (Non-blocking):** 當協程執行一個耗時操作時(例如等待網路回應),它不會佔用執行緒,而是將自己「暫停」,讓執行緒去做其他事情。等到操作完成,它再「恢復」執行,這樣就避免了卡頓。
* **易於閱讀和撰寫:** 它讓我們用寫同步程式碼(從上到下依序執行)的方式來寫非同步程式碼,大大減少了傳統回呼地獄 (Callback Hell) 的複雜性。
**總之,Kotlin 協程就是一個聰明、輕量又高效的方式,讓我們在處理需要等待的任務時,能寫出更清晰、更易於維護的非同步程式碼,同時不影響使用者介面或應用程式的流暢度。**
進階版容錯執行器
基礎的 ResilientExecutor
已經提供了良好的容錯能力,但在生產環境中,我們可能需要更智慧的容錯策略 AdvancedResilientExecutor
進階功能對比
功能 | 基礎版 ResilientExecutor | 進階版 AdvancedResilientExecutor |
---|---|---|
容錯切換 | ✅ 主要 → 備用 | ✅ 主要 → 備用 |
重試機制 | ❌ 單次嘗試 | ✅ 可配置重試次數 |
退避策略 | ❌ 立即切換 | ✅ 指數退避延遲 |
程式碼重用 | ❌ 方法重複 | ✅ 泛型重試邏輯 |
錯誤處理 | ✅ 基本錯誤訊息 | ✅ 詳細的重試記錄 |
核心改進說明
重試機制
- 每個執行器都會重試指定次數(預設 2 次)
- 避免因為暫時性網路問題導致的誤切換
指數退避策略
- 重試間隔採用固定延遲(可配置)
- 避免對 API 服務造成過大壓力
泛型重試邏輯
- 使用
executeWithRetry
和executeWithRetryFlow
統一處理重試 - 大幅減少程式碼重複,提高維護性
- 使用
AdvancedResilientExecutor 實作
class AdvancedResilientExecutor(
private val primaryExecutor: PromptExecutor,
private val primaryModel: LLModel,
private val fallbackExecutor: PromptExecutor,
private val fallbackModel: LLModel,
private val maxRetries: Int = 2,
private val retryDelayMs: Long = 1000
) : PromptExecutor {
override suspend fun execute(prompt: Prompt, model: LLModel, tools: List<ToolDescriptor>): List<Message.Response> {
var attempt = 0
var lastException: Exception? = null
// 嘗試主要執行器
while (attempt < maxRetries) {
try {
println("🎯 嘗試使用主要模型 ${primaryModel.id} (第 ${attempt + 1} 次)...")
return primaryExecutor.execute(prompt, primaryModel, tools)
} catch (e: Exception) {
lastException = e
attempt++
println("⚠️ 主要模型第 $attempt 次嘗試失敗:${e.message}")
if (attempt < maxRetries) {
println("⏳ 等待 ${retryDelayMs}ms 後重試...")
delay(retryDelayMs)
}
}
}
// 主要執行器多次重試都失敗,切換到備用執行器
println("🔄 切換到備用模型 ${fallbackModel.id}...")
attempt = 0
while (attempt < maxRetries) {
try {
println("🎯 嘗試使用備用模型 ${fallbackModel.id} (第 ${attempt + 1} 次)...")
return fallbackExecutor.execute(prompt, fallbackModel, tools)
} catch (e: Exception) {
lastException = e
attempt++
println("⚠️ 備用模型第 $attempt 次嘗試失敗:${e.message}")
if (attempt < maxRetries) {
println("⏳ 等待 ${retryDelayMs}ms 後重試...")
delay(retryDelayMs)
}
}
}
// 所有嘗試都失敗了
throw Exception(
"經過 ${maxRetries * 2} 次嘗試後,所有模型都無法處理請求。最後錯誤:${lastException?.message}"
)
}
override suspend fun executeStreaming(prompt: Prompt, model: LLModel): Flow<String> {
return try {
executeWithRetryFlow(
primaryOperation = { executor, llmModel -> executor.executeStreaming(prompt, llmModel) },
fallbackOperation = { executor, llmModel -> executor.executeStreaming(prompt, llmModel) }
)
} catch (e: Exception) {
flowOf("容錯機制:${e.message}")
}
}
override suspend fun moderate(prompt: Prompt, model: LLModel): ModerationResult {
return try {
executeWithRetry(
primaryOperation = { executor, llmModel -> executor.moderate(prompt, llmModel) },
fallbackOperation = { executor, llmModel -> executor.moderate(prompt, llmModel) }
)
} catch (e: Exception) {
throw Exception("容錯機制:無法完成內容審核 ${e.message}")
}
}
override suspend fun executeMultipleChoices(
prompt: Prompt,
model: LLModel,
tools: List<ToolDescriptor>
): List<LLMChoice> {
return executeWithRetry(
primaryOperation = { executor, llmModel -> executor.executeMultipleChoices(prompt, llmModel, tools) },
fallbackOperation = { executor, llmModel -> executor.executeMultipleChoices(prompt, llmModel, tools) }
)
}
private suspend fun <T> executeWithRetry(
primaryOperation: suspend (PromptExecutor, LLModel) -> T,
fallbackOperation: suspend (PromptExecutor, LLModel) -> T
): T {
var attempt = 0
var lastException: Exception? = null
// 嘗試主要執行器
while (attempt < maxRetries) {
try {
return primaryOperation(primaryExecutor, primaryModel)
} catch (e: Exception) {
lastException = e
attempt++
if (attempt < maxRetries) {
delay(retryDelayMs)
}
}
}
// 嘗試備用執行器
attempt = 0
while (attempt < maxRetries) {
try {
return fallbackOperation(fallbackExecutor, fallbackModel)
} catch (e: Exception) {
lastException = e
attempt++
if (attempt < maxRetries) {
delay(retryDelayMs)
}
}
}
throw Exception("所有重試都失敗:${lastException?.message}")
}
private suspend fun executeWithRetryFlow(
primaryOperation: suspend (PromptExecutor, LLModel) -> Flow<String>,
fallbackOperation: suspend (PromptExecutor, LLModel) -> Flow<String>
): Flow<String> {
var attempt = 0
var lastException: Exception? = null
// 嘗試主要執行器
while (attempt < maxRetries) {
try {
return primaryOperation(primaryExecutor, primaryModel)
} catch (e: Exception) {
lastException = e
attempt++
if (attempt < maxRetries) {
delay(retryDelayMs)
}
}
}
// 嘗試備用執行器
attempt = 0
while (attempt < maxRetries) {
try {
return fallbackOperation(fallbackExecutor, fallbackModel)
} catch (e: Exception) {
lastException = e
attempt++
if (attempt < maxRetries) {
delay(retryDelayMs)
}
}
}
throw Exception("所有串流重試都失敗:${lastException?.message}")
}
}
執行 AI 回應內容
只要把原本的 ResilientExecutor
換成 AdvancedResilientExecutor
就好了 可以看到已經有重試機制,超過重試次數就會自動切換到備用模型
🧪 開始容錯切換測試...
📝 問題:請簡單的說明,什麼是 Kotlin 的協程
🎯 嘗試使用主要模型 gpt-4.1-mini (第 1 次)...
⚠️ 主要模型第 1 次嘗試失敗:模擬主要執行器故障
⏳ 等待 1000ms 後重試...
🎯 嘗試使用主要模型 gpt-4.1-mini (第 2 次)...
⚠️ 主要模型第 2 次嘗試失敗:模擬主要執行器故障
🔄 切換到備用模型 gemini-2.5-flash...
🎯 嘗試使用備用模型 gemini-2.5-flash (第 1 次)...
🤖 回應:Kotlin 協程(Coroutines)是一種用於**簡化非同步(Asynchronous)程式設計**的工具。
簡單來說,它就像是:
1. **「超輕量級的執行緒」**:
* 傳統的執行緒(Threads)很「重」,啟動和管理都需要較多系統資源。
* 協程非常「輕」,可以在一個執行緒上運行成千上萬個協程,因此能高效利用資源。
2. **「可以暫停和恢復的函數」**:
* 協程的核心是 `suspend`(暫停)這個關鍵字。當一個標記為 `suspend` 的函數執行到一個耗時操作(例如網路請求、資料庫查詢)時,它會**暫停**自己的執行,**不會阻塞**底層的執行緒。
* 等到耗時操作完成後,這個協程會**自動恢復**從它暫停的地方繼續執行。
**它解決了什麼問題?**
* **「回呼地獄(Callback Hell)」**:傳統非同步程式碼常使用巢狀的回呼函數,導致程式碼難以閱讀和維護。協程讓非同步程式碼看起來像是循序執行的同步程式碼,大大提高了可讀性。
* **「阻塞(Blocking)」問題**:避免因為一個耗時操作而導致整個應用程式(特別是使用者介面)卡住、沒有回應。
**總結來說:**
Kotlin 協程提供了一種更**簡潔、可讀性高且高效**的方式來編寫非同步和並行程式碼,讓你能夠輕鬆處理網路請求、資料庫操作等耗時任務,而不會阻塞主執行緒,確保應用程式保持流暢回應。
商業價值與效益
服務可靠性
- 確保比較高的服務可用性
- 即使單一 LLM 服務中斷,用戶仍能獲得回應
- 大幅降低服務中斷對業務的影響
用戶體驗
- 用戶無感知的容錯切換
- 持續穩定的服務品質
- 提升系統的可靠性和用戶滿意度
總結
今天我們實作了 Koog 框架的智慧容錯機制,主要成果包括
- ResilientExecutor:簡潔而有效的容錯執行器
- 自動容錯切換:當主要模型失敗時自動使用備用模型
- 進階重試機制:指數退避和多次重試策略
- 實用整合:可直接整合到任何 AI 應用中
在下一篇文章,我們將整合 OpenTelemetry 監控系統,為我們的 LLM 系統添加全面的可觀測性,讓我們能夠
- 追蹤每次請求的完整路徑
- 分析系統效能和健康狀態
- 提供運維團隊實用的監控數據
這將幫助我們在生產環境中更好地理解和優化 AI 系統的執行狀況
參考文件
支持創作
如果這篇文章對您有幫助,歡迎透過 贊助連結 支持我持續創作優質內容。您的支持是我前進的動力!
圖片來源:AI 產生