多 Agent 协作与编排框架AI Agent 的群体智能从单兵作战到团队协作一、单 Agent 的能力瓶颈复杂任务的分解与协调难题单个 AI Agent 在简单任务上表现良好但面对复杂多步骤任务时暴露出两个问题一是上下文窗口限制——长链推理容易丢失关键信息二是能力单一——一个 Agent 难以同时擅长代码生成、数据分析、文档撰写等不同领域。多 Agent 协作的核心思路是分而治之——将复杂任务拆解为子任务分配给专门的 Agent 执行通过编排框架协调 Agent 之间的信息传递和执行顺序。这类似于人类团队中不同角色分工协作的模式。二、多 Agent 协作的架构模式flowchart TB A[用户任务] -- B[编排器 Orchestrator] B -- C{任务分解} C -- D[Researcher: 信息收集] C -- E[Coder: 代码实现] C -- F[Reviewer: 质量审查] D -- G[信息汇总] E -- H[代码产出] G -- I[结果整合] H -- I F -- J{审查通过?} J --|是| K[最终输出] J --|否| L[反馈修正] L -- E subgraph Agent 角色 D E F end subgraph 通信机制 M[共享记忆: 黑板模式] N[消息传递: 事件驱动] end D -- M E -- M F -- N两种主流协作模式顺序编排Pipeline——Agent 按固定顺序依次执行动态编排Orchestrator——编排器根据中间结果动态决定下一步执行哪个 Agent。三、生产级实现多 Agent 编排框架# multi_agent_framework.py — 多 Agent 协作框架 # 设计意图支持顺序和动态编排的多 Agent 协作系统 import asyncio import json from dataclasses import dataclass, field from typing import List, Dict, Optional, Callable from enum import Enum class AgentRole(Enum): RESEARCHER researcher CODER coder REVIEWER reviewer ANALYZER analyzer WRITER writer dataclass class Message: sender: str receiver: str content: str message_type: str # task, result, feedback, control dataclass class Task: task_id: str description: str assigned_agent: Optional[str] None status: str pending # pending, in_progress, completed, failed result: Optional[str] None dependencies: List[str] field(default_factorylist) class Agent: 基础 Agent 类 def __init__(self, name: str, role: AgentRole, system_prompt: str, llm_client): self.name name self.role role self.system_prompt system_prompt self.llm llm_client self.shared_memory: Dict {} async def execute(self, task: Task, context: str ) - str: 执行任务 prompt f{self.system_prompt} ## 当前任务 {task.description} ## 上下文信息 {context} 请完成上述任务输出结果。 response await self.llm.generate(prompt) return response class Orchestrator: 多 Agent 编排器 def __init__(self, llm_client): self.llm llm_client self.agents: Dict[str, Agent] {} self.shared_memory: Dict {} self.message_history: List[Message] [] def register_agent(self, agent: Agent): 注册 Agent self.agents[agent.name] agent async def run(self, task_description: str) - dict: 执行多 Agent 协作任务 # Step 1: 任务分解 subtasks await self._decompose_task(task_description) # Step 2: 动态编排执行 results {} max_iterations 10 iteration 0 while iteration max_iterations: # 找到所有可执行的子任务依赖已满足 executable self._get_executable_subtasks(subtasks, results) if not executable: break # 所有任务完成或无法继续 # 并行执行可执行的子任务 tasks [] for subtask in executable: agent_name subtask.assigned_agent if agent_name and agent_name in self.agents: agent self.agents[agent_name] context self._build_context(subtask, results) tasks.append(self._execute_subtask(agent, subtask, context)) task_results await asyncio.gather(*tasks, return_exceptionsTrue) for subtask, result in zip(executable, task_results): if isinstance(result, Exception): subtask.status failed results[subtask.task_id] f执行失败: {str(result)} else: subtask.status completed results[subtask.task_id] result iteration 1 # Step 3: 结果整合 final_result await self._synthesize_results(task_description, results) return { answer: final_result, subtask_results: results, iterations: iteration, } async def _decompose_task(self, task_description: str) - List[Task]: 任务分解 agent_list \n.join( f- {name}: {agent.role.value} for name, agent in self.agents.items() ) prompt f将以下任务分解为子任务并分配给合适的 Agent。 可用 Agent {agent_list} 原始任务{task_description} 输出 JSON 格式 {{ subtasks: [ {{ task_id: t1, description: 子任务描述, assigned_agent: agent_name, dependencies: [] }} ] }} response await self.llm.generate(prompt) try: parsed json.loads(response) return [ Task( task_idst[task_id], descriptionst[description], assigned_agentst.get(assigned_agent), dependenciesst.get(dependencies, []), ) for st in parsed.get(subtasks, []) ] except json.JSONDecodeError: # 解析失败时创建单任务 return [Task(task_idt1, descriptiontask_description)] def _get_executable_subtasks(self, subtasks: List[Task], results: Dict) - List[Task]: 获取可执行的子任务 executable [] for subtask in subtasks: if subtask.status ! pending: continue # 检查依赖是否已完成 deps_met all( results.get(dep) is not None for dep in subtask.dependencies ) if deps_met: executable.append(subtask) return executable def _build_context(self, subtask: Task, results: Dict) - str: 构建子任务的上下文 context_parts [] for dep_id in subtask.dependencies: if dep_id in results: context_parts.append(f[{dep_id} 的结果]\n{results[dep_id]}) return \n\n.join(context_parts) async def _execute_subtask(self, agent: Agent, subtask: Task, context: str) - str: 执行单个子任务 subtask.status in_progress result await agent.execute(subtask, context) # 记录消息 self.message_history.append(Message( senderagent.name, receiverorchestrator, contentresult, message_typeresult, )) return result async def _synthesize_results(self, original_task: str, results: Dict) - str: 整合所有子任务结果 results_text \n\n.join( f[{task_id}]\n{result} for task_id, result in results.items() ) prompt f基于以下子任务执行结果生成对原始任务的完整回答。 原始任务{original_task} 子任务结果 {results_text} 请整合以上结果生成连贯、完整的最终回答。 return await self.llm.generate(prompt)四、Trade-offs多 Agent 协作的效率与成本权衡Token 消耗的倍增效应。每个 Agent 的执行都需要调用 LLM任务分解和结果整合也需要额外调用。一个 3 Agent 协作任务的 Token 消耗约为单 Agent 的 5—8 倍。建议对简单任务使用单 Agent仅在任务复杂度确实需要分工时使用多 Agent。编排器的可靠性。任务分解的质量直接影响协作效果——如果分解不合理如子任务粒度过细或分配错误多 Agent 可能比单 Agent 效果更差。建议对编排器的分解结果做人工审核或使用固定模板处理常见任务类型。Agent 间的信息丢失。Agent 之间通过共享记忆或消息传递交换信息但 LLM 的上下文窗口限制可能导致信息压缩和丢失。建议使用结构化的信息格式如 JSON而非自由文本传递中间结果。调试的复杂性。多 Agent 系统的执行路径不确定同一个任务可能产生不同的分解和执行顺序。建议记录完整的消息历史和决策日志便于事后分析和调试。五、总结多 Agent 协作是解决复杂任务的有效范式通过角色分工和编排协调实现群体智能。落地路径第一步定义 Agent 角色和系统提示确保每个 Agent 有明确的职责边界第二步实现顺序编排模式处理可预见的固定流程任务第三步引入动态编排器根据中间结果灵活调整执行路径第四步建立消息追踪和调试机制确保协作过程可观测。核心原则多 Agent 不是越多越好——Agent 数量应与任务复杂度匹配简单任务用单 Agent 更高效。