淄博市网站建设_网站建设公司_留言板_seo优化
2026/1/21 12:16:26 网站建设 项目流程

AIGC 异步回调系统实现


目录

  • 一、系统概述
  • 二、核心文件说明
  • 三、数据流详解
  • 四、配置说明
  • 五、开发指南
  • 六、运维指南

一、系统概述

1.1 背景与目标

本系统实现了一套通用的异步回调架构,用于处理 AIGC 服务(视频生成、图片生成等)的长耗时任务。

核心目标

  • 快速响应:接收请求后立即返回 task_id,不阻塞
  • 状态管理:通过数据库追踪任务状态
  • 回调通知:完成后自动回调后端
  • 统一管理:一套架构支持多种业务类型
  • 可观测性:完整的时间戳链路追踪

1.2 两种回调地址(核心概念)

回调地址说明示例配置方式
algorithm_callback_urlAIGC 供应商回调我们的地址https://algorithm.com/api/callback方式1:提交时发送
方式2:供应商平台配置
backend_callback_url我们完成后端通知的地址https://backend.com/api/notify后端通过请求参数传递

关键理解

后端 → 算法端 → AIGC供应商 ↓ ↓ ↓ 发起 提交任务 异步处理 任务 +回调地址 (1-5分钟) ↓ ↓ ↓ ←─────────收到回调─┘ ↓ 下载+上传OSS ↓ ──────→通知后端

二、核心文件说明

2.1 典型项目结构

异步回调系统通常采用分层架构,典型结构如下:

project/ ├── api/ # API 层 - HTTP 接口定义 │ ├── routes/ # 路由模块 │ │ └── callback.py # 回调统一入口 ⭐⭐⭐ │ └── schemas/ # 请求/响应模型 │ ├── core/ # 核心业务层 │ ├── dispatcher.py # 处理器分发器 ⭐⭐⭐ │ ├── handlers/ # 回调处理器 │ │ ├── base.py # 处理器基类 ⭐⭐⭐ │ │ └── video.py # 视频处理器示例 │ └── services/ # 业务逻辑 │ ├── models/ # 数据层 │ ├── task.py # 任务模型 ⭐⭐⭐ │ └── database.py # 数据库操作 ⭐⭐⭐ │ ├── integrations/ # 外部服务集成 │ ├── factory.py # 模型 工厂 │ └── providers/ # 各厂商 模型封装对象 │ ├── config/ # 配置 │ └── settings.py # 配置管理 ⭐⭐ │ └── utils/ # 工具模块 ├── storage.py # 存储/OSS 工具 └── logger/ # 日志系统

架构分层

┌─────────────────────────────────────────────┐ │ API 层 接收请求、参数验证 │ ├─────────────────────────────────────────────┤ │ 业务层 回调处理、业务编排 │ ├─────────────────────────────────────────────┤ │ 数据层 模型定义、数据持久化 │ ├─────────────────────────────────────────────┤ │ 集成层 外部 API 调用 │ ├─────────────────────────────────────────────┤ │ 工具层 通用工具函数 │ └─────────────────────────────────────────────┘

2.2 核心文件详解

2.2.1 API 层
回调统一入口 ⭐⭐⭐

职责:统一回调入口,接收所有供应商的回调请求

核心功能

  1. 解析回调数据,提取 TaskId(支持多种格式)
  2. 根据 TaskId 查询数据库,获取任务信息
  3. 根据 business_type 路由到对应处理器
  4. 更新回调接收时间戳

核心接口

@router.post("/callback")asyncdefhandle_callback(request:Request)->JSONResponse

支持的回调格式

  • 供应商 A:Event.TaskId(事件嵌套格式)
  • 供应商 A:TaskId(扁平格式)
  • 供应商 B:req_id
  • 通用:task_id

关键代码片段

# 提取 TaskId(支持多种格式)model_task_id=(data.get("TaskId")ordata.get("task_id")ordata.get("req_id")ordata.get("Event",{}).get("TaskId"))# 查询数据库db=get_db()task=awaitdb.get_by_model_task_id(model_task_id)# 路由到处理器handler=Dispatcher.get_handler(task.business_type)awaithandler.process_callback(task,data)

依赖关系

callback.py → models/database.py (数据库查询) → core/dispatcher.py (处理器路由) → core/handlers/*.py (具体处理器)

业务接口路由

职责:业务接口的路由定义

核心接口

  • POST /api/v1/video/async- 视频异步生成
  • GET /api/v1/video/status/{task_id}- 查询任务状态

职责范围

  • 接收 HTTP 请求
  • 参数验证(Pydantic Schema)
  • 调用业务层处理
  • 返回统一格式响应

2.2.2 业务层
处理器分发器 ⭐⭐⭐

职责:处理器分发器,根据 business_type 路由到对应处理器

核心功能

  1. 维护处理器注册表
  2. 提供register()方法注册处理器
  3. 提供get_handler()方法获取处理器实例

设计模式:策略模式 + 工厂模式

关键代码

classCallbackDispatcher:"""回调处理器分发器"""# 处理器注册表_handlers:dict[str,Type]={}@classmethoddefregister(cls,business_type:str,handler_class:Type):"""注册处理器"""cls._handlers[business_type]=handler_class logger.info(f"注册处理器:{business_type}->{handler_class.__name__}")@classmethoddefget_handler(cls,business_type:str):"""获取对应的处理器实例"""handler_class=cls._handlers.get(business_type)ifnothandler_class:raiseValueError(f"未知的业务类型:{business_type}")returnhandler_class(business_type)

已注册的处理器示例

  • video_generation→ VideoGenerationCallbackHandler
  • image_generation→ ImageGenerationCallbackHandler
  • (可扩展)

处理器基类 ⭐⭐⭐

职责:回调处理器基类,定义标准处理流程

核心功能

  1. 定义process_callback()抽象方法(子类必须实现)
  2. 提供通用的 OSS 上传方法download_and_upload_storage()
  3. 提供通用的后端通知方法check_and_notify()
  4. 提供通用的 URL 提取方法extract_result_url()

标准处理流程

asyncdefprocess_callback(task,callback_data):# 1. 提取结果 URLresult_url=self.extract_result_url(callback_data)# 2. 下载并上传存储storage_url=awaitself.download_and_upload_storage(result_url,...)# 3. 更新数据库状态awaitdb.update_storage_uploaded(task.model_task_id,storage_url)# 4. 检查是否所有子任务完成awaitself.check_and_notify(task.task_id)

关键方法

# 抽象方法(子类必须实现)@abstractmethodasyncdefprocess_callback(self,task:Task,callback_data:dict)->None:"""处理回调的核心逻辑"""pass# 通用方法asyncdefdownload_and_upload_storage(self,url:str,task_id:str,...)->str:"""从URL流式上传到存储(不落地)"""asyncdefcheck_and_notify(self,task_id:str,results:list=None)->None:"""检查是否所有子任务完成,通知后端"""defextract_result_url(self,callback_data:dict)->str:"""从回调数据提取结果URL(支持多种格式)"""

继承关系

BaseCallbackHandler (抽象基类) ├── VideoGenerationCallbackHandler ├── ImageGenerationCallbackHandler └── ... (其他业务处理器)

处理器自动注册 ⭐⭐

职责:自动注册所有回调处理器

实现方式:模块导入时自动注册

# core/handlers/__init__.pyfrom.video_handlerimportVideoGenerationCallbackHandlerfrom.image_handlerimportImageGenerationCallbackHandler# 导入时自动注册到 Dispatcherfromcore.dispatcherimportCallbackDispatcher# 注册处理器CallbackDispatcher.register("video_generation",VideoGenerationCallbackHandler)CallbackDispatcher.register("image_generation",ImageGenerationCallbackHandler)

注意

  • ⚠️ 新增处理器后,必须在此文件中导入和注册
  • ⚠️ business_type 必须唯一,否则会覆盖

具体业务处理器示例 ⭐⭐

职责:视频生成回调处理器(示例实现)

核心功能

  1. 实现process_callback()方法
  2. 处理视频生成回调
  3. 调用基类的通用方法

关键代码

classVideoGenerationCallbackHandler(BaseCallbackHandler):"""视频生成回调处理器"""asyncdefprocess_callback(self,task:Task,callback_data:Dict[str,Any])->None:# 1. 提取视频URLvideo_url=self.extract_result_url(callback_data)# 2. 流式上传到存储storage_url=awaitself.download_and_upload_storage(url=video_url,task_id=task.task_id,item_index=task.item_index)# 3. 更新数据库状态awaitdb.update_storage_uploaded(task.model_task_id,storage_url)# 4. 检查是否所有子任务完成并通知后端awaitself.check_and_notify(task.task_id)def_build_result_item(self,task:Task)->Dict[str,Any]:"""构建单个结果项(业务格式)"""request_data=task.get_request_data()return{"index":task.item_index,"model_task_id":task.model_task_id,"custom_field":request_data.get("custom_field",""),"storage_url":task.storage_url,"status":task.status.value,}

业务逻辑服务 ⭐⭐

职责:视频生成业务逻辑

核心功能

  1. 接收 API 层请求
  2. 获取回调地址配置
  3. 循环提交任务到外部服务
  4. 保存任务映射关系到数据库
  5. 返回统一格式响应

关键方法

asyncdefsubmit_video_generation_task(request:Dict[str,Any])->Dict[str,Any]:"""提交视频生成任务"""# 1. 获取算法端回调地址(配置)algorithm_callback_url=_get_callback_url()# 2. 获取后端回调地址(请求参数)backend_callback_url=request.get("notify_url")# 3. 循环提交任务foridx,iteminenumerate(request.get("items",[])):result=awaitclient.create_task(...)model_task_id=result["TaskId"]# 4. 保存到数据库task_record=Task(task_id=task_id,model_task_id=model_task_id,business_type="video_generation",backend_callback_url=backend_callback_url,algorithm_callback_url=algorithm_callback_url,...)awaitdb.create_task(task_record)# 5. 返回响应return{"status":"accepted","task_id":task_id}

依赖关系

video_service.py → integrations/factory.py (获取 Client) → models/database.py (数据库操作) → models/task.py (任务模型)

2.2.3 数据层
任务数据模型 ⭐⭐⭐

职责:任务数据模型定义

核心字段

@dataclassclassTask:# 核心关联字段task_id:str# 业务ID(多个子任务共享)model_task_id:str# 外部服务返回的TaskId(唯一)# 业务类型(用于路由到对应处理器)business_type:str# 如:video_generation, image_generationprovider:str# 如:provider_a, provider_b# 任务信息status:TaskStatus=TaskStatus.PENDING# 任务状态item_index:int=0# 序号(方便排序和聚合)# 请求数据request_payload:Optional[str]=None# 完整的请求数据(JSON字符串)prompt:Optional[str]=None# 提取的prompt(方便查询)# 结果数据result_url:Optional[str]=None# 外部服务返回的URLstorage_url:Optional[str]=None# 上传到存储的URLcallback_payload:Optional[str]=None# 完整回调数据(JSON字符串)processed_data:Optional[str]=None# 业务处理后的数据(JSON字符串)# 错误信息error_message:Optional[str]=None# 回调地址(重要!区分两个)backend_callback_url:Optional[str]=None# 后端的回调地址(后端传来)algorithm_callback_url:Optional[str]=None# 算法端的回调地址(配置,提交给外部服务)# 后端通知backend_notified:bool=False# 是否已通知后端# 时间戳(链路追踪)request_time:Optional[datetime]=None# HTTP请求到达时间(API层记录)created_at:Optional[datetime]=None# 数据库记录首次创建时间callback_received_at:Optional[datetime]=None# 收到外部服务回调时间task_completed_time:Optional[datetime]=None# 外部服务任务完成时间

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询