汉中市网站建设_网站建设公司_Bootstrap_seo优化
2026/1/5 9:00:33 网站建设 项目流程

目录
  • 1. 概览
  • 2. 逐块解析
    • 2.1 导入部分
    • 2.2 全局变量
    • 2.3 启动子进程:run_processor(data)
      • 2.3.1 逐行解析
      • 2.3.2 序列化 data
      • 2.3.3 构造命令行参数列表
      • 2.3.4 启动子进程
      • 2.3.5 成功启动后的日志
      • 2.3.6 成功分支的返回值
      • 2.3.7 异常分支
    • 2.4 清理守护线程:cleanup_processes()
    • 2.5 process_image_endpoint()
      • 2.5.1 定义路由和请求方法
      • 2.5.2 函数定义
      • 2.5.3 获取请求数据
      • 2.5.4 提取必要的字段
      • 2.5.5 参数检查
      • 2.5.6 图片 URL 验证
      • 2.5.7 进程锁和并发限制
      • 2.5.8 启动后台进程
      • 2.5.9 立即返回响应
      • 2.5.10 异常处理
      • 函数行为总结
  • 3. 举例说明主进程与子进程的运行过程
    • 3.1 主进程 main.py
    • 3.2 子进程 worker.py
    • 3.3 整体运行流程(清晰版)
      • ① 主进程启动
      • ② 主进程创建子进程
      • ③ 子进程开始工作
      • ④ 子进程完成任务并退出
      • ⑤ 主进程继续执行
    • 3.4 总结(最简单易懂的)
  • 4. subprocess vs multiprocessing 对比图
    • 4.1 什么时候应该用 multiprocessing


1. 概览

这个脚本是一个用 Flask 暴露的 HTTP 接口(/internal/v1/subway-images/full-report),接收一个 JSON 请求后立即返回成功,并在后台以子进程方式运行 process.py 来实际处理任务。用 deque 跟踪当前活跃的子进程(限流),并用一个守护线程周期性清理(检测是否结束、超时并终止)这些子进程。启动时通过 WsgiToAsgi 将 Flask 包装为 ASGI,然后由 uvicorn 运行。

这是一个 HTTP 接口服务,核心特点是:

  • 使用 Flask 提供 API
  • 不在接口进程中做重任务
  • 每个请求会:
    • 启动一个独立的子进程(process.py)
    • 立即返回 HTTP 成功
  • 通过:
    • deque + threading.Lock
    • 限制最大并发子进程数
  • 有一个 后台守护线程
    • 定期检查子进程
    • 超时就 kill
    • 结束就清理

本质上是一个 “HTTP → 进程调度器”


2. 逐块解析

2.1 导入部分

import threading
from flask import Flask, request, jsonify
import uvicorn
import subprocess
import json
from collections import deque
import time
from asgiref.wsgi import WsgiToAsgi
from config.logger_config import app_logger
from config import settings
  • threading:用来启动守护线程(清理进程)。
  • Flask + request + jsonify:HTTP 接收与响应。
  • uvicorn + WsgiToAsgi:把 Flask (WSGI) 包装成 ASGI 并通过 uvicorn 运行(生产环境不建议直接裸用 Flask 自带的 dev server)。
  • subprocess:启动 process.py 的子进程。
  • deque:用于记录活跃进程(带长度上限)。
  • app_logger & settings:分别是日志与配置(诸如 MAX_CONCURRENT, PROCESS_TIMEOUT, PROCESS_CLEAN_TIME 等)。

2.2 全局变量

app = Flask(__name__)
active_processes = deque(maxlen=settings.MAX_CONCURRENT)  # 进程队列
process_lock = threading.Lock()  # 进程操作锁
  • active_processes 存放 (proc, start_time) 的 tuple。maxlen 限制并发数(不过注意 dequemaxlen 会在 append 时自动丢弃最左元素,后面会讨论风险)。
  • process_lock 用于同步访问 active_processes (避免并发竞态)。

2.3 启动子进程:run_processor(data)

json_str = json.dumps(data)
cmd = ["python", "./process.py", "--data", json_str]
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, start_new_session=True)

run_processor(data) 的作用是:

把接口收到的 JSON 数据序列化后,通过命令行参数传给子进程 process.py,并启动该子进程。

函数的返回值是:

  • 成功启动子进程 → 返回 Popen 对象
  • 启动失败 → 返回 None

调用方用它来判断是否成功排队任务。


2.3.1 逐行解析

def run_processor(data):"""启动子进程的函数,启动成功了才会给接口返回成功信息"""
  • 这是一个工具函数,用于启动后台任务。
  • 它不处理业务逻辑,只负责:
    1. 序列化请求数据
    2. 启动子进程
    3. 返回子进程对象

2.3.2 序列化 data

json_str = json.dumps(data)

功能:

  • 将 Python 字典 data 转成 JSON 字符串。

隐含细节:

  • json.dumps() 默认不保证 ASCII,支持 Unicode,但可能包含空格、换行等。
  • 序列化出来的 JSON 将直接放入命令行参数(argv)。

潜在风险(非常重要):

  • JSON 被直接塞进进程命令行参数 → 在系统进程表可见(ps aux)
    如果 data 中含敏感字段(如 token),会暴露。
  • JSON 太长可能超过操作系统命令行长度上限(例如 Linux 通常 128KB 左右)。

2.3.3 构造命令行参数列表

cmd = ["python", "./process.py","--data", json_str
]

设计理由:

  • 使用 列表形式 而不是字符串,可避免 shell=True 带来的命令注入风险。

行为:

最终执行的命令为:

python ./process.py --data '{"requestId": "...", "xxxx": ...}'

process.py 应该会在启动后从命令行解析 "--data" 后的 JSON 字符串。


2.3.4 启动子进程

proc = subprocess.Popen(cmd,stdout=subprocess.PIPE,stderr=subprocess.PIPE,start_new_session=True
)

Popen 参数解析

  • cmd:无 shell,无命令注入风险。

  • stdout=subprocess.PIPE:让父进程捕获子进程的标准输出。

  • stderr=subprocess.PIPE:让父进程捕获子进程的错误输出。

  • 注意
    如果子进程输出特别多,而父进程迟迟不读取 pipe,子进程会因为管道缓冲区满而阻塞。→ 这是一个经典的“管道堵塞”问题。

  • start_new_session=True

    • 将子进程放入新的 session(POSIX setsid)

    • 这样子进程不会被父进程的信号影响

    • 用于让子进程独立运行

    • 后续父进程可安全 terminate/kill 子进程

    • → 用于真正的“后台处理任务”,非常合理。


2.3.5 成功启动后的日志

app_logger.info(f"Started process PID {proc.pid} with requestId: {data.get('requestId')}"
)

写日志记录:

  • 子进程 PID
  • 请求 ID

方便后续排查问题。


2.3.6 成功分支的返回值

return proc

调用方根据返回值:

  • 不为 None → 启动成功 → 将进程加入监控队列
  • 为 None → 返回接口 500 错误

2.3.7 异常分支

except Exception as e:app_logger.exception(f"Process start failed: {e}")return None

典型的异常包括:

  • Python 路径不存在
  • process.py 文件不存在
  • 没权限执行
  • JSON 太长导致 execve 失败
  • 系统资源不足(too many open files)

记录异常并返回 None,调用方知道启动失败。


2.4 清理守护线程:cleanup_processes()

cleanup_processes() 是一个 守护线程 使用的循环任务,它负责:

  1. 定期扫描 active_processes 队列
  2. 判断子进程是否执行完毕
  3. 如果执行完毕 → 记录日志,并把它从队列移除
  4. 如果还在运行但超过超时时间 → 发送 terminate()
  5. 如果还在运行 → 继续保留到下个循环检查

  1. 打印日志

    app_logger.info("Cleanup thread started")
    

  1. 无限循环

    while True:
    

    这是一个常驻后台的任务。
    因为是守护线程(daemon=True),主程序退出时它会自动结束。


  1. 加锁访问 active_processes

    with process_lock:
    

    为什么需要锁?

    因为另一个线程(Web 请求线程)会:

    • 添加新进程到 active_processes

    • 读取进程数来判断是否达到最大并发

    deque 不是线程安全的,所以访问必须用锁。


  1. 遍历 active_processes

    for _ in range(len(active_processes)):proc, start_time = active_processes.popleft()
    

    这一段逻辑有技巧:

    • range(len(active_processes)):按当前长度遍历
  • 每次从左端 pop 一个 (proc, start_time)
    - popleft()collections.deque(双端队列) 提供的方法,用来:从队列左端(头部)移除并返回一个元素。
  • 然后根据情况再 append 回队列(表示这个进程还没结束)

这样能在 不复制列表 的情况下安全迭代并修改队列内容。


  1. 检查子进程状态

    status = proc.poll()
    

    .poll() 行为:

    • 返回 None → 子进程仍在运行
    • 返回 0 → 正常退出
    • 返回非 0 → 异常退出(崩溃 / 错误)

    poll() 是 Python 中 subprocess.Popen 对象的一个方法,用于检查子进程的 退出状态

    • 如果子进程 仍在运行,返回 None

    • 如果子进程 已终止,返回它的 退出码(通常为整数)。

      • 0 表示正常退出。

      • 非0 表示发生错误或异常退出。


  1. 子进程仍然在运行

    if status is None:
    
    • 检查是否超时

      if time.time() - start_time > settings.PROCESS_TIMEOUT:app_logger.info(f"Process PID {proc.pid} timed out, terminating")proc.terminate()
      

      说明:

      • 该子进程运行时间超过允许的最大时间

      • 调用 proc.terminate()(发送 SIGTERM 或 Windows TerminateProcess)

      • 注意:

        • terminate() 只是“请求”终止,不保证立刻退出

        • 如果子进程忽略 SIGTERM 或阻塞,它可能仍不会结束

    • 把进程重新加入队列

      active_processes.append((proc, start_time))
      

      表示“下次循环继续检查它”。


  1. 子进程已退出

    	else:
    
    • 正常退出(exit code == 0)

      if status == 0:app_logger.info(f"Process PID {proc.pid} closed")
      
      • 正常结束 → 写日志即可,不再加入队列。
    • 异常退出(exit code != 0)

      err = proc.stderr.read().decode()
      app_logger.info(f"Process PID {proc.pid} failed with status {status}: {err}")
      
      • 读取 stderr 中全部错误输出
    • 记录日志

      • 这里会一次性把所有 stderr 内容读出来
        • 适用于已退出的进程(管道不会阻塞)

        • 但如果输出太大,会一次性塞进日志

  2. 循环结束 → sleep 再继续下一轮

    time.sleep(settings.PROCESS_CLEAN_TIME)
    

    例如每 60 秒 清理一次。

  3. 函数的整体流程图(思维模型)

    每60s执行一次:上锁遍历所有活跃进程:若运行中:若超时 → terminate继续保留若已退出:记录日志不再保留解锁
    

2.5 process_image_endpoint()

process_image_endpoint() 是一个 Flask 路由处理函数,用于接收来自前端的图片处理请求,验证输入数据并在后台异步处理图片。它通过返回一个 HTTP 响应给客户端,且在后台启动一个子进程来处理图片。

2.5.1 定义路由和请求方法

@app.route('/internal/v1/subway-images/full-report', methods=['POST'])
  • @app.route() 是 Flask 中的装饰器,用来定义一个 HTTP 路由。
  • /internal/v1/subway-images/full-report 是 API 路径,表示这是一个处理地铁图像的接口。
  • methods=['POST'] 指定此路由仅响应 POST 请求,通常用于提交数据。

2.5.2 函数定义

def process_image_endpoint():"""接收图片处理请求,立即返回响应,后台异步处理"""
  • 这个函数会处理来自客户端的请求。
  • 它接收图片处理请求,验证参数后将其交给后台子进程处理。

2.5.3 获取请求数据

data = request.json
  • request.json 是 Flask 中的内建对象,用于解析传入的 JSON 请求体。

  • data 现在是一个 Python 字典,包含了所有的请求数据。

    {"requestId": "48c1e786-e17c-422a-8ebe-4c145dcb7d09","systemId": "0","inspectionStartTime": "20251225010254000","inspectionEndTime": "20251225010254000","trainId": "15052","lineCode": "M15","imageUrls": [{"location": "车顶左","imageUrl": "https://jt-zw.obsv3.dt-bj-1.bjdt.com/clzf/OBCompany4/Picture/normal/xiangjiangbeilu360/20251225010254000_M15_15052_01_%E9%A6%99%E6%B1%9F%E5%8C%97%E8%B7%AF_0_%E8%BD%A6%E9%A1%B6%E5%B7%A6.jpg"}......],"yardName": "香江北路","direction": "0","detectionParty": null
    }
    

2.5.4 提取必要的字段

requestId = data.get("requestId")
systemId = data.get("systemId")
inspectionStartTime = data.get("inspectionStartTime")
inspectionEndTime = data.get("inspectionEndTime")
trainId = data.get("trainId")
lineCode = data.get("lineCode")
imageUrls = data.get("imageUrls")
yardName = data.get("yardName")
direction = data.get("direction")
  • data 中提取请求中发送的字段,像是 requestId, trainId, inspectionStartTime 等。
  • 使用 data.get() 方法从字典中获取对应的值。如果字段不存在,返回 None

2.5.5 参数检查

if None in [requestId, systemId, inspectionStartTime, inspectionEndTime, trainId, lineCode, imageUrls, yardName, direction]:return jsonify({"code": 400,"message": "Missing required parameters."})
  • 这段代码检查请求中的必需参数是否都已提供。如果有任何参数缺失(即其值为 None),则返回一个 HTTP 400 响应,表示客户端请求格式错误。
  • jsonify() 用于将字典转换成 JSON 格式的 HTTP 响应。

2.5.6 图片 URL 验证

for _, imageUrl in enumerate(imageUrls):image_url = imageUrl.get("imageUrl")location = imageUrl.get("location")if None in [image_url, location]:return jsonify({"code": 400,"message": "Missing required parameters."})
  • 遍历 imageUrls 列表,检查每个图片的 imageUrllocation 字段是否都已提供。如果任何一个字段缺失,返回 HTTP 400 错误。
  • enumerate() 用于遍历列表,imageUrl.get() 获取字典中的值。

2.5.7 进程锁和并发限制

with process_lock:if len(active_processes) >= settings.MAX_CONCURRENT:return jsonify({"code": 429,"message": "System busy, try later."})
  • with process_lock::通过加锁,保证访问 active_processes 队列时是线程安全的。多个请求可能同时访问这个队列,所以需要加锁防止并发问题。
  • if len(active_processes) >= settings.MAX_CONCURRENT::检查当前运行的后台进程数是否已达最大并发限制。如果超过限制,返回 HTTP 429 错误,表示系统繁忙。

2.5.8 启动后台进程

proc = run_processor(data)
if not proc:return jsonify({"code": 500,"message": "Internal server error."})
active_processes.append((proc, time.time()))
  • 如果系统没有达到最大并发限制,则调用 run_processor(data) 启动一个后台进程来处理图片数据。
  • run_processor() 函数会启动一个新的子进程,并返回 procPopen 对象)。如果启动失败,则返回 HTTP 500 错误。
  • active_processes.append((proc, time.time())) 将新进程及其开始时间记录到 active_processes 队列中,用于后续的进程管理和超时清理。

2.5.9 立即返回响应

return jsonify({"code": 200,"message": "success"
})
  • 返回一个 HTTP 200 响应,表示请求成功。即使图片处理在后台异步进行,客户端会立即收到成功响应。

2.5.10 异常处理

except Exception as e:app_logger.exception(f"Request processing error: {str(e)}")return jsonify({"code": 500,"message": "Internal server error."})
  • 使用 try-except 语句捕获任何异常,如果在处理请求时发生错误:
    • 记录异常信息(app_logger.exception)。
    • 返回 HTTP 500 错误,表示服务器内部错误。

函数行为总结

  1. 接收请求:从客户端接收包含图片处理信息的 JSON 数据。
  2. 验证数据:检查请求中必需的参数是否完整,确保每个图片的 imageUrllocation 都有效。
  3. 并发控制:通过锁和并发限制,确保系统不会超载,并限制同时处理的进程数量。
  4. 启动处理进程:将图片处理任务交给后台进程异步执行。
  5. 立即返回响应:不等待子进程完成,立刻返回成功的 HTTP 响应。
  6. 异常处理:捕获任何可能的异常并记录日志,返回通用的错误响应。

3. 举例说明主进程与子进程的运行过程

假设你有一个 Python 主程序 main.py,它要启动一个子进程去执行 worker.py 中的耗时任务。


3.1 主进程 main.py

import subprocessprint("主进程:开始运行")# 启动子进程
proc = subprocess.Popen(["python", "worker.py"],stdout=subprocess.PIPE,stderr=subprocess.PIPE,text=True
)print("主进程:子进程已启动,等待结果...")# 等待子进程结束
stdout, stderr = proc.communicate()print("主进程:子进程结束")
print("子进程输出:", stdout)

3.2 子进程 worker.py

import timeprint("子进程:开始执行任务")
time.sleep(2)  # 模拟耗时工作
print("子进程:任务完成")

3.3 整体运行流程(清晰版)

下面逐步描述主进程和子进程的行为:


① 主进程启动

运行 main.py 后,主进程先启动,打印:

主进程:开始运行

② 主进程创建子进程

主进程执行 Popen(...) 时,会:

  • 创建一个新的进程(子进程)
  • 让它执行 worker.py

打印:

主进程:子进程已启动,等待结果...

此时主进程没有退出,只是等待子进程执行完毕。


③ 子进程开始工作

新启动的子进程开始执行 worker.py

打印:

子进程:开始执行任务

然后执行耗时逻辑,例如 time.sleep(2)


④ 子进程完成任务并退出

2 秒后,子进程完成任务并打印:

子进程:任务完成

然后正常退出。


⑤ 主进程继续执行

主进程收到子进程的输出后继续运行:

主进程:子进程结束
子进程输出: 子进程:开始执行任务
子进程:任务完成

3.4 总结(最简单易懂的)

过程 主进程做什么 子进程做什么
1 启动
2 创建子进程
3 等待子进程完成 执行任务
4 获取结果并继续 退出

一句话总结:

主进程负责管理与控制,子进程负责执行具体任务,它们并行运行,但主进程可以选择等待子进程结束。

4. subprocess vs multiprocessing 对比图

subprocess                         multiprocessing
───────────────                   ─────────────────
Web Server                         Web Server│                                  ││  spawn OS process                │  fork Python process▼                                  ▼
┌─────────────┐                  ┌─────────────┐
│  Worker OS  │                  │ Python Proc │
│  Process    │                  │ (shared)    │
└─────────────┘                  └─────────────┘│                                  │kill / timeout OK                kill 不稳定crash 无影响                     crash 可能拖死

4.1 什么时候应该用 multiprocessing

┌───────────────────────────────┐
│ 离线脚本 / 批处理 / 计算任务  │
│ 非 Web 场景                   │
│ 需要结果回传                  │
│ 生命周期短                    │
└───────────────────────────────┘

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询