微调数据对齐难题用 Agent 拓扑模式编排数据流水线前言本文们团队做微调数据标注数据来源多、标准不一、反复修改。之前的流程是靠人肉协调效率低下。后来本文用 Agent 拓扑设计模式重构了数据对齐流程每个环节一个 Agent自由编排。效果不错。今天聊聊。一、底层原理1.1 Agent 拓扑在数据对齐中的应用数据对齐本质是一个多步骤流水线graph TD A[原始数据] -- B[解析 Agent] B -- C[清洗 Agent] C -- D[标注 Agent] D -- E[校验 Agent] E -- F{校验通过} F --|是| G[对齐 Agent] F --|否| H[修正 Agent] H -- D G -- I[输出] J[Agent 拓扑] -- K[灵活编排] K -- L[可扩展] K -- M[可复用]优势每个 Agent 只做一件事拓扑可调整可插拔1.2 方案对比方案可维护性可扩展性复杂度硬编码低低低脚本编排中中中Agent 拓扑高高中二、快速上手2.1 定义 Agent 拓扑from typing import Callable, Dict, Any, Optional class AgentNode: def __init__(self, name: str, process: Callable): self.name name self.process process self.next_nodes {} def add_next(self, condition: str, node: AgentNode): self.next_nodes[condition] node class AgentTopology: def __init__(self): self.nodes {} def add_node(self, node: AgentNode): self.nodes[node.name] node def execute(self, start: str, data: Any) - Any: current start while current: node self.nodes[current] result node.process(data) if node.next_nodes: condition result.get(status, default) next_name node.next_nodes.get(condition) if next_name: current next_name data result else: current None else: current None if not node.next_nodes: return result return result三、核心 API / 深水区3.1 拓扑组件速查组件职责示例解析 Agent格式转换JSON/CSV 解析清洗 Agent去噪去重、过滤标注 Agent分类情感标注校验 Agent质检一致性检查对齐 Agent标准统一标签映射3.2 带状态的数据对齐class DataPipelineContext: def __init__(self): self.data None self.metadata {} self.errors [] def set(self, key, value): self.metadata[key] value def add_error(self, error): self.errors.append(error) class ContextAwareNode: def process(self, ctx: DataPipelineContext) - str: # 处理并返回下一个节点的条件 return pass # 或 fail四、实战演练完整的数据对齐拓扑from typing import Dict, Any, List from dataclasses import dataclass dataclass class DataRecord: text: str original_label: str source: str class ParserAgent: def process(self, raw: Dict) - DataRecord: return DataRecord( textraw.get(text, ), original_labelraw.get(label, ), sourceraw.get(source, unknown) ) class CleanerAgent: def __init__(self): self.noise_words [嗯, 啊, 这个] def process(self, record: DataRecord) - DataRecord: for w in self.noise_words: record.text record.text.replace(w, ) return record class LabelMapperAgent: def __init__(self): self.label_map { 正向: positive, 好评: positive, 负向: negative, 差评: negative, } def process(self, record: DataRecord) - Dict: mapped_label self.label_map.get( record.original_label, unknown ) return { text: record.text, label: mapped_label, source: record.source, status: pass if mapped_label ! unknown else fail } class ValidatorAgent: def process(self, data: Dict) - Dict: errors [] if not data.get(text): errors.append(文本为空) if not data.get(label): errors.append(标签为空) if errors: data[status] fail data[errors] errors else: data[status] pass return data class TopologyOrchestrator: def __init__(self, llm): self.llm llm self.parser ParserAgent() self.cleaner CleanerAgent() self.mapper LabelMapperAgent() self.validator ValidatorAgent() def process(self, raw_data: Dict) - Dict: record self.parser.process(raw_data) record self.cleaner.process(record) mapped self.mapper.process(record) result self.validator.process(mapped) return result orchestrator TopologyOrchestrator(llm) cases [ {text: 这个产品很好用, label: 好评, source: 电商}, {text: 服务很差, label: 负向, source: 客服}, ] for case in cases: result orchestrator.process(case) print(f对齐结果: {result})五、避坑指南与最佳实践 **技巧每个 Agent 单一职责不要一个 Agent 做太多事难调试。⚠️ **警告注意拓扑环路校验失败再修正最多 3 次防止死循环。✅ **推荐加日志追踪每个 Agent 的处理记录下来方便排查。六、综合实战演示生产级数据对齐流水线from typing import Dict, List, Any, Optional from dataclasses import dataclass import json import time dataclass class ProcessStep: agent: str input_data: Any output_data: Any duration: float class ProductionAlignmentPipeline: def __init__(self, llm): self.llm llm self.steps: List[ProcessStep] [] self.max_retries 3 def run(self, raw: Dict) - Dict: data raw self.steps [] for retry in range(self.max_retries): # Step 1: 解析 data self._step(parser, data, self._parse) if data.get(status) fail: continue # Step 2: 清洗 data self._step(cleaner, data, self._clean) # Step 3: 标注 data self._step(labeler, data, self._label) # Step 4: 校验 data self._step(validator, data, self._validate) if data.get(status) pass: return data data[status] failed_after_retry return data def _step(self, name: str, data: Dict, func) - Dict: start time.time() result func(data) duration time.time() - start self.steps.append(ProcessStep( agentname, input_datadata, output_dataresult, durationduration )) return result def _parse(self, data: Dict) - Dict: return { text: data.get(text, ), label: data.get(label, ), source: data.get(source, unknown), status: pass } def _clean(self, data: Dict) - Dict: noise [嗯, 啊, 的, 了] text data.get(text, ) for w in noise: text text.replace(w, ) data[text] text return data def _label(self, data: Dict) - Dict: label_map {正向: positive, 负向: negative, 中性: neutral} original data.get(label, ) data[label] label_map.get(original, original) return data def _validate(self, data: Dict) - Dict: if not data.get(text): data[status] fail data[error] 文本为空 elif not data.get(label): data[status] fail data[error] 标签为空 else: data[status] pass return data def get_report(self) - str: return json.dumps([{ agent: s.agent, duration_ms: s.duration * 1000 } for s in self.steps], ensure_asciiFalse, indent2) pipeline ProductionAlignmentPipeline(llm) result pipeline.run({text: 这个产品真的很好用, label: 正向}) print(result) print(pipeline.get_report())七、总结Agent 拓扑设计模式做微调数据对齐每个环节独立 Agent拓扑灵活编排校验 重试日志追踪这样搞数据对齐流程就清晰可控了。