Logo
Published on

Koog Kotlin AI 框架實戰 Day 17:OpenTelemetry 監控整合:為智能客服系統添加全面可觀測性

在前面 16 天的學習中,我們逐步建立了一個功能完整的智能客服系統,整合了多 LLM 提供商、智能路由、快取機制、記憶體系統等核心功能。今天我們要進入企業級功能的第一篇:為我們的 SmartCustomerService 系統添加 OpenTelemetry 監控功能,實現全面的可觀測性。

我們的技術演進歷程回顧

讓我們先回顧一下到目前為止建立的技術堆疊:

  • Day 2: 建立 ApiKeyManager 基礎配置管理
  • Day 3: 實作提示系統和 DSL 語法
  • Day 4: 整合多 LLM 提供商(OpenAI、Google、Anthropic)
  • Day 5+9: 從基礎到進階的工具系統
  • Day 6: Agent 配置參數調校
  • Day 7: 錯誤處理和除錯機制
  • Day 8: 建立 SmartCustomerService 主線專案
  • Day 11: 快取系統提升回應效能
  • Day 12: 記憶體系統實現個人化學習
  • Day 14: 策略模式智能選擇執行方式
  • Day 16: 智能 LLM 路由與容錯機制

現在,我們要為這個完整的系統添加企業級的監控能力。

OpenTelemetry 在企業 AI 系統中的價值

為什麼 AI 應用特別需要監控?

AI 應用與傳統應用有著不同的監控需求:

  1. 不可預測性:LLM 的回應時間和品質會因查詢複雜度而變化
  2. 多層依賴:涉及多個 LLM 提供商、快取系統、外部 API
  3. 成本控制:Token 使用量直接影響營運成本
  4. 品質保證:需要追蹤 AI 回應的準確性和用戶滿意度

OpenTelemetry 核心概念

  • Traces:完整的請求處理路徑
  • Spans:單一操作的時間追蹤
  • Metrics:系統效能指標
  • Logs:結構化事件記錄

為 SmartCustomerService 整合監控功能

建立監控配置管理器

首先,讓我們擴展 Day 2 的配置管理系統:

// 檔案:src/main/kotlin/config/MonitoringConfig.kt
import ai.koog.agents.features.opentelemetry.OpenTelemetry
import io.opentelemetry.exporter.logging.LoggingSpanExporter
import io.opentelemetry.exporter.otlp.grpc.OtlpGrpcSpanExporter
import io.opentelemetry.api.common.AttributeKey
import io.opentelemetry.sdk.trace.samplers.Sampler

object MonitoringConfig {
    // 監控環境配置
    val environment: String = System.getenv("ENVIRONMENT") ?: "development"
    val jaegerEndpoint: String = System.getenv("JAEGER_ENDPOINT") ?: "http://localhost:4317"
    val serviceName: String = "smart-customer-service"
    val serviceVersion: String = "1.0.0"

    // 建立監控配置
    fun createMonitoringFeature(): OpenTelemetry.() -> Unit = {
        setServiceInfo(serviceName, serviceVersion)

        // 根據環境選擇導出器
        when (environment) {
            "production" -> {
                // 生產環境:使用 Jaeger + 適度取樣
                addSpanExporter(
                    OtlpGrpcSpanExporter.builder()
                        .setEndpoint(jaegerEndpoint)
                        .build()
                )
                setSampler(Sampler.parentBased(Sampler.traceIdRatioBased(0.1))) // 10% 取樣
            }
            "staging" -> {
                // 測試環境:Jaeger + 高取樣率
                addSpanExporter(
                    OtlpGrpcSpanExporter.builder()
                        .setEndpoint(jaegerEndpoint)
                        .build()
                )
                setSampler(Sampler.alwaysOn())
            }
            else -> {
                // 開發環境:日誌導出器
                addSpanExporter(LoggingSpanExporter.create())
                setSampler(Sampler.alwaysOn())
                setVerbose(true)
            }
        }

        // 添加豐富的資源屬性
        addResourceAttributes(mapOf(
            AttributeKey.stringKey("deployment.environment") to environment,
            AttributeKey.stringKey("service.namespace") to "ai-platform",
            AttributeKey.stringKey("service.instance.id") to "instance-${System.currentTimeMillis()}",
            AttributeKey.stringKey("business.unit") to "customer-support",
            AttributeKey.stringKey("cost.center") to "ai-operations",
            AttributeKey.stringKey("koog.version") to "1.0.0"
        ))
    }
}

MonitoredCustomerService:整合所有技術

現在讓我們建立一個完整整合監控功能的客服系統:

// 檔案:src/main/kotlin/service/MonitoredCustomerService.kt
import ai.koog.agents.AIAgent
import ai.koog.agents.executors.simpleOpenAIExecutor
import ai.koog.agents.features.opentelemetry.OpenTelemetry
import ai.koog.agents.features.memory.AgentMemory
import ai.koog.agents.features.events.EventHandler
import ai.koog.llms.OpenAIModels
import ai.koog.tools.*
import kotlinx.coroutines.withTimeout
import kotlin.time.Duration.Companion.seconds
import kotlin.time.Duration.Companion.minutes

class MonitoredCustomerService {
    // Day 11: 快取系統(短期效能優化)
    private val cache by lazy { createCacheStrategy() }

    // Day 12: 記憶體系統(長期學習能力)
    private val memoryProvider by lazy { createMemoryProvider() }

    // Day 16: 智能路由執行器
    private val intelligentExecutor by lazy {
        val baseExecutor = simpleOpenAIExecutor(ApiKeyManager.openAIKey) // Day 2: 統一配置
        AutoFailoverExecutor(
            executors = mapOf(
                "openai" to baseExecutor,
                "anthropic" to simpleAnthropicExecutor(ApiKeyManager.anthropicKey),
                "google" to simpleGoogleExecutor(ApiKeyManager.googleKey)
            ),
            routingStrategy = SmartRoutingStrategy(),      // Day 16: 智能路由
            healthMonitor = ModelHealthMonitor()           // Day 16: 健康監控
        )
    }

    // Day 17: 新增 - 監控執行器包裝
    private val monitoredExecutor by lazy {
        CachedPromptExecutor(                              // Day 11: 快取層
            delegate = intelligentExecutor,                // Day 16: 智能路由層
            cache = cache
        )
    }

    // Day 5+9: 完整工具註冊
    private val toolRegistry = ToolRegistry {
        // Day 5: 基礎工具
        tool(SayToUser)
        tool(AskUser)
        tool(ExitTool)

        // Day 9: 進階異步工具
        tool(WeatherTool)
        tool(OrderStatusTool)
        tool(CustomerInfoTool)

        // Day 17: 新增 - 監控相關工具
        tool(SystemStatusTool)
        tool(PerformanceReportTool)
    }

    // Day 17: 新增 - 監控事件處理器
    private val monitoringEventHandler = object : EventHandler {
        override suspend fun onAgentStarted(strategyName: String) {
            println("📊 [監控] Agent 開始執行策略: $strategyName")
        }

        override suspend fun onAgentFinished(strategyName: String, result: String) {
            println("📊 [監控] Agent 執行完成: $strategyName")
            println("📊 [監控] 回應長度: ${result.length} 字元")
        }

        override suspend fun onToolCallStarted(toolName: String, args: Any) {
            println("🔧 [監控] 工具調用開始: $toolName")
        }

        override suspend fun onToolCallFinished(toolName: String, result: String) {
            println("🔧 [監控] 工具調用完成: $toolName")
        }
    }

    // Day 17: 整合監控功能的主要 Agent
    private val monitoredAgent by lazy {
        AIAgent(
            executor = monitoredExecutor,                      // Day 11+16+17: 多層執行器
            systemPrompt = createEnhancedSystemPrompt(),       // Day 3+12: 增強提示
            toolRegistry = toolRegistry,                       // Day 5+9+17: 完整工具集
            temperature = 0.7,                                 // Day 6: 配置參數
            maxIterations = 15,                                // Day 6: 進階配置
            eventHandler = monitoringEventHandler              // Day 17: 新增監控
        ) {
            // Day 17: 新增 - OpenTelemetry 監控功能
            install(OpenTelemetry, MonitoringConfig.createMonitoringFeature())

            // Day 12: 記憶體功能
            install(AgentMemory) {
                memoryProvider = this@MonitoredCustomerService.memoryProvider
                featureName = "monitored-customer-service"
                organizationName = "tech-support"
            }
        }
    }

    // Day 17: 新增 - 帶監控的客戶查詢處理
    suspend fun handleCustomerQueryWithMonitoring(
        query: String,
        customerId: String? = null
    ): CustomerServiceResponse {
        return try {
            withTimeout(30.seconds) {
                val startTime = System.currentTimeMillis()

                // Day 17: 新增 - 查詢複雜度分析(用於監控標籤)
                val queryComplexity = analyzeQueryComplexity(query)
                println("📊 [監控] 查詢複雜度: ${queryComplexity.complexity}")
                println("📊 [監控] 預期處理時間: ${queryComplexity.estimatedDuration}ms")

                // 執行查詢(所有 Day 2-16 技術都會被自動監控)
                val result = monitoredAgent.run(query)

                val actualDuration = System.currentTimeMillis() - startTime

                // Day 17: 新增 - 監控統計
                recordQueryMetrics(
                    query = query,
                    customerId = customerId,
                    complexity = queryComplexity.complexity,
                    duration = actualDuration,
                    cacheHit = cache.containsKey(query),
                    success = true
                )

                CustomerServiceResponse(
                    response = result,
                    customerId = customerId,
                    processingTime = actualDuration,
                    cacheHit = cache.containsKey(query),
                    complexity = queryComplexity.complexity,
                    timestamp = System.currentTimeMillis()
                )
            }
        } catch (e: Exception) {
            // Day 7+17: 錯誤處理 + 監控
            println("❌ [監控] 處理錯誤: ${e.message}")
            recordQueryMetrics(
                query = query,
                customerId = customerId,
                complexity = QueryComplexity.UNKNOWN,
                duration = -1,
                cacheHit = false,
                success = false,
                error = e.message
            )

            CustomerServiceResponse(
                response = "很抱歉,系統暫時無法處理您的請求。我們已記錄此問題並會盡快解決。",
                customerId = customerId,
                processingTime = -1,
                cacheHit = false,
                complexity = QueryComplexity.UNKNOWN,
                timestamp = System.currentTimeMillis(),
                error = e.message
            )
        }
    }

    // Day 17: 新增 - 系統健康檢查
    suspend fun performHealthCheck(): SystemHealthReport {
        val healthCheck = SystemHealthReport(
            timestamp = System.currentTimeMillis(),
            cacheSize = cache.size(),
            cacheHitRate = calculateCacheHitRate(),
            memoryUsage = getMemoryUsage(),
            llmHealthStatus = checkLLMHealth(),
            toolsStatus = checkToolsHealth()
        )

        println("🏥 [監控] 系統健康檢查完成")
        println("🏥 [監控] 快取命中率: ${healthCheck.cacheHitRate}%")
        println("🏥 [監控] 記憶體使用量: ${healthCheck.memoryUsage}MB")

        return healthCheck
    }
}

// Day 17: 新增 - 客戶服務回應資料結構
data class CustomerServiceResponse(
    val response: String,
    val customerId: String?,
    val processingTime: Long,
    val cacheHit: Boolean,
    val complexity: QueryComplexity,
    val timestamp: Long,
    val error: String? = null
)

// Day 17: 新增 - 系統健康報告
data class SystemHealthReport(
    val timestamp: Long,
    val cacheSize: Int,
    val cacheHitRate: Double,
    val memoryUsage: Long,
    val llmHealthStatus: Map<String, Boolean>,
    val toolsStatus: Map<String, Boolean>
)

// Day 17: 新增 - 查詢複雜度分析
enum class QueryComplexity(val estimatedDuration: Long) {
    SIMPLE(1000),     // 簡單問題,如問候
    MODERATE(3000),   // 中等問題,需要工具調用
    COMPLEX(8000),    // 複雜問題,需要多步推理
    UNKNOWN(-1)       // 無法分析
}

data class QueryAnalysis(
    val complexity: QueryComplexity,
    val estimatedDuration: Long,
    val requiresTools: Boolean,
    val requiresMemory: Boolean
)

監控工具和輔助功能實作

現在讓我們實作支持監控功能的輔助工具:

// 檔案:src/main/kotlin/monitoring/MonitoringTools.kt
import ai.koog.tools.*
import kotlinx.serialization.Serializable

// Day 17: 新增 - 系統狀態查詢工具
object SystemStatusTool : SimpleTool<SystemStatusTool.Args>() {
    @Serializable
    data class Args(val component: String = "all") : ToolArgs

    override val argsSerializer = Args.serializer()

    override val descriptor = ToolDescriptor(
        name = "check_system_status",
        description = "檢查系統各組件的狀態和效能指標",
        optionalParameters = listOf(
            ToolParameterDescriptor(
                name = "component",
                description = "要檢查的組件 (cache, memory, llm, tools, all)",
                type = ToolParameterType.String
            )
        )
    )

    override suspend fun doExecute(args: Args): String {
        return when (args.component.lowercase()) {
            "cache" -> "快取系統:運行正常,命中率 75%,大小 234MB"
            "memory" -> "記憶體系統:運行良好,已儲存 156 個客戶偏好"
            "llm" -> "LLM 服務:OpenAI ✅、Google ✅、Anthropic ✅"
            "tools" -> "工具系統:9 個工具運行正常"
            else -> "整體系統狀態:所有組件運行正常,平均回應時間 1.2 秒"
        }
    }
}

// Day 17: 新增 - 效能報告工具
object PerformanceReportTool : SimpleTool<PerformanceReportTool.Args>() {
    @Serializable
    data class Args(val timeRange: String = "1h") : ToolArgs

    override val argsSerializer = Args.serializer()

    override val descriptor = ToolDescriptor(
        name = "generate_performance_report",
        description = "生成系統效能報告",
        optionalParameters = listOf(
            ToolParameterDescriptor(
                name = "timeRange",
                description = "時間範圍 (1h, 6h, 24h, 7d)",
                type = ToolParameterType.String
            )
        )
    )

    override suspend fun doExecute(args: Args): String {
        val timeRange = args.timeRange
        return """
            📊 系統效能報告 (最近 $timeRange)

            🚀 處理統計:
            - 總查詢數:1,247 次
            - 平均回應時間:1.23 秒
            - 成功率:99.2%

            💾 快取效能:
            - 命中率:75.3%
            - 節省的 API 呼叫:936 次
            - 預估節省成本:NT$ 280

            🧠 記憶體使用:
            - 學習到的偏好:42 個
            - 記憶體準確性:94%

            🎯 熱門查詢類型:
            1. 訂單查詢 (35%)
            2. 產品諮詢 (28%)
            3. 技術支援 (22%)
            4. 退換貨 (15%)
        """.trimIndent()
    }
}

實戰範例:完整監控演示

讓我們建立一個完整的示範程式:

// 檔案:src/main/kotlin/demo/MonitoringDemo.kt
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.delay

suspend fun main() {
    println("🚀 啟動帶監控功能的智能客服系統")
    println("=" * 60)

    // 初始化監控客服系統
    val customerService = MonitoredCustomerService()

    // 測試查詢集合
    val testQueries = listOf(
        TestQuery("你好,我想查詢我的訂單狀態", "CUST001", QueryComplexity.MODERATE),
        TestQuery("今天台北的天氣如何?", "CUST002", QueryComplexity.SIMPLE),
        TestQuery("我想了解你們的退貨政策,我上個月買的產品有問題", "CUST003", QueryComplexity.COMPLEX),
        TestQuery("系統目前的狀態如何?", "CUST004", QueryComplexity.SIMPLE),
        TestQuery("能否生成最近一小時的效能報告?", "CUST005", QueryComplexity.MODERATE)
    )

    // 執行測試查詢並收集監控數據
    println("📋 開始執行測試查詢...")
    val results = mutableListOf<CustomerServiceResponse>()

    for ((index, testQuery) in testQueries.withIndex()) {
        println("\n📞 客戶查詢 ${index + 1}/5")
        println("👤 客戶ID: ${testQuery.customerId}")
        println("❓ 查詢: ${testQuery.query}")
        println("🎯 預期複雜度: ${testQuery.expectedComplexity}")

        val result = customerService.handleCustomerQueryWithMonitoring(
            query = testQuery.query,
            customerId = testQuery.customerId
        )

        results.add(result)

        // 顯示監控結果
        println("✅ 回應: ${result.response.take(100)}${if (result.response.length > 100) "..." else ""}")
        println("⏱️  處理時間: ${result.processingTime}ms")
        println("💾 快取命中: ${if (result.cacheHit) "是" else "否"}")
        println("📊 實際複雜度: ${result.complexity}")

        if (result.error != null) {
            println("❌ 錯誤: ${result.error}")
        }

        // 模擬請求間隔
        delay(1000)
    }

    // 生成監控摘要報告
    println("\n" + "=" * 60)
    println("📊 監控摘要報告")
    println("=" * 60)

    val avgProcessingTime = results.filter { it.processingTime > 0 }
        .map { it.processingTime }.average()
    val cacheHitRate = results.count { it.cacheHit }.toDouble() / results.size * 100
    val successRate = results.count { it.error == null }.toDouble() / results.size * 100

    println("📈 整體統計:")
    println("   • 總查詢數: ${results.size}")
    println("   • 平均處理時間: ${String.format("%.1f", avgProcessingTime)}ms")
    println("   • 快取命中率: ${String.format("%.1f", cacheHitRate)}%")
    println("   • 成功率: ${String.format("%.1f", successRate)}%")

    // 複雜度分布統計
    println("\n📊 查詢複雜度分布:")
    val complexityStats = results.groupBy { it.complexity }
        .mapValues { it.value.size }
    complexityStats.forEach { (complexity, count) ->
        println("   • $complexity: $count 次")
    }

    // 執行系統健康檢查
    println("\n🏥 執行系統健康檢查...")
    val healthReport = customerService.performHealthCheck()

    println("\n💡 監控洞察:")
    println("   • Koog OpenTelemetry 自動追蹤了所有 LLM 調用")
    println("   • 快取系統有效減少了 ${String.format("%.0f", cacheHitRate)}% 的重複請求")
    println("   • 記憶體系統正在學習客戶偏好模式")
    println("   • 智能路由確保了高可用性")

    println("\n🎯 下一步建議:")
    println("   • 使用 Jaeger UI 查看完整的追蹤視圖")
    println("   • 設定告警閾值監控關鍵指標")
    println("   • 建立儀表板可視化效能數據")

    println("\n✨ 監控演示完成!")
}

data class TestQuery(
    val query: String,
    val customerId: String,
    val expectedComplexity: QueryComplexity
)

Jaeger 整合和視覺化

Docker Compose 設定

# 檔案:docker-compose.monitoring.yml
version: '3.8'

services:
  jaeger-all-in-one:
    image: jaegertracing/all-in-one:1.39
    container_name: koog-jaeger
    environment:
      - COLLECTOR_OTLP_ENABLED=true
      - JAEGER_DISABLED=false
    ports:
      - '4317:4317' # OTLP gRPC
      - '4318:4318' # OTLP HTTP
      - '16686:16686' # Jaeger UI
    networks:
      - koog-monitoring

  koog-customer-service:
    build:
      context: .
      dockerfile: Dockerfile
    container_name: koog-customer-service
    environment:
      - OPENAI_API_KEY=${OPENAI_API_KEY}
      - ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
      - GOOGLE_API_KEY=${GOOGLE_API_KEY}
      - JAEGER_ENDPOINT=http://jaeger-all-in-one:4317
      - ENVIRONMENT=production
    depends_on:
      - jaeger-all-in-one
    networks:
      - koog-monitoring
    ports:
      - '8080:8080'

networks:
  koog-monitoring:
    driver: bridge

生產環境配置

# 啟動監控環境
docker-compose -f docker-compose.monitoring.yml up -d

# 訪問 Jaeger UI
open http://localhost:16686

# 檢查服務狀態
curl http://localhost:8080/health

監控最佳實踐與 Day 29 的區別

Day 17 vs Day 29 的明確分工

Day 17(今天)- 技術整合焦點

  • OpenTelemetry 功能安裝和配置
  • Span、Metrics 的基礎收集
  • Jaeger 整合和視覺化設定
  • AI 應用特有的監控指標

Day 29(未來)- 維運實踐焦點

  • 基於監控數據的告警策略
  • 生產環境故障診斷流程
  • 自動化運維腳本
  • 容量伸縮決策支援

關鍵監控指標建議

  1. 效能指標

    • LLM 推理延遲
    • 工具執行時間
    • 端到端回應時間
  2. 業務指標

    • 客戶滿意度
    • 問題解決率
    • 服務升級次數
  3. 成本指標

    • Token 使用量
    • API 調用費用
    • 快取節省金額

總結

今天我們成功為 SmartCustomerService 系統整合了 OpenTelemetry 監控功能,實現了從 Day 2 到 Day 17 的完整技術演進:

技術整合成果

  • 完美繼承:整合了 Day 2-16 所有學習技術
  • 配置一致:統一使用 ApiKeyManager 進行配置管理
  • 監控全面:覆蓋快取、記憶體、工具、LLM 所有層面
  • 實際應用:提供具體的商業價值和監控洞察

新增核心功能

  • OpenTelemetry 整合:自動追蹤所有 AI 操作
  • 多環境支援:開發、測試、生產環境自適應
  • 智能分析:查詢複雜度預測和效能統計
  • 健康監控:系統狀態即時檢查

商業價值

  • 效能可見:即時了解系統執行狀況
  • 成本控制:精確追蹤 API 使用和費用
  • 品質保證:快速識別和定位問題
  • 決策支援:基於數據優化系統配置

下一篇文章,我們將學習如何將這個完整的監控系統整合到 Spring Boot 框架中,建立企業級的 AI 微服務架構。

參考文件

本文涵蓋的 Koog 框架功能詳細文檔:

更多資訊請參考 Koog 完整文檔

支持創作

如果這篇文章對您有幫助,歡迎透過 贊助連結 支持我持續創作優質內容。您的支持是我前進的動力!


圖片來源:AI 產生