中山市网站建设_网站建设公司_轮播图_seo优化
2025/12/30 21:08:37 网站建设 项目流程

https://www.bilibili.com/video/BV1kWvvB4EQT

feudal/my_python_server

有固定记忆(知识)和短期对话记忆

可以调用工具直接运行py文件

工具是用json定义启用工具功能的时候直接发给llm

模型还是免费的Qwen/Qwen2.5-7B-Instruct

llm_server\memory_llm.py

import tkinter as tk from tkinter import scrolledtext, messagebox, ttk import os import subprocess import re import json import sys from pathlib import Path from llm_server import LLMService def execute_tool(tool_name, *args): """ 执行指定名称的工具 """ # 获取项目根目录,然后定位到 llm_server 目录下的 executor.py current_dir = Path(__file__).parent # llm_server 目录 executor_script = current_dir / "executor.py" if not executor_script.exists(): return "错误: 执行器脚本不存在" cmd = [sys.executable, str(executor_script), tool_name] + list(args) try: result = subprocess.run( cmd, capture_output=True, text=True, timeout=30, encoding='utf-8', # 明确指定UTF-8编码 errors='replace' # 遇到编码错误时替换字符 ) if result.returncode == 0: return result.stdout.strip() else: return f"工具执行失败: {result.stderr.strip()}" except subprocess.TimeoutExpired: return f"工具执行超时: {tool_name}" except Exception as e: return f"执行工具时出错: {str(e)}" def get_available_tools_info(): """ 获取所有可用工具的信息 """ current_dir = Path(__file__).parent # 获取当前文件的目录 config_path = current_dir / "tools_config.json" # 在同级目录中查找配置文件 if not config_path.exists(): return "错误: 工具配置文件不存在" try: with open(config_path, 'r', encoding='utf-8') as f: config = json.load(f) return json.dumps(config, ensure_ascii=False, indent=2) except Exception as e: return f"读取工具配置失败: {str(e)}" def chat_with_memory(user_message, knowledge_file=None, memory_file=None, use_knowledge=True, use_memory=True, use_tools=True): """ 支持固定知识和记忆聊天的函数 """ # 如果没有指定文件路径,则使用当前脚本所在目录下的相应文件 current_dir = os.path.dirname(os.path.abspath(__file__)) if knowledge_file is None: knowledge_file = os.path.join(current_dir, "knowledge.txt") if memory_file is None: memory_file = os.path.join(current_dir, "memory.txt") try: # 创建LLM服务实例 llm_service = LLMService() # 准备消息列表 messages = [] # 如果启用固定知识且知识文件存在,读取固定知识 if use_knowledge and os.path.exists(knowledge_file): with open(knowledge_file, 'r', encoding='utf-8') as f: knowledge_content = f.read() if knowledge_content.strip(): # 将固定知识作为系统提示添加到消息中 messages.append({"role": "system", "content": f"重要知识:\n{knowledge_content}"}) # 如果启用记忆功能且记忆文件存在,读取历史对话 if use_memory and os.path.exists(memory_file): with open(memory_file, 'r', encoding='utf-8') as f: content = f.read() # 解析历史对话,将它们转换为messages格式 history_messages = parse_history_content(content) messages.extend(history_messages) # 添加当前用户消息 messages.append({"role": "user", "content": user_message}) # 如果启用工具功能,添加工具使用提示 if use_tools: tools_info = get_available_tools_info() tool_instruction = f"\n\n注意:如果需要执行特定任务(如打开网页、执行系统命令等),请使用以下工具: {tools_info}\n使用格式: [TOOL:工具名称,参数1,参数2,...]\n请直接输出工具调用格式,不要额外解释。" # 将工具指令附加到用户消息 messages[-1]["content"] += tool_instruction # 调用LLM服务 result = llm_service.create(messages) # 获取AI回复 ai_response = result['choices'][0]['message']['content'] # 如果启用了工具功能,检查AI是否需要执行工具 if use_tools: tool_execution_result = process_tool_calls(ai_response) if tool_execution_result: # 如果有工具执行结果,将其添加到对话历史 messages.append({"role": "assistant", "content": ai_response}) messages.append({"role": "user", "content": f"工具执行结果: {tool_execution_result}"}) # 再次调用LLM以获取最终响应 final_result = llm_service.create(messages) ai_response = final_result['choices'][0]['message']['content'] # 如果启用了记忆功能,则保存到记忆文件(不包括固定知识) if use_memory: try: # 保存到记忆文件(追加模式) with open(memory_file, 'a', encoding='utf-8') as f: f.write(f"用户: {user_message}\n") f.write(f"AI: {ai_response}\n\n") print(f"成功写入记忆文件: {memory_file}") except Exception as e: print(f"写入文件时出错: {e}") return ai_response except Exception as e: error_msg = f"连接LLM服务失败: {str(e)}" print(error_msg) return error_msg def process_tool_calls(response_text): """ 解析AI响应中的工具调用指令 支持格式: [TOOL:工具名称,arg1,arg2] """ # 匹配工具调用模式 tool_pattern = r'\[TOOL:([^\],\]]+)(?:,([^\]]+))?\]' matches = re.findall(tool_pattern, response_text) all_results = [] if matches: for match in matches: tool_name = match[0] # 工具名称 tool_args_str = match[1] if match[1] else "" # 解析参数 tool_args = [] if tool_args_str: # 简单的参数分割(可以根据需要扩展) tool_args = [arg.strip() for arg in tool_args_str.split(',')] # 执行工具 result = execute_tool(tool_name, *tool_args) all_results.append(f"工具 '{tool_name}' 执行结果: {result}") return "\n".join(all_results) if all_results else None def parse_history_content(content): """ 解析历史对话内容,转换为messages格式 """ messages = [] lines = content.strip().split('\n') current_role = None current_content = [] for line in lines: line = line.strip() if line.startswith('用户:'): # 保存之前的角色内容 if current_role and current_content: messages.append({ "role": current_role, "content": '\n'.join(current_content).strip() }) # 开始新的用户消息 current_role = "user" current_content = [line[3:].strip()] # 去掉"用户:"前缀 elif line.startswith('AI:'): # 保存之前的角色内容 if current_role and current_content: messages.append({ "role": current_role, "content": '\n'.join(current_content).strip() }) # 开始新的AI消息 current_role = "assistant" current_content = [line[3:].strip()] # 去掉"AI:"前缀 elif line == "" and current_content: # 空行表示对话段结束 if current_role and current_content: messages.append({ "role": current_role, "content": '\n'.join(current_content).strip() }) current_role = None current_content = [] elif current_role: # 继续当前消息的内容 current_content.append(line) # 处理最后一条消息 if current_role and current_content: messages.append({ "role": current_role, "content": '\n'.join(current_content).strip() }) return messages class MemoryChatApp: def __init__(self, root): self.root = root self.root.title("AI记忆聊天") self.root.geometry("800x600") # 固定知识开关变量 self.use_knowledge_var = tk.BooleanVar(value=True) # 临时记忆开关变量 self.use_memory_var = tk.BooleanVar(value=True) # 工具开关变量 self.use_tools_var = tk.BooleanVar(value=False) # 工具开关的上一个状态,用于检测是否切换 self.prev_tools_state = False # 创建界面 self.setup_ui() # 文件路径 current_dir = os.path.dirname(os.path.abspath(__file__)) self.knowledge_file = os.path.join(current_dir, "knowledge.txt") self.memory_file = os.path.join(current_dir, "memory.txt") # 初始化固定知识文件 self.init_knowledge_file() # 加载历史对话 self.load_history() def init_knowledge_file(self): """初始化固定知识文件""" if not os.path.exists(self.knowledge_file): with open(self.knowledge_file, 'w', encoding='utf-8') as f: f.write("# 固定知识库\n") f.write("# 在这里添加固定信息,如用户偏好、重要上下文等\n") f.write("# 此文件不会被清除,只能手动编辑\n\n") f.write("# 示例:\n") f.write("# 用户姓名: 张三\n") f.write("# 工作领域: 软件开发\n") f.write("# 兴趣爱好: Python编程\n") def setup_ui(self): # 顶部框架 - 开关和按钮 top_frame = tk.Frame(self.root) top_frame.pack(fill=tk.X, padx=10, pady=5) # 固定知识开关复选框 knowledge_check = tk.Checkbutton( top_frame, text="启用固定知识", variable=self.use_knowledge_var ) knowledge_check.pack(side=tk.LEFT, padx=(0, 10)) # 临时记忆开关复选框 memory_check = tk.Checkbutton( top_frame, text="启用临时记忆", variable=self.use_memory_var ) memory_check.pack(side=tk.LEFT, padx=(0, 10)) # 工具开关复选框 tools_check = tk.Checkbutton( top_frame, text="启用工具功能", variable=self.use_tools_var, command=self.on_tools_toggle # 添加切换事件 ) tools_check.pack(side=tk.LEFT, padx=(0, 10)) # 编辑固定知识按钮 edit_knowledge_btn = tk.Button( top_frame, text="编辑固定知识", command=self.edit_knowledge ) edit_knowledge_btn.pack(side=tk.LEFT, padx=(0, 10)) # 清除对话按钮 clear_btn = tk.Button( top_frame, text="清除临时记忆", command=self.clear_memory ) clear_btn.pack(side=tk.RIGHT) # 聊天历史显示区域 self.chat_history = scrolledtext.ScrolledText( self.root, wrap=tk.WORD, state='disabled', height=25 ) self.chat_history.pack(padx=10, pady=5, fill=tk.BOTH, expand=True) # 输入区域框架 input_frame = tk.Frame(self.root) input_frame.pack(padx=10, pady=5, fill=tk.X) # 消息输入框 self.input_text = tk.Text(input_frame, height=5) self.input_text.pack(fill=tk.X, pady=5) # 底部按钮框架 button_frame = tk.Frame(input_frame) button_frame.pack(fill=tk.X) # 发送按钮 send_button = tk.Button( button_frame, text="发送 (Enter)", command=self.send_message ) send_button.pack(side=tk.RIGHT, padx=(5, 0)) # 绑定回车键发送消息(Shift+Enter 换行) self.input_text.bind('<Return>', self.on_enter_key) def on_tools_toggle(self): """工具开关切换事件处理""" current_state = self.use_tools_var.get() # 如果从关闭变为开启,显示工具列表 if current_state and not self.prev_tools_state: tools_info = get_available_tools_info() messagebox.showinfo("可用工具列表", tools_info) # 更新上一个状态 self.prev_tools_state = current_state def on_enter_key(self, event): # 检测是否同时按下Shift键,如果是则换行,否则发送消息 if event.state & 0x1: # Shift键被按下 return # 允许正常的换行操作 else: self.send_message() return "break" # 阻止换行操作 def send_message(self): user_message = self.input_text.get("1.0", tk.END).strip() if not user_message: messagebox.showwarning("警告", "请输入消息内容") return # 清空输入框 self.input_text.delete("1.0", tk.END) # 显示用户消息 self.display_message("用户", user_message) # 获取开关状态 use_knowledge = self.use_knowledge_var.get() use_memory = self.use_memory_var.get() use_tools = self.use_tools_var.get() try: ai_response = chat_with_memory( user_message, self.knowledge_file, self.memory_file, use_knowledge, use_memory, use_tools ) # 显示AI回复 self.display_message("AI", ai_response) except Exception as e: error_msg = f"获取AI回复时出错: {str(e)}" self.display_message("系统", error_msg) def display_message(self, sender, message): self.chat_history.config(state='normal') self.chat_history.insert(tk.END, f"{sender}: {message}\n\n") self.chat_history.config(state='disabled') # 滚动到底部 self.chat_history.see(tk.END) def load_history(self): """加载历史对话到界面""" if os.path.exists(self.memory_file): with open(self.memory_file, 'r', encoding='utf-8') as f: content = f.read() # 解析历史内容并显示 history_messages = parse_history_content(content) for msg in history_messages: role = "用户" if msg["role"] == "user" else "AI" self.display_message(role, msg["content"]) def clear_memory(self): """清除临时记忆""" if messagebox.askyesno("确认", "确定要清除临时记忆吗?\n(固定知识将保留)"): try: # 清空记忆文件 with open(self.memory_file, 'w', encoding='utf-8') as f: f.write("") # 清空界面显示 self.chat_history.config(state='normal') self.chat_history.delete(1.0, tk.END) self.chat_history.config(state='disabled') # 重新加载固定知识相关的对话 self.load_history() messagebox.showinfo("提示", "临时记忆已清除") except Exception as e: messagebox.showerror("错误", f"清除记录时出错: {str(e)}") def edit_knowledge(self): """编辑固定知识""" # 创建新窗口 knowledge_window = tk.Toplevel(self.root) knowledge_window.title("编辑固定知识") knowledge_window.geometry("600x400") # 文本框 text_area = scrolledtext.ScrolledText(knowledge_window, wrap=tk.WORD) text_area.pack(padx=10, pady=10, fill=tk.BOTH, expand=True) # 加载现有知识 if os.path.exists(self.knowledge_file): with open(self.knowledge_file, 'r', encoding='utf-8') as f: content = f.read() text_area.insert(tk.END, content) # 保存按钮 def save_knowledge(): try: with open(self.knowledge_file, 'w', encoding='utf-8') as f: f.write(text_area.get("1.0", tk.END)) messagebox.showinfo("提示", "固定知识已保存") knowledge_window.destroy() except Exception as e: messagebox.showerror("错误", f"保存知识时出错: {str(e)}") save_btn = tk.Button(knowledge_window, text="保存", command=save_knowledge) save_btn.pack(pady=5) def main(): root = tk.Tk() app = MemoryChatApp(root) root.mainloop() if __name__ == "__main__": main()

llm_server\llm_server.py

import os from dotenv import load_dotenv import json import http.client from urllib.parse import urlparse import base64 import requests from io import BytesIO from PIL import Image class LLMService: def __init__(self): # 从环境变量中获取 DeepSeek 参数 # 加载 .env 文件中的环境变量 dotenv_path = r'E:\code\apikey\.env' load_dotenv(dotenv_path) self.api_url = os.getenv('LLM_OPENAI_API_URL') self.model_name = os.getenv('LLM_MODEL_NAME') self.api_key = os.getenv('LLM_OPENAI_API_KEY') # 检查必需的环境变量是否存在 if not self.api_url: raise ValueError("环境变量 'deepseek_OPENAI_API_URL' 未设置或为空") if not self.model_name: raise ValueError("环境变量 'deepseek_MODEL_NAME' 未设置或为空") if not self.api_key: raise ValueError("环境变量 'deepseek_OPENAI_API_KEY' 未设置或为空") print(f"LLM服务初始化完成,模型: {self.model_name}") def create(self, messages, tools=None): print("开始调用LLM服务") # 解析 URL(去掉协议部分) parsed = urlparse(f"{self.api_url}/chat/completions") host, path = parsed.hostname, parsed.path if not host: raise ValueError("API URL 无效,无法解析主机名") # 创建 HTTP 连接 conn = http.client.HTTPSConnection(host) # 构造请求体 request_body = { "model": self.model_name, "messages": messages, "tools": tools, "temperature": 0.9 # 添加温度参数 } # 发送 POST 请求 headers = { "Content-Type": "application/json", "Authorization": f"Bearer {self.api_key}" } conn.request( "POST", path, body=json.dumps(request_body), headers=headers ) # 获取响应 response = conn.getresponse() print(f"LLM服务响应状态码: {response.status}") if response.status != 200: error_msg = response.read().decode('utf-8') raise Exception(f"LLM服务器错误: {response.status} - {error_msg}") # 读取响应内容 response_data = response.read().decode('utf-8') data = json.loads(response_data) # 确保output目录存在 os.makedirs('output', exist_ok=True) # 将响应保存到文件 (修复路径分隔符问题) output_file_path = os.path.join('output', 'formatted_data.json') with open(output_file_path, 'w', encoding='utf-8') as f: json.dump(data, f, indent=4, ensure_ascii=False) # 关闭连接 conn.close() print("LLM服务调用完成") return data class VLMService: def __init__(self): # 从环境变量中获取 VLM 参数 dotenv_path = r'E:\code\my_python_server_private\.env' load_dotenv(dotenv_path) self.api_url = os.getenv('VLM_OPENAI_API_URL') self.model_name = os.getenv('VLM_MODEL_NAME') self.api_key = os.getenv('VLM_OPENAI_API_KEY') # 检查必需的环境变量是否存在 if not self.api_url: raise ValueError("环境变量 'VLM_OPENAI_API_URL' 未设置或为空") if not self.model_name: raise ValueError("环境变量 'VLM_MODEL_NAME' 未设置或为空") if not self.api_key: raise ValueError("环境变量 'VLM_OPENAI_API_KEY' 未设置或为空") print(f"VLM服务初始化完成,模型: {self.model_name}") def encode_image(self, image_source): """ 编码图像为base64字符串 支持URL和本地文件路径 """ try: if image_source.startswith(('http://', 'https://')): # 从URL获取图像 response = requests.get(image_source) image_data = response.content else: # 从本地文件路径获取图像 with open(image_source, "rb") as image_file: image_data = image_file.read() return base64.b64encode(image_data).decode('utf-8') except Exception as e: raise Exception(f"图像编码失败: {str(e)}") def create_with_image(self, messages, image_source=None, tools=None): print("开始调用VLM服务") # 如果提供了图像源,则处理图像 if image_source: # 编码图像 base64_image = self.encode_image(image_source) # 在第一条消息中添加图像 if messages and len(messages) > 0: # 构建包含图像的消息内容 image_content = { "type": "image_url", "image_url": { "url": f"data:image/jpeg;base64,{base64_image}" } } # 如果第一条消息的内容是字符串,转换为列表格式 if isinstance(messages[0]["content"], str): messages[0]["content"] = [ { "type": "text", "text": messages[0]["content"] }, image_content ] # 如果已经是列表格式,追加图像内容 elif isinstance(messages[0]["content"], list): messages[0]["content"].append(image_content) # 解析 URL(去掉协议部分) parsed = urlparse(f"{self.api_url}/chat/completions") host, path = parsed.hostname, parsed.path if not host: raise ValueError("API URL 无效,无法解析主机名") # 创建 HTTP 连接 conn = http.client.HTTPSConnection(host) # 构造请求体 request_body = { "model": self.model_name, "messages": messages, "tools": tools, "temperature": 0.7 } # 发送 POST 请求 headers = { "Content-Type": "application/json", "Authorization": f"Bearer {self.api_key}" } conn.request( "POST", path, body=json.dumps(request_body), headers=headers ) # 获取响应 response = conn.getresponse() print(f"VLM服务响应状态码: {response.status}") if response.status != 200: error_msg = response.read().decode('utf-8') raise Exception(f"VLM服务器错误: {response.status} - {error_msg}") # 读取响应内容 response_data = response.read().decode('utf-8') data = json.loads(response_data) # 确保output目录存在 os.makedirs('output', exist_ok=True) # 将响应保存到文件 output_file_path = os.path.join('output', 'vlm_formatted_data.json') with open(output_file_path, 'w', encoding='utf-8') as f: json.dump(data, f, indent=4, ensure_ascii=False) # 关闭连接 conn.close() print("VLM服务调用完成") return data

llm_server\tools_config.json

{ "tools": [ { "name": "open_browser", "path": "net_server/open_browser.py", "description": "打开指定的网页浏览器", "parameters": [ { "name": "url", "type": "string", "description": "要打开的网页URL" } ] } ] }

llm_server\executor.py

#!/usr/bin/env python3 """ 通用Python脚本执行器 参数: 脚本名称 [脚本参数...] """ import sys import os import subprocess import json from pathlib import Path def execute_python_script(script_path, *args): """ 执行指定路径的Python脚本 """ # 获取项目根目录(从当前脚本位置向上一级) current_dir = Path(__file__).parent.parent # 回到项目根目录 script_full_path = current_dir / script_path if not script_full_path.exists(): return f"错误: 脚本 '{script_path}' 不存在" if script_full_path.suffix != '.py': return f"错误: 文件必须是Python脚本 (.py文件)" try: # 构建命令 cmd = [sys.executable, str(script_full_path)] + list(args) # 执行Python脚本,指定编码为UTF-8 result = subprocess.run( cmd, capture_output=True, text=True, timeout=30, cwd=str(current_dir), encoding='utf-8', # 明确指定UTF-8编码 errors='replace' # 遇到编码错误时替换字符 ) if result.returncode == 0: return result.stdout.strip() else: return f"脚本执行失败: {result.stderr.strip()}" except subprocess.TimeoutExpired: return f"脚本执行超时: {script_path}" except Exception as e: return f"执行脚本时出错: {str(e)}" def list_available_tools(): """ 从配置文件中列出所有可用工具 """ config_path = Path(__file__).parent / "tools_config.json" if not config_path.exists(): return [] try: with open(config_path, 'r', encoding='utf-8') as f: config = json.load(f) return config.get('tools', []) except Exception as e: return [] def get_tool_by_name(tool_name): """ 根据工具名称获取工具信息 """ tools = list_available_tools() for tool in tools: if tool['name'] == tool_name: return tool return None def main(): if len(sys.argv) < 2: print("用法: python executor.py <工具名称> [参数...]") print("\n可用工具:") tools = list_available_tools() for tool in tools: params = ", ".join([p['name'] for p in tool['parameters']]) if tool['parameters'] else "无参数" print(f" - {tool['name']}: {tool['description']} (参数: {params})") sys.exit(1) tool_name = sys.argv[1] tool_args = sys.argv[2:] # 根据工具名称查找对应的脚本路径 tool_info = get_tool_by_name(tool_name) if not tool_info: print(f"错误: 未找到工具 '{tool_name}'") sys.exit(1) script_path = tool_info['path'] result = execute_python_script(script_path, *tool_args) print(result) if __name__ == "__main__": main()

net_server\open_browser.py

#!/usr/bin/env python3 """ 打开浏览器脚本 参数: URL """ import sys import webbrowser def normalize_url(url): """ 标准化URL格式,处理可能的错误格式 """ # 如果URL包含"=",可能是在"="后面才是真正的URL if '=' in url: parts = url.split('=', 1) if len(parts) > 1: url = parts[1] # 取"="后面的部分作为URL # 移除可能的协议前缀重复 url = url.replace('http://http://', 'http://').replace('https://http://', 'https://') url = url.replace('http://https://', 'https://').replace('https://https://', 'https://') # 如果URL没有协议,添加https:// if not url.startswith(('http://', 'https://')): url = 'https://' + url return url def main(): if len(sys.argv) < 2: print("用法: python net_server/open_browser.py <URL>") sys.exit(1) raw_url = sys.argv[1] url = normalize_url(raw_url) try: webbrowser.open(url) print(f"成功打开网页: {url}") except Exception as e: print(f"打开网页失败: {str(e)}") sys.exit(1) if __name__ == "__main__": main()

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询