一、Listener:把 Network 事件拼成一条“请求时间线”
源码位置:_units/listener.py,核心类定义:
classListener(object):"""监听器基类"""1. CDP 连接与基础状态
关键构造:
owner:通常是 Page 或 Frame,内部包含_target_id和browser._ws_address。_driver:通过Driver(self._target_id, self._address)建立到当前目标的 CDP 通道。_running_requests/_running_targets:- 全部 Network 请求计数
- 命中监听条件的“目标请求”计数
_targets/_is_regex/_method/_res_type:监听过滤条件,支持:- URL 白名单/关键字匹配/正则
- 请求方法过滤(GET/POST/…)
- 响应资源类型过滤(
Document、XHR、Fetch等)
2. 配置监听目标:set_targets()
defset_targets(self,targets=True,is_regex=False,method=('GET','POST'),res_type=True):传入list或者true 监听所有3. 启动监听:start()
defstart(self,targets=None,is_regex=None,method=None,res_type=None):...self.clear()ifself.listening:returnself._driver=Driver(self._target_id,self._address)self._driver.session_id=self._driver.run('Target.attachToTarget',targetId=self._target_id,flatten=True)['sessionId']self._driver.run('Network.enable')self._set_callback()self.listening=True主要动作:
- 根据传参更新目标过滤条件(必要时调用
set_targets())。 clear()清空状态。- 若尚未监听,则:
- 创建新的
Driver(CDP 会话) Target.attachToTarget得到sessionIdNetwork.enable打开 Network 事件流_set_callback()注册各类回调- 标记
listening=True
- 创建新的
4. 注册 CDP 回调:_set_callback()
def_set_callback(self):self._driver.set_callback('Network.requestWillBeSent',self._requestWillBeSent)self._driver.set_callback('Network.requestWillBeSentExtraInfo',self._requestWillBeSentExtraInfo)self._driver.set_callback('Network.responseReceived',self._response_received)self._driver.set_callback('Network.responseReceivedExtraInfo',self._responseReceivedExtraInfo)self._driver.set_callback('Network.loadingFinished',self._loading_finished)self._driver.set_callback('Network.loadingFailed',self._loading_failed)可以看到,Listener 关注的是一整条请求生命周期:
requestWillBeSent/requestWillBeSentExtraInforesponseReceived/responseReceivedExtraInfoloadingFinished/loadingFailed
这些原始事件会被整理为一个完整的DataPacket对象,最后推入_caught队列中。
5. 事件拼装:如何变成一个 DataPacket
5.1 请求发出:_requestWillBeSent()
self._request_ids 是一个{}
def_requestWillBeSent(self,**kwargs):self._running_requests+=1p=Noneifself._targetsisTrue://全都要if((self._methodisTrueorkwargs['request']['method']inself._method)and(self._res_typeisTrueorkwargs.get('type','').upper()inself._res_type)):self._running_targets+=1rid=kwargs['requestId']p=self._request_ids.setdefault(rid,DataPacket(self._owner.tab_id,True))p._raw_request=kwargs...else:rid=kwargs['requestId']fortargetinself._targets://正则或者判断if(((self._is_regexandsearch(target,kwargs['request']['url']))or(notself._is_regexandtargetinkwargs['request']['url']))and(self._methodisTrueorkwargs['request']['method']inself._method)and(self._res_typeisTrueorkwargs.get('type','').upper()inself._res_type)):self._running_targets+=1p=self._request_ids.setdefault(rid,DataPacket(self._owner.tab_id,target))p._raw_request=kwargsbreakself._extra_info_ids.setdefault(kwargs['requestId'],{})['obj']=pifpelseFalse关键逻辑:
- 每次收到请求事件,
_running_requests+1。 - 检查是否命中过滤条件(目标 URL / 方法 / 资源类型)。
- 如果命中:
_running_targets+1- 为该
requestId创建一个DataPacket(挂在_request_ids上) - 把原始请求数据记到
DataPacket._raw_request
- 如果命中:
- 同时在
_extra_info_ids中占位,后面额外信息(ExtraInfo)会根据相同requestId合并进来。
5.2 响应头:_response_received()
def_response_received(self,**kwargs):request=self._request_ids.get(kwargs['requestId'],None)//全局获取到这个对象 设置返回值ifrequest:request._raw_response=kwargs['response']request._resource_type=kwargs['type']把响应头和资源类型挂到对应DataPacket上。
5.3 额外信息:_responseReceivedExtraInfo()
def_responseReceivedExtraInfo(self,**kwargs):self._running_requests-=1r=self._extra_info_ids.get(kwargs['requestId'],None)ifr:obj=r.get('obj',None)ifobjisFalse:self._extra_info_ids.pop(kwargs['requestId'],None)elifisinstance(obj,DataPacket):obj._requestExtraInfo=r.get('request',None)obj._responseExtraInfo=kwargs self._extra_info_ids.pop(kwargs['requestId'],None)else:r['response']=kwargs这里把request/response对应的 extraInfo(CDP 提供的更底层网络信息,比如原始头、cookie、认证等)注入到DataPacket中。
5.4 请求成功结束:_loading_finished()
def_loading_finished(self,**kwargs):self._running_requests-=1rid=kwargs['requestId']packet=self._request_ids.get(rid)ifpacket:r=self._driver.run('Network.getResponseBody',requestId=rid)if'body'inr:packet._raw_body=r['body']packet._base64_body=r['base64Encoded']else:packet._raw_body=''packet._base64_body=False......self._request_ids.pop(rid,None)ifpacket:self._caught.put(packet)//是一个queue 。 self._running_targets-=1- 通过
Network.getResponseBody(requestId)获取响应 body:- 字段
body:真实内容(字符串) - 字段
base64Encoded:是否需要 base64 解码
- 字段
- 最后将完整的
DataPacket投入_caught,供外部wait()/steps()消费。
5.5 请求失败:_loading_failed()
类似逻辑,只是填的是FailInfo:
data_packet._raw_fail_info=kwargs data_packet._resource_type=kwargs['type']data_packet.is_failed=True...self._caught.put(data_packet)二、DataPacket / Request / Response:把 CDP 原始事件“整合成人话”
Listener本身只负责“抓包 + 拼装”,真正暴露给用户的,是三个核心数据对象:
DataPacket:一条请求的完整信息Request:请求部分Response:响应部分
1. DataPacket:一条请求的总包装
classDataPacket(object):def__init__(self,tab_id,target):self.tab_id=tab_id self.target=target self.is_failed=False...常用属性:
url/method:
@propertydefurl(self):returnself.request.url@propertydefmethod(self):returnself.request.methodresourceType:来自Network.responseReceived的typerequest/response/fail_info:- 延迟创建对应对象:
@propertydefrequest(self):ifself._requestisNone:self._request=Request(self,self._raw_request['request'],self._raw_post_data)returnself._request@propertydefresponse(self):ifself._responseisNone:self._response=Response(self,self._raw_response,self._raw_body,self._base64_body)returnself._responsewait_extra_info(timeout=None):- 等待 extraInfo 注入完成(部分请求 ExtraInfo 会稍后才到)。
2. Request:请求信息解包
classRequest(object):def__init__(self,data_packet,raw_request,post_data):self._data_packet=data_packet self._request=raw_request self._raw_post_data=post_data self._postData=Noneself._headers=None主要能力:
__getattr__直接透传原始字段,例如request.url/request.method等。headers:合并 ExtraInfo 头(有些头只在 extraInfo 提供):
@propertydefheaders(self):ifself._headersisNone:self._headers=CaseInsensitiveDict(self._request['headers'])ifself.extra_info.headers:h=CaseInsensitiveDict(self.extra_info.headers)fork,vinh.items():ifknotinself._headers:self._headers[k]=vreturnself._headersparams:URL 查询参数解析为 dict。postData:- 优先使用
_raw_post_data(Network.getRequestPostData补回来的)。 - 其次使用原始
postData字段。 - 尝试
json.loads(),失败则原样返回。
- 优先使用
cookies:从 ExtraInfo 的associatedCookies里筛选未被阻止的 cookie。
3. Response:响应信息解包
classResponse(object):def__init__(self,data_packet,raw_response,raw_body,base64_body):self._data_packet=data_packet self._response=raw_response self._raw_body=raw_body self._is_base64_body=base64_body self._body=Noneself._headers=None关键属性:
headers:同样合并 ExtraInfo 的 header。raw_body:原始字符串/body。body:- 如果是 base64,则解码为 bytes。
- 否则尝试当 JSON 解析,不行就原样返回字符串。
这一层把底层 CDP 返回的结构改成了“用起来顺手”的 Python 对象。
三、Listener 的等待与遍历:同步消费异步事件
有了_caught这个队列,Listener提供了两种消费方式:一次性等待和逐步遍历。
1.wait(count=1, timeout=None, fit_count=True, raise_err=None)
defwait(self,count=1,timeout=None,fit_count=True,raise_err=None):ifnotself.listening:raiseRuntimeError(_S._lang.join(_S._lang.NOT_LISTENING))...行为说明:
count=1:- 默认等待至少 1 条数据包,返回
DataPacket对象。
- 默认等待至少 1 条数据包,返回
count>1:- 在超时时间内等到至少 count 条,返回列表。
fit_count=False:- 超时但队列里已经有部分数据时,直接返回已有的全部数据。
timeout=None:- 一直等到条件满足或 driver 不再运行。
当配合网络断言、接口抓包等使用时,这个接口非常直观。
2.steps(count=None, timeout=None, gap=1)
defsteps(self,count=None,timeout=None,gap=1):...whileself._driver.is_runningandself.listening:ifself._caught.qsize()>=gap:yieldself._caught.get_nowait()ifgap==1else[...]...- 这是一个生成器接口,可以逐步迭代抓到的请求:
gap=1:每次 yield 1 条gap=N:每次 yield N 条
count控制总共 yield 的数量。timeout表示“每一步之间最长等待多久”,超时返回False结束。
适合写这种模式:
forpktinlistener.steps(count=10,timeout=5):print(pkt.url,pkt.response.status)四、FrameListener:专注某个 Frame 的网络请求
FrameListener继承自Listener,只改了两处方法:
classFrameListener(Listener):def_requestWillBeSent(self,**kwargs):ifnotself._owner._is_diff_domainandkwargs.get('frameId',None)!=self._owner._frame_id:returnsuper()._requestWillBeSent(**kwargs)def_response_received(self,**kwargs):ifnotself._owner._is_diff_domainandkwargs.get('frameId',None)!=self._owner._frame_id:returnsuper()._response_received(**kwargs)含义很简单:
- 如果 frame 没有跨域(
_is_diff_domain=False),就只处理frameId == 当前 frame 的 _frame_id的请求。 - 这样可以把一个页面中不同 iframe 的请求区分开,便于精准监听子页面的网络行为。
五、Console:监听浏览器控制台输出
再看Console,定位在_units/console.py第 14 行:
classConsole(object):def__init__(self,owner):self._owner=owner self._caught=Noneself._not_enabled=Trueself.listening=False角色很清晰:监听Console.messageAdded事件,并把每一条 console 消息封装成ConsoleData。
1. 启动与停止
defstart(self):self._caught=Queue(maxsize=0)self._owner._driver.set_callback("Console.messageAdded",self._console)ifself._not_enabled:self._owner._run_cdp("Console.enable")self._not_enabled=Falseself.listening=Truedef_console(self,**kwargs):self._caught.put(ConsoleData(kwargs['message']))defstop(self):ifself.listening:self._owner._driver.set_callback('Console.messageAdded',None)self.listening=Falsestart():- 新建队列
_caught - 注册回调
_console - 若首次开启,则调用
Console.enable打开 CDP 的 Console 域
- 新建队列
stop():- 取消回调,标记停止
清空消息也很简单:
defclear(self):self._caught=Queue(maxsize=0)2. 同步等待:wait(timeout=None)
defwait(self,timeout=None):ifnotself.listening:raiseRuntimeError(_S._lang.join(_S._lang.NOT_LISTENING))iftimeoutisNone:whileself._owner._driver.is_runningandself.listeningandnotself._caught.qsize():sleep(.03)returnself._caught.get_nowait()ifself._caught.qsize()elseNoneelse:end=perf_counter()+timeoutwhileself._owner._driver.is_runningandself.listeningandperf_counter()<end:ifself._caught.qsize():returnself._caught.get_nowait()sleep(0.05)returnFalse行为:
timeout=None:- 一直等到出现一条 console 消息,然后返回一个
ConsoleData - 若中途 driver 停止或监听被关,可能返回
None
- 一直等到出现一条 console 消息,然后返回一个
timeout>0:- 在指定时间内等一条消息,超时返回
False
- 在指定时间内等一条消息,超时返回
3. 迭代消费:steps(timeout=None)
defsteps(self,timeout=None):iftimeoutisNone:whileself._owner._driver.is_runningandself.listening:ifself._caught.qsize():yieldself._caught.get_nowait()sleep(0.05)else:end=perf_counter()+timeoutwhileself._owner._driver.is_runningandself.listeningandperf_counter()<end:ifself._caught.qsize():yieldself._caught.get_nowait()end=perf_counter()+timeout sleep(0.05)returnFalse和Listener.steps()类似,只不过每次 yield 的是单条ConsoleData。
其实和listener 思路差不多,简化了一点,优雅。
六、结语
这一轮把 DrissionPage 里几块重点基本看了一遍:从Driver 到 元素对象的构造(ChromiumElement、三类 ID 的互转和懒加载),到网络层监听(Listener/FrameListener/DataPacket),再到控制台输出监听(Console/ConsoleData),基本把页面结构、网络流量、前端日志这三条主线都串了起来。
如果跟着源码走了一遍,现在对 CDP 大概会有这样几层认识:
- DOM 域(
DOM.*)负责节点的查找、描述、属性与结构; - Runtime 域(
Runtime.*)负责在具体对象上执行 JS,并把返回值再包装回 Python; - Network 域(
Network.*)把一次 HTTP 请求拆成多个事件,DrissionPage 再拼成可用的数据对象; - 其它域(
Console.*、Page.*、IO.*等)则分别承担日志、资源获取、截图等更上层的能力。
DrissionPage 做的事情,是在这些零散的 CDP 能力之上,再加了一层稳定、可组合、语义化的封装。在代码里只看到.ele().click()、listener.wait()、console.messages这样的调用,而不是满屏的Runtime.callFunctionOn、Network.getResponseBody。
如果还想继续往下,其实还有不少有意思的模块可以研究,比如:
- 行为类:
Clicker、ElementScroller、ElementWaiter以及对应的cdp。 - 状态与几何:
ElementStates、ElementRect以及对应的cdp。 - 页面级工具:下载器、 screencast、cookies 管理等,如何在多 target、多 frame 场景下保持一致行为。
更多文章,敬请关注gzh:零基础爬虫第一天