TCC结合Saga保障任务流水线一致性,AI智能体任务实战教程

张开发
2026/4/9 20:48:41 15 分钟阅读

分享文章

TCC结合Saga保障任务流水线一致性,AI智能体任务实战教程
Agent任务流水线跨服务状态一致性与幂等性保证机制在高并发分布式Agent系统中任务流水线涉及多个服务的协同处理状态一致性确保数据在分布式环境下保持正确而幂等性保证重复请求不会产生副作用。这两者是构建可靠分布式系统的基石。一、跨服务状态一致性保证机制1.1 分布式事务管理策略Agent任务流水线通常采用最终一致性而非强一致性以平衡性能与正确性。以下是四种主流实现方案方案原理适用场景优缺点TCC模式Try-Confirm-Cancel三阶段对一致性要求高的金融、电商场景业务侵入性强性能较好需实现幂等性Saga模式长事务拆分为本地事务补偿操作长流程业务如订单处理、客服工单实现简单但补偿逻辑复杂本地消息表本地事务异步消息重试大多数业务场景特别是写操作为主的系统实现简单保证最终一致性2PC/3PC协调者统一管理提交/回滚传统企业系统强一致性但存在阻塞问题1.2 TCC模式在Agent流水线的具体实现TCCTry-Confirm-Cancel模式通过业务层面的补偿机制保证一致性特别适合Agent流水线中的资源预留场景。# TCC模式在客服Agent流水线中的实现 import uuid from datetime import datetime from enum import Enum from typing import Dict, Any, Optional from dataclasses import dataclass import redis import json class TCCState(Enum): TCC事务状态 INITIAL initial TRYING trying CONFIRMED confirmed CANCELLED cancelled FAILED failed dataclass class TCCTransaction: TCC事务记录 transaction_id: str business_id: str # 业务ID如session_id state: TCCState created_at: datetime updated_at: datetime participants: Dict[str, Dict] # 参与者状态 context: Dict[str, Any] # 事务上下文 class TCCCoordinator: TCC协调器 def __init__(self, redis_client): self.redis redis_client self.transaction_timeout 300 # 5分钟超时 def begin_transaction(self, business_id: str, context: Dict) - str: 开始TCC事务 transaction_id ftcc_{uuid.uuid4()} transaction TCCTransaction( transaction_idtransaction_id, business_idbusiness_id, stateTCCState.INITIAL, created_atdatetime.utcnow(), updated_atdatetime.utcnow(), participants{}, contextcontext ) # 保存事务状态到Redis self._save_transaction(transaction) # 设置超时监控 self._setup_timeout_monitor(transaction_id) return transaction_id def try_phase(self, transaction_id: str, participant: str, try_func, *args, **kwargs) - bool: Try阶段预留资源 # 获取事务 transaction self._get_transaction(transaction_id) if not transaction: return False # 检查事务状态 if transaction.state ! TCCState.INITIAL: raise Exception(fTransaction {transaction_id} is in {transaction.state} state) try: # 执行Try操作 result try_func(*args, **kwargs) # 记录参与者状态 transaction.participants[participant] { state: tried, try_result: result, tried_at: datetime.utcnow().isoformat() } transaction.state TCCState.TRYING transaction.updated_at datetime.utcnow() # 保存状态 self._save_transaction(transaction) return True except Exception as e: # Try失败触发Cancel self._cancel_participant(transaction_id, participant, e) return False def confirm_phase(self, transaction_id: str) - bool: Confirm阶段确认执行业务 transaction self._get_transaction(transaction_id) if not transaction: return False # 检查是否所有参与者都Try成功 all_tried all( info[state] tried for info in transaction.participants.values() ) if not all_tried: # 有参与者Try失败触发Cancel self.cancel_phase(transaction_id) return False # 执行所有参与者的Confirm操作 confirm_failures [] for participant, info in transaction.participants.items(): try: # 这里应该调用参与者的Confirm接口 # confirm_func get_confirm_func(participant) # confirm_func(info[try_result]) info[state] confirmed info[confirmed_at] datetime.utcnow().isoformat() except Exception as e: confirm_failures.append((participant, str(e))) if confirm_failures: # Confirm失败需要人工干预 transaction.state TCCState.FAILED transaction.context[confirm_failures] confirm_failures else: transaction.state TCCState.CONFIRMED transaction.updated_at datetime.utcnow() self._save_transaction(transaction) return len(confirm_failures) 0 def cancel_phase(self, transaction_id: str) - bool: Cancel阶段取消预留资源 transaction self._get_transaction(transaction_id) if not transaction: return False # 执行所有参与者的Cancel操作 cancel_failures [] for participant, info in transaction.participants.items(): try: # 这里应该调用参与者的Cancel接口 # cancel_func get_cancel_func(participant) # cancel_func(info.get(try_result)) info[state] cancelled info[cancelled_at] datetime.utcnow().isoformat() except Exception as e: cancel_failures.append((participant, str(e))) if cancel_failures: transaction.state TCCState.FAILED transaction.context[cancel_failures] cancel_failures else: transaction.state TCCState.CANCELLED transaction.updated_at datetime.utcnow() self._save_transaction(transaction) return len(cancel_failures) 0 def _cancel_participant(self, transaction_id: str, participant: str, error: Exception): 取消单个参与者 transaction self._get_transaction(transaction_id) if not transaction: return if participant in transaction.participants: # 执行Cancel操作 # cancel_func get_cancel_func(participant) # cancel_func(transaction.participants[participant].get(try_result)) transaction.participants[participant][state] cancelled transaction.participants[participant][error] str(error) transaction.participants[participant][cancelled_at] datetime.utcnow().isoformat() self._save_transaction(transaction) def _save_transaction(self, transaction: TCCTransaction): 保存事务状态 key ftcc_transaction:{transaction.transaction_id} value { transaction_id: transaction.transaction_id, business_id: transaction.business_id, state: transaction.state.value, created_at: transaction.created_at.isoformat(), updated_at: transaction.updated_at.isoformat(), participants: transaction.participants, context: transaction.context } self.redis.setex(key, self.transaction_timeout, json.dumps(value)) def _get_transaction(self, transaction_id: str) - Optional[TCCTransaction]: 获取事务状态 key ftcc_transaction:{transaction_id} data self.redis.get(key) if not data: return None data_dict json.loads(data) return TCCTransaction( transaction_iddata_dict[transaction_id], business_iddata_dict[business_id], stateTCCState(data_dict[state]), created_atdatetime.fromisoformat(data_dict[created_at]), updated_atdatetime.fromisoformat(data_dict[updated_at]), participantsdata_dict[participants], contextdata_dict[context] ) def _setup_timeout_monitor(self, transaction_id: str): 设置事务超时监控 # 使用Redis的过期机制 # 当key过期时触发回调进行Cancel操作 pass # Agent流水线中的TCC应用示例 class AgentPipelineWithTCC: 使用TCC保证状态一致性的Agent流水线 def __init__(self): self.redis redis.Redis(hostlocalhost, port6379, db0) self.tcc_coordinator TCCCoordinator(self.redis) # 定义流水线步骤 self.pipeline_steps [ intent_recognition, knowledge_retrieval, sentiment_analysis, response_generation ] def process_request(self, session_id: str, user_input: str) - Dict: 处理用户请求使用TCC保证一致性 # 开始TCC事务 context { session_id: session_id, user_input: user_input, start_time: datetime.utcnow().isoformat() } transaction_id self.tcc_coordinator.begin_transaction(session_id, context) try: # Try阶段依次执行流水线步骤 for step in self.pipeline_steps: success self.tcc_coordinator.try_phase( transaction_idtransaction_id, participantstep, try_funcself._try_step, step_namestep, session_idsession_id, user_inputuser_input ) if not success: # 任一步骤Try失败整个事务回滚 self.tcc_coordinator.cancel_phase(transaction_id) return {status: failed, error: f{step} try failed} # Confirm阶段确认所有步骤 confirm_success self.tcc_coordinator.confirm_phase(transaction_id) if confirm_success: return { status: success, transaction_id: transaction_id, result: self._get_final_result(session_id) } else: return { status: failed, transaction_id: transaction_id, error: confirm failed, requires manual intervention } except Exception as e: # 异常情况执行Cancel self.tcc_coordinator.cancel_phase(transaction_id) return {status: error, error: str(e)} def _try_step(self, step_name: str, session_id: str, user_input: str) - Dict: Try阶段的具体实现 # 预留资源或执行预处理 step_handlers { intent_recognition: self._try_intent_recognition, knowledge_retrieval: self._try_knowledge_retrieval, sentiment_analysis: self._try_sentiment_analysis, response_generation: self._try_response_generation } handler step_handlers.get(step_name) if handler: return handler(session_id, user_input) else: raise ValueError(fUnknown step: {step_name}) def _try_intent_recognition(self, session_id: str, user_input: str) - Dict: 意图识别Try阶段 # 1. 检查资源如模型是否可用 # 2. 预留计算资源 # 3. 执行轻量级预处理 return { step: intent_recognition, status: tried, resource_reserved: True, preprocess_result: { tokens: len(user_input.split()), language: zh-CN, requires_deep_analysis: len(user_input) 20 } } def _try_knowledge_retrieval(self, session_id: str, user_input: str) - Dict: 知识检索Try阶段 # 1. 检查知识库连接 # 2. 预留查询配额 # 3. 构建查询索引 return { step: knowledge_retrieval, status: tried, index_built: True, quota_reserved: 5, # 预留5次查询 cache_prepared: True } # ... 其他步骤的Try实现 def _get_final_result(self, session_id: str) - Dict: 获取最终结果 # 从各步骤收集结果 return { response: 处理完成, session_id: session_id, completed_at: datetime.utcnow().isoformat() }1.3 基于Saga模式的最终一致性方案对于长流程的Agent任务流水线Saga模式通过补偿操作保证最终一致性更适合客服场景中的复杂业务流程。# Saga模式实现Agent任务流水线 from typing import List, Callable, Dict, Any, Optional import asyncio from dataclasses import dataclass from enum import Enum import json class SagaStepStatus(Enum): PENDING pending EXECUTING executing COMPLETED completed FAILED failed COMPENSATED compensated dataclass class SagaStep: Saga步骤定义 name: str execute_func: Callable # 执行函数 compensate_func: Callable # 补偿函数 timeout: int 30 # 超时时间秒 retry_count: int 3 # 重试次数 class SagaOrchestrator: Saga编排器 def __init__(self): self.steps: List[SagaStep] [] self.execution_log [] # 执行日志 self.state_store {} # 状态存储 def add_step(self, step: SagaStep): 添加Saga步骤 self.steps.append(step) async def execute(self, context: Dict) - Dict: 执行Saga事务 execution_id fsaga_{uuid.uuid4()} # 初始化执行上下文 execution_context { execution_id: execution_id, context: context, start_time: datetime.utcnow().isoformat(), steps: [] } completed_steps [] failed_step None try: # 顺序执行所有步骤 for i, step in enumerate(self.steps): step_context { step_name: step.name, step_index: i, start_time: datetime.utcnow().isoformat(), status: SagaStepStatus.EXECUTING.value } try: # 执行步骤 result await self._execute_step_with_retry( step, context, execution_id ) step_context.update({ status: SagaStepStatus.COMPLETED.value, end_time: datetime.utcnow().isoformat(), result: result }) completed_steps.append({ step: step.name, result: result }) # 更新上下文 context.update(result.get(context_updates, {})) except Exception as e: # 步骤执行失败 step_context.update({ status: SagaStepStatus.FAILED.value, end_time: datetime.utcnow().isoformat(), error: str(e) }) failed_step (i, step, e) break if failed_step: # 执行补偿操作 await self._compensate(completed_steps, context, execution_id) execution_context.update({ status: failed, failed_step: failed_step[0], error: str(failed_step[2]), compensated: True }) else: # 所有步骤成功 execution_context.update({ status: completed, end_time: datetime.utcnow().isoformat(), results: completed_steps }) except Exception as e: # 编排器本身异常 execution_context.update({ status: error, error: str(e) }) # 记录执行日志 self.execution_log.append(execution_context) return execution_context async def _execute_step_with_retry(self, step: SagaStep, context: Dict, execution_id: str) - Dict: 带重试的步骤执行 last_exception None for attempt in range(step.retry_count): try: # 设置超时 result await asyncio.wait_for( step.execute_func(context, execution_id), timeoutstep.timeout ) return result except asyncio.TimeoutError: last_exception TimeoutError(fStep {step.name} timeout on attempt {attempt 1}) except Exception as e: last_exception e # 等待后重试 if attempt step.retry_count - 1: await asyncio.sleep(2 ** attempt) ---- ## 参考来源 - [分布式Agent系统如何保证一致性](https://cloud.tencent.com/developer/techpedia/2493/19525) - [分布式系统中跨服务事务一致性的实现与解决方案在分布式系统中跨服务的事务一致性是系统设计的核心挑战之一。由于服务拆分后数](https://juejin.cn/post/7558284340574240810) - [问智能体如何实现跨系统的事务一致性](https://cloud.tencent.com/developer/ask/2186555)

更多文章