本溪市网站建设_网站建设公司_博客网站_seo优化
2025/12/29 19:50:59 网站建设 项目流程

通过API接口,调用防火墙策略,并最终转化为Excel输出

 

import requests
import json
import warnings
import pandas as pd
from requests.packages.urllib3.exceptions import InsecureRequestWarning
from datetime import datetime# 禁用 SSL 警告
warnings.simplefilter('ignore', InsecureRequestWarning)# 统一时间戳生成
timestamp = datetime.now().strftime('%m%d_%H%M')# 获取token后,调用appcontrols/policys接口
def get_policys(host, token):url = f"https://{host}/api/v1/namespaces/@namespace/appcontrols/policys"# 设置请求头,携带token作为Cookieheaders = {"Accept": "*/*",  # 接受任何类型的响应"Accept-Encoding": "gzip, deflate, br",  # 支持的压缩格式"Connection": "keep-alive",  # 保持连接"Content-Type": "application/json",  # 请求数据格式为JSON"User-Agent": "PostmanRuntime-ApipostRuntime/1.1.0",  # 模拟 Postman 请求"Cookie": f"token={token}"  # 把token放入Cookie中}all_items = []  # 用来存储所有页的数据page = 1while True:params = {"_start": (page - 1) * 100,  # 起始位置"_length": 100,  # 每页条数}try:# 发送GET请求,获取策略列表response = requests.get(url, headers=headers, params=params, verify=False)if response.status_code == 200:resp_data = response.json()if resp_data["code"] == 0:# 获取当前页的策略数据items = resp_data.get("data", {}).get("items", [])all_items.extend(items)  # 将当前页的数据添加到all_items中# 保存当前页的数据到文件filename = f"{timestamp}_data_page_{page}.json"with open(filename, 'w', encoding='utf-8') as f:json.dump(resp_data, f, ensure_ascii=False, indent=4)print(f"第 {page} 页数据已保存到 '{filename}'")# 检查是否还有下一页if len(items) < 100:  # 如果当前页的数据量少于100,说明没有更多数据了breakpage += 1  # 继续请求下一页else:print(f"获取策略失败,错误信息: {resp_data['message']}")  # 打印错误信息breakelse:print(f"请求失败,状态码: {response.status_code}, 错误信息: {response.text}")  # 打印HTTP错误breakexcept requests.exceptions.RequestException as e:print(f"请求异常: {e}")  # 捕获请求异常并打印break# 将所有数据合并并保存到一个文件if all_items:all_data = {"data": {"items": all_items}}all_filename = f"{timestamp}_all_data.json"with open(all_filename, 'w', encoding='utf-8') as f:json.dump(all_data, f, ensure_ascii=False, indent=4)return all_items, all_filename  # 返回合并数据和文件名# 登录函数,获取token
def login(host, username, password):url = f"https://{host}/api/v1/namespaces/@namespace/login"  # 使用@namespace固定命名空间# 登录所需的用户名和密码payload = {"name": username,"password": password}# 构建请求头headers = {"Accept": "*/*",  # 接受任何类型的响应"Accept-Encoding": "gzip, deflate, br",  # 支持的压缩格式"Connection": "keep-alive",  # 保持连接"Content-Type": "application/json",  # 请求数据格式为JSON}try:# 发送POST请求以获取tokenresponse = requests.post(url, json=payload, headers=headers, verify=False)if response.status_code == 200:resp_data = response.json()if resp_data["code"] == 0:token = resp_data["data"]["loginResult"]["token"]print("获取到的token:", token)  # 打印获取到的tokenreturn tokenelse:print("登录失败,错误信息:", resp_data["message"])  # 打印错误信息else:print(f"请求失败,状态码: {response.status_code}, 错误信息: {response.text}")  # 打印HTTP错误except requests.exceptions.RequestException as e:print(f"请求异常: {e}")  # 捕获请求异常并打印# 获取网络对象列表,支持分页
def get_ipgroups(host, token):url = f"https://{host}/api/v1/namespaces/@namespace/ipgroups"  # 修改为正确的访问路径# 设置请求头,携带token作为Cookieheaders = {"Accept": "*/*",  # 接受任何类型的响应"Accept-Encoding": "gzip, deflate, br",  # 支持的压缩格式"Connection": "keep-alive",  # 保持连接"Content-Type": "application/json",  # 请求数据格式为JSON"User-Agent": "PostmanRuntime-ApipostRuntime/1.1.0",  # 模拟 Postman 请求"Cookie": f"token={token}"  # 把token放入Cookie中}all_ipgroups = []  # 用来存储所有页的数据page = 1while True:params = {"_start": (page - 1) * 100,  # 起始位置"_length": 100,  # 每页条数}try:# 发送GET请求,获取网络对象列表response = requests.get(url, headers=headers, params=params, verify=False)if response.status_code == 200:resp_data = response.json()if resp_data["code"] == 0:ipgroups = resp_data.get("data", {}).get("items", [])if ipgroups:all_ipgroups.extend(ipgroups)  # 将当前页的数据添加到 all_ipgroups 中# 保存当前页的数据到文件ipgroups_filename = f"{timestamp}_ipgroups_page_{page}.json"with open(ipgroups_filename, 'w', encoding='utf-8') as f:json.dump(resp_data, f, ensure_ascii=False, indent=4)print(f"第 {page} 页网络对象数据已保存到 '{ipgroups_filename}'")else:print(f"第 {page} 页没有获取到网络对象数据。")break# 判断是否有更多页total_pages = resp_data["data"].get("totalPages", 1)if page < total_pages:page += 1  # 继续请求下一页else:print("所有网络对象数据已获取完毕。")breakelse:print(f"获取网络对象失败,错误信息: {resp_data['message']}")breakelse:print(f"请求失败,状态码: {response.status_code}, 错误信息: {response.text}")breakexcept requests.exceptions.RequestException as e:print(f"请求异常: {e}")break# 如果有数据,将所有数据合并并保存到一个文件if all_ipgroups:all_data = {"data": {"items": all_ipgroups}}all_filename = f"{timestamp}_all_ipgroups_data.json"with open(all_filename, 'w', encoding='utf-8') as f:json.dump(all_data, f, ensure_ascii=False, indent=4)print(f"所有网络对象数据已保存到 '{all_filename}'")return all_ipgroups, all_filenamedef update_dst_ipgroups_with_domains(all_data, ipgroup_to_info):"""根据 IP 组的域名或 IP 范围更新 all_data 中的 dstIpGroups 字段。"""try:# 遍历 all_data 中的每个策略数据,更新 dstIpGroupsfor item in all_data.get("data", {}).get("items", []):if 'dst' in item and 'dstAddrs' in item['dst']:dst_ipgroups = item['dst']['dstAddrs'].get('dstIpGroups', [])# 更新 dstIpGroups,拼接域名或 IP 范围updated_dst_ipgroups = []for ipgroup in dst_ipgroups:if ipgroup in ipgroup_to_info:# 构建没有转义字符的格式,手动拼接info_str = ",".join([f"\"{domain}\"" for domain in ipgroup_to_info[ipgroup]])updated_dst_ipgroups.append(f"{ipgroup}[{info_str}]")else:updated_dst_ipgroups.append(ipgroup)# 更新 dstIpGroups 字段item['dst']['dstAddrs']['dstIpGroups'] = updated_dst_ipgroupsreturn all_dataexcept Exception as e:print(f"处理过程中出错: {e}")return all_datadef generate_ipgroup_to_info(ipgroups_filename):"""读取 IP 组数据文件,并构建 IP 组到域名的映射。"""ipgroup_to_info = {}try:# 读取 IP 组文件with open(ipgroups_filename, 'r', encoding='utf-8') as f:ipgroups_data = json.load(f)# 构建 IP 组到域名或 IP 范围的映射for ipgroup in ipgroups_data.get('data', {}).get('items', []):if ipgroup['businessType'] == "DOMAINS":ipgroup_to_info[ipgroup['name']] = ipgroup.get('domains', [])elif ipgroup['businessType'] == "IP":# 你可以扩展 IP 组到 IP 地址的处理ipgroup_to_info[ipgroup['name']] = [f"{ip_range['start']}{' - ' + ip_range['end'] if 'end' in ip_range else ''}"for ip_range in ipgroup.get('ipRanges', [])]except Exception as e:print(f"读取 IP 组数据时出错: {e}")return ipgroup_to_info# 新增导出 Excel 文件功能
def export_to_excel(json_file):# 读取 JSON 文件with open(json_file, 'r', encoding='utf-8') as f:data = json.load(f)# 提取需要的字段 (你可以根据实际需要调整字段)items = data['data']['items']rows = []for item in items:row = {'UUID': item.get('uuid', ''),'Name': item.get('name', ''),'Description': item.get('description', ''),'Group': item.get('group', ''),'Source Zones': ', '.join(item.get('src', {}).get('srcZones', [])),'Source IP Groups': ', '.join(item.get('src', {}).get('srcAddrs', {}).get('srcIpGroups', [])),'Destination Zones': ', '.join(item.get('dst', {}).get('dstZones', [])),'Destination IP Groups': ', '.join(item.get('dst', {}).get('dstAddrs', {}).get('dstIpGroups', [])),'Services': ', '.join(item.get('services', [])),  # 使用 .get() 防止 KeyError'Action': item.get('action', ''),'Schedule': item.get('schedule', ''),'Log Enable': item.get('advanceOption', {}).get('logEnable', ''),'Last Hit Time': item.get('lastHitTime', ''),'Hits': item.get('hits', ''),}rows.append(row)# 转换为 DataFramedf = pd.DataFrame(rows)excel_filename = f"AF_Final_{timestamp}.xlsx"# 导出为 Exceldf.to_excel(excel_filename, index=False, engine='openpyxl')print(f"数据已导出为 Excel 文件: {excel_filename}")# 主程序中的菜单更新
def main():token = Noneall_filename = None  # 初始化保存文件名变量all_data = None  # 新增变量保存所有数据ipgroup_to_info = {}  # 动态生成的字典while True:print("\n请选择操作:")print("1. 登录并获取 token")print("2. 拉取策略列表")print("3. 批量获取网络对象")  # 新增功能print("4. 更新 dstIpGroups 字段")  # 新增功能print("5. 导出为 Excel")  # 新增导出 Excel 功能print("0. 退出")choice = input("请输入数字:")if choice == "1":host = input("请输入主机地址 (默认 10.0.0.1): ") or "10.0.0.1"username = input("请输入用户名 (默认 admin): ") or "admin"password = input("请输入密码 (默认 ******): ") or "admin"# 登录并获取tokentoken = login(host, username, password)elif choice == "2":if token:# 调用接口获取策略列表all_items, all_filename = get_policys(host, token)print(f"所有策略数据已保存到 '{all_filename}'")# 载入all_datawith open(all_filename, 'r', encoding='utf-8') as f:all_data = json.load(f)else:print("请先获取token。")elif choice == "3":if token:ipgroups, ipgroups_filename = get_ipgroups(host, token)if ipgroups:print(f"所有网络对象数据已保存到 '{ipgroups_filename}'")# 动态生成ipgroup_to_info字典ipgroup_to_info = generate_ipgroup_to_info(ipgroups_filename)print("ipgroup_to_info字典已生成。")else:print("请先获取token。")elif choice == "4":if all_data and ipgroup_to_info:# 更新dstIpGroups字段all_data = update_dst_ipgroups_with_domains(all_data, ipgroup_to_info)updated_filename = f"{timestamp}_AF-Final.json"with open(updated_filename, 'w', encoding='utf-8') as f:json.dump(all_data, f, ensure_ascii=False, indent=4)print(f"dstIpGroups字段已更新,结果已保存到 '{updated_filename}'")else:print("请先拉取策略数据和批量获取网络对象数据。")elif choice == "5":if all_filename:# 将 JSON 文件导出为 Excelexport_to_excel(updated_filename)else:print("请先拉取策略数据。")elif choice == "0":print("退出程序")breakelse:print("无效输入,请重新选择。")if __name__ == "__main__":main()

根据你的设备信息,修改主机地址、用户名、密码,即可执行效果:

请选择操作:
1. 登录并获取 token
2. 拉取策略列表
3. 批量获取网络对象
4. 更新 dstIpGroups 字段
5. 导出为 Excel
0. 退出
请输入数字:1
请输入主机地址 (默认 10.0.0.1): 
请输入用户名 (默认 admin): 
请输入密码 (默认 ******): 
获取到的token: 6KILKDU5GLS1***************JP8V70GPSRNEZK***R2V请选择操作:
1. 登录并获取 token
2. 拉取策略列表
3. 批量获取网络对象
4. 更新 dstIpGroups 字段
5. 导出为 Excel
0. 退出
请输入数字:2
第 1 页数据已保存到 '1229_1939_data_page_1.json'
第 2 页数据已保存到 '1229_1939_data_page_2.json'
所有策略数据已保存到 '1229_1939_all_data.json'请选择操作:
1. 登录并获取 token
2. 拉取策略列表
3. 批量获取网络对象
4. 更新 dstIpGroups 字段
5. 导出为 Excel
0. 退出
请输入数字:3
第 1 页网络对象数据已保存到 '1229_1939_ipgroups_page_1.json'
第 2 页网络对象数据已保存到 '1229_1939_ipgroups_page_2.json'
第 3 页网络对象数据已保存到 '1229_1939_ipgroups_page_3.json'
第 4 页网络对象数据已保存到 '1229_1939_ipgroups_page_4.json'
所有网络对象数据已获取完毕。
所有网络对象数据已保存到 '1229_1939_all_ipgroups_data.json'
所有网络对象数据已保存到 '1229_1939_all_ipgroups_data.json'
ipgroup_to_info字典已生成。请选择操作:
1. 登录并获取 token
2. 拉取策略列表
3. 批量获取网络对象
4. 更新 dstIpGroups 字段
5. 导出为 Excel
0. 退出
请输入数字:4
dstIpGroups字段已更新,结果已保存到 '1229_1939_AF-Final.json'请选择操作:
1. 登录并获取 token
2. 拉取策略列表
3. 批量获取网络对象
4. 更新 dstIpGroups 字段
5. 导出为 Excel
0. 退出
请输入数字:5
数据已导出为 Excel 文件: AF_Final_1229_1939.xlsx请选择操作:
1. 登录并获取 token
2. 拉取策略列表
3. 批量获取网络对象
4. 更新 dstIpGroups 字段
5. 导出为 Excel
0. 退出
请输入数字:0
退出程序进程已结束,退出代码为 0

 image

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询