机制分析
关键文件和类
文件路径:langchain_text_splitters/character.py
类名:RecursiveCharacterTextSplitter
核心入口函数:_split_text
解析步骤及源码分析
| 步骤 | 说明 | 示例/细节 |
|---|---|---|
| 1. 分隔符降级 | 按separators=["\n\n", "\n", " ", ""]顺序尝试,分隔符可自定义 | 先用\n\n切段落,若任一段>chunk_size 字符,则对该段降级使用\n切,依次类推 |
| 2. 递归切分 | 对“超长段”重复步骤 1,直到所有片段 ≤ chunk_size 或已用完分隔符 | 若句子级仍超长,最终用空字符串""按字符硬切 |
| 3. 段合并(Merge) | 把“好段”依次拼成尽可能长的块,保证 ≤ chunk_size | 如果拼到再加就超chunk_size,则封口、起新块 |
| 4. 重叠(Overlap) | 当合并完成一个块A1之后,算法回退<=chunk_overlap的长度,以便下个块A2包含前一个块末尾overlap的内容。特别的,若块A1的最后一段长度大于chunk_overlap,则为保证语义完整,不强行生成overlap。 | 相邻两个块 A1和A2,则 A2 头部 = A1 尾部 overlap 内容 |
上述步骤的总体逻辑在_split_text函数中,关键处均有代码注释说明:
def_split_text(self,text:str,separators:list[str])->list[str]:"""Split incoming text and return chunks."""final_chunks=[]# Get appropriate separator to use# 得到当前层级的分隔符separator=separators[-1]new_separators=[]fori,_sinenumerate(separators):_separator=_sifself._is_separator_regexelsere.escape(_s)if_s=="":separator=_sbreak# 快速搜索文本中是否存在分隔符,不存在,则分隔符降级ifre.search(_separator,text):separator=_s new_separators=separators[i+1:]break_separator=separatorifself._is_separator_regexelsere.escape(separator)splits=_split_text_with_regex(text,_separator,keep_separator=self._keep_separator)# Now go merging things, recursively splitting longer texts.# 所谓的“好段”指的是长度小于chunk_size的段_good_splits=[]_separator=""ifself._keep_separatorelseseparatorforsinsplits:ifself._length_function(s)<self._chunk_size:_good_splits.append(s)else:# 如果发现一个段不是好段,就把前面累积的好段做合并if_good_splits:merged_text=self._merge_splits(_good_splits,_separator)final_chunks.extend(merged_text)_good_splits=[]ifnotnew_separators:# 这里看似会生成一个超长块,但考虑到最后一个分隔符是空(亦即按字符切分),这行代码其实是跑不到的。final_chunks.append(s)else:# 对于超长段,用下级分隔符递归切分other_info=self._split_text(s,new_separators)final_chunks.extend(other_info)if_good_splits:merged_text=self._merge_splits(_good_splits,_separator)final_chunks.extend(merged_text)returnfinal_chunks合并与重叠的逻辑则在_merge_splits函数中,关键处均有代码注释说明:
def_merge_splits(self,splits:Iterable[str],separator:str)->list[str]:# We now want to combine these smaller pieces into medium size# chunks to send to the LLM.separator_len=self._length_function(separator)docs=[]current_doc:list[str]=[]total=0# splits是所谓的好段,我们将好段尽可能拼接为较大的块fordinsplits:_len=self._length_function(d)# 拼到再加就超chunk_size,则封口、起新块if(total+_len+(separator_leniflen(current_doc)>0else0)>self._chunk_size):iftotal>self._chunk_size:logger.warning(f"Created a chunk of size{total}, "f"which is longer than the specified{self._chunk_size}")iflen(current_doc)>0:doc=self._join_docs(current_doc,separator)ifdocisnotNone:docs.append(doc)# Keep on popping if:# - we have a larger chunk than in the chunk overlap# - or if we still have any chunks and the length is long# 这一段是overlap的核心处理逻辑# 算法回退<=chunk_overlap的长度,以便下个块A2包含前一个块A1末尾overlap的内容。特别的,若块A1的最后一段长度大于chunk_overlap,则为保证语义完整,不会生成overlap。current_doc里存的就是一个块的所有段。whiletotal>self._chunk_overlapor(total+_len+(separator_leniflen(current_doc)>0else0)>self._chunk_sizeandtotal>0):total-=self._length_function(current_doc[0])+(separator_leniflen(current_doc)>1else0)current_doc=current_doc[1:]current_doc.append(d)total+=_len+(separator_leniflen(current_doc)>1else0)doc=self._join_docs(current_doc,separator)ifdocisnotNone:docs.append(doc)returndocs