孝感市网站建设_网站建设公司_模板建站_seo优化
2026/1/9 1:17:52 网站建设 项目流程

1.导出数据库指定表的所有字段(含有字段注释)和数据

导出结果如下

#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ MySQL数据导出工具 - 修复元组索引问题 """ import pandas as pd import pymysql import openpyxl from openpyxl.utils import get_column_letter from openpyxl.styles import Font, PatternFill, Alignment, Border, Side import sys from datetime import datetime import os import warnings warnings.filterwarnings('ignore') def get_db_connection(): """建立数据库连接""" try: connection = pymysql.connect( host='数据库ip或链接', port=3306, user='ebs_read', password='数据库密码', database='数据库名称, charset='utf8mb4', autocommit=True ) print(f"✅ 成功连接到数据库") return connection except Exception as e: print(f"❌ 数据库连接失败: {e}") sys.exit(1) def get_table_columns_with_comments(connection, table_name): """获取表的字段名和注释 - 修复元组索引问题""" try: cursor = connection.cursor() # 方法1:使用INFORMATION_SCHEMA query = """ SELECT COLUMN_NAME, COLUMN_COMMENT, DATA_TYPE, ORDINAL_POSITION FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = DATABASE() AND TABLE_NAME = %s ORDER BY ORDINAL_POSITION """ cursor.execute(query, (table_name,)) columns = cursor.fetchall() cursor.close() # 处理返回的元组数据 formatted_columns = [] for col in columns: # col是元组,按查询顺序访问:0=COLUMN_NAME, 1=COLUMN_COMMENT, 2=DATA_TYPE, 3=ORDINAL_POSITION formatted_columns.append({ 'COLUMN_NAME': col[0], 'COLUMN_COMMENT': col[1] if col[1] else '', 'DATA_TYPE': col[2], 'ORDINAL_POSITION': col[3] }) return formatted_columns except Exception as e: print(f"❌ 获取字段信息失败,尝试备用方法: {e}") # 方法2:使用SHOW FULL COLUMNS try: cursor = connection.cursor() cursor.execute(f"SHOW FULL COLUMNS FROM `{table_name}`") columns = cursor.fetchall() cursor.close() # SHOW FULL COLUMNS返回的字段顺序需要查看 # 通常顺序是:Field, Type, Collation, Null, Key, Default, Extra, Privileges, Comment formatted_columns = [] for i, col in enumerate(columns): formatted_columns.append({ 'COLUMN_NAME': col[0], # Field 'COLUMN_COMMENT': col[8] if len(col) > 8 else '', # Comment 'DATA_TYPE': str(col[1]).split('(')[0].split()[0] if col[1] else '', # Type 'ORDINAL_POSITION': i + 1 }) return formatted_columns except Exception as e2: print(f"❌ 备用方法也失败: {e2}") return [] def get_table_data(connection, table_name, columns_info, limit=None): """获取表数据""" try: cursor = connection.cursor() column_names = [col['COLUMN_NAME'] for col in columns_info] select_columns = ', '.join([f"`{col}`" for col in column_names]) query = f"SELECT {select_columns} FROM `{table_name}`" if limit: query += f" LIMIT {limit}" cursor.execute(query) data = cursor.fetchall() cursor.close() # 创建表头:注释:字段名 column_headers = [] for col in columns_info: comment = col['COLUMN_COMMENT'] if col['COLUMN_COMMENT'] else col['COLUMN_NAME'] column_headers.append(f"{comment}:{col['COLUMN_NAME']}") # 将元组列表转换为列表列表 data_list = [list(row) for row in data] df = pd.DataFrame(data_list, columns=column_headers) print(f"✅ 成功读取 {len(data)} 行数据") return df except Exception as e: print(f"❌ 查询数据失败: {e}") import traceback traceback.print_exc() return None def export_to_excel(df, table_name, output_dir='/Users/gongbei/mydoc/ms-doc/数据迁移/data/数据导出'): """导出DataFrame到Excel""" if not os.path.exists(output_dir): os.makedirs(output_dir) print(f"📁 创建导出目录: {output_dir}") timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') filename = f"{table_name}_{timestamp}.xlsx" filepath = os.path.join(output_dir, filename) try: # 确保所有数据都是字符串格式,避免Excel格式问题 df = df.astype(str) with pd.ExcelWriter(filepath, engine='openpyxl') as writer: df.to_excel(writer, sheet_name='Data', index=False) workbook = writer.book worksheet = writer.sheets['Data'] # 调整列宽 for column in worksheet.columns: max_length = 0 column_letter = get_column_letter(column[0].column) for cell in column: try: cell_value = str(cell.value) if cell.value is not None else "" if len(cell_value) > max_length: max_length = len(cell_value) except: pass adjusted_width = min(max_length + 2, 100) worksheet.column_dimensions[column_letter].width = adjusted_width # 冻结首行 worksheet.freeze_panes = 'A2' # 设置表头样式 header_font = Font(bold=True, color='FFFFFF', size=11) header_fill = PatternFill(start_color='366092', end_color='366092', fill_type='solid') header_alignment = Alignment(horizontal='center', vertical='center', wrap_text=True) border = Border( left=Side(style='thin'), right=Side(style='thin'), top=Side(style='thin'), bottom=Side(style='thin') ) # 应用表头样式 for cell in worksheet[1]: cell.font = header_font cell.fill = header_fill cell.alignment = header_alignment cell.border = border # 应用数据行边框 for row in worksheet.iter_rows(min_row=2, max_row=worksheet.max_row, min_col=1, max_col=worksheet.max_column): for cell in row: cell.border = border print(f"✅ 数据已成功导出到: {filepath}") print(f"📊 文件大小: {os.path.getsize(filepath) / 1024:.2f} KB") return filepath except Exception as e: print(f"❌ 导出Excel失败: {e}") import traceback traceback.print_exc() return None def check_mysql_version(connection): """检查MySQL版本""" try: cursor = connection.cursor() cursor.execute("SELECT VERSION() as version") result = cursor.fetchone() cursor.close() version = result[0] if isinstance(result, tuple) else result print(f"🔍 MySQL版本: {version}") if '5.' in str(version): print("📌 检测到MySQL 5.x") return 5 elif '8.' in str(version): print("📌 检测到MySQL 8.x") return 8 else: print("📌 未知MySQL版本") return 0 except: print("⚠️ 无法获取MySQL版本信息") return 0 def get_available_tables(connection): """获取数据库中的所有表""" try: cursor = connection.cursor() cursor.execute("SHOW TABLES") tables = cursor.fetchall() cursor.close() table_list = [] for table in tables: if isinstance(table, tuple): table_list.append(table[0]) else: table_list.append(table) return sorted(table_list) except: return [] def main(): """主函数""" print("=" * 60) print("MySQL数据导出工具") print("=" * 60) # 建立连接 print("\n🔗 正在连接数据库...") connection = get_db_connection() try: # 检查MySQL版本 mysql_version = check_mysql_version(connection) # 获取所有表名 all_tables = get_available_tables(connection) if all_tables: print(f"\n📋 数据库中有 {len(all_tables)} 个表") if len(all_tables) <= 20: print("可用表:") for i, table in enumerate(all_tables, 1): print(f" {i:3d}. {table}") else: print("前20个表:") for i, table in enumerate(all_tables[:20], 1): print(f" {i:3d}. {table}") print(f" ... 还有 {len(all_tables) - 20} 个表") # 获取表名 if len(sys.argv) > 1: table_name = sys.argv[1] print(f"\n📝 导出表: {table_name}") else: table_name = input("\n📝 请输入要导出的表名: ").strip() if not table_name: print("❌ 请输入有效的表名") return # 检查表是否存在 if table_name not in all_tables: print(f"❌ 表 '{table_name}' 不存在") print(f"💡 请输入正确的表名,可用表见上方列表") return print(f"\n📊 正在获取表 '{table_name}' 的字段信息...") columns_info = get_table_columns_with_comments(connection, table_name) if not columns_info: print(f"❌ 未能获取表 '{table_name}' 的字段信息") return print(f"✅ 找到 {len(columns_info)} 个字段") # 显示字段预览 print("\n📋 字段预览:") for i, col in enumerate(columns_info[:15], 1): comment = col['COLUMN_COMMENT'] display_comment = comment if comment else col['COLUMN_NAME'] print(f" {i:2d}. {col['COLUMN_NAME']:25s} - {display_comment[:40]}{'...' if len(display_comment) > 40 else ''}") if len(columns_info) > 15: print(f" ... 还有 {len(columns_info) - 15} 个字段") # 询问是否限制行数 limit_input = input("\n🔢 要限制导出的行数吗?(直接回车导出全部,或输入数字): ").strip() limit = int(limit_input) if limit_input.isdigit() else None print(f"\n📥 正在读取表 '{table_name}' 的数据...") df = get_table_data(connection, table_name, columns_info, limit) if df is not None and not df.empty: print(f"\n💾 正在导出数据到Excel...") excel_path = export_to_excel(df, table_name) if excel_path: print("\n" + "=" * 60) print("🎉 导出完成!") print(f"📂 文件位置: {excel_path}") print(f"📊 数据行数: {len(df)}") print(f"📋 字段数量: {len(df.columns)}") print(f"🔧 MySQL版本: {mysql_version}") print("=" * 60) # 询问是否打开文件 if sys.platform == 'darwin': # macOS open_file = input("\n是否打开文件所在目录?(y/n): ").strip().lower() if open_file == 'y': os.system(f'open "{os.path.dirname(excel_path)}"') elif sys.platform == 'win32': # Windows open_file = input("\n是否打开文件所在目录?(y/n): ").strip().lower() if open_file == 'y': os.startfile(os.path.dirname(excel_path)) else: print("❌ 没有数据可导出或查询失败") except KeyboardInterrupt: print("\n\n⚠️ 用户中断操作") except Exception as e: print(f"\n❌ 导出过程中出错: {e}") import traceback traceback.print_exc() finally: if connection and hasattr(connection, 'close'): connection.close() print("\n🔌 数据库连接已关闭") if __name__ == "__main__": # 检查依赖 try: import pandas as pd print("✅ pandas 已安装") except ImportError: print("❌ 缺少 pandas,请安装: pip install pandas") sys.exit(1) try: import pymysql print("✅ PyMySQL 已安装") except ImportError: print("❌ 缺少 PyMySQL,正在安装...") import subprocess subprocess.check_call([sys.executable, "-m", "pip", "install", "pymysql"]) print("✅ PyMySQL 安装完成") import pymysql try: import openpyxl print("✅ openpyxl 已安装") except ImportError: print("❌ 缺少 openpyxl,请安装: pip install openpyxl") sys.exit(1) print("\n" + "=" * 60) main()

2.根据指定的sql导出查询字段和对应的注释

#!/usr/bin/env python3 """ MySQL查询工具 - 支持字段注释显示和JSON输出 """ import pymysql import pandas as pd import argparse import sys import re import json from typing import Dict, List, Any from urllib.parse import urlparse, parse_qs class MySQLQueryWithComments: def __init__(self, host, port, user, password, database, charset='utf8mb4'): """直接使用参数连接""" self.connection = pymysql.connect( host=host, port=port, user=user, password=password, database=database, charset=charset, cursorclass=pymysql.cursors.DictCursor, connect_timeout=10 ) self.database = database def get_column_comments(self, table_name: str) -> Dict[str, str]: """获取字段注释""" try: with self.connection.cursor() as cursor: sql = """ SELECT COLUMN_NAME, COLUMN_COMMENT FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = %s AND TABLE_NAME = %s """ cursor.execute(sql, (self.database, table_name)) results = cursor.fetchall() comments = {} for row in results: column_name = row['COLUMN_NAME'] column_comment = row['COLUMN_COMMENT'] or '' comments[column_name] = column_comment return comments except Exception as e: print(f"获取字段注释时出错: {e}") return {} def get_all_column_comments_from_sql(self, sql: str) -> Dict[str, str]: """从SQL中获取所有表的字段注释""" tables = self.extract_tables_from_sql(sql) all_comments = {} for table in tables: try: comments = self.get_column_comments(table) all_comments.update(comments) except Exception as e: print(f"获取表 {table} 的注释时出错: {e}") continue return all_comments def extract_tables_from_sql(self, sql: str) -> List[str]: """从SQL中提取表名""" # 去除注释 sql_clean = re.sub(r'--.*?$|/\*.*?\*/', '', sql, flags=re.MULTILINE|re.DOTALL) sql_clean = ' '.join(sql_clean.split()) # 查找表名 tables = set() # 查找FROM后面的表名 from_pattern = r'\bFROM\s+([\w\.`]+)' from_matches = re.findall(from_pattern, sql_clean, re.IGNORECASE) for match in from_matches: table = match.replace('`', '').split('.')[-1] tables.add(table) # 查找JOIN后面的表名 join_pattern = r'\b(?:LEFT\s+|RIGHT\s+|INNER\s+|OUTER\s+)?JOIN\s+([\w\.`]+)' join_matches = re.findall(join_pattern, sql_clean, re.IGNORECASE) for match in join_matches: table = match.replace('`', '').split('.')[-1] tables.add(table) return list(tables) def execute_query(self, sql: str) -> pd.DataFrame: """执行查询""" try: df = pd.read_sql(sql, self.connection) return df except Exception as e: print(f"执行查询时出错: {e}") import traceback traceback.print_exc() return None def dataframe_to_json_with_comments(self, df: pd.DataFrame, comments_dict: Dict[str, str], indent: int = 2) -> str: """将DataFrame转换为带注释的JSON格式""" if df is None or df.empty: return json.dumps([], indent=indent, ensure_ascii=False) # 创建带注释的数据结构 result = { "metadata": { "row_count": len(df), "column_count": len(df.columns), "columns": [] }, "data": [] } # 添加列信息(包含注释) for col in df.columns: comment = comments_dict.get(col, "") column_info = { "name": col, "comment": comment, "type": str(df[col].dtype) } result["metadata"]["columns"].append(column_info) # 添加数据行 for _, row in df.iterrows(): data_row = {} for col in df.columns: # 处理特殊类型(如datetime, decimal等) value = row[col] if pd.isna(value): data_row[col] = None elif hasattr(value, 'isoformat'): # datetime类型 data_row[col] = value.isoformat() else: data_row[col] = value result["data"].append(data_row) return json.dumps(result, indent=indent, ensure_ascii=False) def dataframe_to_simple_json(self, df: pd.DataFrame, comments_dict: Dict[str, str], indent: int = 2) -> str: """将DataFrame转换为简单JSON格式(仅数据)""" if df is None or df.empty: return json.dumps([], indent=indent, ensure_ascii=False) # 转换数据为列表 data = df.to_dict('records') # 处理特殊类型 for item in data: for key, value in item.items(): if pd.isna(value): item[key] = None elif hasattr(value, 'isoformat'): # datetime类型 item[key] = value.isoformat() return json.dumps(data, indent=indent, ensure_ascii=False) def dataframe_to_enhanced_json(self, df: pd.DataFrame, comments_dict: Dict[str, str], indent: int = 2) -> str: """将DataFrame转换为增强JSON格式(字段名包含注释)""" if df is None or df.empty: return json.dumps([], indent=indent, ensure_ascii=False) # 创建新的列名映射(字段名_注释) column_mapping = {} new_columns = [] for col in df.columns: comment = comments_dict.get(col, "") if comment: new_name = f"{col}__{comment}" # 使用双下划线分隔 else: new_name = col column_mapping[col] = new_name new_columns.append(new_name) # 创建结果 result = { "column_mapping": column_mapping, "columns": new_columns, "data": [] } # 添加数据 for _, row in df.iterrows(): data_row = {} for col in df.columns: new_name = column_mapping[col] value = row[col] if pd.isna(value): data_row[new_name] = None elif hasattr(value, 'isoformat'): data_row[new_name] = value.isoformat() else: data_row[new_name] = value result["data"].append(data_row) return json.dumps(result, indent=indent, ensure_ascii=False) def close(self): """关闭连接""" if self.connection: self.connection.close() def save_to_file(content: str, filename: str): """保存内容到文件""" try: with open(filename, 'w', encoding='utf-8') as f: f.write(content) print(f"✓ 结果已保存到文件: {filename}") except Exception as e: print(f"保存文件时出错: {e}") def main(): parser = argparse.ArgumentParser(description='MySQL查询工具 - 显示字段注释并输出JSON') # 连接参数 parser.add_argument('--host', default='127.0.0.1', help='数据库主机') parser.add_argument('--port', type=int, default=3306, help='数据库端口') parser.add_argument('--user', required=True, help='数据库用户') parser.add_argument('--password', required=True, help='数据库密码') parser.add_argument('--database', default='mydatabase', help='数据库名') # SQL参数 parser.add_argument('--sql', help='SQL查询语句') parser.add_argument('--sql-file', help='SQL文件路径') # 输出参数 parser.add_argument('--json-format', choices=['standard', 'simple', 'enhanced'], default='standard', help='JSON格式类型') parser.add_argument('--indent', type=int, default=2, help='JSON缩进空格数') parser.add_argument('--output', help='输出文件路径') parser.add_argument('--limit', type=int, default=0, help='限制返回行数(0表示无限制)') args = parser.parse_args() # 读取SQL if args.sql_file: with open(args.sql_file, 'r', encoding='utf-8') as f: sql = f.read().strip() elif args.sql: sql = args.sql else: print("错误: 请提供SQL语句或SQL文件") parser.print_help() sys.exit(1) # 添加LIMIT子句 if args.limit > 0 and 'LIMIT' not in sql.upper(): sql = f"{sql.rstrip(';')} LIMIT {args.limit}" print(f"执行的SQL: {sql[:100]}..." if len(sql) > 100 else f"执行的SQL: {sql}") print("-" * 80) try: # 创建连接 query_tool = MySQLQueryWithComments( host=args.host, port=args.port, user=args.user, password=args.password, database=args.database ) print(f"✓ 成功连接到数据库: {args.database}") # 获取字段注释 print("正在获取字段注释...") comments = query_tool.get_all_column_comments_from_sql(sql) print(f"获取到 {len(comments)} 个字段的注释") # 执行查询 print("正在执行查询...") df = query_tool.execute_query(sql) if df is None: print("❌ 查询执行失败") query_tool.close() sys.exit(1) if df.empty: print("⚠️ 查询结果为空") json_output = json.dumps([], indent=args.indent, ensure_ascii=False) else: print(f"✓ 查询成功,返回 {len(df)} 行,{len(df.columns)} 列") # 根据选择的格式生成JSON if args.json_format == 'simple': json_output = query_tool.dataframe_to_simple_json(df, comments, args.indent) print("📋 输出格式: 简单JSON (仅数据)") elif args.json_format == 'enhanced': json_output = query_tool.dataframe_to_enhanced_json(df, comments, args.indent) print("📋 输出格式: 增强JSON (字段名包含注释)") else: # standard json_output = query_tool.dataframe_to_json_with_comments(df, comments, args.indent) print("📋 输出格式: 标准JSON (包含元数据)") # 输出或保存结果 if args.output: save_to_file(json_output, args.output) else: print("\n" + "=" * 80) print("JSON 输出:") print("=" * 80) print(json_output) query_tool.close() except pymysql.Error as e: print(f"❌ 数据库连接错误: {e}") sys.exit(1) except Exception as e: print(f"❌ 程序执行错误: {e}") import traceback traceback.print_exc() sys.exit(1) if __name__ == "__main__": # 交互式输入版本 if len(sys.argv) == 1: print("=== MySQL查询工具(交互式)===") host = input("数据库主机 (127.0.0.1): ") or "127.0.0.1" port = input("数据库端口 (3306): ") or "3306" user = input("数据库用户: ") password = input("数据库密码: ") database = input("数据库名 (mydatabase): ") or "mydatabase" print("\n请选择JSON格式:") print("1. 标准格式 (包含元数据和注释)") print("2. 简单格式 (仅数据)") print("3. 增强格式 (字段名包含注释)") format_choice = input("选择 (1-3, 默认为1): ") or "1" json_formats = {'1': 'standard', '2': 'simple', '3': 'enhanced'} json_format = json_formats.get(format_choice, 'standard') limit = input("限制返回行数 (0表示无限制): ") or "0" print("\n请输入SQL查询语句 (输入完成后按Ctrl+D结束):") sql_lines = [] while True: try: line = input() sql_lines.append(line) except EOFError: break sql = ' '.join(sql_lines) if not sql.strip(): print("错误: SQL不能为空") sys.exit(1) output_file = input("\n输出文件路径 (直接回车输出到屏幕): ") # 设置命令行参数 args_list = [ sys.argv[0], '--host', host, '--port', port, '--user', user, '--password', password, '--database', database, '--json-format', json_format, '--limit', limit, '--sql', sql ] if output_file: args_list.extend(['--output', output_file]) sys.argv = args_list main()

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询