Logo
Published on

第一次學 Kotlin Koog AI 就上手 Day 26:增加 AI 執行效率:Parallel Node 並行執行節點

在前一篇文章中,我們學習了檢查點系統來保護長時間執行的 AI 任務。在開發 AI 應用時,我們經常遇到需要比較多個不同做法或模型的情況,例如:產生多種文案版本並選擇最佳的、使用不同模型分析同一問題、或是同時執行多個獨立的任務

傳統的做法是一個一個來,但這樣會耗費很多時間。就像你要問三個朋友對一部電影的意見,但你必須一個一個打電話去問,而不能同時問所有人。今天我們來學習「Parallel Node」,讓我們能夠同時執行多個節點,大幅提升 AI 應用的效率

什麼是 Parallel Node?

生活中的類比

想像你要選擇一間餐廳吃晚餐,但不確定哪間最好。你有兩種做法

  • 順序詢問:一個一個打電話問朋友意見,要等前一人回答完才能問下一人
  • 同時詢問:建立一個群組,同時問所有朋友,等所有人都回答後再做決定

Parallel Node 就像第二種做法,讓我們能同時執行多個 AI 節點,然後根據結果來做決定

Parallel Node 的核心概念

  • 同時執行:多個節點並行處理相同的輸入
  • 結果比較:可以比較不同節點的輸出結果
  • 策略選擇:透過合併策略選擇或組合最終結果
  • 效能提升:相較於順序執行,可大幅節省時間

基本語法與概念

基本結構

Parallel Node 的基本語法非常簡單

val strategy = strategy<String, String>("my-strategy") {
    // 定義要並行執行的節點
    val nodeA by node<String, String>("node_a") { "結果A" }
    val nodeB by node<String, String>("node_b") { "結果B" }
    val nodeC by node<String, String>("node_c") { "結果C" }

    // 使用 parallel 函數並行執行
    val parallelResult by parallel<String, String>(
        nodeA, nodeB, nodeC
    ) {
        // 這裡定義如何合併結果
        // 選擇最長的結果
        selectByMax { it.length }
    }

    // 連接到策略流程
    edge(nodeStart forwardTo parallelResult)
    edge(parallelResult forwardTo nodeFinish)
}

類型定義

// 輸入類型:傳入每個節點的資料型別
typealias Input = String

// 輸出類型:每個節點回傳的資料型別
typealias Output = String

// parallel 函數的完整形式
val parallelNode by parallel<Input, Output>(
    node1, node2, node3, /* 更多節點... */
) {
    // 合併策略邏輯
}

四種合併策略詳解

Koog 提供四種主要的合併策略,讓我們能靈活處理並行節點的結果

selectBy - 條件選擇

根據特定條件選擇符合條件的第一個結果

val selectByCondition by parallel<String, String>(
    nodeA, nodeB, nodeC
) {
    // 選擇包含"重要"關鍵字的結果
    selectBy { it.contains("重要") }
}

使用場景

  • 需要篩選特定內容的結果
  • 檢查是否包含關鍵字或符合格式
  • 驗證輸出是否滿足條件

selectByMax - 最大值選擇

選擇在某個指標上表現最好的結果

val selectLongestText by parallel<String, String>(
    nodeA, nodeB, nodeC
) {
    // 選擇最長的文字
    selectByMax { it.length }
}

val selectHighestScore by parallel<String, Int>(
    scoreNodeA, scoreNodeB, scoreNodeC
) {
    // 直接選擇最高分數
    selectByMax { it }
}

使用場景

  • 選擇最詳細的回答
  • 找出評分最高的結果
  • 比較性能指標

selectByIndex - 智慧選擇

使用另一個 LLM 或自訂邏輯來評估並選擇最佳結果

val selectBestJoke by parallel<String, String>(
    jokeNodeA, jokeNodeB, jokeNodeC
) {
    selectByIndex { jokes ->
        // 使用 LLM 評選最佳笑話
        llm.writeSession {
            model = OpenAIModels.CostOptimized.GPT4_1Mini,
            updatePrompt {
                system("你是一位幽默評審,請從以下笑話中選出最好笑的一個")
                user("笑話列表:\n${jokes.joinToString("\n\n")}\n\n請回傳最好笑笑話的編號(0, 1, 2...)")
            }
            val response = requestLLM()
            response.text.toIntOrNull() ?: 0 // 預設選擇第一個
        }
    }
}

使用場景

  • 需要複雜評估邏輯
  • 讓 AI 評選最佳結果
  • 結合多種評估標準

fold - 結果合併

將所有結果合併成一個最終結果

val combineAllResults by parallel<String, String>(
    nodeA, nodeB, nodeC
) {
    fold("綜合結果:\n") { accumulated, result ->
        "$accumulated\n• $result"
    }
}

使用場景

  • 需要所有結果的完整資訊
  • 計算統計資料(平均、總和)
  • 組合多個回答成報告

AI 笑話內容產生器

讓我們建立一個實際的範例,使用不同的 AI 模型同時產生笑話,然後選出最佳的

class JokeGeneratorAgent {

    private val agent = AIAgent(
        executor = simpleOpenAIExecutor(ApiKeyManager.openAIApiKey!!),
        systemPrompt = "你是一個專業的笑話產生助手",
        llmModel = OpenAIModels.CostOptimized.GPT4_1Mini,
        strategy = createStrategy()
    )

    private fun createStrategy() = strategy<String, String>("joke-generator") {

        // 定義三個不同風格的笑話產生節點
        val formalJokeNode by node<String, String>("formal-joke") { topic ->
            llm.writeSession {
                updatePrompt {
                    system("你是一位優雅的幽默大師,請產生一個有趣但優雅的笑話")
                    user("主題:$topic")
                }
                val response = requestLLM()
                println("0 幽默大師: ${response.content}")
                response.content
            }
        }

        val casualJokeNode by node<String, String>("casual-joke") { topic ->
            llm.writeSession {
                updatePrompt {
                    system("你是一位輕鬆幽默的喜劇演員,請產生一個輕鬆好笑的笑話")
                    user("主題:$topic")
                }
                val response = requestLLM()
                println("1 喜劇演員: ${response.content}")
                response.content
            }
        }

        val creativeJokeNode by node<String, String>("creative-joke") { topic ->
            llm.writeSession {
                updatePrompt {
                    system("你是一位富有創意的幽默作家,請產生一個創意十足的笑話")
                    user("主題:$topic")
                }
                val response = requestLLM()
                println("2 幽默作家: ${response.content}")
                response.content
            }
        }

        // 使用 Parallel Node 同時產生三種笑話
        val bestJoke by parallel<String, String>(
            formalJokeNode, casualJokeNode, creativeJokeNode
        ) {
            selectByIndex { jokes ->
                // 使用另一個 LLM 評選最佳笑話
                llm.writeSession {
                    updatePrompt {
                        system(
                            """
                            你是一位專業的幽默評審。請從以下三個笑話中選出最好笑的一個。
                            評選標準:
                            1. 幽默程度
                            2. 創意性
                            3. 是否容易理解

                            請只回傳選中笑話的編號:0、1 或 2
                        """.trimIndent()
                        )
                        user("笑話選項:\n${jokes.mapIndexed { index, joke -> "$index: $joke" }.joinToString("\n\n")}")
                    }
                    val response = requestLLM()
                    println("評比 - ${response.content}")
                    response.content.trim().toIntOrNull() ?: 0
                }
            }
        }

        // 建立策略流程
        edge(nodeStart forwardTo bestJoke)
        edge(bestJoke forwardTo nodeFinish)
    }

    suspend fun generateJoke(topic: String): String {
        return agent.run(topic)
    }
}

AI 笑話內容產生器使用範例

/**
 * AI 笑話產生器主程式
 * 展示 Parallel Node 的實際應用
 */
suspend fun main() {
    val jokeGenerator = JokeGeneratorAgent()

    println("🎭 AI 笑話產生器啟動!")

    val topics = listOf("程式設計師", "貓咪", "咖啡", "上班族")

    for (topic in topics) {
        println("\n📝 正在為主題「$topic」產生笑話...")
        val result = jokeGenerator.generateJoke(topic)
        println("🎉 最佳笑話:$result")
        println("-".repeat(50))
    }
}

執行 AI 回應內容

🎭 AI 笑話產生器啟動!

📝 正在為主題「程式設計師」產生笑話...
0 幽默大師: 為什麼程式設計師喜歡在森林裡散步?

因為他們愛尋找樹結構,還能享受「遞迴」的樂趣!
2 幽默作家: 為什麼程式設計師總是帶著一瓶水去上班?

因為他們習慣了「除錯」,需要隨時「喝水」才能保持清醒!
1 喜劇演員: 為什麼程式設計師喜歡待在森林裡?
因為那裡有很多「樹」(Tree)可以一直「debug」!
評比 - 0
🎉 最佳笑話:為什麼程式設計師喜歡在森林裡散步?

因為他們愛尋找樹結構,還能享受「遞迴」的樂趣!
--------------------------------------------------

📝 正在為主題「貓咪」產生笑話...
1 喜劇演員: 為什麼貓咪從來不玩撲克牌?

因為它們怕抓到「貓」牌,會被主人「罵」!😹
0 幽默大師: 為什麼貓咪總是喜歡坐在電腦鍵盤上?
因為牠們想成為最“鍵”的辦公室助理!
2 幽默作家: 為什麼貓咪不喜歡上網?

因為它們怕被「網」住,寧願當個自由的「毛」客!
評比 - 2
🎉 最佳笑話:為什麼貓咪不喜歡上網?

因為它們怕被「網」住,寧願當個自由的「毛」客!
--------------------------------------------------

進階應用場景

內容分析並行處理

同時進行不同類型的內容分析

val textAnalysis by parallel<String, Map<String, Any>>(
    tokenCountNode,    // 計算 token 數量
    sentimentNode,     // 情感分析
    keywordNode        // 關鍵字提取
) {
    fold(mutableMapOf<String, Any>()) { combined, result ->
        combined.putAll(result)
        combined
    }
}

多模型比較測試

比較不同 AI 模型的表現

val modelComparison by parallel<String, String>(
    gpt4Node,      // GPT-4 回答
    claudeNode,    // Claude 回答
    llamaNode      // Llama 回答
) {
    fold("模型比較結果:\n") { report, answer ->
        "$report\n• $answer"
    }
}

A/B 測試場景

同時測試不同的提示詞策略

val promptTesting by parallel<String, String>(
    conservativePromptNode,  // 保守型提示詞
    aggressivePromptNode,    // 積極型提示詞
    balancedPromptNode       // 平衡型提示詞
) {
    selectByMax { response ->
        // 根據回應品質評分(這裡簡化為長度)
        response.split('.').size // 以句子數量作為品質指標
    }
}

效能考量與最佳實踐

何時使用 Parallel Node

適合的場景

  • 需要比較多個解決方案
  • 執行時間較長的 AI 操作
  • 需要備援方案的情況
  • A/B 測試或效能比較

不適合的場景

  • 節點之間有依賴關係
  • 系統資源有限
  • 操作非常快速(並行成本大於收益)

資源管理注意事項

// 良好的實踐:控制並行數量
val reasonableParallel by parallel<String, String>(
    nodeA, nodeB, nodeC // 適量的節點數(建議 2-5 個)
) {
    selectByMax { it.length }
}

// 避免:過多的並行節點
val tooManyParallel by parallel<String, String>(
    node1, node2, node3, node4, node5,
    node6, node7, node8, node9, node10 // 可能造成資源競爭
) {
    selectByMax { it.length }
}

總結

今天我們學習了 Parallel Node 功能

  • 並行執行概念:同時執行多個節點,大幅提升效率
  • 四種策略:selectBy、selectByMax、selectByIndex、fold
  • 實際應用:從簡單的內容比較到複雜的 AI 內容產生
  • 最佳實踐:合理控制並行數量,注意資源管理

Parallel Node 是提升 AI 應用效能的強大工具。透過合理運用並行執行和智慧結果選擇,我們可以建立更快速、更可靠的 AI 系統。下一篇我們將學習 AI 應用的安全性設計,探討如何保護用戶資料和隱私,為 AI 系統建立堅實的安全防護

參考資源


支持創作

如果這篇文章對您有幫助,歡迎透過 贊助連結 支持我持續創作優質內容。您的支持是我前進的動力!


圖片來源:AI 產生