从零部署PaddleOCR-VL-WEB并接入Dify的完整技术指南
1. 引言:为什么我们需要一个私有化OCR能力引擎?
你有没有遇到过这样的场景:客户发来一张模糊的合同截图,你需要从中提取“甲方名称”、“签约金额”和“生效日期”,但手动复制粘贴不仅费时,还容易出错?更别说面对PDF扫描件、手写票据或双栏排版的学术论文时,通用OCR工具往往束手无策。
而商业API虽然方便,却存在三大痛点:数据安全风险、调用成本不可控、无法定制优化。尤其在金融、医疗、法律等敏感行业,把用户文档上传到第三方服务器几乎是不可能接受的。
本文要解决的就是这个问题——教你从零开始,本地部署百度开源的PaddleOCR-VL-WEB模型,并通过MCP协议将其无缝接入Dify,打造一个真正属于你的AI Agent“眼睛”。
这不是简单的功能叠加,而是一次工程思维的升级:
我们不再让Agent被动等待结构化输入,而是让它能主动“看懂”图像与文档,实现感知-决策-执行的闭环。
整个过程无需修改Dify源码,支持热插拔扩展,适合企业级私有化部署。无论你是AI应用开发者、低代码平台使用者,还是想提升办公效率的技术爱好者,都能快速上手。
2. 环境准备与镜像部署
2.1 部署PaddleOCR-VL-WEB镜像
我们使用的镜像是百度官方推出的PaddleOCR-VL-WEB,它集成了SOTA级别的文档解析能力,支持109种语言,特别擅长处理中文复杂文档(如发票、表格、公式)。
部署步骤(以单卡4090D为例)
- 在AI平台选择
PaddleOCR-VL-WEB镜像进行实例创建; - 实例启动后,进入Jupyter Lab环境;
- 激活专属conda环境:
conda activate paddleocrvl- 切换到根目录:
cd /root- 执行一键启动脚本:
./1键启动.sh该脚本会自动拉起Web服务,默认监听6006端口。
- 返回实例列表,点击“网页推理”即可打开PaddleOCR-VL的可视化界面。
此时你已经拥有了一个本地运行的高性能OCR引擎,可以直接上传图片或PDF进行测试。
提示:如果你希望将此服务暴露给外部调用(比如Dify),建议配置Nginx反向代理或将端口映射到公网IP(内网使用更安全)。
2.2 构建MCP通信桥梁:Python环境搭建
为了让Dify能够动态发现并调用这个OCR服务,我们需要构建一个符合MCP(Model Calling Protocol)规范的中间层。
MCP是一种专为AI Agent设计的能力调用协议,它的核心价值在于:解耦Agent逻辑与工具实现,实现“能力即服务”。
我们将使用Python 3.13 + Flask构建一个轻量级MCP Client中转服务。
创建虚拟环境
conda create -n py13 python=3.13 -y conda activate py13安装核心依赖
# 使用uv提升包管理效率 powershell -ExecutionPolicy ByPass -c "irm https://astral.sh/uv/install.ps1 | iex" # 初始化项目 uv init quickmcp # 进入项目目录 cd quickmcp修改.python-version和.project.toml中的版本为3.13。
激活虚拟环境:
uv venv --python="D:\utility\miniconda3\envs\py13\python.exe" .venv .\.venv\Scripts\activate安装所需库:
uv add mcp-server mcp mcp[cli] requests npm install @modelcontextprotocol/inspector@0.8.0 uv add mcp anthropic python-dotenv flask flask-cors至此,MCP Server与Client所需的运行环境已全部就绪。
3. MCP Server开发:封装OCR为标准能力服务
3.1 核心目标
我们的任务是:把本地运行的PaddleOCR-VL Web服务包装成一个可通过MCP协议调用的标准工具。
这意味着:
- Agent可以自动发现这个“OCR能力”
- 能获取参数说明、调用示例
- 支持结构化输入输出
- 可跨网络、跨语言调用
3.2 代码实现:BatchOcr.py
以下是完整的MCP Server实现代码:
import json import sys import os import logging from logging.handlers import RotatingFileHandler from datetime import datetime from typing import Any, Dict, List from pydantic import BaseModel, Field import httpx from mcp.server.fastmcp import FastMCP from mcp.server import Server import uvicorn from starlette.applications import Starlette from mcp.server.sse import SseServerTransport from starlette.requests import Request from starlette.responses import Response from starlette.routing import Mount, Route # 日志初始化 log_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "logs") os.makedirs(log_dir, exist_ok=True) log_file = os.path.join(log_dir, f"BatchOcr_{datetime.now().strftime('%Y%m%d')}.log") file_handler = RotatingFileHandler( log_file, maxBytes=50 * 1024 * 1024, backupCount=30, encoding='utf-8' ) file_handler.setLevel(logging.INFO) file_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')) console_handler = logging.StreamHandler() console_handler.setLevel(logging.INFO) console_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')) logging.basicConfig(level=logging.INFO, handlers=[file_handler, console_handler]) logger = logging.getLogger("BatchOcr") logger.info("日志系统初始化完成") # 数据模型定义 class FileData(BaseModel): file: str = Field(..., description="文件URL地址") fileType: int = Field(..., description="文件类型: 0=PDF, 1=图片") class OcrFilesInput(BaseModel): files: List[FileData] = Field(..., description="要处理的文件列表") # 初始化 MCP 服务 mcp = FastMCP("BatchOcr") logger.info("FastMCP初始化完成") @mcp.tool() async def ocr_files(files: List[FileData]) -> str: """使用本地paddleocr-vl提取用户输入中的文件url进行批量或者单个扫描""" logger.info(f"收到OCR请求,文件数量: {len(files)}") OCR_SERVICE_URL = "http://localhost:8080/layout-parsing" all_text_results = [] for idx, file_data in enumerate(files): try: logger.info(f"正在处理第 {idx + 1}/{len(files)} 个文件: {file_data.file}") ocr_payload = { "file": file_data.file, "fileType": file_data.fileType } async with httpx.AsyncClient(timeout=60.0) as client: response = await client.post( OCR_SERVICE_URL, json=ocr_payload, headers={"Content-Type": "application/json"} ) if response.status_code != 200: error_msg = f"OCR服务返回错误状态码 {response.status_code},文件: {file_data.file}" logger.error(error_msg) all_text_results.append(f"错误: {error_msg}") continue ocr_response = response.json() text_blocks = [] if "result" in ocr_response and "layoutParsingResults" in ocr_response["result"]: for layout in ocr_response["result"]["layoutParsingResults"]: if "prunedResult" in layout and "parsing_res_list" in layout["prunedResult"]: blocks = layout["prunedResult"]["parsing_res_list"] for block in blocks: content = block.get("block_content", "") if content: text_blocks.append(content) if text_blocks: file_result = "\n".join(text_blocks) all_text_results.append(file_result) logger.info(f"成功处理文件 {idx + 1}: {file_data.file}") else: logger.warning(f"文件 {file_data.file} 未提取到任何文本内容") all_text_results.append(f"警告: 文件 {file_data.file} 未提取到文本内容") except httpx.RequestError as e: error_msg = f"调用OCR服务时发生网络错误,文件: {file_data.file},错误: {str(e)}" logger.error(error_msg, exc_info=True) all_text_results.append(f"错误: {error_msg}") except Exception as e: error_msg = f"处理文件时发生未知错误,文件: {file_data.file},错误: {str(e)}" logger.error(error_msg, exc_info=True) all_text_results.append(f"错误: {error_msg}") final_result = "\n".join(all_text_results) return json.dumps({"result": final_result}, ensure_ascii=False) def create_starlette_app(mcp_server: Server, *, debug: bool = False) -> Starlette: sse = SseServerTransport("/messages/") async def handle_sse(request: Request): logger.info("收到SSE连接请求") try: async with sse.connect_sse( request.scope, request.receive, request._send, ) as (read_stream, write_stream): await mcp_server.run(read_stream, write_stream, mcp_server.create_initialization_options()) except Exception as e: logger.error(f"SSE处理出错: {str(e)}", exc_info=True) raise return Response() return Starlette( debug=debug, routes=[ Route("/sse", endpoint=handle_sse), Mount("/messages/", app=sse.handle_post_message), ], ) def run_server(): import argparse parser = argparse.ArgumentParser(description='Run MCP SSE-based server') parser.add_argument('--host', default='127.0.0.1', help='Host to bind to') parser.add_argument('--port', type=int, default=8090, help='Port to listen on') args = parser.parse_args() mcp_server = mcp._mcp_server starlette_app = create_starlette_app(mcp_server, debug=True) logger.info(f"Starting SSE server on {args.host}:{args.port}") uvicorn.run(starlette_app, host=args.host, port=args.port) if __name__ == "__main__": run_server()3.3 关键点解析
| 组件 | 说明 |
|---|---|
@mcp.tool() | 注册名为ocr_files的可调用工具 |
| 输入参数 | 接收文件URL列表及类型(0=PDF, 1=图片) |
| 内部调用 | 转发请求至本地http://localhost:8080/layout-parsing接口 |
| 结果处理 | 提取所有block_content字段,合并为纯文本返回 |
| 返回格式 | JSON字符串:{"result": "解析后的全文"} |
这个服务启动后,就会成为一个标准的MCP Server,Agent可以通过/manifest获取其能力描述,并通过SSE长连接发起调用。
4. MCP Client开发:打通Dify通信通道
4.1 为什么要加一层Client?
Dify本身不直接支持MCP协议,但我们又不想改动其源码。解决方案是:构建一个HTTP接口作为MCP Client中转层。
这样Dify只需调用一个标准REST API,剩下的协议转换由我们完成。
4.2 代码实现:QuickMcpClient.py
import logging from logging.handlers import RotatingFileHandler import asyncio import json import os from typing import Optional from contextlib import AsyncExitStack from datetime import datetime import threading from mcp import ClientSession from mcp.client.sse import sse_client from anthropic import Anthropic from dotenv import load_dotenv from flask import Flask, request, jsonify from flask_cors import CORS # 日志设置 log_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "logs") os.makedirs(log_dir, exist_ok=True) log_file = os.path.join(log_dir, f"QuickMcpClient_{datetime.now().strftime('%Y%m%d')}.log") file_handler = RotatingFileHandler(log_file, maxBytes=50*1024*1024, backupCount=30, encoding='utf-8') file_handler.setLevel(logging.INFO) file_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')) console_handler = logging.StreamHandler() console_handler.setLevel(logging.INFO) console_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')) logging.basicConfig(level=logging.INFO, handlers=[console_handler, file_handler]) logger = logging.getLogger("QuickMcpClient") app = Flask(__name__) CORS(app) class MCPClient: def __init__(self): self.session: Optional[ClientSession] = None self.exit_stack = AsyncExitStack() self.anthropic = Anthropic() self._streams_context = None self._session_context = None self._loop = None self._loop_thread = None async def connect_to_sse_server(self, base_url: str): try: self._streams_context = sse_client(url=base_url) streams = await self._streams_context.__aenter__() self._session_context = ClientSession(*streams) self.session = await self._session_context.__aenter__() await self.session.initialize() logger.info("连接成功,会话已初始化") return True except Exception as e: logger.error(f"连接服务器时出错: {str(e)}", exc_info=True) return False async def get_tools_list(self): try: if not self.session: logger.error("会话未初始化,请先连接到服务器") return None response = await self.session.list_tools() tools = response.tools tools_json = json.dumps( {"tools": [{"name": tool.name, "description": tool.description, "inputSchema": getattr(tool, 'inputSchema', None)} for tool in tools]}, indent=4, ensure_ascii=False ) logger.info(f"获取到 {len(tools)} 个工具") return json.loads(tools_json) except Exception as e: logger.error(f"获取工具列表时出错: {str(e)}", exc_info=True) return None async def call_tool(self, tool_name: str, tool_args: dict): try: if not self.session: logger.error("会话未初始化,请先连接到服务器") return None result = await self.session.call_tool(tool_name, tool_args) logger.info(f"工具 {tool_name} 执行成功") return result except Exception as e: logger.error(f"调用工具 {tool_name} 时出错: {str(e)}", exc_info=True) raise def _start_event_loop(self): asyncio.set_event_loop(self._loop) self._loop.run_forever() def run_async(self, coro): if self._loop is None: self._loop = asyncio.new_event_loop() self._loop_thread = threading.Thread(target=self._start_event_loop, daemon=True) self._loop_thread.start() future = asyncio.run_coroutine_threadsafe(coro, self._loop) return future.result(timeout=30) mcp_client = MCPClient() @app.route('/listTools', methods=['POST']) def list_tools(): data = request.get_json(force=True, silent=True) or {} base_url = data.get('base_url') if base_url and not mcp_client.session: success = mcp_client.run_async(mcp_client.connect_to_sse_server(base_url=base_url)) if not success: return jsonify({"status": "error", "message": "连接失败"}), 500 if not mcp_client.session: return jsonify({"status": "error", "message": "未连接"}), 400 tools_data = mcp_client.run_async(mcp_client.get_tools_list()) if tools_data is None: return jsonify({"status": "error", "message": "获取失败"}), 500 return jsonify({"status": "success", "data": tools_data}), 200 @app.route('/callTool', methods=['POST']) def call_tool(): data = request.get_json(force=True, silent=True) if not data: return jsonify({"status": "error", "message": "请求体不能为空"}), 400 base_url = data.get('base_url', 'http://127.0.0.1:8090/sse') tool_name = data.get('tool_name') tool_args = data.get('tool_args', {}) if not tool_name: return jsonify({"status": "error", "message": "缺少 tool_name"}), 400 if base_url and not mcp_client.session: success = mcp_client.run_async(mcp_client.connect_to_sse_server(base_url=base_url)) if not success: return jsonify({"status": "error", "message": "连接失败"}), 500 if not mcp_client.session: return jsonify({"status": "error", "message": "未连接"}), 400 result = mcp_client.run_async(mcp_client.call_tool(tool_name, tool_args)) if result is None: return jsonify({"status": "error", "message": "调用失败"}), 500 result_data = {} if hasattr(result, 'content'): content = result.content if isinstance(content, list) and len(content) > 0: first_content = content[0] if hasattr(first_content, 'text'): result_text = first_content.text try: result_data = json.loads(result_text) except json.JSONDecodeError: result_data = {"text": result_text} return jsonify({"status": "success", "data": result_data}), 200 @app.route('/', methods=['GET']) def index(): return jsonify({ "message": "QuickMcpClient Flask Server is running", "endpoints": ["/health", "/listTools", "/callTool"] }), 200 @app.route('/health', methods=['GET']) def health_check(): return jsonify({"status": "ok", "connected": mcp_client.session is not None}), 200 if __name__ == "__main__": load_dotenv() logger.info("启动 QuickMcpClient Flask 服务器...") app.run(host='0.0.0.0', port=8500, debug=True)4.3 功能说明
| 接口 | 作用 |
|---|---|
GET /health | 健康检查,确认服务是否正常 |
POST /listTools | 获取当前可用的MCP工具列表 |
POST /callTool | 调用指定工具,转发至MCP Server并返回结果 |
该服务运行在8500端口,Dify只需向/callTool发送POST请求即可完成调用。
5. 启动服务与集成Dify
5.1 启动顺序
先启动MCP Server:
python BatchOcr.py --host 127.0.0.1 --port 8090再启动MCP Client:
python QuickMcpClient.py确保两个服务都正常运行,日志无报错。
5.2 在Dify中配置自定义工具
- 登录Dify,进入“工具”管理页面;
- 创建一个新的“自定义工具”;
- 名称填写
OCR解析,描述可写“调用本地PaddleOCR-VL进行文档识别”; - API URL 设置为:
http://<your-client-ip>:8500/callTool - 请求方式:POST
- 参数配置如下:
{ "tool_name": "ocr_files", "tool_args": { "files": [ { "file": "{{file_url}}", "fileType": "{{file_type}}" } ] } }其中{{file_url}}和{{file_type}}为变量占位符,可在工作流中传入。
- 测试连接,确认返回成功。
6. 实际运行效果
当用户提问:
“请帮我解析 http://localhost/mkcdn/ocrsample/test-1.png 和 test-1.pdf 这两个文件的内容。”
Dify中的Agent会自动判断需要调用OCR工具,构造请求发送至MCP Client,后者转发给MCP Server,最终调用本地PaddleOCR-VL完成解析。
整个过程在2秒内完成,返回的结果包含完整的文本内容,保留了原文的段落结构与语义信息,可用于后续的摘要、问答、分类等任务。
更重要的是:所有数据均停留在内网,无需上传任何第三方平台。
7. 总结:迈向真正的AI Agent能力生态
通过本文的实践,我们完成了三个关键跃迁:
- 从公有云到私有化:将OCR能力部署在本地,保障数据安全;
- 从硬编码到标准化:通过MCP协议实现能力解耦,支持热插拔;
- 从被动响应到主动感知:Agent可根据上下文自主决定是否调用OCR,形成完整的工作流闭环。
这不仅仅是一个技术整合案例,更是未来AI应用架构的缩影。
你可以在此基础上继续扩展:
- 添加更多MCP工具(如TTS、翻译、RPA)
- 支持多语言自动识别
- 对接企业内部知识库
- 实现批量化文档处理流水线
正如文章开头所说:未来的Agent,应该像人一样具备“感官”与“手脚”。而MCP,就是连接这些器官的神经系统。
现在,轮到你动手构建属于自己的AI能力网络了。
获取更多AI镜像
想探索更多AI镜像和应用场景?访问 CSDN星图镜像广场,提供丰富的预置镜像,覆盖大模型推理、图像生成、视频生成、模型微调等多个领域,支持一键部署。