锦州市网站建设_网站建设公司_Banner设计_seo优化
2025/12/26 7:02:55 网站建设 项目流程

一、Listener:把 Network 事件拼成一条“请求时间线”

源码位置:_units/listener.py,核心类定义:

classListener(object):"""监听器基类"""

1. CDP 连接与基础状态

关键构造:

  • owner:通常是 Page 或 Frame,内部包含_target_idbrowser._ws_address
  • _driver:通过Driver(self._target_id, self._address)建立到当前目标的 CDP 通道。
  • _running_requests/_running_targets
    • 全部 Network 请求计数
    • 命中监听条件的“目标请求”计数
  • _targets/_is_regex/_method/_res_type:监听过滤条件,支持:
    • URL 白名单/关键字匹配/正则
    • 请求方法过滤(GET/POST/…)
    • 响应资源类型过滤(DocumentXHRFetch等)

2. 配置监听目标:set_targets()

defset_targets(self,targets=True,is_regex=False,method=('GET','POST'),res_type=True):传入list或者true 监听所有

3. 启动监听:start()

defstart(self,targets=None,is_regex=None,method=None,res_type=None):...self.clear()ifself.listening:returnself._driver=Driver(self._target_id,self._address)self._driver.session_id=self._driver.run('Target.attachToTarget',targetId=self._target_id,flatten=True)['sessionId']self._driver.run('Network.enable')self._set_callback()self.listening=True

主要动作:

  1. 根据传参更新目标过滤条件(必要时调用set_targets())。
  2. clear()清空状态。
  3. 若尚未监听,则:
    • 创建新的Driver(CDP 会话)
    • Target.attachToTarget得到sessionId
    • Network.enable打开 Network 事件流
    • _set_callback()注册各类回调
    • 标记listening=True

4. 注册 CDP 回调:_set_callback()

def_set_callback(self):self._driver.set_callback('Network.requestWillBeSent',self._requestWillBeSent)self._driver.set_callback('Network.requestWillBeSentExtraInfo',self._requestWillBeSentExtraInfo)self._driver.set_callback('Network.responseReceived',self._response_received)self._driver.set_callback('Network.responseReceivedExtraInfo',self._responseReceivedExtraInfo)self._driver.set_callback('Network.loadingFinished',self._loading_finished)self._driver.set_callback('Network.loadingFailed',self._loading_failed)

可以看到,Listener 关注的是一整条请求生命周期:

  • requestWillBeSent/requestWillBeSentExtraInfo
  • responseReceived/responseReceivedExtraInfo
  • loadingFinished/loadingFailed

这些原始事件会被整理为一个完整的DataPacket对象,最后推入_caught队列中。

5. 事件拼装:如何变成一个 DataPacket

5.1 请求发出:_requestWillBeSent()

self._request_ids 是一个{}

def_requestWillBeSent(self,**kwargs):self._running_requests+=1p=Noneifself._targetsisTrue://全都要if((self._methodisTrueorkwargs['request']['method']inself._method)and(self._res_typeisTrueorkwargs.get('type','').upper()inself._res_type)):self._running_targets+=1rid=kwargs['requestId']p=self._request_ids.setdefault(rid,DataPacket(self._owner.tab_id,True))p._raw_request=kwargs...else:rid=kwargs['requestId']fortargetinself._targets://正则或者判断if(((self._is_regexandsearch(target,kwargs['request']['url']))or(notself._is_regexandtargetinkwargs['request']['url']))and(self._methodisTrueorkwargs['request']['method']inself._method)and(self._res_typeisTrueorkwargs.get('type','').upper()inself._res_type)):self._running_targets+=1p=self._request_ids.setdefault(rid,DataPacket(self._owner.tab_id,target))p._raw_request=kwargsbreakself._extra_info_ids.setdefault(kwargs['requestId'],{})['obj']=pifpelseFalse

关键逻辑:

  • 每次收到请求事件,_running_requests+1。
  • 检查是否命中过滤条件(目标 URL / 方法 / 资源类型)。
    • 如果命中:
      • _running_targets+1
      • 为该requestId创建一个DataPacket(挂在_request_ids上)
      • 把原始请求数据记到DataPacket._raw_request
  • 同时在_extra_info_ids中占位,后面额外信息(ExtraInfo)会根据相同requestId合并进来。
5.2 响应头:_response_received()
def_response_received(self,**kwargs):request=self._request_ids.get(kwargs['requestId'],None)//全局获取到这个对象 设置返回值ifrequest:request._raw_response=kwargs['response']request._resource_type=kwargs['type']

把响应头和资源类型挂到对应DataPacket上。

5.3 额外信息:_responseReceivedExtraInfo()
def_responseReceivedExtraInfo(self,**kwargs):self._running_requests-=1r=self._extra_info_ids.get(kwargs['requestId'],None)ifr:obj=r.get('obj',None)ifobjisFalse:self._extra_info_ids.pop(kwargs['requestId'],None)elifisinstance(obj,DataPacket):obj._requestExtraInfo=r.get('request',None)obj._responseExtraInfo=kwargs self._extra_info_ids.pop(kwargs['requestId'],None)else:r['response']=kwargs

这里把request/response对应的 extraInfo(CDP 提供的更底层网络信息,比如原始头、cookie、认证等)注入到DataPacket中。

5.4 请求成功结束:_loading_finished()
def_loading_finished(self,**kwargs):self._running_requests-=1rid=kwargs['requestId']packet=self._request_ids.get(rid)ifpacket:r=self._driver.run('Network.getResponseBody',requestId=rid)if'body'inr:packet._raw_body=r['body']packet._base64_body=r['base64Encoded']else:packet._raw_body=''packet._base64_body=False......self._request_ids.pop(rid,None)ifpacket:self._caught.put(packet)//是一个queue 。 self._running_targets-=1
  • 通过Network.getResponseBody(requestId)获取响应 body:
    • 字段body:真实内容(字符串)
    • 字段base64Encoded:是否需要 base64 解码
  • 最后将完整的DataPacket投入_caught,供外部wait()/steps()消费。
5.5 请求失败:_loading_failed()

类似逻辑,只是填的是FailInfo

data_packet._raw_fail_info=kwargs data_packet._resource_type=kwargs['type']data_packet.is_failed=True...self._caught.put(data_packet)

二、DataPacket / Request / Response:把 CDP 原始事件“整合成人话”

Listener本身只负责“抓包 + 拼装”,真正暴露给用户的,是三个核心数据对象:

  • DataPacket:一条请求的完整信息
  • Request:请求部分
  • Response:响应部分

1. DataPacket:一条请求的总包装

classDataPacket(object):def__init__(self,tab_id,target):self.tab_id=tab_id self.target=target self.is_failed=False...

常用属性:

  • url/method
@propertydefurl(self):returnself.request.url@propertydefmethod(self):returnself.request.method
  • resourceType:来自Network.responseReceivedtype
  • request/response/fail_info
    • 延迟创建对应对象:
@propertydefrequest(self):ifself._requestisNone:self._request=Request(self,self._raw_request['request'],self._raw_post_data)returnself._request@propertydefresponse(self):ifself._responseisNone:self._response=Response(self,self._raw_response,self._raw_body,self._base64_body)returnself._response
  • wait_extra_info(timeout=None)
    • 等待 extraInfo 注入完成(部分请求 ExtraInfo 会稍后才到)。

2. Request:请求信息解包

classRequest(object):def__init__(self,data_packet,raw_request,post_data):self._data_packet=data_packet self._request=raw_request self._raw_post_data=post_data self._postData=Noneself._headers=None

主要能力:

  • __getattr__直接透传原始字段,例如request.url/request.method等。
  • headers:合并 ExtraInfo 头(有些头只在 extraInfo 提供):
@propertydefheaders(self):ifself._headersisNone:self._headers=CaseInsensitiveDict(self._request['headers'])ifself.extra_info.headers:h=CaseInsensitiveDict(self.extra_info.headers)fork,vinh.items():ifknotinself._headers:self._headers[k]=vreturnself._headers
  • params:URL 查询参数解析为 dict。
  • postData
    • 优先使用_raw_post_dataNetwork.getRequestPostData补回来的)。
    • 其次使用原始postData字段。
    • 尝试json.loads(),失败则原样返回。
  • cookies:从 ExtraInfo 的associatedCookies里筛选未被阻止的 cookie。

3. Response:响应信息解包

classResponse(object):def__init__(self,data_packet,raw_response,raw_body,base64_body):self._data_packet=data_packet self._response=raw_response self._raw_body=raw_body self._is_base64_body=base64_body self._body=Noneself._headers=None

关键属性:

  • headers:同样合并 ExtraInfo 的 header。
  • raw_body:原始字符串/body。
  • body
    • 如果是 base64,则解码为 bytes。
    • 否则尝试当 JSON 解析,不行就原样返回字符串。

这一层把底层 CDP 返回的结构改成了“用起来顺手”的 Python 对象。


三、Listener 的等待与遍历:同步消费异步事件

有了_caught这个队列,Listener提供了两种消费方式:一次性等待和逐步遍历。

1.wait(count=1, timeout=None, fit_count=True, raise_err=None)

defwait(self,count=1,timeout=None,fit_count=True,raise_err=None):ifnotself.listening:raiseRuntimeError(_S._lang.join(_S._lang.NOT_LISTENING))...

行为说明:

  • count=1
    • 默认等待至少 1 条数据包,返回DataPacket对象。
  • count>1
    • 在超时时间内等到至少 count 条,返回列表。
  • fit_count=False
    • 超时但队列里已经有部分数据时,直接返回已有的全部数据。
  • timeout=None
    • 一直等到条件满足或 driver 不再运行。

当配合网络断言、接口抓包等使用时,这个接口非常直观。

2.steps(count=None, timeout=None, gap=1)

defsteps(self,count=None,timeout=None,gap=1):...whileself._driver.is_runningandself.listening:ifself._caught.qsize()>=gap:yieldself._caught.get_nowait()ifgap==1else[...]...
  • 这是一个生成器接口,可以逐步迭代抓到的请求:
    • gap=1:每次 yield 1 条
    • gap=N:每次 yield N 条
  • count控制总共 yield 的数量。
  • timeout表示“每一步之间最长等待多久”,超时返回False结束。

适合写这种模式:

forpktinlistener.steps(count=10,timeout=5):print(pkt.url,pkt.response.status)

四、FrameListener:专注某个 Frame 的网络请求

FrameListener继承自Listener,只改了两处方法:

classFrameListener(Listener):def_requestWillBeSent(self,**kwargs):ifnotself._owner._is_diff_domainandkwargs.get('frameId',None)!=self._owner._frame_id:returnsuper()._requestWillBeSent(**kwargs)def_response_received(self,**kwargs):ifnotself._owner._is_diff_domainandkwargs.get('frameId',None)!=self._owner._frame_id:returnsuper()._response_received(**kwargs)

含义很简单:

  • 如果 frame 没有跨域(_is_diff_domain=False),就只处理frameId == 当前 frame 的 _frame_id的请求。
  • 这样可以把一个页面中不同 iframe 的请求区分开,便于精准监听子页面的网络行为。

五、Console:监听浏览器控制台输出

再看Console,定位在_units/console.py第 14 行:

classConsole(object):def__init__(self,owner):self._owner=owner self._caught=Noneself._not_enabled=Trueself.listening=False

角色很清晰:监听Console.messageAdded事件,并把每一条 console 消息封装成ConsoleData

1. 启动与停止

defstart(self):self._caught=Queue(maxsize=0)self._owner._driver.set_callback("Console.messageAdded",self._console)ifself._not_enabled:self._owner._run_cdp("Console.enable")self._not_enabled=Falseself.listening=Truedef_console(self,**kwargs):self._caught.put(ConsoleData(kwargs['message']))defstop(self):ifself.listening:self._owner._driver.set_callback('Console.messageAdded',None)self.listening=False
  • start()
    • 新建队列_caught
    • 注册回调_console
    • 若首次开启,则调用Console.enable打开 CDP 的 Console 域
  • stop()
    • 取消回调,标记停止

清空消息也很简单:

defclear(self):self._caught=Queue(maxsize=0)

2. 同步等待:wait(timeout=None)

defwait(self,timeout=None):ifnotself.listening:raiseRuntimeError(_S._lang.join(_S._lang.NOT_LISTENING))iftimeoutisNone:whileself._owner._driver.is_runningandself.listeningandnotself._caught.qsize():sleep(.03)returnself._caught.get_nowait()ifself._caught.qsize()elseNoneelse:end=perf_counter()+timeoutwhileself._owner._driver.is_runningandself.listeningandperf_counter()<end:ifself._caught.qsize():returnself._caught.get_nowait()sleep(0.05)returnFalse

行为:

  • timeout=None
    • 一直等到出现一条 console 消息,然后返回一个ConsoleData
    • 若中途 driver 停止或监听被关,可能返回None
  • timeout>0
    • 在指定时间内等一条消息,超时返回False

3. 迭代消费:steps(timeout=None)

defsteps(self,timeout=None):iftimeoutisNone:whileself._owner._driver.is_runningandself.listening:ifself._caught.qsize():yieldself._caught.get_nowait()sleep(0.05)else:end=perf_counter()+timeoutwhileself._owner._driver.is_runningandself.listeningandperf_counter()<end:ifself._caught.qsize():yieldself._caught.get_nowait()end=perf_counter()+timeout sleep(0.05)returnFalse

Listener.steps()类似,只不过每次 yield 的是单条ConsoleData

其实和listener 思路差不多,简化了一点,优雅。

六、结语

这一轮把 DrissionPage 里几块重点基本看了一遍:从Driver 到 元素对象的构造(ChromiumElement、三类 ID 的互转和懒加载),到网络层监听(Listener/FrameListener/DataPacket),再到控制台输出监听(Console/ConsoleData),基本把页面结构、网络流量、前端日志这三条主线都串了起来。

如果跟着源码走了一遍,现在对 CDP 大概会有这样几层认识:

  • DOM 域(DOM.*)负责节点的查找、描述、属性与结构;
  • Runtime 域(Runtime.*)负责在具体对象上执行 JS,并把返回值再包装回 Python;
  • Network 域(Network.*)把一次 HTTP 请求拆成多个事件,DrissionPage 再拼成可用的数据对象;
  • 其它域(Console.*Page.*IO.*等)则分别承担日志、资源获取、截图等更上层的能力。

DrissionPage 做的事情,是在这些零散的 CDP 能力之上,再加了一层稳定、可组合、语义化的封装。在代码里只看到.ele().click()listener.wait()console.messages这样的调用,而不是满屏的Runtime.callFunctionOnNetwork.getResponseBody

如果还想继续往下,其实还有不少有意思的模块可以研究,比如:

  • 行为类:ClickerElementScrollerElementWaiter以及对应的cdp。
  • 状态与几何:ElementStatesElementRect以及对应的cdp。
  • 页面级工具:下载器、 screencast、cookies 管理等,如何在多 target、多 frame 场景下保持一致行为。

更多文章,敬请关注gzh:零基础爬虫第一天

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询