陵水黎族自治县网站建设_网站建设公司_定制开发_seo优化
2025/12/28 18:18:07 网站建设 项目流程

10.3 通用模块

在本项目中,“utils ”目录是Agent-MCP项目的通用模块,提供了跨模块复用的辅助功能,为框架其他组件提供基础支撑。其核心作用包括封装 Git 操作(如代码提交、分支管理)以支持代理的版本控制需求,集成 Tmux 终端复用工具来管理多代理的独立会话与实时输出,同时包含各类通用函数(如日志处理、格式转换、路径解析等),通过标准化工具接口减少重复开发,提升各模块协作效率。

10.3.1 Agent操作日志记录

文件agent_mcp/utils/audit_utils.py实现了审计日志记录功能,用于记录代理(Agent)的操作行为。它会将审计条目同时添加到内存中的全局审计日志列表、输出到服务器日志(通过logger),并且在调试模式下还会写入到名为agent_audit.log的持久化文件中。每条审计记录包含时间戳、代理ID、操作动作以及详细信息,能帮助追踪和监控代理的活动。

import datetime import json from typing import Dict, Any # 从中央配置导入日志记录器 from ..core.config import logger # 导入全局审计日志列表 from ..core import globals as g # 审计日志文件的名称,与原来的保持一致。 # 该文件将在服务器运行的目录中创建。 # (原始 main.py 第 868 行:with open("agent_audit.log", "a")) AUDIT_LOG_FILE_NAME = "agent_audit.log" # 原始位置:main.py 第 838-850 行(注意:您提示中的行号是针对 auth.py 的) # main.py 中的实际函数 `log_audit` 大约从第 838 行开始。 def log_audit(agent_id: str, action: str, details: Dict[str, Any]) -> None: """ 为代理操作记录审计条目,同时记录到内存列表(g.audit_log)和持久化文件(agent_audit.log)。 原始 main.py 行:约 838-850 行。 """ timestamp = datetime.datetime.now().isoformat() entry = { "timestamp": timestamp, "agent_id": agent_id, "action": action, "details": details # details 预期是一个字典 } # 添加到内存中的全局审计日志 # (原始 main.py 第 844 行:audit_log.append(entry)) if g.audit_log is not None: # 防御性检查,尽管它已经被初始化了 g.audit_log.append(entry) else: logger.warning("全局 audit_log 列表为 None。无法追加内存中的审计条目。") # 记录到主服务器日志(控制台和 mcp_server.log) # (原始 main.py 第 845 行:logger.info(f"AUDIT: {agent_id} - {action} - {json.dumps(details)}")) try: # 尝试将 details 序列化为日志;使用 str() 作为回退。 details_for_logging = json.dumps(details) except TypeError: details_for_logging = str(details) logger.info(f"AUDIT: {agent_id} - {action} - {details_for_logging}") # 写入到持久化审计日志文件(agent_audit.log)——仅在调试模式下 # (原始 main.py 第 847-849 行:with open("agent_audit.log", "a") as f: ...) import os debug_mode = os.environ.get("MCP_DEBUG", "false").lower() == "true" if debug_mode: try: with open(AUDIT_LOG_FILE_NAME, "a", encoding='utf-8') as f: # 文件中的每一行都应该是一个自包含的 JSON 对象。 json.dump(entry, f) f.write("\n") # 在每个 JSON 条目后换行,以提高可读性和解析性 except IOError as e: logger.error(f"写入审计日志文件 '{AUDIT_LOG_FILE_NAME}' 时发生 IO 错误:{e}") except Exception as e: # 捕获文件写入过程中任何其他意外错误 logger.error(f"写入审计日志文件 '{AUDIT_LOG_FILE_NAME}' 时发生意外错误:{e}", exc_info=True)

10.3.2 JSON输入的清理和解析

文件agent_mcp/utils/json_utils.py主要实现了JSON输入的清理和解析功能。其中sanitize_json_input 函数能对各类输入数据(字符串、字节、字典、列表等)进行一系列处理,包括处理隐藏的 Unicode 字符、不当的空白符和换行符等,以尽可能将其解析为合法的 Python 对象;get_sanitized_json_body 函数则是辅助处理 API 请求体,安全地获取并清理请求体中的 JSON 数据,确保后续能正确解析使用,若处理过程中出现问题会抛出包含相关错误信息的 ValueError。

import json import re from typing import Any, Union, Dict, List # 导入集中配置的日志记录器 from ..core.config import logger # from starlette.requests import Request # 示例 def sanitize_json_input(input_data: Union[str, bytes, Dict, List, Any]) -> Union[Dict, List, Any]: # 为 input_data 添加了 bytes 类型 """ 积极清理 JSON 输入,以处理隐藏的 Unicode 字符、 错位的空白符和换行符。 参数: input_data: 可以是字符串、字节(来自 request.body())、 或 Python 对象(字典、列表)。 返回: 正确解析后的 Python 对象(字典、列表等) """ # 如果已经是 Python 对象(字典/列表),直接返回 if isinstance(input_data, (dict, list)): return input_data # 如果是字节,先解码为字符串 if isinstance(input_data, bytes): try: input_data_str = input_data.decode('utf-8') except UnicodeDecodeError: logger.warning("无法将输入数据解码为 UTF-8,尝试用 latin-1。") try: input_data_str = input_data.decode('latin-1') except UnicodeDecodeError as ude: logger.error(f"无法解码输入字节:{ude}") raise ValueError(f"无效的输入字节编码:{ude}") elif isinstance(input_data, str): input_data_str = input_data else: # 如果不是字符串或字节,尝试转换为字符串 try: input_data_str = str(input_data) except Exception as e: logger.error(f"无法将输入转换为字符串:{e}") raise ValueError(f"输入必须是 JSON 字符串、字节或 Python 对象,得到的是 {type(input_data)}") # 步骤 1:首次直接解析尝试 try: return json.loads(input_data_str) except json.JSONDecodeError: pass # 如果直接解析失败,继续清理 # 步骤 2:积极的空白符移除(处理元素之间的 CR/LF/空格) # 移除左大括号/左方括号后的空白符 cleaned = re.sub(r'([\{\[])\s+', r'\1', input_data_str) \# 移除右大括号/右方括号前的空白符 cleaned = re.sub(r'\s+([\}\]])', r'\1', cleaned) # 移除逗号和冒号后的空白符 cleaned = re.sub(r'([:,])\s+', r'\1', cleaned) # 移除逗号前的空白符 cleaned = re.sub(r'\s+(,)', r'\1', cleaned) # 移除可能分隔元素的换行符 cleaned = cleaned.replace('\r\n', '').replace('\n', '').replace('\r', '') # 步骤 3:移除控制字符(不包括制表符 \t) # 使用 repr() 使它们在正则表达式中可见,然后去掉引号。 cleaned = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F]', '', cleaned) # 步骤 4:移除有问题的 Unicode(零宽度空格、BOM、行/段落分隔符) cleaned = re.sub(r'[\u200B-\u200F\uFEFF\u2028\u2029]', '', cleaned) # 步骤 5:尝试解析积极清理后的字符串 try: return json.loads(cleaned) except json.JSONDecodeError as e_cleaned: # 步骤 6:对可能嵌套/转义的 JSON 或其他异常情况的回退处理 try: # 尝试在字符串中找到主要的 JSON 对象/数组 match = re.search(r'^\s*(\{.*\}|\[.*\])\s*$', cleaned, re.DOTALL) if match: return json.loads(match.group(1)) except json.JSONDecodeError: pass # 如果提取的部分也解析失败,继续 except Exception as inner_e: logger.warning(f"清理过程中内部正则/解析回退失败:{inner_e}") pass # 记录最终的失败状态以用于调试 error_excerpt = cleaned[:100] + ('...' if len(cleaned) > 100 else '') logger.error(f"积极的 JSON 解析失败:{e_cleaned},清理后的数据(摘录):{error_excerpt}") raise ValueError(f"即使经过积极清理也无法解析 JSON:{e_cleaned}") # API 请求处理的辅助函数 # 原始位置:main.py 第 126-143 行 async def get_sanitized_json_body(request: Any) -> Union[Dict, List, Any]: # 如果导入了 Starlette,是 'request: Request' try: # 获取原始的请求体数据 raw_body = await request.body() # 通常是字节类型 # 清理并解析它(sanitize_json_input 现在处理字节解码) return sanitize_json_input(raw_body) except ValueError as ve: # 捕获来自 sanitize_json_input 或请求体解码的 ValueError logger.error(f"获取/清理请求体失败:{ve}") raise ValueError(f"无效的请求体:{ve}") # 附带上下文重新抛出 except Exception as e: # 捕获来自 request.body() 的其他潜在异常或意外问题 logger.error(f"处理请求体时出现意外错误:{e}", exc_info=True) raise ValueError(f"处理请求体时出错:{e}") # --- JSON 清理工具结束 ---

10.3.3 tmux交互处理

文件agent_mcp/utils/tmux_utils.py实现了于与 tmux 终端复用工具交互的工具模块,核心功能是通过tmux 管理代理(Agent)的独立终端会话。它提供了一系列函数,包括检查tmux是否可用、清理和生成安全的会话名称、创建/删除/查询tmux会话、向会话发送命令或提示信息(支持异步发送),以及通过tmux会话自动发现、同步活跃代理等。通过与tmux集成,该模块实现了对多代理运行状态的可视化监控、独立环境隔离和生命周期管理,确保每个代理的操作过程可追踪、可调试,提升了多代理协作时的终端管理效率。

(1)下面代码的功能是创建新的tmux会话,用于为代理提供独立的终端环境。原理是先检查tmux是否可用,对会话名称进行安全清理以符合tmux命名规则,确保工作目录存在,再通过subprocess调用tmux命令创建会话,支持设置环境变量和初始执行命令,最终返回创建结果。

def create_tmux_session(session_name: str, working_dir: str, command: str = None, env_vars: Dict[str, str] = None) -> bool: """ 创建具有指定名称和工作目录的新 tmux 会话。 参数: session_name: tmux 会话的名称(将被清理以符合规范) working_dir: 会话的工作目录 command: 可选,在会话中运行的命令 env_vars: 可选,要设置的环境变量 返回: 如果会话创建成功则返回 True,否则返回 False """ # 检查 tmux 是否可用 if not is_tmux_available(): logger.error("当前系统上没有安装 tmux") return False # 清理会话名称,确保符合 tmux 命名要求 clean_session_name = sanitize_session_name(session_name) # 检查会话是否已存在 if session_exists(clean_session_name): logger.warning(f"tmux 会话 '{clean_session_name}' 已存在") return False # 确保工作目录存在 try: Path(working_dir).mkdir(parents=True, exist_ok=True) except OSError as e: logger.error(f"创建工作目录 {working_dir} 失败: {e}") return False try: # 构建 tmux 命令 tmux_cmd = ['tmux', 'new-session', '-d', '-s', clean_session_name, '-c', working_dir] # 如果提供了环境变量,则准备环境 env = None if env_vars: import os env = os.environ.copy() env.update(env_vars) # 如果提供了命令,则添加到 tmux 命令中 if command: tmux_cmd.append(command) # 执行 tmux 命令 result = subprocess.run(tmux_cmd, capture_output=True, text=True, timeout=10, env=env) if result.returncode == 0: logger.info(f"在 {working_dir} 中创建了 tmux 会话 '{clean_session_name}'") return True else: logger.error(f"创建 tmux 会话失败: {result.stderr}") return False except subprocess.TimeoutExpired: logger.error(f"创建 tmux 会话 '{clean_session_name}' 超时") return False except Exception as e: logger.error(f"创建 tmux 会话 '{clean_session_name}' 时出错: {e}") return False

(2)下面代码的功能是管理代理会话的清理工作,删除与活跃代理不对应的孤立tmux会话。原理是先获取所有tmux会话,筛选出符合代理会话命名模式的会话,再与活跃代理ID对比,删除不匹配的会话,确保终端资源不被浪费。

def cleanup_agent_sessions(active_agent_ids: List[str]) -> int: """ 清理与活跃代理不对应的 tmux 会话。 参数: active_agent_ids: 当前活跃的代理 ID 列表 返回: 清理的会话数量 """ # 检查 tmux 是否可用 if not is_tmux_available(): return 0 # 获取所有 tmux 会话 sessions = list_tmux_sessions() cleaned_count = 0 # 清理以 'agent_' 开头但不在活跃代理 ID 中的会话 for session in sessions: session_name = session['name'] # 检查是否为代理会话 if session_name.startswith('agent_') or any(session_name == sanitize_session_name(agent_id) for agent_id in active_agent_ids): # 提取潜在的代理 ID potential_agent_id = session_name.replace('agent_', '') clean_agent_ids = [sanitize_session_name(aid) for aid in active_agent_ids] # 如果会话不属于活跃代理,则清理 if session_name not in clean_agent_ids and potential_agent_id not in active_agent_ids: logger.info(f"清理孤立的代理会话: {session_name}") if kill_tmux_session(session_name): cleaned_count += 1 return cleaned_count

(3)下面代码的功能是从tmux会话中发现活跃代理并同步代理跟踪状态。原理是通过扫描tmux会话,根据管理员令牌后缀识别代理会话,解析出代理ID,再与全局跟踪的代理信息对比,更新或新增跟踪记录,实现代理状态的自动同步。

def sync_agents_from_tmux(admin_token: str) -> Dict[str, Any]: """ 通过从 tmux 会话中发现活跃代理来同步代理跟踪状态。 参数: admin_token: 用于识别会话的管理员令牌 返回: 包含发现的代理和统计信息的同步操作摘要 """ # 从 tmux 会话中发现活跃代理 discovered_agents = discover_active_agents_from_tmux(admin_token) # 此处导入以避免循环导入 from ..core import globals as g # 同步摘要信息 sync_summary = { 'discovered_count': len(discovered_agents), 'discovered_agents': [], 'already_tracked': [], 'newly_tracked': [] } # 处理每个发现的代理 for agent_info in discovered_agents: agent_id = agent_info['agent_id'] session_name = agent_info['session_name'] # 记录发现的代理信息 sync_summary['discovered_agents'].append({ 'agent_id': agent_id, 'session_name': session_name, 'session_attached': agent_info['session_attached'] }) # 检查是否已跟踪该会话 if agent_id in g.agent_tmux_sessions: if g.agent_tmux_sessions[agent_id] == session_name: # 已跟踪且会话名称未变 sync_summary['already_tracked'].append(agent_id) else: # 更新会话名称 g.agent_tmux_sessions[agent_id] = session_name sync_summary['newly_tracked'].append(agent_id) logger.info(f"更新代理 '{agent_id}' 的会话跟踪: {session_name}") else: # 开始跟踪新代理 g.agent_tmux_sessions[agent_id] = session_name sync_summary['newly_tracked'].append(agent_id) logger.info(f"开始跟踪会话 '{session_name}' 中的代理 '{agent_id}'") return sync_summary

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询