盘锦市网站建设_网站建设公司_博客网站_seo优化
2025/12/26 7:02:54 网站建设 项目流程

接上篇,来实验一下只用python+cdp 启动操作浏览器

一、python 实现cdp控制浏览器

--remote-debugging-port=9222 --remote-allow-origins=* // 必须添加 --remote-allow-origins=* 参数(或者指定具体来源),否则 Python 脚本通过 WebSocket 连接会被直接拒绝(返回 403)

见证奇迹的时刻

import json import time import requests from websocket import create_connection # 获取所有标签页列表,并取第一个标签页的 WebSocket URL response = requests.get("http://localhost:9222/json") tabs = response.json() if not tabs: print("没有找到打开的标签页,请确保 Chrome 已启动并有标签页。") exit(1) ws_url = tabs[0]["webSocketDebuggerUrl"] print(f"连接到标签页: {tabs[0]['title']} ({tabs[0]['url']})") print(f"WebSocket URL: {ws_url}") # 连接 WebSocket ws = create_connection(ws_url) # 启用 Page 域(必须先启用才能接收事件和使用 navigate) enable_page = {"id": 1, "method": "Page.enable"} ws.send(json.dumps(enable_page)) print("启用 Page 域:", json.loads(ws.recv())) # 导航到百度 navigate_cmd = { "id": 2, "method": "Page.navigate", "params": {"url": "https://www.baidu.com"} } ws.send(json.dumps(navigate_cmd)) response = json.loads(ws.recv()) print("导航响应:", response) # 等待页面加载完成(监听 loadEventFired 事件) print("等待页面加载完成...") while True: msg = json.loads(ws.recv()) print('msg=======',msg) if "method" in msg and msg["method"] == "Page.loadEventFired": print("页面加载完成!") break # 可以打印其他事件(可选) # print("收到事件:", msg) # 等待几秒,让你看到百度页面 time.sleep(5) # 关闭当前标签页 close_cmd = {"id": 3, "method": "Page.close"} ws.send(json.dumps(close_cmd)) print("关闭标签页响应:", json.loads(ws.recv())) # 关闭 WebSocket 连接 ws.close() print("完成!标签页已关闭。")

当然,也有一个三方库:

import PyChromeDevTools # 连接到 Chrome (需提前启动 Chrome 调试端口) chrome = PyChromeDevTools.ChromeInterface() # 启用网络和页面功能 chrome.Network.enable() chrome.Page.enable() # 导航到网页 chrome.Page.navigate(url="https://www.baidu.com/") # 等待页面加载完成 chrome.wait_event("Page.loadEventFired", timeout=60)

我们对这个逻辑,按dp 的逻辑封装一下:

先来分析一下dp 的deriver 相比上面两种,

二、driver 类详细讲解

1.三线程异步架构

  • 接收线程:专门负责从 WebSocket 接收消息,不被任何业务逻辑阻塞
def _recv_loop(self): while self.is_running: try: # self._ws.settimeout(1) msg_json = self._ws.recv() msg = loads(msg_json) except WebSocketTimeoutException: continue except (WebSocketException, OSError, WebSocketConnectionClosedException, JSONDecodeError): self._stop() return if 'method' in msg: if msg['method'].startswith('Page.javascriptDialog'): self.alert_flag = msg['method'].endswith('Opening') function = self.immediate_event_handlers.get(msg['method']) if function: self._handle_immediate_event(function, msg['params']) else: self.event_queue.put(msg) elif msg.get('id') in self.method_results: self.method_results[msg['id']].put(msg)
  • 普通事件处理线程:后台持续处理网络请求、页面事件等
  • 即时事件处理线程:按需启动,处理需要立即响应的事件(如 dialog)
def _handle_event_loop(self): while self.is_running: try: event = self.event_queue.get(timeout=1) except Empty: continue function = self.event_handlers.get(event['method']) if function: function(**event['params']) self.event_queue.task_done() def _handle_immediate_event_loop(self): while not self.immediate_event_queue.empty(): function, kwargs = self.immediate_event_queue.get(timeout=1) try: function(**kwargs) except PageDisconnectedError: pass //两个队列分别处理不同的事务

2.消息分类路由

def_recv_loop(self):# ... 接收消息 ...if'method'inmsg:# 判断:这是事件ifmsg['method'].startswith('Page.javascriptDialog'):self.alert_flag=msg['method'].endswith('Opening')# 特殊处理 Alertfunction=self.immediate_event_handlers.get(msg['method'])iffunction:self._handle_immediate_event(function,msg['params'])# 即时事件else:self.event_queue.put(msg)# 普通事件elifmsg.get('id')inself.method_results:# 判断:这是命令响应self.method_results[msg['id']].put(msg)# 精确投递到等待的命令

先看一下上面执行的日志

连接到标签页: WeTab 新标签页 (chrome-extension://aikflfpejipbpjdlfabpgclhblkpaafo/index.html) WebSocket URL: ws://localhost:9222/devtools/page/AB460ED6AE423BE07C5E5429395C95E8 启用 Page 域: {'id': 1, 'result': {}} 导航响应: {'method': 'Page.frameStartedLoading', 'params': {'frameId': 'AB460ED6AE423BE07C5E5429395C95E8'}} 等待页面加载完成... msg======= {'id': 2, 'result': {'frameId': 'AB460ED6AE423BE07C5E5429395C95E8', 'loaderId': '52220A990F1BFF283061E5299D598198'}} msg======= {'method': 'Page.frameNavigated', 'params': {'frame': {'id': 'AB460ED6AE423BE07C5E5429395C95E8', 'loaderId': '52220A990F1BFF283061E5299D598198', 'url': 'https://www.baidu.com/', 'domainAndRegistry': 'baidu.com', 'securityOrigin': 'https://www.baidu.com', 'mimeType': 'text/html', 'adFrameStatus': {'adFrameType': 'none'}, 'secureContextType': 'Secure', 'crossOriginIsolatedContextType': 'NotIsolated', 'gatedAPIFeatures': []}, 'type': 'Navigation'}} msg======= {'method': 'Page.domContentEventFired', 'params': {'timestamp': 751485.511995}} msg======= {'method': 'Page.loadEventFired', 'params': {'timestamp': 751485.835044}} 页面加载完成! 关闭标签页响应: {'method': 'Page.frameStoppedLoading', 'params': {'frameId': 'AB460ED6AE423BE07C5E5429395C95E8'}} 完成!标签页已关闭。

路由逻辑:

WebSocket 消息到达 | v _recv_loop (接收线程) 立即处理 | +---> 检查是否有紧急处理器? | +---YES---> _handle_immediate_event() | | | v | 立即创建/启动 immediate 线程处理 | | | v | 直接执行 function(**kwargs) | (毫秒级响应) | +---NO----> event_queue.put(msg) | v _handle_event_loop (事件处理线程) | v 轮询取事件,处理 (可能被阻塞)

** 1. 有**method**字段** → 事件 (这是浏览器主动推送的事件通知。表示页面中发生了某件事,不需要发送命令,它会自动返回(前提是已启用对应域)。)

- **是 **`**javascriptDialog**`** → 设置 **`**alert_flag**` - **有即时处理器 → 即时队列** - **否则 → 普通队列**

** 2. 有**id**字段且在等待列表** → 命令响应 (这是主动发送的命令的回复。发送了一个带 id 的命令,浏览器处理完后用相同的 id 回复,表示“这个命令的结果出来了”。)

- 投递到 `method_results[id]` 队列

3.还有一种alert ,会阻塞页面:点击、输入等操作无法执行 ,后面是进行处理:

//1. 监听 Page.javascriptDialogOpening 和 Page.javascriptDialogClosed 事件 //2. 发送 Page.handleJavaScriptDialog CDP 命令去除 alert if msg['method'].startswith('Page.javascriptDialog'): self.alert_flag = msg['method'].endswith('Opening')

处理事件循环的伪代码:

def _immediate_loop(self): """紧急事件处理循环""" while not self.immediate_event_queue.empty(): msg = self.immediate_event_queue.get() handler = self.immediate_event_handlers.get(msg['method']) # {'method': 'Page.frameStartedLoading', 'params': {'frameId': 'AB460ED6AE423BE07C5E5429395C95E8'}} if handler: handler(msg.get('params', {})) def _event_loop(self): """普通事件处理循环""" while True: try: event = self.event_queue.get(timeout=1) except self.event_queue.empty(): time.sleep(0.1) continue handler = self.event_handlers.get(event['method']) if handler: handler(event.get('params', {}))

这两个都会调用回调函数,回调函数会在初始化被设置

def set_callback(self, event, callback, immediate=False): handler = self.immediate_event_handlers if immediate else self.event_handlers if callback: handler[event] = callback else: handler.pop(event, None)

还有发送cdp 命令的接口

def run(self, _method, **kwargs): if not self.is_running: return {'error': 'connection disconnected', 'type': 'connection_error'} timeout = kwargs.pop('_timeout', _S.cdp_timeout) if self.session_id: result = self._send({'method': _method, 'params': kwargs, 'sessionId': self.session_id}, timeout=timeout) else: result = self._send({'method': _method, 'params': kwargs}, timeout=timeout) if 'result' not in result and 'error' in result: kwargs['_timeout'] = timeout return {'error': result['error']['message'], 'type': result.get('type', 'call_method_error'), 'method': _method, 'args': kwargs, 'data': result['error'].get('data')} else: return result['result'] def _send(self, message, timeout=None): self._cur_id += 1 ws_id = self._cur_id message['id'] = ws_id message_json = dumps(message) end_time = perf_counter() + timeout if timeout is not None else None self.method_results[ws_id] = Queue() try: self._ws.send(message_json) if timeout == 0: self.method_results.pop(ws_id, None) return {'id': ws_id, 'result': {}} except (OSError, WebSocketConnectionClosedException): self.method_results.pop(ws_id, None) return {'error': {'message': 'connection disconnected'}, 'type': 'connection_error'} while self.is_running: try: result = self.method_results[ws_id].get(timeout=.2) self.method_results.pop(ws_id, None) return result except Empty: if self.alert_flag and message['method'].startswith(('Input.', 'Runtime.')): return {'error': {'message': 'alert exists.'}, 'type': 'alert_exists'} if timeout is not None and perf_counter() > end_time: self.method_results.pop(ws_id, None) return {'error': {'message': 'alert exists.'}, 'type': 'alert_exists'} \ if self.alert_flag else {'error': {'message': 'timeout'}, 'type': 'timeout'} continue return {'error': {'message': 'connection disconnected'}, 'type': 'connection_error'}

然后开始 chromium 对driver的封装,包括设置ua 等,

我来给你详细讲解 Chromium 类的作用和核心功能。
好的,现在我来给你详细讲解 Chromium 类的整体设计和核心功能。

三、Chromium 类详细讲解

Chromium 类是 DrissionPage 框架中最核心的类之一,它代表整个浏览器进程的管理器。

核心职责

1. 浏览器进程的生命周期管理

启动阶段(line 39-54):

def__new__(cls,addr_or_opts=None,session_options=None):# 第 1 步:处理参数opt=handle_options(addr_or_opts)# 第 2 步:启动浏览器进程或连接到已有浏览器is_headless,browser_id,is_exists,ws_only=run_browser(opt)# 这个就是 ws = s.get(f'http://{chromium_options.address}/json/version', headers={'Connection': 'close'}, timeout=2)# 获取browser_id# 第 3 步:单例模式,避免重复创建同一浏览器withcls._lock:ifbrowser_idincls._BROWSERS:returncls._BROWSERS[browser_id]# 浏览器已存在,直接返回# 第 4 步:新建浏览器对象r=object.__new__(cls)# 省略其他配置cls._BROWSERS[browser_id]=r# 注册到全局浏览器字典returnr

初始化阶段(line 57-121):

def__init__(self,addr_or_opts=None,session_options=None):# 防止重复初始化ifhasattr(self,'_created'):returnself._created=True# 初始化浏览器驱动self._driver=BrowserDriver(self.id,self._ws_address,self)# 监听标签页创建和销毁事件self._run_cdp('Target.setDiscoverTargets',discover=True)## discover=True 表示启用目标发现 向浏览器发送 CDP 命令 Target.setDiscoverTargets ,浏览器会开始主动推送标签页相关的事件消息## 新开的标签页都会给回复self._driver.set_callback('Target.targetDestroyed',self._onTargetDestroyed)# 清理驱动引用 等缓存self._driver.set_callback('Target.targetCreated',self._onTargetCreated)# 初始化下载管理器self._dl_mgr=DownloadManager(self)""" 一个简单的伪代码 class MyBrowser: def __init__(self): self._drivers = {} self._driver = WebSocketDriver() def _init_target_monitor(self): # 第 1 步:启用目标发现 self._driver.run('Target.setDiscoverTargets', discover=True) # 如果不注册这个,即使有下面两个也不会 触发回调。 # 第 2 步:注册销毁回调 self._driver.set_callback('Target.targetDestroyed', self._on_tab_destroyed) # 第 3 步:注册创建回调 self._driver.set_callback('Target.targetCreated', self._on_tab_created) def _on_tab_created(self, targetInfo, **kwargs): """新标签页创建时的处理""" target_id = targetInfo['targetId'] # 为新标签页创建驱动 driver = TabDriver(target_id) self._drivers[target_id] = driver print(f"新标签页创建: {target_id}") def _on_tab_destroyed(self, targetId, **kwargs): """标签页销毁时的处理""" # 清理驱动 driver = self._drivers.pop(targetId, None) if driver: driver.stop() print(f"标签页销毁: {targetId}") """

关闭阶段(line 253-303):

defquit(self,timeout=5,force=False,del_data=False):# 发送 CDP 命令关闭浏览器self._run_cdp('Browser.close')# 停止所有驱动self._driver.stop()# 强制杀死进程ifforce:Process(pid).kill()# 删除用户数据ifdel_data:rmtree(path,True)
2. 标签页(Tab)的管理

获取标签页

@propertydeftab_ids(self):"""获取所有标签页 ID"""return[i['id']foriinself._driver.get(f'http://{self.address}/json').json()]@propertydeflatest_tab(self):"""获取最新创建的标签页"""returnself._get_tab(id_or_num=self.tab_ids[0])

创建标签页(line 188-189, 305-336):

defnew_tab(self,url=None,new_window=False,background=False):returnself._new_tab(True,url=url,new_window=new_window)def_new_tab(self,mix=True,url=None,new_window=False):# 通过 CDP 命令创建新标签页tab=self._run_cdp('Target.createTarget',newWindow=new_window)['targetId']# 创建标签页对象tab=MixTab(self,tab)ifmixelseChromiumTab(self,tab)ifurl:tab.get(url)returntab

查找标签页(line 338-391):

def_get_tab(self,id_or_num=None,title=None,url=None,**kwargs):"""支持多种查找方式:ID、序号、标题、URL"""ifid_or_numisnotNone:ifisinstance(id_or_num,int):returnself.tab_ids[id_or_num]# 按序号else:returnid_or_num# 按 IDeliftitle:return[tfortintabsiftitleint['title']]# 按标题elifurl:return[tfortintabsifurlint['url']]# 按 URL

关闭标签页(line 200-226):

defclose_tabs(self,tabs_or_ids,others=False):# 支持关闭指定标签页或其他标签页fortabintabs:self._run_cdp('Target.closeTarget',targetId=tab.tab_id)
3. Frame(iframe)的管理
self._frames={}# 存储所有 frame 的映射关系

当新标签页创建时,自动跟踪其包含的所有 frame:

def_onTargetCreated(self,**kwargs):tab_id=kwargs['targetInfo']['targetId']self._frames[tab_id]=tab_id# 记录 frame 所属的标签页
4. 驱动管理
self._drivers={}# 待分配的驱动self._all_drivers={}# 所有活跃的驱动

为每个标签页创建独立的驱动:

def_get_driver(self,tab_id,owner=None):"""为标签页获取或创建驱动"""d=self._drivers.pop(tab_id,None)ifnotd:d=Driver(tab_id,self._ws_address)# 创建新驱动d.owner=owner self._all_drivers.setdefault(tab_id,set()).add(d)returnd

四、使用流程日志

# 1. 创建或连接浏览器browser=Chromium()# 自动启动浏览器或自动分配端口# 2. 创建新标签页tab=browser.new_tab('https://example.com')""" def new_tab(xxx): ## ...省略很多代码: tab = self._run_cdp('Target.createTarget', **kwargs)['targetId'] tab = tab_type(self, tab) # 输出 如下,即先返回 id ,再包装对象 ,优雅 0ED87E5977F65C3A170E9CC794439AE0 <MixTab browser_id=9027e9ce-c259-4e38-8e6a-604cbe5d92c3 tab_id=0ED87E5977F65C3A170E9CC794439AE0> """# 3. 获取现有标签页tab=browser.get_tab(url='example')# 按 URL 查找tab=browser.latest_tab# 最新标签页# 4. 关闭标签页browser.close_tabs(tab)# 5. 关闭浏览器browser.quit()

更多文章,敬请关注gzh:零基础爬虫第一天

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询