Navicat密码恢复终极指南:5步快速找回丢失的数据库连接
2026/1/13 14:04:29
编写了一个完整的Python脚本,功能呢主要用于监控Windows远程登录失败事件,并在1小时内同一IP密码错误5次时,通过Windows防火墙封禁该IP24小时:
""" Windows远程登录失败监控与自动封禁脚本 功能:监控RDP登录失败,1小时内同一IP失败5次则自动封禁24小时 """importosimportsysimporttimeimportsqlite3importdatetimeimportsubprocessimportwin32evtlogimportwin32evtlogutilfromcollectionsimportdefaultdictfromthreadingimportThread,Lockimportloggingimportschedule# 配置日志logging.basicConfig(level=logging.INFO,format='%(asctime)s - %(levelname)s - %(message)s',handlers=[logging.FileHandler('rdp_monitor.log'),logging.StreamHandler()])logger=logging.getLogger(__name__)classRDPFailureMonitor:def__init__(self,db_path='rdp_monitor.db'):""" 初始化监控器 Args: db_path: SQLite数据库路径 """self.db_path=db_path self.lock=Lock()self.setup_database()self.setup_firewall_rules()# 配置参数self.max_attempts=5# 最大尝试次数self.time_window=3600# 时间窗口(秒)- 1小时self.ban_duration=86400# 封禁持续时间(秒)- 24小时defsetup_database(self):"""初始化数据库"""withsqlite3.connect(self.db_path)asconn:cursor=conn.cursor()# 创建失败尝试记录表cursor.execute(''' CREATE TABLE IF NOT EXISTS failed_attempts ( id INTEGER PRIMARY KEY AUTOINCREMENT, ip_address TEXT NOT NULL, username TEXT, attempt_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, event_id INTEGER ) ''')# 创建封禁记录表cursor.execute(''' CREATE TABLE IF NOT EXISTS banned_ips ( id INTEGER PRIMARY KEY AUTOINCREMENT, ip_address TEXT NOT NULL UNIQUE, ban_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, ban_duration INTEGER, unban_time TIMESTAMP, reason TEXT ) ''')# 创建索引以提高查询性能cursor.execute('CREATE INDEX IF NOT EXISTS idx_ip_time ON failed_attempts(ip_address, attempt_time)')cursor.execute('CREATE INDEX IF NOT EXISTS idx_unban_time ON banned_ips(unban_time)')conn.commit()defsetup_firewall_rules(self):"""设置防火墙规则组"""try:# 创建自定义防火墙规则组group_name="RDP_BLOCKED_IPS"group_desc="Automatically blocked IPs due to RDP brute force attacks"# 检查规则组是否存在check_cmd=f'netsh advfirewall firewall show rule name="RDP Monitor Group"'result=subprocess.run(check_cmd,shell=True,capture_output=True,text=True)if"确定。"notinresult.stdout:# 创建规则组create_group=f''' netsh advfirewall firewall add rule name="RDP Monitor Group" dir=in action=block program=any description="{group_desc}" group="{group_name}" '''subprocess.run(create_group,shell=True,capture_output=True)logger.info("防火墙规则组创建成功")exceptExceptionase:logger.error(f"设置防火墙规则组失败:{e}")defget_event_logs(self,log_type='Security',event_id=4625):""" 获取Windows事件日志 Args: log_type: 日志类型,默认为Security event_id: 事件ID,4625表示登录失败 Returns: 事件记录列表 """events=[]try:hand=win32evtlog.OpenEventLog(None,log_type)flags=win32evtlog.EVENTLOG_BACKWARDS_READ|win32evtlog.EVENTLOG_SEQUENTIAL_READ total=win32evtlog.GetNumberOfEventLogRecords(hand)logger.info(f"正在读取{total}条安全事件日志...")events_list=win32evtlog.ReadEventLog(hand,flags,0)foreventinevents_list:ifevent.EventID==event_id:event_data={}# 解析事件数据foriteminevent.StringInserts:ifitemand':'initem:key,value=item.split(':',1)event_data[key.strip()]=value.strip()# 提取IP地址和用户名ip_address=event_data.get('Source Network Address','')username=event_data.get('Target User Name','')ifip_addressandip_address!='-':# 过滤掉本地登录events.append({'time':event.TimeGenerated.Format(),'ip':ip_address,'username':username,'event_id':event.EventID,'raw_data':str(event.StringInserts)})win32evtlog.CloseEventLog(hand)exceptExceptionase:logger.error(f"读取事件日志失败:{e}")returneventsdefsave_failed_attempt(self,ip_address,username,event_id):"""保存失败尝试记录到数据库"""try:withsqlite3.connect(self.db_path)asconn:cursor=conn.cursor()cursor.execute(''' INSERT INTO failed_attempts (ip_address, username, attempt_time, event_id) VALUES (?, ?, datetime('now'), ?) ''',(ip_address,username,event_id))conn.commit()logger.info(f"记录失败尝试: IP={ip_address}, 用户={username}")# 检查是否需要封禁self.check_and_block_ip(ip_address)exceptExceptionase:logger.error(f"保存失败尝试记录失败:{e}")defcheck_and_block_ip(self,ip_address):""" 检查IP是否需要封禁 Args: ip_address: 要检查的IP地址 """try:withsqlite3.connect(self.db_path)asconn:cursor=conn.cursor()# 查询1小时内该IP的失败次数query=''' SELECT COUNT(*) as attempts FROM failed_attempts WHERE ip_address = ? AND datetime(attempt_time) > datetime('now', '-1 hour') '''cursor.execute(query,(ip_address,))result=cursor.fetchone()attempts=result[0]ifresultelse0logger.info(f"IP{ip_address}在1小时内失败次数:{attempts}")# 检查是否已经封禁cursor.execute(''' SELECT COUNT(*) FROM banned_ips WHERE ip_address = ? AND unban_time > datetime('now') ''',(ip_address,))already_banned=cursor.fetchone()[0]>0ifattempts>=self.max_attemptsandnotalready_banned:# 需要封禁self.block_ip_with_firewall(ip_address)exceptExceptionase:logger.error(f"检查IP封禁状态失败:{e}")defblock_ip_with_firewall(self,ip_address):""" 使用Windows防火墙封禁IP Args: ip_address: 要封禁的IP地址 """try:rule_name=f"Block_RDP_Attacker_{ip_address.replace('.','_')}"# 创建防火墙规则封禁该IPblock_cmd=f''' netsh advfirewall firewall add rule name="{rule_name}" dir=in action=block protocol=any remoteip={ip_address}description="Blocked due to RDP brute force attack" enable=yes profile=any '''result=subprocess.run(block_cmd,shell=True,capture_output=True,text=True)ifresult.returncode==0:# 保存封禁记录到数据库self.save_ban_record(ip_address)logger.warning(f"成功封禁IP:{ip_address}")# 发送通知(可选)self.send_notification(ip_address)else:logger.error(f"封禁IP失败:{result.stderr}")exceptExceptionase:logger.error(f"执行防火墙命令失败:{e}")defsave_ban_record(self,ip_address):"""保存封禁记录到数据库"""try:withsqlite3.connect(self.db_path)asconn:cursor=conn.cursor()# 计算解封时间ban_time=datetime.datetime.now()unban_time=ban_time+datetime.timedelta(seconds=self.ban_duration)cursor.execute(''' INSERT OR REPLACE INTO banned_ips (ip_address, ban_time, ban_duration, unban_time, reason) VALUES (?, ?, ?, ?, ?) ''',(ip_address,ban_time,self.ban_duration,unban_time,"RDP brute force attack detected"))conn.commit()logger.info(f"保存封禁记录: IP={ip_address}")exceptExceptionase:logger.error(f"保存封禁记录失败:{e}")defunban_expired_ips(self):"""解除过期的封禁"""try:withsqlite3.connect(self.db_path)asconn:cursor=conn.cursor()# 查询已过期的封禁cursor.execute(''' SELECT ip_address FROM banned_ips WHERE unban_time <= datetime('now') ''')expired_ips=cursor.fetchall()for(ip_address,)inexpired_ips:# 删除防火墙规则rule_name=f"Block_RDP_Attacker_{ip_address.replace('.','_')}"unban_cmd=f'netsh advfirewall firewall delete rule name="{rule_name}"'result=subprocess.run(unban_cmd,shell=True,capture_output=True,text=True)ifresult.returncode==0:# 从数据库中删除记录cursor.execute('DELETE FROM banned_ips WHERE ip_address = ?',(ip_address,))logger.info(f"已解封IP:{ip_address}")else:logger.error(f"解封IP失败:{result.stderr}")conn.commit()exceptExceptionase:logger.error(f"解封过期IP失败:{e}")defsend_notification(self,ip_address):"""发送通知(可选功能)"""try:# 可以发送邮件、写入系统日志、调用Webhook等message=f"检测到RDP暴力破解攻击,已封禁IP:{ip_address}"# 写入Windows应用程序事件日志win32evtlogutil.ReportEvent(appName="RDP Monitor",eventID=1001,eventType=win32evtlog.EVENTLOG_WARNING_TYPE,strings=[message],data=None)logger.info(f"已发送通知:{message}")exceptExceptionase:logger.error(f"发送通知失败:{e}")defcleanup_old_records(self):"""清理旧的数据库记录"""try:withsqlite3.connect(self.db_path)asconn:cursor=conn.cursor()# 删除48小时前的失败记录cursor.execute(''' DELETE FROM failed_attempts WHERE datetime(attempt_time) < datetime('now', '-48 hours') ''')deleted=cursor.rowcount conn.commit()ifdeleted>0:logger.info(f"清理了{deleted}条旧的失败记录")exceptExceptionase:logger.error(f"清理旧记录失败:{e}")defmonitor_loop(self):"""监控主循环"""logger.info("开始监控RDP登录失败事件...")whileTrue:try:# 获取新的失败登录事件events=self.get_event_logs()foreventinevents:self.save_failed_attempt(event['ip'],event['username'],event['event_id'])# 每小时清理一次过期封禁self.unban_expired_ips()# 每6小时清理一次旧记录self.cleanup_old_records()# 休眠10秒后继续检查time.sleep(10)exceptKeyboardInterrupt:logger.info("监控已停止")breakexceptExceptionase:logger.error(f"监控循环出错:{e}")time.sleep(30)definstall_as_service():"""安装为Windows服务"""importwin32serviceutil script_path=os.path.abspath(__file__)python_path=sys.executable# 创建服务安装脚本service_script=f''' import sys import os sys.path.insert(0, os.path.dirname(__file__)) from rdp_monitor import RDPFailureMonitor if __name__ == '__main__': monitor = RDPFailureMonitor() monitor.monitor_loop() '''# 保存服务脚本service_file=os.path.join(os.path.dirname(__file__),'rdp_service.py')withopen(service_file,'w',encoding='utf-8')asf:f.write(service_script)# 安装服务service_name="RDPFailureMonitor"service_display_name="RDP Login Failure Monitor"install_cmd=f'"{python_path}" "{service_file}" install --startup auto'subprocess.run(install_cmd,shell=True)print(f"服务 '{service_display_name}' 已安装")defmain():"""主函数"""importargparse parser=argparse.ArgumentParser(description='Windows RDP登录失败监控器')parser.add_argument('--install',action='store_true',help='安装为Windows服务')parser.add_argument('--start',action='store_true',help='启动监控')parser.add_argument('--config',help='配置文件路径')args=parser.parse_args()ifargs.install:install_as_service()else:# 直接运行监控monitor=RDPFailureMonitor()# 立即执行一次检查和清理monitor.unban_expired_ips()monitor.cleanup_old_records()# 启动监控循环monitor.monitor_loop()if__name__=='__main__':# 检查管理员权限try:importctypes is_admin=ctypes.windll.shell32.IsUserAnAdmin()except:is_admin=Falseifnotis_admin:print("请以管理员身份运行此脚本!")# 请求管理员权限ctypes.windll.shell32.ShellExecuteW(None,"runas",sys.executable," ".join(sys.argv),None,1)sys.exit()main()pipinstallpywin32 pipinstallschedule# 以管理员身份运行PowerShellpython rdp_monitor.py--start# 安装服务python rdp_monitor.py--install# 启动服务scstartRDPFailureMonitor# 停止服务scstop RDPFailureMonitor# 查看服务状态scquery RDPFailureMonitor创建config.json:
{"max_attempts":5,"time_window":3600,"ban_duration":86400,"db_path":"C:\\ProgramData\\RDPMonitor\\rdp_monitor.db","log_level":"INFO","notification_email":"admin@example.com","cleanup_interval":21600}这个脚本是一个完整的解决方案,可以有效地保护Windows服务器免受RDP暴力破解攻击。