嘉义县网站建设_网站建设公司_SEO优化_seo优化
2025/12/31 9:23:44 网站建设 项目流程

关键的函数为_tir_generate
下面是主rollout的类

# Copyright 2024 Bytedance Ltd. and/or its affiliates
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
The vllm_rollout that can be applied in different backend
When working with FSDP:
- Use DTensor weight loader (recommended) or HF weight loader
- Utilize state_dict from the FSDP to synchronize the weights among tp ranks in vLLM
When working with Megatron:
- Use Megatron weight loader
- During training, only the current pp stage holds the parameters
- Before inference, broadcast the parameters of the current pp rank to all other pp ranks (all pp ranks holds all the parameters)
- Bind the parameters to the inference engine
- Do inference in tp. pp is treated as additional dp
- After inference, all the parameters that doesn't belong to this pp rank is freed.
"""
import numpy as np
from typing import List
from contextlib import contextmanager
from omegaconf import DictConfig
import os
import torch
import torch.distributed
from tensordict import TensorDict
import requests
from multiprocessing import Pool
from functools import partial
from torch import nn
from typing import Any, Union
from verl import DataProto
from verl.utils.torch_functional import get_eos_mask, pad_2d_list_to_length
from verl.workers.rollout.base import BaseRollout
from vllm.distributed import parallel_state as vllm_ps
from vllm import LLM, SamplingParams
from verl.third_party.vllm import vllm_version
from qwen_agent.tools.python_executor import PythonExecutor
from qwen_agent.tools.code_interpreter import CodeInterpreter
from qwen_agent.utils.utils import print_traceback
from typing import Tuple
import json5
import pdb
import json
import copy
from concurrent.futures import ThreadPoolExecutor, as_completed, TimeoutError
# TODO
# 1. support pp in vllm
# 2. passing tokenizer is not necessary? no encoding/decoding is happending here
# 3. simplify init logics# NOTE(sgm): add for verl. We can optimize it by making the dataloader yield List[int] without padding.
def _pre_process_inputs(pad_token_id, prompt_token_ids: torch.Tensor) -> List[int]:# remove the left padding in the prompt token_id# pad_token_id = self.llm_engine.tokenizer.pad_token_id if self.llm_engine.tokenizer.pad_token_id is not None else self.llm_engine.tokenizer.eos_token_idnon_pad_index = torch.nonzero(prompt_token_ids != pad_token_id, as_tuple=False)[0][0]token_ids = prompt_token_ids[non_pad_index:].tolist()return token_idsdef _repeat_interleave(value: Union[torch.Tensor, np.ndarray], repeats: int) -> Union[torch.Tensor, List[Any]]:if isinstance(value, torch.Tensor):return value.repeat_interleave(repeats, dim=0)else:return np.repeat(value, repeats, axis=0)OBS_START = '```output'
OBS_END = '\n```\n'
def extract_program(result: str, last_only=True):"""extract the program after "```python", and before "```""""program = ''start = Falsefor line in result.split('\n'):if line.startswith('```python') or line.endswith('```python'):if last_only:program = ''  # only extract the last programelse:program += '\n# ========\n'start = Trueelif line.startswith('```'):start = Falseelif start:program += line + '\n'if start:# the code is incompleteprogram = ''return programdef _detect_tool(text: str) -> Tuple[bool, str, str, str]:program = extract_program(text)if program:program = json.dumps({'code': program}, ensure_ascii=False)return (program != ''), PythonExecutor.name, program, textdef send_request(json_data):try:url = 'sandbox_url'response = requests.post(url, json=json_data, timeout=10)return response.json()  # 返回响应的 JSON 数据except:print("sanbox timeout")return {"error": "unknown"}class vLLMRollout(BaseRollout):def __init__(self, model_path: str, config: DictConfig, tokenizer, model_hf_config, **kwargs):"""A vLLM rollout. It requires the module is supported by the vllm.Args:module: module here follows huggingface APIsconfig: DictConfigtokenizer: the task/model tokenizermodel_hf_config: the huggingface config to initiallize the generating model in vllm**kwargs: train_tp, for Megatron Backend to initialize hybrid engine (zero redundancy) process group"""# 示例输入: model_path="/path/to/model", config包含prompt_length=512, response_length=1024等# 调用父类BaseRollout的初始化方法,设置基础属性super().__init__()# 保存配置对象,用于后续访问配置参数# 示例: self.config.prompt_length = 512, self.config.response_length = 1024self.config = config# 断言检查:如果使用CUDA graph优化(enforce_eager=False),则不能释放缓存引擎# 原因:CUDA graph需要固定的内存布局,释放缓存会破坏这个布局assert not (not config.enforce_eager and config.free_cache_engine), \"disable CUDA graph (enforce_eager = False) if free cache engine"# 获取张量并行大小,默认为1(不使用张量并行)# 示例: tensor_parallel_size = 4 表示使用4个GPU进行张量并行tensor_parallel_size = self.config.get('tensor_model_parallel_size', 1)# 断言检查:张量并行大小不能超过总进程数# 示例: 如果world_size=8, tensor_parallel_size=4是合法的assert tensor_parallel_size <= torch.distributed.get_world_size(), \"tensor parallel size should be less than or equal to the world size"# 获取最大批处理token数,用于控制批处理大小# 示例: max_num_batched_tokens = 8192 表示最多同时处理8192个tokenmax_num_batched_tokens = self.config.get('max_num_batched_tokens', 8192)# 如果提供了train_tp参数,说明是在Megatron后端部署# 示例: train_tp=8 表示训练时使用8个GPU进行张量并行if kwargs.get('train_tp', None) is not None:# deployed with megatronimport os# 禁用CUDA计时器相关环境变量,避免性能开销os.environ['CUDA_TIMER_STREAM_KAFKA_ENABLE'] = '0'os.environ['MEGATRON_IMPORT_TIMERS'] = '0'# 获取训练时的张量并行大小# 示例: train_tp=8, tensor_parallel_size=4, 则num_tp_per_train_tp=2train_tp = kwargs.get('train_tp', None)num_tp_per_train_tp = train_tp // tensor_parallel_size# 初始化vLLM的并行状态,用于混合引擎(零冗余)进程组# 示例: 初始化4个TP组,每组2个进程vllm_ps.initialize_parallel_state(tensor_model_parallel_size=tensor_parallel_size,num_tp_per_train_tp=num_tp_per_train_tp)# 断言检查:模型的最大位置嵌入必须大于等于总序列长度(prompt+response)# 示例: max_position_embeddings=2048 >= 512+1024=1536 ✓assert model_hf_config.max_position_embeddings >= config.prompt_length + config.response_length, \"model context length should be greater than total sequence length"# 初始化vLLM推理引擎# 示例输入: model_path="/path/to/qwen2", tensor_parallel_size=4# 创建LLM对象,用于后续的批量推理self.inference_engine = LLM(model=model_path,  # 模型路径,例如: "/path/to/qwen2-7b"enable_sleep_mode=True,  # 启用睡眠模式,可以在不使用时释放GPU内存tensor_parallel_size=tensor_parallel_size,  # 张量并行大小,例如: 4distributed_executor_backend="external_launcher",  # 使用外部启动器作为分布式后端dtype=config.dtype,  # 数据类型,例如: "float16" 或 "bfloat16"enforce_eager=config.enforce_eager,  # 是否强制使用eager模式(禁用CUDA graph)gpu_memory_utilization=config.gpu_memory_utilization,  # GPU内存利用率,例如: 0.9disable_custom_all_reduce=True,  # 禁用自定义all-reduce,使用vLLM默认实现skip_tokenizer_init=False,  # 不跳过tokenizer初始化max_model_len=config.prompt_length + config.response_length,  # 最大模型长度,例如: 1536disable_log_stats=config.disable_log_stats,  # 是否禁用统计日志max_num_batched_tokens=max_num_batched_tokens,  # 最大批处理token数,例如: 8192enable_chunked_prefill=config.enable_chunked_prefill,  # 启用分块预填充enable_prefix_caching=True,  # 启用前缀缓存,加速相同前缀的生成)# Offload vllm model to reduce peak memory usage# 将vLLM模型卸载到CPU,减少峰值内存使用# level=1表示部分卸载,保留部分权重在GPU上以加速推理# 示例: 模型权重从GPU转移到CPU,释放GPU显存self.inference_engine.sleep(level=1)# 初始化采样参数的基础字典# 示例: kwargs = {'n': 1, 'logprobs': 0, 'max_tokens': 1024}kwargs = dict(n=1,  # 每个prompt生成的候选数量,n=1表示只生成1个响应logprobs=0,  # 不返回log概率(设为0,让actor重新计算以节省内存)max_tokens=config.response_length,  # 最大生成token数,例如: 1024)# # we may detokenize the result all together later# 如果vLLM版本不是0.3.1,则禁用自动detokenize# 原因:我们会在后续统一处理tokenization,避免重复操作# 示例: vllm_version='0.6.3', 设置detokenize=Falseif vllm_version != '0.3.1':kwargs['detokenize'] = False# supporting adding any sampling params from the config file# 从配置文件中添加任何采样参数(如temperature, top_p, top_k等)# 示例: config.temperature=0.7, config.top_p=0.9 -> kwargs['temperature']=0.7, kwargs['top_p']=0.9for k in config.keys():if hasattr(SamplingParams(), str(k)):kwargs[k] = config.get(k)# 打印最终的采样参数,用于调试# 示例输出: kwargs: {'n': 1, 'logprobs': 0, 'max_tokens': 1024, 'temperature': 0.7, 'top_p': 0.9}print(f"kwargs: {kwargs}")# 创建SamplingParams对象,用于控制生成过程# 示例: self.sampling_params包含所有采样配置self.sampling_params = SamplingParams(**kwargs)# 保存tokenizer的pad_token_id,用于后续的padding处理# 示例: self.pad_token_id = 151643 (Qwen2的pad_token_id)self.pad_token_id = tokenizer.pad_token_id# 保存tokenizer对象,用于后续的编码/解码操作self.tokenizer = tokenizer# 初始化Python执行器,用于执行生成的Python代码# 注意:这里创建了执行器但可能不使用(被注释掉的code_interpreter)self.executor=PythonExecutor()# self.code_interpreter=CodeInterpreter()def _get_prompts_and_indices(self, samples_info):"""从samples_info中提取未停止的prompts和对应的索引示例输入 samples_info:[{'prompt': '计算1+1', 'sequence': '计算1+1\n```python\n1+1\n```', 'stop': False, 'index': 0},{'prompt': '你好', 'sequence': '你好\n世界', 'stop': True, 'index': 1},{'prompt': '写代码', 'sequence': '写代码\n```python\nprint("hi")\n```', 'stop': False, 'index': 2}]返回:prompts: ['计算1+1\n```python\n1+1\n```', '写代码\n```python\nprint("hi")\n```']indices: [0, 2]"""# 初始化空列表,用于存储未停止的prompts和对应的索引prompts, indices=[], []# 遍历所有样本信息# 示例: index=0, info={'prompt': '计算1+1', 'sequence': '...', 'stop': False, 'index': 0}for index, info in enumerate(samples_info):# 只处理未停止的样本(stop=False)# 示例: info['stop']=False,继续处理if not info['stop']:# 添加当前序列(包含prompt和已生成的部分response)# 示例: prompts.append('计算1+1\n```python\n1+1\n```')prompts.append(info['sequence'])# 添加对应的原始索引(用于后续更新samples_info)# 示例: indices.append(0)indices.append(info['index'])# 返回未停止的prompts和对应的索引# 示例返回: (['计算1+1\n```python\n1+1\n```', '写代码\n```python\nprint("hi")\n```'], [0, 2])return prompts, indices# def code_interpreter_batch_call(self, tool_inputs):#     with Pool(processes=min(len(tool_inputs),os.cpu_count(), 32)) as pool:#         results = pool.map(self.code_interpreter.call, tool_inputs)#     def postproc(result):#         report=result.split("```")[0].strip()#         output=result.split("```")[-1].split("```")[-1].strip()#         if report=="stdout:": report="Done"#         return (output, report)#     results=[postproc(result) for result in results]#     return resultsdef code_interpreter_batch_call(self, tool_inputs, timeout=20):"""批量执行Python代码,使用线程池并行执行示例输入 tool_inputs:['print(1+1)', 'x = 2\nprint(x*3)', 'import time\ntime.sleep(30)']数据流动过程:1. 格式化输入: [{'code': 'print(1+1)', 'language': 'python'}, ...]2. 并行执行: 使用线程池同时发送请求到sandbox3. 收集结果: 按原始顺序收集执行结果4. 后处理: 提取stdout/stderr,判断执行是否成功返回:[('2', 'Done'), ('6', 'Done'), ('Error', 'TimeoutError')]"""# 将输入格式化为sandbox API要求的格式# 示例输入: tool_inputs = ['print(1+1)', 'x = 2\nprint(x*3)']# 转换后: [{'code': 'print(1+1)', 'language': 'python'}, {'code': 'x = 2\nprint(x*3)', 'language': 'python'}]tool_inputs=[{'code': tool_input,'language': 'python'} for tool_input in tool_inputs]# 初始化结果列表,长度与输入相同,初始值为None# 示例: results = [None, None] (对于2个输入)results = [None] * len(tool_inputs) # 创建线程池执行器,最大工作线程数限制在[min(输入数, CPU核心数, 64), 1]之间# 示例: len(tool_inputs)=2, os.cpu_count()=8 -> max_workers=2with ThreadPoolExecutor(max_workers=max(min(len(tool_inputs), os.cpu_count(), 64), 1)) as executor:# 提交所有任务到线程池,并建立future到索引的映射# 示例: future_to_index = {Future对象1: 0, Future对象2: 1}future_to_index = {executor.submit(send_request, input): i for i, input in enumerate(tool_inputs)}# 遍历完成的任务(不按提交顺序,按完成顺序)# 示例: future1先完成,future2后完成for future in as_completed(future_to_index):# 获取该future对应的原始索引# 示例: index = 0 (对应第一个输入)index = future_to_index[future]try:# 获取任务结果,设置超时时间# 示例: result = {'run_result': {'stdout': '2', 'stderr': '', 'return_code': '0'}}result = future.result(timeout=timeout)# 将结果存储到对应索引位置# 示例: results[0] = {'run_result': {'stdout': '2', ...}}results[index] = resultexcept:# 如果超时或出错,设置错误结果# 示例: results[0] = {'run_result': {'stdout': 'Error', 'stderr': 'TimeoutError'}}results[index] = {"run_result": {"stdout": "Error", "stderr": "TimeoutError"}}# 定义后处理函数,从sandbox响应中提取stdout和状态def postproc(output):try:# 检查返回码是否为0(成功)或stdout不为空# 示例: output['run_result']['return_code']='0' -> 返回stdout和"Done"if str(output['run_result']['return_code'])=='0' or len(str(output['run_result']['stdout'])) != 0:# 执行成功,返回stdout和"Done"状态# 示例: return ('2', 'Done')return output['run_result']['stdout'], "Done"else:# 执行失败,返回stdout和stderr# 示例: return ('', 'NameError: name x is not defined')return output['run_result']['stdout'], output['run_result']['stderr'].strip()except Exception:# 解析出错,返回错误信息# 示例: return ('Error', 'UnknownError')return "Error", "UnknownError"# 对所有结果进行后处理# 示例输入: results = [{'run_result': {'stdout': '2', 'return_code': '0'}}, ...]# 处理后: [('2', 'Done'), ('6', 'Done'), ('Error', 'TimeoutError')]results=[postproc(result) for result in results]# 返回处理后的结果列表# 示例返回: [('2', 'Done'), ('6', 'Done'), ('Error', 'TimeoutError')]return resultsdef _tokenize_and_find_mask_token_indices(self, sample_info):"""将response tokenize,并找到需要mask的token位置(工具输出部分)示例输入 sample_info:{'response': '计算1+1\n```python\n1+1\n```\n```output\n2\n```','mask_info': [[15, 30]]  # 字符串位置范围,表示```output\n2\n```这部分需要mask}数据流动过程:1. tokenize response,获取token_ids和字符偏移映射2. 根据mask_str_ranges找到对应的token位置3. 创建mask_tensor,1表示保留,0表示mask掉返回:response_token_ids: [1234, 5678, ..., 9012]  # token ID列表mask_tensor: tensor([1, 1, ..., 0, 0, 0, ..., 1])  # mask张量,0表示工具输出"""# 获取response文本# 示例: response = '计算1+1\n```python\n1+1\n```\n```output\n2\n```'response=sample_info['response']# 获取需要mask的字符串位置范围列表# 示例: mask_str_ranges = [[15, 30]]  # 表示从字符位置15到30需要maskmask_str_ranges=sample_info['mask_info']# 使用tokenizer对response进行编码,同时返回字符偏移映射# add_special_tokens=False: 不添加特殊token(如BOS/EOS)# return_offsets_mapping=True: 返回每个token对应的字符位置范围# 示例输入: response = '计算1+1\n```python\n1+1\n```\n```output\n2\n```'# 返回: encoding = {#   'input_ids': [1234, 5678, 123, 456, ..., 9012],#   'offset_mapping': [(0, 2), (2, 3), (3, 4), ..., (25, 30)]# }encoding=self.tokenizer(response, add_special_tokens=False, return_offsets_mapping=True)# 提取token ID列表# 示例: response_token_ids = [1234, 5678, 123, 456, ..., 9012]response_token_ids=encoding['input_ids']# 将字符偏移映射转换为tensor,便于后续计算# 示例: offset_mapping = [(0, 2), (2, 3), (3, 4), ..., (25, 30)]# offset_mapping_tensor = tensor([[0, 2], [2, 3], [3, 4], ..., [25, 30]])offset_mapping_tensor=torch.tensor(encoding['offset_mapping'], dtype=torch.long)# 提取每个token的起始字符位置# 示例: token_starts = tensor([0, 2, 3, 4, ..., 25])token_starts = offset_mapping_tensor[:,0]# 提取每个token的结束字符位置# 示例: token_ends = tensor([2, 3, 4, 5, ..., 30])token_ends = offset_mapping_tensor[:,1]# 初始化mask_tensor,全为1表示默认保留所有token# 示例: mask_tensor = tensor([1, 1, 1, ..., 1])  # 长度等于token数量mask_tensor=torch.ones(len(response_token_ids))# 遍历每个需要mask的字符串范围# 示例: mask_str_ranges = [[15, 30]]for mask_str_range in mask_str_ranges:# 提取起始和结束字符位置# 示例: start_index=15, end_index=30start_index, end_index=mask_str_range[0], mask_str_range[1]# 找到所有与mask范围重叠的token# 条件: token的结束位置 > mask起始位置 AND token的起始位置 < mask结束位置 AND token的起始位置 >= mask起始位置# 示例: token位置[25, 30]与[15, 30]重叠 -> mask = tensor([False, ..., True, True])mask = (token_starts < end_index) & (token_ends > start_index) & (token_starts >= start_index)# 将匹配的token位置设为0(mask掉)# 示例: mask_tensor[mask] = 0 -> mask_tensor = tensor([1, 1, ..., 0, 0, 0, ..., 1])mask_tensor[mask]=0 # 返回token ID列表和mask张量# 示例返回: # response_token_ids = [1234, 5678, ..., 9012]# mask_tensor = tensor([1, 1, ..., 0, 0, 0, ..., 1])return response_token_ids, mask_tensordef _tir_generate(self, prompts=None, sampling_params=None, prompt_token_ids=None, use_tqdm=False):"""TIR (Tool-Integrated Reasoning) 生成方法:支持多轮LLM调用和工具执行示例输入 prompts:[{'prompt_token_ids': [1234, 5678, 9012]},  # "计算1+1"{'prompt_token_ids': [3456, 7890]},  # "你好"]数据流动过程(以第一个prompt为例):第1轮:输入: "计算1+1"LLM输出: "计算1+1\n```python\n1+1\n```" (停止在```output)检测到工具调用: True执行代码: 1+1 -> 结果: "2"更新sequence: "计算1+1\n```python\n1+1\n```\n```output\n2\n```"第2轮:输入: "计算1+1\n```python\n1+1\n```\n```output\n2\n```"LLM输出: "答案是2" (正常结束)检测到工具调用: False停止生成返回:responses_ids: [[1234, 5678, ..., 9012], [3456, 7890, ...]]  # token ID列表tool_output_masks: [tensor([1,1,...,0,0,0,...,1]), ...]  # mask张量execution_passes: tensor([1, 0])  # 执行是否通过"""# 深拷贝采样参数,避免修改原始参数# 示例: sampling_params = SamplingParams(temperature=0.7, max_tokens=1024)sampling_params=copy.deepcopy(sampling_params)# 将prompt_token_ids解码为文本字符串# 示例输入: prompts = [{'prompt_token_ids': [1234, 5678, 9012]}]# 解码后: prompts = ['计算1+1']prompts=[self.tokenizer.decode(prompt['prompt_token_ids'], skip_special_tokens=False) for prompt in prompts]# 如果sampling_params.n > 1,需要为每个prompt生成n个候选# 示例: sampling_params.n=2, prompts=['计算1+1'] -> prompts=['计算1+1', '计算1+1']prompts=[prompt for prompt in prompts for _ in range(sampling_params.n) ]# 将n设为1,因为我们在外层已经处理了多候选的情况# 示例: sampling_params.n从2变为1sampling_params.n=1# 启用detokenize,让vLLM返回文本而不是token IDs# 示例: sampling_params.detokenize = Truesampling_params.detokenize=True# 设置停止词,当生成到```output时停止(准备执行工具)# 示例: sampling_params.stop = ["```output"]sampling_params.stop=["```output"]# 初始化每个样本的信息字典# 示例: samples_info = [#   {#     'prompt': '计算1+1',#     'sequence': '计算1+1',  # 当前完整序列(prompt + response)#     'response': '',  # 已生成的response部分#     'stop': False,  # 是否停止生成#     'finish_reason': None,  # 结束原因#     'index': 0,  # 原始索引#     'mask_info': [],  # 需要mask的字符串位置范围列表#     'execution_pass': 0  # 执行是否通过(0/1)#   },#   ...# ]samples_info=[{"prompt": prompt, "sequence": prompt, "response": "", "stop": False, "finish_reason": None,"index": index, "mask_info": [], "execution_pass": 0} for index, prompt in enumerate(prompts)]# 记录程序到输出的映射(用于调试,当前未使用)program2output=[]# 获取可用的LLM调用次数(限制总轮数)# 示例: num_llm_calls_available = 5 表示最多调用5次LLMnum_llm_calls_available=copy.deepcopy(self.config.num_llm_calls_available)# 主循环:持续生成直到所有样本停止或达到调用次数限制# 示例: num_llm_calls_available从5开始递减while num_llm_calls_available >= 0:# 如果是最后一次调用(num_llm_calls_available==0),移除停止词限制# 原因:最后一次调用需要生成完整响应,不能停在```output# 示例: num_llm_calls_available=0 -> sampling_params.stop=Noneif num_llm_calls_available==0: sampling_params.stop=None# 减少可用调用次数# 示例: num_llm_calls_available从5变为4num_llm_calls_available-=1# llm generate response, stop at eos token or ```output# 获取未停止的prompts和对应的索引# 示例: samples_info中有2个样本,第2个已停止# 返回: input_prompts=['计算1+1\n```python\n1+1\n```'], indices=[0]input_prompts, indices=self._get_prompts_and_indices(samples_info)# 将prompts编码为token IDs,并截断到最大长度# 示例: input_prompts=['计算1+1\n```python\n1+1\n```']# 编码后: [{'prompt_token_ids': [1234, 5678, ..., 9012]}] (长度限制在prompt_length+response_length内)input_prompts = [{'prompt_token_ids': self.tokenizer.encode(x, add_special_tokens=False)[:self.config.prompt_length+self.config.response_length]} for x in input_prompts]# 调用vLLM进行批量生成# 示例输入: input_prompts=[{'prompt_token_ids': [1234, ...]}]# 返回: outputs = [RequestOutput对象,包含生成的文本和元信息]outputs = self.inference_engine.generate(prompts=input_prompts, sampling_params=sampling_params, use_tqdm=use_tqdm)# 按request_id排序输出,确保顺序一致# 示例: sorted_outputs按request_id排序sorted_outputs = sorted(outputs, key=lambda output: int(output.request_id))# 提取生成的文本响应# 示例: responses = ['计算1+1\n```python\n1+1\n```']responses=[x.outputs[0].text for x in sorted_outputs]# 提取结束原因('stop', 'length', 'abort'等)# 示例: finish_reason = ['stop']finish_reason=[x.outputs[0].finish_reason for x in sorted_outputs]# 提取停止原因(具体停止词)# 示例: stop_reason = ['```output']stop_reason=[x.outputs[0].stop_reason for x in sorted_outputs]# 如果是最后一次调用(num_llm_calls_available==-1),直接添加响应并退出# 示例: num_llm_calls_available=-1,最后一次调用if num_llm_calls_available==-1:# 更新所有样本信息,标记为停止# 示例: samples_info[0]['response'] += '答案是2'for i ,index in enumerate(indices):samples_info[index]['response']+=responses[i]samples_info[index]['sequence']+=responses[i]samples_info[index]['stop']=Truesamples_info[index]['finish_reason']=finish_reason[i]# 退出循环break# 定义函数:判断是否需要执行Python代码def _python_execution(finish_reason, stop_reason):# 如果正常停止且没有特定停止词,不需要执行# 示例: finish_reason='stop', stop_reason=None -> Falseif finish_reason=='stop' and stop_reason==None: return False# 如果停止在```output,需要执行代码# 示例: finish_reason='stop', stop_reason='```output' -> Trueif finish_reason=='stop' and stop_reason=='```output': return True# 如果达到长度限制,不需要执行# 示例: finish_reason='length' -> Falseif finish_reason=='length': return Falsereturn False# 判断每个响应是否需要执行工具# 示例: is_execution = [True] (第一个响应需要执行)is_execution=[_python_execution(finish_reason[i], stop_reason[i]) for i in range(len(finish_reason))]# check if all samples are finished# 如果所有样本都不需要执行工具,退出循环# 示例: is_execution = [False, False] -> all([not x for x in is_execution]) = True -> breakif all([not x for x in is_execution]): break# prepare for python execution# 检测每个响应中的工具调用(Python代码)# 示例: responses = ['计算1+1\n```python\n1+1\n```']# tool_infos = [(True, 'python', '{"code": "1+1"}', '计算1+1\n```python\n1+1\n```')]tool_infos=[ _detect_tool(response) for response in responses]# 初始化工具索引和输入列表tool_indices=[]  # 需要执行工具的响应索引tool_inputs=[]  # 工具输入(JSON格式的代码)# 遍历工具信息,筛选出需要执行的工具# 示例: tool_info[0]=True表示检测到工具,is_execution[0]=True表示需要执行for i, tool_info in enumerate(tool_infos):if tool_info[0] and is_execution[i]:# 记录需要执行的响应索引# 示例: tool_indices.append(0)tool_indices.append(i)# 记录工具输入(JSON格式的代码字符串)# 示例: tool_inputs.append('{"code": "1+1"}')tool_inputs.append(tool_info[2])# 定义函数:后处理观察结果(工具执行输出)def postproc_observation(observation):execution_pass=0  # 默认执行未通过try:# observation是(code_interpreter_batch_call返回的)元组# 示例: observation = ('2', 'Done')observation_list=observation# 如果状态是'Done',表示执行成功# 示例: observation_list[-1]='Done' -> execution_pass=1, observation='2'if observation_list[-1] == 'Done':observation = observation_list[0]execution_pass=1else:# 执行失败,使用错误信息# 示例: observation_list[-1]='NameError' -> observation='NameError: ...'observation = observation_list[-1]except Exception:# 解析出错,设置为错误observation="Error"# 如果包含"Error",提取最后一行错误信息# 示例: observation='Traceback...\nNameError: x' -> observation='NameError: x'if "Error" in observation: observation=observation.strip().split("\n")[-1]# 如果观察结果为空,设置为超时错误# 示例: observation='' -> observation='timeout_decorator.timeout_decorator.TimeoutError: Timed Out'if len(observation.strip())==0: observation="timeout_decorator.timeout_decorator.TimeoutError: 'Timed Out'"# 去除首尾空白observation = observation.strip()# 如果观察结果过长,截断并添加省略号# 示例: observation='a'*300 -> observation='a'*128+'...'+'a'*128if len(observation)>=256:observation = observation[:128]+"..."+observation[-128:]# 格式化为```output格式# 示例: observation='2' -> observation='```output\n2\n```\n'observation = f'{OBS_START}\n{observation}{OBS_END}'# 返回格式化后的观察结果和执行通过标志# 示例返回: ('```output\n2\n```\n', 1)return observation, execution_pass# execute python code# 批量执行Python代码# 示例: tool_inputs = ['{"code": "1+1"}']# 解析JSON并提取代码: ['1+1']# 执行后返回: observations = [('2', 'Done')]observations=self.code_interpreter_batch_call([json5.loads(x)['code'] for x in tool_inputs])# construction responses from observations# 确保每个响应以换行符结尾# 示例: responses = ['计算1+1\n```python\n1+1\n```'] -> ['计算1+1\n```python\n1+1\n```\n']responses=[response+"\n" if not response.endswith('\n') else response for response in responses]# 创建带观察结果的响应副本# 示例: responses_w_res = ['计算1+1\n```python\n1+1\n```\n']responses_w_res=copy.deepcopy(responses)# 初始化执行通过标志列表# 示例: execution_passes = [0, 0] (对应2个响应)execution_passes=[0 for _ in range(len(responses))]# 将工具执行结果添加到对应响应中# 示例: tool_indices=[0], observations=[('2', 'Done')]for i, index in enumerate(tool_indices):# 后处理观察结果# 示例: processed_observation = ('```output\n2\n```\n', 1)processed_observation=postproc_observation(observations[i])# 将观察结果追加到响应中# 示例: responses_w_res[0] += '```output\n2\n```\n'# 结果: responses_w_res[0] = '计算1+1\n```python\n1+1\n```\n```output\n2\n```\n'responses_w_res[index]+=processed_observation[0]# 更新执行通过标志# 示例: execution_passes[0] = 1execution_passes[index]=processed_observation[1]# update samples_info# 更新所有样本信息# 示例: indices=[0], responses=['计算1+1\n```python\n1+1\n```\n'], responses_w_res=['计算1+1\n```python\n1+1\n```\n```output\n2\n```\n']for i ,index in enumerate(indices):# 计算工具输出的字符串位置范围(用于后续mask)# mask[0]: 工具输出开始位置 = 响应长度 + '```output'长度# mask[1]: 工具输出结束位置 = 带观察结果的响应长度# 示例: responses[0]长度=20, '```output'长度=9, responses_w_res[0]长度=35# mask = [20+9, 35] = [29, 35] 表示字符位置29到35是工具输出mask=[ len(responses[i]) + len('```output'), len(responses_w_res[i]) ]# 添加mask信息到样本信息中# 示例: samples_info[0]['mask_info'].append([29, 35])samples_info[index]['mask_info'].append(mask)# 更新response(累积生成的响应)# 示例: samples_info[0]['response'] += '计算1+1\n```python\n1+1\n```\n```output\n2\n```\n'samples_info[index]['response']+=responses_w_res[i]# 更新sequence(完整序列)# 示例: samples_info[0]['sequence'] += '计算1+1\n```python\n1+1\n```\n```output\n2\n```\n'samples_info[index]['sequence']+=responses_w_res[i]# 更新停止标志(如果不需要执行工具,则停止)# 示例: is_execution[0]=False -> samples_info[0]['stop']=Truesamples_info[index]['stop']=not is_execution[i]# 更新结束原因# 示例: samples_info[0]['finish_reason']='stop'samples_info[index]['finish_reason']=finish_reason[i]# 更新执行通过标志# 示例: samples_info[0]['execution_pass']=1samples_info[index]['execution_pass']=execution_passes[i]# 为所有非长度限制结束的响应添加EOS token# 示例: samples_info[0]['finish_reason']='stop' -> 添加EOS tokenfor i, line in enumerate(samples_info):if samples_info[i]['finish_reason']!='length': samples_info[i]['response']+=self.tokenizer.eos_token# 初始化返回列表responses_ids=[]  # token ID列表tool_output_masks=[]  # 工具输出mask列表execution_passes=[]  # 执行通过标志列表# 遍历所有样本,进行tokenize和mask计算# 示例: samples_info = [{...}, {...}]for idx, sample_info in enumerate(samples_info):# 调用_tokenize_and_find_mask_token_indices进行tokenize和mask计算# 示例输入: sample_info = {'response': '...', 'mask_info': [[29, 35]]}# 返回: response_id=[1234, 5678, ...], tool_output_mask=tensor([1,1,...,0,0,0,...,1])response_id, tool_output_mask = self._tokenize_and_find_mask_token_indices(sample_info)# 截断到最大响应长度# 示例: response_id长度=1500, response_length=1024 -> 截断到1024responses_ids.append(response_id[:self.config.response_length])# 同样截断mask# 示例: tool_output_mask长度=1500 -> 截断到1024tool_output_masks.append(tool_output_mask[:self.config.response_length])# 添加执行通过标志# 示例: execution_passes.append(1)execution_passes.append(sample_info['execution_pass'])# 返回token ID列表、工具输出mask列表和执行通过标志tensor# 示例返回:# responses_ids = [[1234, 5678, ...], [3456, 7890, ...]]# tool_output_masks = [tensor([1,1,...,0,0,0,...,1]), tensor([1,1,...])]# execution_passes = tensor([1, 0])return responses_ids, tool_output_masks, torch.tensor(execution_passes, dtype=torch.long)@contextmanagerdef update_sampling_params(self, **kwargs):"""上下文管理器:临时更新采样参数,退出时自动恢复示例使用:with self.update_sampling_params(temperature=0.8, top_p=0.9):# 在这个代码块中,sampling_params的temperature=0.8, top_p=0.9result = self._tir_generate(...)# 退出后,sampling_params恢复为原来的值数据流动过程:1. 保存当前采样参数的值2. 更新为新的值3. yield(执行用户代码)4. 恢复为原来的值"""# update sampling params# 初始化旧参数值字典,用于后续恢复# 示例: old_sampling_params_args = {}old_sampling_params_args = {}# 如果有新的参数需要更新# 示例: kwargs = {'temperature': 0.8, 'top_p': 0.9}if kwargs:# 遍历每个要更新的参数# 示例: key='temperature', value=0.8for key, value in kwargs.items():# 检查sampling_params是否有该属性# 示例: hasattr(self.sampling_params, 'temperature') -> Trueif hasattr(self.sampling_params, key):# 保存旧值# 示例: old_value = 0.7 (原来的temperature值)old_value = getattr(self.sampling_params, key)# 记录到字典中,用于后续恢复# 示例: old_sampling_params_args['temperature'] = 0.7old_sampling_params_args[key] = old_value# 设置新值# 示例: self.sampling_params.temperature = 0.8setattr(self.sampling_params, key, value)# yield:暂停执行,让用户代码运行# 在这个点,sampling_params已经被更新# 示例: 此时self.sampling_params.temperature=0.8, top_p=0.9yield# roll back to previous sampling params# 恢复为之前的采样参数值# 示例: old_sampling_params_args = {'temperature': 0.7, 'top_p': 0.95}for key, value in old_sampling_params_args.items():# 恢复旧值# 示例: self.sampling_params.temperature = 0.7setattr(self.sampling_params, key, value)@torch.no_grad()def generate_sequences(self, prompts: DataProto, **kwargs) -> DataProto:"""主要的序列生成方法:处理输入prompts,生成responses,并返回完整的序列数据示例输入 prompts (DataProto):prompts.batch = {'input_ids': tensor([[0,0,0,1234,5678], [0,0,3456,7890]]),  # (bs=2, prompt_length=5), 左填充'attention_mask': tensor([[0,0,0,1,1], [0,0,1,1]]),  # (bs=2, prompt_length=5)'position_ids': tensor([[0,0,0,0,1], [0,0,0,1]])  # (bs=2, prompt_length=5)}prompts.meta_info = {'eos_token_id': 151643, 'do_sample': True}prompts.non_tensor_batch = {}  # 可能包含多模态数据数据流动过程:1. 预处理输入:去除左填充,提取raw_prompt_ids2. 调用_tir_generate生成responses(支持工具调用)3. 后处理:padding、拼接、构建attention_mask和position_ids4. 返回完整的DataProto,包含prompt+response返回 DataProto:batch = {'prompts': tensor([[0,0,0,1234,5678], ...]),  # 原始prompt'responses': tensor([[9012,1234,5678,...], ...]),  # 生成的response'input_ids': tensor([[0,0,0,1234,5678,9012,...], ...]),  # prompt+response'attention_mask': tensor([[0,0,0,1,1,1,1,0,0], ...]),  # 完整序列的mask'position_ids': tensor([[0,0,0,0,1,2,3,4,5], ...]),  # 完整序列的位置ID'tool_output_masks': tensor([[1,1,1,0,0,0,1,1], ...]),  # 工具输出mask'execution_passes': tensor([1, 0])  # 执行通过标志}"""# rebuild vllm cache engine# 如果配置了free_cache_engine且vLLM版本支持,重新初始化缓存引擎# 原因:之前可能释放了缓存以节省内存,现在需要重新构建# 示例: vllm_version='0.6.3', self.config.free_cache_engine=True -> 重新初始化if vllm_version in ('0.3.1', '0.4.2', '0.5.4', '0.6.3') and self.config.free_cache_engine:self.inference_engine.init_cache_engine()# 提取输入token IDs,形状为(batch_size, prompt_length)# 示例: idx = tensor([[0,0,0,1234,5678], [0,0,3456,7890]])  # (2, 5)idx = prompts.batch['input_ids']  # (bs, prompt_length)# 提取attention mask(左填充的mask)# 示例: attention_mask = tensor([[0,0,0,1,1], [0,0,1,1]])  # (2, 5)attention_mask = prompts.batch['attention_mask']# 提取position IDs(位置编码)# 示例: position_ids = tensor([[0,0,0,0,1], [0,0,0,1]])  # (2, 5)position_ids = prompts.batch['position_ids']# used to construct attention_mask# 提取EOS token ID,用于后续构建attention mask# 示例: eos_token_id = 151643 (Qwen2的EOS token)eos_token_id = prompts.meta_info['eos_token_id']# 获取批次大小# 示例: batch_size = 2batch_size = idx.size(0)# 获取非tensor批次数据(可能包含多模态数据等)# 示例: non_tensor_batch = {}non_tensor_batch = prompts.non_tensor_batch# 如果raw_prompt_ids不存在,从input_ids中提取(去除左填充)# 示例: idx[0] = [0,0,0,1234,5678] -> raw_prompt_ids[0] = [1234,5678]if 'raw_prompt_ids' not in non_tensor_batch:non_tensor_batch['raw_prompt_ids'] = np.array([_pre_process_inputs(self.pad_token_id, idx[i]) for i in range(batch_size)], dtype=object)# 检查批次大小是否一致(用于验证sharding manager是否正常工作)# 示例: batch_size=2, len(non_tensor_batch['raw_prompt_ids'])=2 -> 通过if batch_size != len(non_tensor_batch['raw_prompt_ids']):raise RuntimeError('vllm sharding manager is not work properly.')# 如果包含多模态数据,构建包含多模态信息的输入# 示例: non_tensor_batch = {'raw_prompt_ids': [...], 'multi_modal_data': [...]}if 'multi_modal_data' in non_tensor_batch:vllm_inputs = []# 同时提取prompt token IDs和多模态数据# 示例: vllm_inputs = [{'prompt_token_ids': [1234,5678], 'multi_modal_data': {...}}, ...]for raw_prompt_ids, multi_modal_data in zip(non_tensor_batch.pop('raw_prompt_ids'),non_tensor_batch.pop('multi_modal_data')):vllm_inputs.append({'prompt_token_ids': raw_prompt_ids, 'multi_modal_data': multi_modal_data})else:# 只有文本数据,构建简单的输入列表# 示例: vllm_inputs = [{'prompt_token_ids': [1234,5678]}, {'prompt_token_ids': [3456,7890]}]vllm_inputs = [{'prompt_token_ids': raw_prompt_ids} for raw_prompt_ids in non_tensor_batch.pop('raw_prompt_ids')]# 获取是否采样的标志(True=采样,False=贪心)# 示例: do_sample = Truedo_sample = prompts.meta_info.get('do_sample', True)# 如果是贪心生成(不采样),设置相应的采样参数# 示例: do_sample=False -> kwargs = {'temperature': 0, 'top_p': 1.0, ...}if not do_sample:kwargs = {'best_of': 1,'top_p': 1.0,'top_k': -1,'min_p': 0.0,'temperature': 0,'n': 1  # if greedy, only 1 response}# users can customize different sampling_params at different run# 使用上下文管理器更新采样参数(支持临时修改,结束后恢复)# 示例: kwargs = {'temperature': 0.8} -> 临时设置temperature=0.8with self.update_sampling_params(**kwargs):# 调用TIR生成方法,生成responses和工具相关信息# 示例输入: vllm_inputs = [{'prompt_token_ids': [1234,5678]}, ...]# 返回: #   response = [[9012,1234,...], [5678,...]]  # token ID列表的列表#   tool_output_masks = [[1,1,0,0,1,...], [1,1,...]]  # mask列表的列表#   execution_passes = tensor([1, 0])  # 执行通过标志response, tool_output_masks, execution_passes = self._tir_generate(prompts=vllm_inputs,  # because we have already convert it to prompt token idsampling_params=self.sampling_params,use_tqdm=False)# TODO(sgm): disable logprob when recompute_log_prob is enable# if n = 1: (bs, response_length) ; if n > 1: (bs * n, response_length)# 将response列表padding到统一长度并转换为tensor# 示例输入: response = [[9012,1234], [5678]]  # 长度不一致# 示例: response_length=1024, pad_token_id=151643# 输出: response = tensor([[9012,1234,151643,...], [5678,151643,...]])  # (2, 1024)response = pad_2d_list_to_length(response, self.pad_token_id,max_length=self.config.response_length).to(idx.device)# 同样padding tool_output_masks# 示例输入: tool_output_masks = [[1,1,0,0], [1,1]]  # 长度不一致# 输出: tool_output_masks = tensor([[1,1,0,0,1,1,...], [1,1,1,1,...]])  # (2, 1024)tool_output_masks = pad_2d_list_to_length(tool_output_masks, 1,max_length=self.config.response_length).to(idx.device).int()# 将execution_passes转换为int类型tensor# 示例: execution_passes = tensor([1, 0], dtype=torch.long) -> tensor([1, 0], dtype=torch.int32)execution_passes = execution_passes.to(idx.device).int()# 如果n>1且使用采样,需要扩展批次(每个prompt生成n个候选)# 示例: self.config.n=2, do_sample=True, batch_size=2if self.config.n > 1 and do_sample:# 扩展prompt、attention_mask和position_ids# 示例: idx从(2,5)扩展到(4,5),每个prompt重复2次idx = _repeat_interleave(idx, self.config.n)attention_mask = _repeat_interleave(attention_mask, self.config.n)position_ids = _repeat_interleave(position_ids, self.config.n)# 更新批次大小# 示例: batch_size从2变为4batch_size = batch_size * self.config.n# 如果有多模态输入,也需要扩展if 'multi_modal_inputs' in non_tensor_batch.keys():non_tensor_batch['multi_modal_inputs'] = _repeat_interleave(non_tensor_batch['multi_modal_inputs'],self.config.n)# 拼接prompt和response,形成完整序列# 示例: idx.shape=(2,5), response.shape=(2,1024) -> seq.shape=(2,1029)# seq[0] = [0,0,0,1234,5678,9012,1234,5678,...]  # prompt + responseseq = torch.cat([idx, response], dim=-1)# 获取response长度# 示例: response_length = 1024response_length = response.size(1)# 创建response部分的position ID增量# 示例: delta_position_id = tensor([1,2,3,...,1024])  # (1024,)delta_position_id = torch.arange(1, response_length + 1, device=position_ids.device)# 扩展到批次维度# 示例: delta_position_id从(1024,)扩展到(2,1024)delta_position_id = delta_position_id.unsqueeze(0).expand(batch_size, -1)# 如果是qwen2vl的mrope(多维rope),需要特殊处理# 示例: position_ids.dim()=3 -> delta_position_id从(2,1024)扩展到(2,3,1024)if position_ids.dim() == 3:  # qwen2vl mropedelta_position_id = delta_position_id.view(batch_size, 1, -1).expand(batch_size, 3, -1)# TODO(sgm): fix position_ids on right_pad# prompt: left pad + response: right pad# attention_mask: [0,0,0,0,1,1,1,1, | 1,1,1,0,0,0,0,0]# position_ids:   [0,0,0,0,0,1,2,3, | 4,5,6,7,8,9,10,11]# 计算response部分的position IDs:prompt最后一个position ID + 增量# 示例: position_ids[:, -1:] = tensor([[1], [1]])  # prompt最后一个position ID#      delta_position_id = tensor([[1,2,3,...], [1,2,3,...]])#      response_position_ids = tensor([[2,3,4,...], [2,3,4,...]])  # (2, 1024)response_position_ids = position_ids[:, -1:] + delta_position_id# 拼接prompt和response的position IDs# 示例: position_ids从(2,5)扩展到(2,1029)# position_ids[0] = [0,0,0,0,1,2,3,4,...,1025]  # prompt position + response positionposition_ids = torch.cat([position_ids, response_position_ids], dim=-1)# 根据response中的EOS token构建response部分的attention mask# 示例: response = tensor([[9012,1234,151643,...], ...])  # 包含EOS token# response_attention_mask = tensor([[1,1,1,0,0,...], ...])  # EOS之后为0response_attention_mask = get_eos_mask(response_id=response, eos_token=eos_token_id, dtype=attention_mask.dtype)# response_attention_mask = response_attention_mask & tool_output_masks# 拼接prompt和response的attention mask# 示例: attention_mask从(2,5)扩展到(2,1029)# attention_mask[0] = [0,0,0,1,1,1,1,1,0,0,...]  # prompt mask + response maskattention_mask = torch.cat((attention_mask, response_attention_mask), dim=-1)# all the tp ranks should contain the same data here. data in all ranks are valid# 构建最终的批次数据字典# 示例: batch包含所有必要的字段batch = TensorDict({'prompts': idx,  # 原始prompt,例如: tensor([[0,0,0,1234,5678], ...])'responses': response,  # 生成的response,例如: tensor([[9012,1234,...], ...])'input_ids': seq,  # here input_ids become the whole sentences,例如: tensor([[0,0,0,1234,5678,9012,...], ...])# 'old_log_probs': log_probs, # we will recompute old log prob with actor'attention_mask': attention_mask,  # 完整序列的mask,例如: tensor([[0,0,0,1,1,1,1,0,...], ...])'tool_output_masks': tool_output_masks,  # 工具输出mask,例如: tensor([[1,1,0,0,1,...], ...])'position_ids': position_ids,  # 完整序列的位置ID,例如: tensor([[0,0,0,0,1,2,3,...], ...])'execution_passes': execution_passes,  # 执行通过标志,例如: tensor([1, 0])},batch_size=batch_size)# free vllm cache engine# 如果配置了free_cache_engine,释放缓存引擎以节省内存# 示例: vllm_version='0.6.3', self.config.free_cache_engine=True -> 释放缓存if vllm_version in ('0.3.1', '0.4.2', '0.5.4', '0.6.3') and self.config.free_cache_engine:self.inference_engine.free_cache_engine()# 返回包含批次数据和非tensor数据的DataProto对象# 示例返回: DataProto(batch={...}, non_tensor_batch={...})return DataProto(batch=batch, non_tensor_batch=non_tensor_batch)

下面是verl的整体流程

vLLM Rollout SPMD 调用链详解

本文档详细说明 verl/workers/rollout/vllm_rollout/vllm_rollout_spmd.py 在训练过程中如何被调用,以及相关类的职责和作用。

目录

  1. 调用链概览
  2. 核心类说明
  3. 详细调用流程
  4. 关键代码位置
  5. 数据流动过程

调用链概览

训练入口 (main_ppo.py)↓
RayTrainer (ray_trainer.py)↓
ActorRolloutRefWorker (fsdp_workers.py / megatron_workers.py)↓
_build_rollout() 方法↓
vLLMRollout (vllm_rollout_spmd.py) 实例化↓
generate_sequences() 方法调用↓
_tir_generate() 方法执行↓
返回生成的序列

核心类说明

1. BaseRollout (基类)

位置: verl/workers/rollout/base.py

作用:

  • 定义了所有 Rollout 类的抽象接口
  • 提供 generate_sequences() 抽象方法,所有 Rollout 实现必须实现此方法

关键代码:

class BaseRollout(ABC):@abstractmethoddef generate_sequences(self, prompts: DataProto) -> DataProto:"""Generate sequences"""pass

2. vLLMRollout (vllm_rollout_spmd.py)

位置: verl/workers/rollout/vllm_rollout/vllm_rollout_spmd.py

作用:

  • 继承自 BaseRollout
  • 实现基于 vLLM 的序列生成,支持工具调用(TIR - Tool-Integrated Reasoning)
  • 当 vLLM 版本 > 0.6.3 时,使用 SPMD 模式
  • 支持多轮 LLM 调用和 Python 代码执行

关键方法:

  • __init__(): 初始化 vLLM 推理引擎和采样参数
  • generate_sequences(): 主要的序列生成入口
  • _tir_generate(): TIR 生成核心逻辑,支持工具调用
  • code_interpreter_batch_call(): 批量执行 Python 代码
  • _tokenize_and_find_mask_token_indices(): Tokenize 并计算工具输出 mask

初始化参数:

def __init__(self, model_path: str, config: DictConfig, tokenizer, model_hf_config, **kwargs):# model_path: 模型路径# config: 配置对象,包含 rollout 相关配置# tokenizer: Tokenizer 对象# model_hf_config: HuggingFace 模型配置# device_mesh: (可选) 设备网格,用于 SPMD 模式

3. ActorRolloutRefWorker (FSDP 版本)

位置: verl/workers/fsdp_workers.py

作用:

  • 实现 Actor、Rollout 和 Reference Policy 的统一 Worker
  • 负责模型初始化、参数同步、序列生成等任务
  • 使用 FSDP (Fully Sharded Data Parallel) 进行分布式训练

关键方法:

  • init_model(): 初始化模型、优化器、rollout 等组件
  • _build_rollout(): 构建 Rollout 实例(在这里创建 vLLMRollout)
  • generate_sequences(): 调用 Rollout 生成序列的入口方法

关键代码片段 (_build_rollout()):

def _build_rollout(self):# ...elif self.config.rollout.name == 'vllm':from verl.workers.rollout.vllm_rollout import vLLMRollout, vllm_modefrom verl.workers.sharding_manager import FSDPVLLMShardingManagerif vllm_mode == 'spmd':rollout = vLLMRollout(model_path=local_path,config=self.config.rollout,tokenizer=self.tokenizer,model_hf_config=self.actor_model_config,device_mesh=rollout_device_mesh)# ...

关键代码片段 (generate_sequences()):

@register(dispatch_mode=Dispatch.DP_COMPUTE_PROTO)
def generate_sequences(self, prompts: DataProto):# 1. 将 prompts 移到 GPUprompts = prompts.to('cuda')# 2. 如果启用了参数卸载,先加载到 GPUif self._is_offload_param:load_fsdp_model_to_gpu(self.actor_module_fsdp)# 3. 更新 meta_info(EOS token ID 等)prompts.meta_info.update(meta_info)# 4. 使用 sharding manager 同步参数with self.rollout_sharding_manager:# 5. 同步参数后,卸载 actor 模型到 CPU(节省显存)if self._is_offload_param:offload_fsdp_model_to_cpu(self.actor_module_fsdp)# 6. 预处理数据(参数同步、数据分片等)prompts = self.rollout_sharding_manager.preprocess_data(prompts)# 7. 调用 Rollout 生成序列 ⭐ 核心调用点output = self.rollout.generate_sequences(prompts=prompts)# 8. 后处理数据(数据聚合等)output = self.rollout_sharding_manager.postprocess_data(output)# 9. 将结果移到 CPUoutput = output.to('cpu')return output

4. ActorRolloutRefWorker (Megatron 版本)

位置: verl/workers/megatron_workers.py

作用:

  • 与 FSDP 版本类似,但使用 Megatron-LM 进行分布式训练
  • 支持 Pipeline Parallelism 和 Tensor Parallelism

关键区别:

  • 使用 MegatronVLLMShardingManager 而不是 FSDPVLLMShardingManager
  • 需要处理 Pipeline Parallelism 的参数同步
  • 目前仅支持 vLLM <= 0.6.3 (customized 模式)

关键代码片段 (_build_rollout()):

def _build_rollout(self):if self.config.rollout.name == 'vllm':from verl.workers.rollout.vllm_rollout import vLLMRollout, vllm_mode# 从 actor 获取参数并同步到所有 PP ranksself.hybrid_engine.load_params_to_cuda()self.hybrid_engine.allgather_params()params = self.hybrid_engine.get_all_params()assert vllm_mode == 'customized', "Megatron 仅支持 customized 模式"rollout = vLLMRollout(actor_module=params,config=self.config.rollout,tokenizer=self.tokenizer,model_hf_config=self.actor_model_config,train_tp=mpu.get_tensor_model_parallel_world_size())

5. FSDPVLLMShardingManager

位置: verl/workers/sharding_manager/fsdp_vllm.py

作用:

  • 管理 FSDP 模型参数与 vLLM 推理引擎之间的参数同步
  • 在生成序列前,将 FSDP 分片的参数同步到 vLLM
  • 在生成序列后,清理临时参数

关键方法:

  • preprocess_data(): 同步参数到 vLLM,准备生成
  • postprocess_data(): 生成完成后的清理工作
  • __enter__() / __exit__(): 上下文管理器,确保参数同步和清理

工作原理:

  1. 进入上下文时 (__enter__): 从 FSDP 模型获取完整参数,同步到 vLLM
  2. 生成序列: vLLM 使用同步的参数进行推理
  3. 退出上下文时 (__exit__): 清理 vLLM 中的临时参数,释放显存

6. RayTrainer

位置: verl/trainer/ppo/ray_trainer.py

作用:

  • PPO 训练的主控制器
  • 管理所有 Worker 组(Actor、Rollout、Critic、Reward Model 等)
  • 协调训练循环、数据流、检查点保存等

关键流程:

  1. 初始化所有 Worker 组
  2. 创建数据加载器
  3. 训练循环:
    • 从数据加载器获取 prompts
    • 调用 actor_rollout_wg.generate_sequences() 生成序列
    • 计算奖励和优势
    • 更新 Actor 和 Critic

关键代码片段:

# 初始化 ActorRollout Worker 组
self.actor_rollout_wg = all_wg['actor_rollout']
self.actor_rollout_wg.init_model()# 在训练循环中调用
output = self.actor_rollout_wg.generate_sequences(prompts=prompts)

详细调用流程

阶段 1: 初始化阶段

1. main_ppo.py 启动训练↓
2. RayTrainer.__init__()- 创建 Worker 组映射- 配置资源池↓
3. ActorRolloutRefWorker 实例化- 设置角色标志 (_is_actor, _is_rollout, _is_ref)↓
4. ActorRolloutRefWorker.init_model()- 加载模型- 初始化优化器- 调用 _build_rollout()↓
5. _build_rollout()- 检查 vLLM 版本,确定使用 customized 还是 spmd 模式- 创建 vLLMRollout 实例 ⭐- 创建 ShardingManager 实例↓
6. vLLMRollout.__init__()- 初始化 vLLM 推理引擎 (LLM)- 设置采样参数- 配置工具执行器

阶段 2: 训练循环中的序列生成

1. RayTrainer 训练循环- 从数据加载器获取 prompts (DataProto)↓
2. actor_rollout_wg.generate_sequences(prompts)- 调用 ActorRolloutRefWorker.generate_sequences()↓
3. ActorRolloutRefWorker.generate_sequences()- 将 prompts 移到 GPU- 加载模型到 GPU (如果启用了 offload)↓
4. 进入 ShardingManager 上下文with self.rollout_sharding_manager:↓5. ShardingManager.preprocess_data()- 从 FSDP 模型获取完整参数- 同步参数到 vLLM 推理引擎↓6. self.rollout.generate_sequences(prompts) ⭐ 核心调用- 调用 vLLMRollout.generate_sequences()↓7. vLLMRollout.generate_sequences()- 预处理输入 (去除 padding, 提取 token IDs)- 调用 _tir_generate() 进行生成↓8. vLLMRollout._tir_generate()- 多轮 LLM 调用循环:a. 调用 vLLM 生成响应b. 检测工具调用 (Python 代码)c. 执行 Python 代码d. 将执行结果追加到响应e. 继续生成直到完成- Tokenize 响应- 计算工具输出 mask- 返回 token IDs、mask 和执行状态↓9. 后处理响应- Padding 到统一长度- 构建 attention_mask 和 position_ids- 拼接 prompt 和 response↓10. ShardingManager.postprocess_data()- 清理临时参数- 聚合数据 (如果使用分布式)↓
11. 返回生成的序列 (DataProto)

关键代码位置

1. vLLMRollout 实例化

FSDP 版本:

  • 文件: verl/workers/fsdp_workers.py
  • 方法: ActorRolloutRefWorker._build_rollout()
  • 行号: 324-329

Megatron 版本:

  • 文件: verl/workers/megatron_workers.py
  • 方法: ActorRolloutRefWorker._build_rollout()
  • 行号: 258-262

2. generate_sequences 调用

FSDP 版本:

  • 文件: verl/workers/fsdp_workers.py
  • 方法: ActorRolloutRefWorker.generate_sequences()
  • 行号: 489

Megatron 版本:

  • 文件: verl/workers/megatron_workers.py
  • 方法: ActorRolloutRefWorker.generate_sequences()
  • 行号: 386

3. vLLMRollout 核心实现

  • 文件: verl/workers/rollout/vllm_rollout/vllm_rollout_spmd.py
  • 类: vLLMRollout
  • 关键方法:
    • generate_sequences(): 行号 389-497
    • _tir_generate(): 行号 259-370

4. 版本选择和导入

  • 文件: verl/workers/rollout/vllm_rollout/__init__.py
  • 逻辑: 根据 vLLM 版本自动选择实现
    • vLLM <= 0.6.3: 使用 vllm_rollout.py (customized 模式)
    • vLLM > 0.6.3: 使用 vllm_rollout_spmd.py (SPMD 模式)

数据流动过程

输入数据 (DataProto)

prompts = DataProto(batch={'input_ids': tensor([[0,0,0,1234,5678], ...]),  # (batch_size, prompt_length)'attention_mask': tensor([[0,0,0,1,1], ...]),   # 左填充的 mask'position_ids': tensor([[0,0,0,0,1], ...])     # 位置编码},meta_info={'eos_token_id': 151643,'do_sample': True},non_tensor_batch={}  # 可能包含多模态数据
)

处理流程

  1. 预处理 (generate_sequences 开始):

    • 去除左填充,提取 raw_prompt_ids
    • 构建 vLLM 输入格式
  2. TIR 生成 (_tir_generate):

    • 第 1 轮: LLM 生成响应,检测到工具调用
    • 执行工具: 运行 Python 代码,获取结果
    • 第 2 轮: 将工具结果追加到 prompt,继续生成
    • 重复直到完成或达到最大轮数
  3. 后处理 (generate_sequences 结束):

    • Tokenize 响应
    • Padding 到统一长度
    • 构建完整的 attention_mask 和 position_ids
    • 拼接 prompt 和 response

输出数据 (DataProto)

output = DataProto(batch={'prompts': tensor([[0,0,0,1234,5678], ...]),      # 原始 prompt'responses': tensor([[9012,1234,...], ...]),     # 生成的 response'input_ids': tensor([[0,0,0,1234,5678,9012,...], ...]),  # prompt + response'attention_mask': tensor([[0,0,0,1,1,1,1,0,...], ...]), # 完整序列的 mask'position_ids': tensor([[0,0,0,0,1,2,3,4,...], ...]),   # 完整序列的位置 ID'tool_output_masks': tensor([[1,1,1,0,0,0,1,...], ...]), # 工具输出 mask'execution_passes': tensor([1, 0])  # 执行是否通过},non_tensor_batch={}
)

关键配置参数

Rollout 配置 (config.rollout)

rollout:name: vllm                    # 使用 vLLM rollouttensor_model_parallel_size: 4  # 张量并行大小prompt_length: 512             # Prompt 最大长度response_length: 1024          # Response 最大长度temperature: 0.7              # 采样温度top_p: 0.9                    # Top-p 采样num_llm_calls_available: 5    # 最大 LLM 调用次数 (TIR)gpu_memory_utilization: 0.85   # GPU 内存利用率enforce_eager: False          # 是否禁用 CUDA graphfree_cache_engine: True       # 是否释放缓存引擎

Worker 配置 (config.actor_rollout_ref)

actor_rollout_ref:actor:strategy: fsdp              # 或 megatronfsdp_config:param_offload: True        # 参数卸载到 CPUoptimizer_offload: True    # 优化器卸载到 CPUrollout:name: vllm# ... rollout 配置

总结

  1. vLLMRollout 是核心的序列生成类,实现了基于 vLLM 的高效推理
  2. ActorRolloutRefWorker 是 Worker 层,负责模型管理和参数同步
  3. ShardingManager 负责 FSDP/Megatron 参数与 vLLM 之间的同步
  4. RayTrainer 是训练控制器,协调整个训练流程

整个调用链的设计遵循了清晰的职责分离:

  • 训练逻辑 (RayTrainer) → Worker 管理 (ActorRolloutRefWorker) → 参数同步 (ShardingManager) → 序列生成 (vLLMRollout)

这种设计使得系统具有良好的可扩展性和可维护性。

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询