- 1. 概览
- 2. 逐块解析
- 2.1 导入部分
- 2.2 全局变量
- 2.3 启动子进程:
run_processor(data)- 2.3.1 逐行解析
- 2.3.2 序列化 data
- 2.3.3 构造命令行参数列表
- 2.3.4 启动子进程
- 2.3.5 成功启动后的日志
- 2.3.6 成功分支的返回值
- 2.3.7 异常分支
- 2.4 清理守护线程:
cleanup_processes() - 2.5
process_image_endpoint()- 2.5.1 定义路由和请求方法
- 2.5.2 函数定义
- 2.5.3 获取请求数据
- 2.5.4 提取必要的字段
- 2.5.5 参数检查
- 2.5.6 图片 URL 验证
- 2.5.7 进程锁和并发限制
- 2.5.8 启动后台进程
- 2.5.9 立即返回响应
- 2.5.10 异常处理
- 函数行为总结
- 3. 举例说明主进程与子进程的运行过程
- 3.1 主进程
main.py - 3.2 子进程
worker.py - 3.3 整体运行流程(清晰版)
- ① 主进程启动
- ② 主进程创建子进程
- ③ 子进程开始工作
- ④ 子进程完成任务并退出
- ⑤ 主进程继续执行
- 3.4 总结(最简单易懂的)
- 3.1 主进程
- 4. subprocess vs multiprocessing 对比图
- 4.1 什么时候应该用 multiprocessing
1. 概览
这个脚本是一个用 Flask 暴露的 HTTP 接口(/internal/v1/subway-images/full-report),接收一个 JSON 请求后立即返回成功,并在后台以子进程方式运行 process.py 来实际处理任务。用 deque 跟踪当前活跃的子进程(限流),并用一个守护线程周期性清理(检测是否结束、超时并终止)这些子进程。启动时通过 WsgiToAsgi 将 Flask 包装为 ASGI,然后由 uvicorn 运行。
这是一个 HTTP 接口服务,核心特点是:
- 使用 Flask 提供 API
- 不在接口进程中做重任务
- 每个请求会:
- 启动一个独立的子进程(process.py)
- 立即返回 HTTP 成功
- 通过:
deque + threading.Lock- 限制最大并发子进程数
- 有一个 后台守护线程:
- 定期检查子进程
- 超时就 kill
- 结束就清理
本质上是一个 “HTTP → 进程调度器”。
2. 逐块解析
2.1 导入部分
import threading
from flask import Flask, request, jsonify
import uvicorn
import subprocess
import json
from collections import deque
import time
from asgiref.wsgi import WsgiToAsgi
from config.logger_config import app_logger
from config import settings
threading:用来启动守护线程(清理进程)。Flask+request+jsonify:HTTP 接收与响应。uvicorn+WsgiToAsgi:把 Flask (WSGI) 包装成 ASGI 并通过 uvicorn 运行(生产环境不建议直接裸用 Flask 自带的 dev server)。subprocess:启动process.py的子进程。deque:用于记录活跃进程(带长度上限)。app_logger&settings:分别是日志与配置(诸如MAX_CONCURRENT,PROCESS_TIMEOUT,PROCESS_CLEAN_TIME等)。
2.2 全局变量
app = Flask(__name__)
active_processes = deque(maxlen=settings.MAX_CONCURRENT) # 进程队列
process_lock = threading.Lock() # 进程操作锁
active_processes存放(proc, start_time)的 tuple。maxlen限制并发数(不过注意deque的maxlen会在 append 时自动丢弃最左元素,后面会讨论风险)。process_lock用于同步访问active_processes(避免并发竞态)。
2.3 启动子进程:run_processor(data)
json_str = json.dumps(data)
cmd = ["python", "./process.py", "--data", json_str]
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, start_new_session=True)
run_processor(data) 的作用是:
把接口收到的 JSON 数据序列化后,通过命令行参数传给子进程 process.py,并启动该子进程。
函数的返回值是:
- 成功启动子进程 → 返回
Popen对象 - 启动失败 → 返回 None
调用方用它来判断是否成功排队任务。
2.3.1 逐行解析
def run_processor(data):"""启动子进程的函数,启动成功了才会给接口返回成功信息"""
- 这是一个工具函数,用于启动后台任务。
- 它不处理业务逻辑,只负责:
- 序列化请求数据
- 启动子进程
- 返回子进程对象
2.3.2 序列化 data
json_str = json.dumps(data)
功能:
- 将 Python 字典
data转成 JSON 字符串。
隐含细节:
json.dumps()默认不保证 ASCII,支持 Unicode,但可能包含空格、换行等。- 序列化出来的 JSON 将直接放入命令行参数(argv)。
潜在风险(非常重要):
- JSON 被直接塞进进程命令行参数 → 在系统进程表可见(ps aux)
如果 data 中含敏感字段(如 token),会暴露。 - JSON 太长可能超过操作系统命令行长度上限(例如 Linux 通常 128KB 左右)。
2.3.3 构造命令行参数列表
cmd = ["python", "./process.py","--data", json_str
]
设计理由:
- 使用 列表形式 而不是字符串,可避免
shell=True带来的命令注入风险。
行为:
最终执行的命令为:
python ./process.py --data '{"requestId": "...", "xxxx": ...}'
process.py 应该会在启动后从命令行解析 "--data" 后的 JSON 字符串。
2.3.4 启动子进程
proc = subprocess.Popen(cmd,stdout=subprocess.PIPE,stderr=subprocess.PIPE,start_new_session=True
)
Popen 参数解析
-
cmd:无 shell,无命令注入风险。 -
stdout=subprocess.PIPE:让父进程捕获子进程的标准输出。 -
stderr=subprocess.PIPE:让父进程捕获子进程的错误输出。 -
注意:
如果子进程输出特别多,而父进程迟迟不读取 pipe,子进程会因为管道缓冲区满而阻塞。→ 这是一个经典的“管道堵塞”问题。 -
start_new_session=True-
将子进程放入新的 session(POSIX setsid)
-
这样子进程不会被父进程的信号影响
-
用于让子进程独立运行
-
后续父进程可安全 terminate/kill 子进程
-
→ 用于真正的“后台处理任务”,非常合理。
-
2.3.5 成功启动后的日志
app_logger.info(f"Started process PID {proc.pid} with requestId: {data.get('requestId')}"
)
写日志记录:
- 子进程 PID
- 请求 ID
方便后续排查问题。
2.3.6 成功分支的返回值
return proc
调用方根据返回值:
- 不为 None → 启动成功 → 将进程加入监控队列
- 为 None → 返回接口 500 错误
2.3.7 异常分支
except Exception as e:app_logger.exception(f"Process start failed: {e}")return None
典型的异常包括:
- Python 路径不存在
- process.py 文件不存在
- 没权限执行
- JSON 太长导致 execve 失败
- 系统资源不足(too many open files)
记录异常并返回 None,调用方知道启动失败。
2.4 清理守护线程:cleanup_processes()
cleanup_processes() 是一个 守护线程 使用的循环任务,它负责:
- 定期扫描
active_processes队列 - 判断子进程是否执行完毕
- 如果执行完毕 → 记录日志,并把它从队列移除
- 如果还在运行但超过超时时间 → 发送
terminate() - 如果还在运行 → 继续保留到下个循环检查
-
打印日志
app_logger.info("Cleanup thread started")
-
无限循环
while True:这是一个常驻后台的任务。
因为是守护线程(daemon=True),主程序退出时它会自动结束。
-
加锁访问 active_processes
with process_lock:为什么需要锁?
因为另一个线程(Web 请求线程)会:
-
添加新进程到 active_processes
-
读取进程数来判断是否达到最大并发
deque不是线程安全的,所以访问必须用锁。 -
-
遍历 active_processes
for _ in range(len(active_processes)):proc, start_time = active_processes.popleft()这一段逻辑有技巧:
range(len(active_processes)):按当前长度遍历
- 每次从左端 pop 一个
(proc, start_time)
-popleft()是collections.deque(双端队列) 提供的方法,用来:从队列左端(头部)移除并返回一个元素。 - 然后根据情况再 append 回队列(表示这个进程还没结束)
这样能在 不复制列表 的情况下安全迭代并修改队列内容。
-
检查子进程状态
status = proc.poll().poll()行为:- 返回 None → 子进程仍在运行
- 返回 0 → 正常退出
- 返回非 0 → 异常退出(崩溃 / 错误)
poll()是 Python 中subprocess.Popen对象的一个方法,用于检查子进程的 退出状态。-
如果子进程 仍在运行,返回
None。 -
如果子进程 已终止,返回它的 退出码(通常为整数)。
-
0 表示正常退出。
-
非0 表示发生错误或异常退出。
-
-
子进程仍然在运行
if status is None:-
检查是否超时
if time.time() - start_time > settings.PROCESS_TIMEOUT:app_logger.info(f"Process PID {proc.pid} timed out, terminating")proc.terminate()说明:
-
该子进程运行时间超过允许的最大时间
-
调用
proc.terminate()(发送 SIGTERM 或 Windows TerminateProcess) -
注意:
-
terminate()只是“请求”终止,不保证立刻退出 -
如果子进程忽略 SIGTERM 或阻塞,它可能仍不会结束
-
-
-
把进程重新加入队列
active_processes.append((proc, start_time))表示“下次循环继续检查它”。
-
-
子进程已退出
else:-
正常退出(exit code == 0)
if status == 0:app_logger.info(f"Process PID {proc.pid} closed")- 正常结束 → 写日志即可,不再加入队列。
-
异常退出(exit code != 0)
err = proc.stderr.read().decode() app_logger.info(f"Process PID {proc.pid} failed with status {status}: {err}")- 读取 stderr 中全部错误输出
-
记录日志
- 这里会一次性把所有 stderr 内容读出来
-
适用于已退出的进程(管道不会阻塞)
-
但如果输出太大,会一次性塞进日志
-
- 这里会一次性把所有 stderr 内容读出来
-
-
循环结束 → sleep 再继续下一轮
time.sleep(settings.PROCESS_CLEAN_TIME)例如每 60 秒 清理一次。
-
函数的整体流程图(思维模型)
每60s执行一次:上锁遍历所有活跃进程:若运行中:若超时 → terminate继续保留若已退出:记录日志不再保留解锁
2.5 process_image_endpoint()
process_image_endpoint() 是一个 Flask 路由处理函数,用于接收来自前端的图片处理请求,验证输入数据并在后台异步处理图片。它通过返回一个 HTTP 响应给客户端,且在后台启动一个子进程来处理图片。
2.5.1 定义路由和请求方法
@app.route('/internal/v1/subway-images/full-report', methods=['POST'])
@app.route()是 Flask 中的装饰器,用来定义一个 HTTP 路由。/internal/v1/subway-images/full-report是 API 路径,表示这是一个处理地铁图像的接口。methods=['POST']指定此路由仅响应POST请求,通常用于提交数据。
2.5.2 函数定义
def process_image_endpoint():"""接收图片处理请求,立即返回响应,后台异步处理"""
- 这个函数会处理来自客户端的请求。
- 它接收图片处理请求,验证参数后将其交给后台子进程处理。
2.5.3 获取请求数据
data = request.json
-
request.json是 Flask 中的内建对象,用于解析传入的 JSON 请求体。 -
data现在是一个 Python 字典,包含了所有的请求数据。{"requestId": "48c1e786-e17c-422a-8ebe-4c145dcb7d09","systemId": "0","inspectionStartTime": "20251225010254000","inspectionEndTime": "20251225010254000","trainId": "15052","lineCode": "M15","imageUrls": [{"location": "车顶左","imageUrl": "https://jt-zw.obsv3.dt-bj-1.bjdt.com/clzf/OBCompany4/Picture/normal/xiangjiangbeilu360/20251225010254000_M15_15052_01_%E9%A6%99%E6%B1%9F%E5%8C%97%E8%B7%AF_0_%E8%BD%A6%E9%A1%B6%E5%B7%A6.jpg"}......],"yardName": "香江北路","direction": "0","detectionParty": null }
2.5.4 提取必要的字段
requestId = data.get("requestId")
systemId = data.get("systemId")
inspectionStartTime = data.get("inspectionStartTime")
inspectionEndTime = data.get("inspectionEndTime")
trainId = data.get("trainId")
lineCode = data.get("lineCode")
imageUrls = data.get("imageUrls")
yardName = data.get("yardName")
direction = data.get("direction")
- 从
data中提取请求中发送的字段,像是requestId,trainId,inspectionStartTime等。 - 使用
data.get()方法从字典中获取对应的值。如果字段不存在,返回None。
2.5.5 参数检查
if None in [requestId, systemId, inspectionStartTime, inspectionEndTime, trainId, lineCode, imageUrls, yardName, direction]:return jsonify({"code": 400,"message": "Missing required parameters."})
- 这段代码检查请求中的必需参数是否都已提供。如果有任何参数缺失(即其值为
None),则返回一个 HTTP 400 响应,表示客户端请求格式错误。 jsonify()用于将字典转换成 JSON 格式的 HTTP 响应。
2.5.6 图片 URL 验证
for _, imageUrl in enumerate(imageUrls):image_url = imageUrl.get("imageUrl")location = imageUrl.get("location")if None in [image_url, location]:return jsonify({"code": 400,"message": "Missing required parameters."})
- 遍历
imageUrls列表,检查每个图片的imageUrl和location字段是否都已提供。如果任何一个字段缺失,返回 HTTP 400 错误。 enumerate()用于遍历列表,imageUrl.get()获取字典中的值。
2.5.7 进程锁和并发限制
with process_lock:if len(active_processes) >= settings.MAX_CONCURRENT:return jsonify({"code": 429,"message": "System busy, try later."})
with process_lock::通过加锁,保证访问active_processes队列时是线程安全的。多个请求可能同时访问这个队列,所以需要加锁防止并发问题。if len(active_processes) >= settings.MAX_CONCURRENT::检查当前运行的后台进程数是否已达最大并发限制。如果超过限制,返回 HTTP 429 错误,表示系统繁忙。
2.5.8 启动后台进程
proc = run_processor(data)
if not proc:return jsonify({"code": 500,"message": "Internal server error."})
active_processes.append((proc, time.time()))
- 如果系统没有达到最大并发限制,则调用
run_processor(data)启动一个后台进程来处理图片数据。 run_processor()函数会启动一个新的子进程,并返回proc(Popen对象)。如果启动失败,则返回 HTTP 500 错误。active_processes.append((proc, time.time()))将新进程及其开始时间记录到active_processes队列中,用于后续的进程管理和超时清理。
2.5.9 立即返回响应
return jsonify({"code": 200,"message": "success"
})
- 返回一个 HTTP 200 响应,表示请求成功。即使图片处理在后台异步进行,客户端会立即收到成功响应。
2.5.10 异常处理
except Exception as e:app_logger.exception(f"Request processing error: {str(e)}")return jsonify({"code": 500,"message": "Internal server error."})
- 使用
try-except语句捕获任何异常,如果在处理请求时发生错误:- 记录异常信息(
app_logger.exception)。 - 返回 HTTP 500 错误,表示服务器内部错误。
- 记录异常信息(
函数行为总结
- 接收请求:从客户端接收包含图片处理信息的 JSON 数据。
- 验证数据:检查请求中必需的参数是否完整,确保每个图片的
imageUrl和location都有效。 - 并发控制:通过锁和并发限制,确保系统不会超载,并限制同时处理的进程数量。
- 启动处理进程:将图片处理任务交给后台进程异步执行。
- 立即返回响应:不等待子进程完成,立刻返回成功的 HTTP 响应。
- 异常处理:捕获任何可能的异常并记录日志,返回通用的错误响应。
3. 举例说明主进程与子进程的运行过程
假设你有一个 Python 主程序 main.py,它要启动一个子进程去执行 worker.py 中的耗时任务。
3.1 主进程 main.py
import subprocessprint("主进程:开始运行")# 启动子进程
proc = subprocess.Popen(["python", "worker.py"],stdout=subprocess.PIPE,stderr=subprocess.PIPE,text=True
)print("主进程:子进程已启动,等待结果...")# 等待子进程结束
stdout, stderr = proc.communicate()print("主进程:子进程结束")
print("子进程输出:", stdout)
3.2 子进程 worker.py
import timeprint("子进程:开始执行任务")
time.sleep(2) # 模拟耗时工作
print("子进程:任务完成")
3.3 整体运行流程(清晰版)
下面逐步描述主进程和子进程的行为:
① 主进程启动
运行 main.py 后,主进程先启动,打印:
主进程:开始运行
② 主进程创建子进程
主进程执行 Popen(...) 时,会:
- 创建一个新的进程(子进程)
- 让它执行
worker.py
打印:
主进程:子进程已启动,等待结果...
此时主进程没有退出,只是等待子进程执行完毕。
③ 子进程开始工作
新启动的子进程开始执行 worker.py:
打印:
子进程:开始执行任务
然后执行耗时逻辑,例如 time.sleep(2)。
④ 子进程完成任务并退出
2 秒后,子进程完成任务并打印:
子进程:任务完成
然后正常退出。
⑤ 主进程继续执行
主进程收到子进程的输出后继续运行:
主进程:子进程结束
子进程输出: 子进程:开始执行任务
子进程:任务完成
3.4 总结(最简单易懂的)
| 过程 | 主进程做什么 | 子进程做什么 |
|---|---|---|
| 1 | 启动 | — |
| 2 | 创建子进程 | — |
| 3 | 等待子进程完成 | 执行任务 |
| 4 | 获取结果并继续 | 退出 |
一句话总结:
主进程负责管理与控制,子进程负责执行具体任务,它们并行运行,但主进程可以选择等待子进程结束。
4. subprocess vs multiprocessing 对比图
subprocess multiprocessing
─────────────── ─────────────────
Web Server Web Server│ ││ spawn OS process │ fork Python process▼ ▼
┌─────────────┐ ┌─────────────┐
│ Worker OS │ │ Python Proc │
│ Process │ │ (shared) │
└─────────────┘ └─────────────┘│ │kill / timeout OK kill 不稳定crash 无影响 crash 可能拖死
4.1 什么时候应该用 multiprocessing
┌───────────────────────────────┐
│ 离线脚本 / 批处理 / 计算任务 │
│ 非 Web 场景 │
│ 需要结果回传 │
│ 生命周期短 │
└───────────────────────────────┘