Prompt Engineering 与 Agent 工作流多步骤任务编排的设计模式一、单次对话的天花板当任务复杂到一步搞不定大模型在单轮问答中表现出色但真实业务场景中的任务往往需要多个步骤协作完成。比如帮我调研竞品并生成分析报告这个任务包含信息检索、数据提取、对比分析、报告撰写四个子步骤每个步骤的输入依赖上一步的输出。如果让 LLM 一次性完成结果通常是要么遗漏关键步骤要么中间环节出错导致最终结果不可用。更棘手的是步骤间的状态传递问题。第二步需要第一步提取的结构化数据第三步需要第二步的分析结论——如果步骤之间没有明确的接口约定LLM 的自由文本输出很难被下游步骤可靠解析。这就是 Agent 工作流编排要解决的核心问题如何将复杂任务拆解为可组合、可验证、可回溯的子任务链路。二、Agent 工作流的核心机制从 ReAct 到 DAG 编排当前主流的 Agent 工作流模式有三种ReAct 循环思考-行动-观察、Plan-then-Execute先规划后执行、DAG 编排有向无环图调度。三种模式各有适用场景理解它们的差异是正确选型的前提。flowchart TB subgraph ReAct[ReAct 循环模式] R1[Thought: 分析当前状态] -- R2[Action: 调用工具] R2 -- R3[Observation: 观察结果] R3 -- R1 R3 --|任务完成| R4[Final Answer] end subgraph PlanExec[Plan-then-Execute 模式] P1[Planner: 生成步骤计划] -- P2[Step 1: 执行] P2 -- P3[Step 2: 执行] P3 -- P4[Step N: 执行] P4 -- P5[Validator: 校验结果] P5 --|不通过| P1 P5 --|通过| P6[Output] end subgraph DAG[DAG 编排模式] D1[任务 A] -- D3[任务 C: 依赖 AB] D2[任务 B] -- D3 D1 -- D4[任务 D: 依赖 A] D3 -- D5[任务 E: 依赖 CD] D4 -- D5 end ReAct -.-|适用于: 探索性任务| Choice{选型决策} PlanExec -.-|适用于: 流程确定的任务| Choice DAG -.-|适用于: 并行子任务| ChoiceReAct 模式最灵活Agent 在每一步都重新评估状态并决定下一步行动。但灵活性带来的代价是可控性差——Agent 可能陷入循环、偏离目标、或者做出非预期的工具调用。Plan-then-Execute 模式通过预先规划提高了可控性但计划一旦生成就难以调整遇到执行失败时缺乏弹性。DAG 编排模式将任务建模为有向无环图支持并行执行和依赖管理但需要预先定义完整的任务拓扑对动态场景适应性不足。在实际产品中最有效的策略是混合模式用 Plan-then-Execute 生成初始计划用 DAG 编排管理步骤依赖用 ReAct 处理执行中的异常和动态调整。三、生产级代码实现多步骤任务编排框架3.1 任务定义与 DAG 调度器from dataclasses import dataclass, field from typing import Any, Callable, Dict, List, Optional from enum import Enum import asyncio class StepStatus(Enum): PENDING pending RUNNING running SUCCESS success FAILED failed SKIPPED skipped dataclass class StepResult: 步骤执行结果——步骤间状态传递的标准接口 step_id: str status: StepStatus output: Any None error: Optional[str] None metadata: Dict[str, Any] field(default_factorydict) dataclass class WorkflowStep: 工作流步骤定义 step_id: str name: str handler: Callable depends_on: List[str] field(default_factorylist) retry_count: int 2 timeout_seconds: int 60 # 步骤输入映射从上游步骤的输出中提取所需字段 input_mapping: Dict[str, str] field(default_factorydict) class DAGScheduler: 基于有向无环图的工作流调度器 def __init__(self, steps: List[WorkflowStep]): self.steps {s.step_id: s for s in steps} self.results: Dict[str, StepResult] {} self._validate_dag() def _validate_dag(self): 校验 DAG 合法性无环 依赖存在 visited set() path set() def dfs(step_id: str): if step_id in path: raise ValueError(f检测到循环依赖: {step_id}) if step_id in visited: return if step_id not in self.steps: raise ValueError(f依赖的步骤不存在: {step_id}) path.add(step_id) for dep in self.steps[step_id].depends_on: dfs(dep) path.remove(step_id) visited.add(step_id) for step_id in self.steps: dfs(step_id) def _resolve_inputs(self, step: WorkflowStep) - Dict[str, Any]: 从上游步骤输出中解析当前步骤的输入 inputs {} for target_field, source_ref in step.input_mapping.items(): # source_ref 格式: step_id.field_name source_step_id, field_name source_ref.split(., 1) if source_step_id not in self.results: raise ValueError( f步骤 {step.step_id} 的输入依赖 {source_step_id} 尚未执行 ) source_output self.results[source_step_id].output if isinstance(source_output, dict): inputs[target_field] source_output.get(field_name) else: inputs[target_field] source_output return inputs async def _execute_step(self, step: WorkflowStep) - StepResult: 执行单个步骤支持重试和超时 inputs self._resolve_inputs(step) for attempt in range(step.retry_count 1): try: result await asyncio.wait_for( step.handler(inputs), timeoutstep.timeout_seconds, ) return StepResult( step_idstep.step_id, statusStepStatus.SUCCESS, outputresult, ) except asyncio.TimeoutError: if attempt step.retry_count: return StepResult( step_idstep.step_id, statusStepStatus.FAILED, errorf步骤超时 ({step.timeout_seconds}s), ) except Exception as e: if attempt step.retry_count: return StepResult( step_idstep.step_id, statusStepStatus.FAILED, errorstr(e), ) return StepResult( step_idstep.step_id, statusStepStatus.FAILED, error未知错误, ) async def run(self) - Dict[str, StepResult]: 按 DAG 拓扑顺序执行所有步骤 completed set() while len(completed) len(self.steps): # 找出所有依赖已满足的待执行步骤 ready [ s for s in self.steps.values() if s.step_id not in completed and all(d in completed for d in s.depends_on) ] if not ready: # 没有可执行步骤但未全部完成——存在无法满足的依赖 break # 并行执行所有就绪步骤 tasks [self._execute_step(step) for step in ready] step_results await asyncio.gather(*tasks) for result in step_results: self.results[result.step_id] result completed.add(result.step_id) # 如果步骤失败跳过所有依赖它的后续步骤 if result.status StepStatus.FAILED: self._skip_dependents(result.step_id, completed) return self.results def _skip_dependents(self, failed_step_id: str, completed: set): 级联跳过依赖失败步骤的后续步骤 for step in self.steps.values(): if (failed_step_id in step.depends_on and step.step_id not in completed): self.results[step.step_id] StepResult( step_idstep.step_id, statusStepStatus.SKIPPED, errorf上游步骤 {failed_step_id} 失败, ) completed.add(step.step_id) self._skip_dependents(step.step_id, completed)3.2 LLM 步骤处理器与结构化输出from openai import OpenAI import json class LLMStepHandler: 封装 LLM 调用为可编排的步骤处理器 def __init__(self, api_key: str, model: str gpt-4o-mini): self.client OpenAI(api_keyapi_key) self.model model def create_handler( self, system_prompt: str, output_schema: dict, ) - Callable: 工厂方法根据 prompt 和输出 schema 生成步骤处理函数 async def handler(inputs: dict) - dict: user_content \n.join( f【{k}】\n{v} for k, v in inputs.items() ) response self.client.chat.completions.create( modelself.model, messages[ {role: system, content: system_prompt}, {role: user, content: user_content}, ], response_format{type: json_object}, temperature0.3, ) try: result json.loads(response.choices[0].message.content) # 基础 schema 校验检查必需字段是否存在 missing [ k for k in output_schema.get(required, []) if k not in result ] if missing: return { **result, _warning: f缺少字段: {missing}, } return result except json.JSONDecodeError: raise ValueError(LLM 输出无法解析为 JSON) return handler3.3 组装竞品分析工作流async def run_competitive_analysis(): 竞品分析工作流检索 → 提取 → 对比 → 报告 handler_factory LLMStepHandler(api_keyyour-key) steps [ WorkflowStep( step_idsearch, name竞品信息检索, handlerhandler_factory.create_handler( system_prompt根据产品名称列出主要竞品及其核心功能。返回 JSON。, output_schema{required: [competitors, features]}, ), depends_on[], input_mapping{product_name: initial.product_name}, ), WorkflowStep( step_idextract, name功能维度提取, handlerhandler_factory.create_handler( system_prompt从竞品信息中提取可对比的功能维度如价格、用户量、核心功能差异。返回 JSON。, output_schema{required: [dimensions, matrix]}, ), depends_on[search], input_mapping{competitor_data: search.output}, ), WorkflowStep( step_idcompare, name对比分析, handlerhandler_factory.create_handler( system_prompt基于功能维度矩阵生成竞品对比分析突出优劣势。返回 JSON。, output_schema{required: [analysis, recommendation]}, ), depends_on[extract], input_mapping{dimension_data: extract.output}, ), WorkflowStep( step_idreport, name报告生成, handlerhandler_factory.create_handler( system_prompt将对比分析整理为结构化报告包含摘要、详细对比、建议。返回 JSON。, output_schema{required: [summary, details, suggestion]}, ), depends_on[compare], input_mapping{analysis_data: compare.output}, ), ] scheduler DAGScheduler(steps) results await scheduler.run() return results四、编排灵活性与执行可靠性的权衡4.1 DAG 的静态性与任务的动态性DAG 编排要求预先定义步骤拓扑但真实任务中步骤数量和依赖关系往往是动态的。比如信息检索步骤可能发现需要额外的子检索这在初始 DAG 中无法预定义。解决思路是引入动态子图——允许步骤在执行时生成子 DAG 并注入调度器但这会显著增加调试难度和状态管理复杂度。4.2 LLM 输出的不确定性 vs. 步骤接口的确定性步骤间的输入映射依赖上游输出的字段名和结构但 LLM 的 JSON 输出并不完全可靠。即使使用response_format: json_object字段名仍可能偏离预期如返回name而非product_name。工程上的缓解方案是在input_mapping中增加模糊匹配和字段别名映射但这本质上是在用工程手段弥补 LLM 输出的不确定性维护成本随步骤数量增长。4.3 重试策略的副作用自动重试能提高成功率但 LLM 调用的失败原因可能是 prompt 本身的问题——重试只会浪费 Token 和时间。更合理的策略是区分可重试错误网络超时、限流和不可重试错误schema 不匹配、内容违规只对前者自动重试后者直接失败并附带错误详情由上层决策是否调整 prompt 后重试。4.4 并行执行的 Token 成本DAG 中无依赖关系的步骤可以并行执行缩短总耗时。但并行意味着同一时刻发起多个 LLM 调用Token 消耗峰值可能远超串行模式。在 API 限流场景下并行请求还可能触发速率限制导致全部失败。生产环境中需要引入并发控制如信号量限制最大并行数在速度和成本之间取得平衡。五、总结多步骤任务编排是 Agent 从对话工具走向生产力工具的关键能力。DAG 调度器提供了可靠的步骤依赖管理和并行执行能力LLMStepHandler 将大模型调用封装为可组合的标准步骤input_mapping 机制解决了步骤间的状态传递问题。但编排框架只是骨架真正的挑战在于 LLM 输出的不确定性如何与步骤接口的确定性要求共存。当前最务实的策略是严格约束 LLM 输出格式JSON Schema 校验、区分可重试与不可重试错误、控制并行度以平衡成本与速度。落地建议从 Plan-then-Execute 模式起步验证任务拆解的合理性逐步引入 DAG 编排提升执行效率仅在需要动态调整的场景下使用 ReAct 模式。不要一上来就追求最灵活的方案——灵活性越高调试成本越大。