梅州市网站建设_网站建设公司_博客网站_seo优化
2025/12/18 0:19:44 网站建设 项目流程

最近Vibe Code在各种技术社区刷屏,不过说实话,在日常工作中,我更多是用LLM来生成文档、批量修改代码或者排查问题。毕竟业务需求嘛,很少有能一次性描述清楚的(懂的都懂哈哈~)。但在看了最新的SWE-Bench Pro评测后,我决定尝试一下端到端的AI编程体验。

前两章我们讨论了JupyterAgent,当时用的是E2B的代码沙箱。这次我决定自己动手,用字节的TRAE从头构建一个Python代码沙箱,并加入MCP支持。完整代码已经开源在simple_sandbox,Star is Welcomed!

个人体感TRAE在国内Coding IDE里算Very Good,比开源的Cline,Kilo等在Token使用上效率更高,但和cursor还有距离。如何最大化Token使用效率和效果是个系统工程问题~

Vibe Coding实战:从零构建代码沙箱

本来想完整分享整个Vibe Coding过程的,结果TRAE升级把历史对话记录清空了(哭)。那就跟大家分享一下我的操作思路和最终成果吧!

我在使用AI IDE时通常有两种策略:

模型主导:让模型先做整体规划,我人工调整后让模型执行,依赖模型的测试文件和执行报错进行迭代优化

人工主导:我自己拆解任务,让模型逐步实现,每一步都进行人工校验

如何选择这两种模式完全取决于我对于整个项目是否有很强的先验思考,哈哈就是我知道怎么做的我带着模型做,我不知道的模型带着我做。

这次的任务因为有E2B的SDK可以参考,我选择了人工主导的方式,将任务拆解成几个明确的步骤:

沙箱核心功能:沙箱创建、环境初始化、代码执行

功能完善:结构化输出、预装环境、工作目录隔离、中文字体支持

扩展功能:文件上传、文件获取、超时关闭

服务化:搭建FastAPI服务接口

客户端Demo:创建请求示例

文档撰写:完善的README

最终模型实现的沙箱核心类如下,完整代码请移步Github,在整个编码过程中,我只在功能设计层面做了调整,没有发现任何编码错误:

# 沙箱类

class Sandbox:

def __init__(self, sandbox_id: str, work_dir: str, venv_dir: str):

self.sandbox_id = sandbox_id

self.work_dir = work_dir

self.venv_dir = venv_dir

# 配置KernelManager使用虚拟环境中的Python解释器

self.kernel_manager = KernelManager(

kernel_name='python3',

kernel_spec_manager=self._create_custom_kernel_spec_manager()

)

# 设置环境变量

env = os.environ.copy()

env['VIRTUAL_ENV'] = venv_dir

# 设置内核使用虚拟环境中的Python

self.kernel_manager.start_kernel(

cwd=work_dir,

env=env,

)

self.kernel_client = self.kernel_manager.client()

self.kernel_client.start_channels()

self.kernel_client.wait_for_ready()

self.last_execute_id = 0

# 复制字体文件到沙箱工作目录

font_source_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'SimHei.ttf')

font_dest_path = os.path.join(work_dir, 'SimHei.ttf')

if os.path.exists(font_source_path):

try:

shutil.copy2(font_source_path, font_dest_path)

print(f"字体文件已复制到: {font_dest_path}")

except Exception as e:

print(f"复制字体文件失败: {e}")

else:

print(f"未找到字体文件: {font_source_path}")

# 安装基本包

self._install_basic_packages()

# 执行字体注册代码

self.execute_code(font_code)

def _create_custom_kernel_spec_manager(self):

# 创建自定义的内核规范管理器,确保使用正确的Python环境

from jupyter_client.kernelspec import KernelSpecManager

ksm = KernelSpecManager()

return ksm

def _install_basic_packages(self):

# 在虚拟环境中安装基本包

# 如果是从镜像复制的环境,可能已经包含了基本包,这里可以跳过或仅检查

try:

# 直接使用Linux路径设置

pip_path = os.path.join(self.venv_dir, 'bin', 'pip')

python_exe = os.path.join(self.venv_dir, 'bin', 'python')

# 检查ipykernel是否已安装

check_result = subprocess.run(

[python_exe, '-c', 'import ipykernel'],

capture_output=True,

text=True

)

# 如果ipykernel未安装,则安装

if check_result.returncode != 0:

print("ipykernel未安装,开始安装...")

subprocess.check_call([pip_path, 'install', 'ipykernel'])

except Exception as e:

print(f"安装基础包失败: {e}")

def execute_code(self, code: str) -> Dict:

# 生成执行ID

self.last_execute_id += 1

# 执行代码

msg_id = self.kernel_client.execute(code)

stdout = []

stderr = []

error = None

results = []

# 收集执行结果

while True:

try:

msg = self.kernel_client.get_iopub_msg(timeout=3600)

msg_type = msg['header']['msg_type']

if msg['parent_header'].get('msg_id') != msg_id:

continue

elif msg_type == 'stream':

content = msg['content']

if content['name'] == 'stdout':

stdout.append(content['text'])

elif content['name'] == 'stderr':

stderr.append(content['text'])

elif msg_type == 'error':

content = msg['content']

# 合并error字段,包含ename、evalue和traceback

error = {

'name':ansi_escape.sub('', content['ename']),

'value': ansi_escape.sub('',content['evalue']),

'traceback': [ansi_escape.sub('', i) for i in content['traceback']]

}

elif msg_type == 'execute_result':

# 处理执行结果,按照{type,data}格式存储

data = msg['content']['data']

for data_type, data_value in data.items():

results.append({"type": data_type, "data": data_value})

elif msg_type == 'display_data':

# 处理显示数据,按照{type,data}格式存储

data = msg['content']['data']

for data_type, data_value in data.items():

results.append({"type": data_type, "data": data_value})

elif msg_type == 'execute_reply':

break

elif msg_type == 'status':

if msg['content']['execution_state'] == 'idle':

break

except Exception:

break

return {

'stdout': [ansi_escape.sub('', i) for i in stdout],

'stderr': [ansi_escape.sub('', i) for i in stderr],

'error': error,

'results': results

}

def upload_file(self, file: UploadFile, file_path: Optional[str] = None) -> str:

# 确定文件保存路径:默认为工作目录最简化code

if file_path:

save_path = os.path.join(self.work_dir, file_path)

else:

save_path = os.path.join(self.work_dir, file.filename)

# 确保目标目录存在

os.makedirs(os.path.dirname(save_path), exist_ok=True)

# 保存文件

with open(save_path, "wb") as f:

shutil.copyfileobj(file.file, f)

return save_path

def get_files(self) -> List[Dict[str, str]]:

files = []

for root, _, filenames in os.walk(self.work_dir):

for filename in filenames:

file_path = os.path.join(root, filename)

relative_path = os.path.relpath(file_path, self.work_dir)

files.append({

'path': relative_path,

'size': os.path.getsize(file_path)

})

return files

def get_file_path(self, file_path: str) -> str:

full_path = os.path.abspath(os.path.join(self.work_dir, file_path))

# 安全检查,确保文件在工作目录内

if not full_path.startswith(os.path.abspath(self.work_dir)):

raise HTTPException(status_code=403, detail="File access denied")

return full_path

def shutdown(self):

try:

self.kernel_client.stop_channels()

self.kernel_manager.shutdown_kernel()

# 清理工作目录

shutil.rmtree(self.work_dir, ignore_errors=True)

# 清理虚拟环境目录

shutil.rmtree(self.venv_dir, ignore_errors=True)

except Exception:

pass

def install_package(self, package_name: str) -> Dict:

"""在沙箱的虚拟环境中安装Python包"""

try:

# 获取虚拟环境中的pip路径(Linux环境)

pip_path = os.path.join(self.venv_dir, 'bin', 'pip')

# 执行pip安装命令

result = subprocess.run(

[pip_path, 'install', package_name],

capture_output=True,

text=True,

timeout=60 # 设置超时时间

)

# 检查安装是否成功

if result.returncode == 0:

return {

'success': True,

'stdout': result.stdout,

'stderr': result.stderr,

'message': f"成功安装包: {package_name}"

}

else:

return {

'success': False,

'stdout': result.stdout,

'stderr': result.stderr,

'message': f"安装包失败: {package_name}"

}

except Exception as e:

return {

'success': False,

'stdout': '',

'stderr': str(e),

'message': f"安装过程出错: {str(e)}"

}

用FastAPI-MCP打造标准化MCP服务

沙箱服务跑通后,我决定将其打包成标准的MCP服务。这时候就轮到FastAPI-MCP出场了!就像我们在解密prompt系列58. MCP - 工具演变 & MCP基础中提到的MCP本身并不是工具,它只是Adapter,而FastAPI-MCP库完美体现了这一特性——它可以将现有的FastAPI工具直接转换成标准MCP服务。

但这里遇到了一个常见问题:大模型对新的library支持不够好。解决方案是使用上下文管理模块,将API接口文档加入上下文。这种方法特别适用于:

这两年的新Library:MCP etc.

大版本更新的Lirabry: 例如ES 7.XX -> Elastic Search 8.XX

image

这样我们就可以引用对应的API文档让TRAE帮我们进一步把服务借助FastAPI-MCP包装成MCP服务,并使用FastMCP给出请求Demo。而FastAPI-MCP的使用也非常简单,只需要添加三行代码就可以完成MCP服务的适配

from fastapi_mcp import FastApiMCP

# 创建并挂载MCP服务器 - 移到所有端点定义之后

mcp = FastApiMCP(app)

mcp.mount_http()

接下来,我把FastMCP的接口文档加入上下文,让模型生成MCP Client来验证服务。但运行时发现MCP服务可以请求成功,list_tool却显示为空。通过观察输入输出来定位问题,TRAE成功找到了问题所在并进行了修复。

image

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询