知识库-向量化功能-流式分片
一、设计背景
针对超大文本(如100MB以上纯文本文件),传统“一次性加载全部文本到内存再分片”的方式易导致内存溢出、方法卡死等问题。因此采用流式分片策略:逐批次读取文本到缓冲区,按需生成分片,全程不加载完整文本到内存,大幅降低内存占用。
二、核心逻辑
- 流式读取:通过
Reader逐缓冲区读取文本,避免一次性加载全量内容; - 批次分片:对每批次读取的文本按句子结束符分割,保证分片语义完整性;
- 兜底机制:设置最小递增步长、最大循环次数,避免死循环;
- 重叠处理:分片间保留重叠字符,防止跨批次句子语义断裂;
- 剩余文本处理:合并未处理完的文本到下一批次,保证分片完整性。
三、核心实现代码
import java.io.IOException; import java.io.Reader; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.regex.Pattern; /** * 超大文本流式分片工具类 * 核心:按需读取、按需分片,避免一次性加载全量文本到内存 */ public class StreamTextSplitter { // 句子结束符正则(覆盖中英文结束符) private static final Pattern SENTENCE_END_PATTERN = Pattern.compile("[。!?;\\n\\.!?;]"); // 单分片最大字符数(可根据模型上下文调整) private static final int MAX_CHUNK_LENGTH = 1000; // 分片重叠字符数(避免跨分片语义断裂) public static final int CHUNK_OVERLAP = 100; // 最小递增步长(防止死循环) private static final int MIN_STEP = MAX_CHUNK_LENGTH / 2; // 最大循环次数(兜底,避免无限循环) private static final int MAX_LOOP_COUNT = 100; /** * 流式分片:核心方法,按需生成文本分片 * @param textReader 文本流式读取器(支持FileReader/InputStreamReader等) * @param bufferSize 缓冲区大小(建议8192/16384字符,平衡IO次数与内存占用) * @return 分片迭代器(按需遍历,不一次性加载所有分片到内存) * @throws IOException 文本读取异常 */ public Iterator<String> streamSplitText(Reader textReader, int bufferSize) throws IOException { List<String> results = new ArrayList<>(); List<String> currentBatchChunks = new ArrayList<>(); char[] buffer = new char[bufferSize]; int readLen; // 上一批次未处理完的文本(合并到下一批) String remainingText = ""; // 逐缓冲区读取文本 while ((readLen = textReader.read(buffer)) != -1) { // 拼接:剩余文本 + 新读取的文本 String batchText = remainingText + new String(buffer, 0, readLen); // 对当前批次文本进行分片 List<String> batchChunks = splitBatchText(batchText); if (!batchChunks.isEmpty()) { String lastChunk = batchChunks.get(batchChunks.size() - 1); // 判断最后一个分片是否为“不完整分片”(长度<最大长度的1/2) if (lastChunk.length() < MAX_CHUNK_LENGTH / 2) { // 不完整分片:保留到下一批次处理 remainingText = lastChunk; // 完整分片:加入结果集 currentBatchChunks.addAll(batchChunks.subList(0, batchChunks.size() - 1)); } else { // 最后一个分片完整:清空剩余文本,全部加入 remainingText = ""; currentBatchChunks.addAll(batchChunks); } } // 批量添加到结果集,减少内存波动 results.addAll(currentBatchChunks); currentBatchChunks.clear(); } // 处理最后剩余的文本(非空则作为最后一个分片) if (!remainingText.isEmpty()) { String lastChunk = remainingText.trim(); if (!lastChunk.isEmpty()) { results.add(lastChunk); } } // 返回迭代器,支持按需遍历 return results.iterator(); } /** * 批次文本分片:对单批次文本按句子结束符分割,保证语义完整 * @param text 单批次读取的文本 * @return 该批次的文本分片列表 */ private List<String> splitBatchText(String text) { List<String> chunks = new ArrayList<>(); int textLength = text.length(); if (textLength == 0) { return chunks; } int start = 0; int loopCount = 0; // 循环分片,直到处理完当前批次文本或达到最大循环次数 while (start < textLength && loopCount < MAX_LOOP_COUNT) { loopCount++; // 1. 基础结束位置(不超过最大分片长度) int end = Math.min(start + MAX_CHUNK_LENGTH, textLength); // 2. 调整结束位置到最近的句子结束符(保证语义完整) end = adjustToSentenceEnd(text, start, end); // 3. 截取分片并去空 String chunk = text.substring(start, end).trim(); if (!chunk.isEmpty()) { chunks.add(chunk); } // 4. 计算下一个分片的起始位置(核心:避免死循环) int nextStart = end - CHUNK_OVERLAP; nextStart = Math.max(nextStart, start + MIN_STEP); // 至少递增最小步长 nextStart = Math.min(nextStart, textLength); // 不超过文本长度 // 5. 终止条件:起始位置不再递增(防止死循环) if (nextStart <= start) { break; } start = nextStart; } // 兜底:处理最后剩余的文本(循环终止后仍有未处理内容) if (start < textLength) { String lastChunk = text.substring(start).trim(); if (!lastChunk.isEmpty()) { chunks.add(lastChunk); } } // 日志提示:循环次数达上限(需排查文本格式问题) if (loopCount >= MAX_LOOP_COUNT) { System.err.printf("批次文本分片循环次数达上限(%d次),可能存在异常文本格式!文本长度:%d%n", MAX_LOOP_COUNT, textLength); } return chunks; } /** * 调整分片结束位置到句子末尾(优化:避免过度回溯) * @param text 待分片文本 * @param start 分片起始位置 * @param end 基础结束位置 * @return 调整后的结束位置(保证语义完整) */ private int adjustToSentenceEnd(String text, int start, int end) { // 已到文本末尾,直接返回 if (end >= text.length()) { return end; } // 截取当前分片范围的文本 String subText = text.substring(start, end); // 查找最后一个句子结束符的位置 int lastEndPos = findLastSentenceEnd(subText); // 仅当回溯后分片长度≥最小有效长度时,才调整(避免过短分片) int minValidLength = 50; if (lastEndPos != -1) { int adjustedEnd = start + lastEndPos + 1; if (adjustedEnd - start >= minValidLength) { return adjustedEnd; } } // 无有效句子结束符,返回基础结束位置 return end; } /** * 查找文本中最后一个句子结束符的位置 * @param text 待查找文本 * @return 最后一个结束符的索引(无则返回-1) */ private int findLastSentenceEnd(String text) { for (int i = text.length() - 1; i >= 0; i--) { if (SENTENCE_END_PATTERN.matcher(String.valueOf(text.charAt(i))).matches()) { return i; } } return -1; } }
四、使用方法
4.1 基础使用(读取本地超大文本文件)
import java.io.FileReader; import java.io.Reader; import java.util.Iterator; public class StreamSplitterDemo { public static void main(String[] args) { // 1. 初始化流式分片工具 StreamTextSplitter splitter = new StreamTextSplitter(); // 2. 定义文件路径和缓冲区大小(建议8192字符) String filePath = "D:/large_text.txt"; int bufferSize = 8192; try (Reader textReader = new FileReader(filePath)) { // 3. 执行流式分片,获取迭代器 Iterator<String> chunkIterator = splitter.streamSplitText(textReader, bufferSize); // 4. 按需遍历分片(逐一分片处理,不加载全量到内存) int chunkIndex = 0; while (chunkIterator.hasNext()) { String chunk = chunkIterator.next(); chunkIndex++; // 业务处理:如向量化、存储到ES等 System.out.printf("分片%d:长度=%d,内容预览:%s%n", chunkIndex, chunk.length(), chunk.substring(0, Math.min(chunk.length(), 50))); // 预览前50字符 } System.out.printf("分片完成,共生成%d个分片%n", chunkIndex); } catch (Exception e) { System.err.println("流式分片失败:" + e.getMessage()); e.printStackTrace(); } } }
4.2 结合文件上传场景(InputStreamReader)
import org.springframework.web.multipart.MultipartFile; import java.io.InputStreamReader; import java.io.Reader; import java.nio.charset.StandardCharsets; /** * 处理上传的超大文本文件(如TXT/CSV) * @param file 上传的文件 * @throws Exception 处理异常 */ public void processUploadedLargeFile(MultipartFile file) throws Exception { StreamTextSplitter splitter = new StreamTextSplitter(); int bufferSize = 16384; // 大文件建议增大缓冲区 // 基于文件输入流创建Reader(指定UTF-8编码避免乱码) try (Reader reader = new InputStreamReader(file.getInputStream(), StandardCharsets.UTF_8)) { Iterator<String> chunkIterator = splitter.streamSplitText(reader, bufferSize); // 遍历分片,逐个向量化并存储 while (chunkIterator.hasNext()) { String chunk = chunkIterator.next(); // 调用向量化方法 // float[] vector = embeddingModel.embed(chunk); // 存储到ES // esService.saveChunk(chunk, vector); } } }
4.3 结合PDF/Word解析(流式处理解析结果)
/** * 流式处理PDF解析后的超大文本 * @param pdfFilePath PDF文件路径 * @throws Exception 处理异常 */ public void streamProcessPdf(String pdfFilePath) throws Exception { // 1. 解析PDF获取文本(参考PDF解析文档) String pdfText = PdfTextExtractor.getContent(pdfFilePath); // 2. 将字符串转为Reader(模拟流式读取) try (Reader reader = new java.io.StringReader(pdfText)) { StreamTextSplitter splitter = new StreamTextSplitter(); Iterator<String> chunkIterator = splitter.streamSplitText(reader, 8192); // 3. 逐分片处理 while (chunkIterator.hasNext()) { String chunk = chunkIterator.next(); // 向量化+存储逻辑 } } }
五、核心设计说明
5.1 关键参数说明
| 参数名 | 取值 | 作用 |
|---|---|---|
| MAX_CHUNK_LENGTH | 1000 | 单分片最大字符数,需匹配嵌入模型上下文窗口(如num_ctx=1024) |
| CHUNK_OVERLAP | 100 | 分片间重叠字符数,避免跨分片句子语义断裂(如“我是一个测试文本”被截断为“我是一个”和“测试文本”) |
| MIN_STEP | 500 | 最小递增步长,防止因重叠导致start位置不递增,触发死循环 |
| MAX_LOOP_COUNT | 100 | 最大循环次数,兜底处理异常文本(如无句子结束符的超长文本) |
| bufferSize | 8192/16384 | 缓冲区大小,建议为2的幂次,平衡IO次数与内存占用 |
5.2 核心优势
| 优势 | 解决的问题 |
|---|---|
| 流式读取 | 避免一次性加载超大文本到内存,防止OOM/方法卡死 |
| 句子结束符分割 | 保证分片语义完整性,避免生硬截断导致的向量化误差 |
| 剩余文本合并 | 防止跨批次文本丢失,保证分片完整性 |
| 多重兜底机制 | 最小步长+最大循环次数,彻底避免死循环 |
| 迭代器返回 | 支持按需遍历,分片处理完成后立即释放内存 |
六、注意事项
- 编码一致性:创建
Reader时需指定编码(如UTF-8),避免不同编码导致的字符解析错误; - 缓冲区大小调整:小文件(<1MB)建议用8192,超大文件(>100MB)建议用16384/32768,减少IO次数;
- 异常文本处理:若文本无任何句子结束符(如纯数字/乱码),会触发兜底按最大长度分割,需在日志中监控此类情况;
- 资源释放:必须通过
try-with-resources关闭Reader,避免文件句柄泄漏; - 性能优化:分片处理(向量化/存储)建议结合线程池异步执行,提升整体效率;
- 内存监控:处理GB级文本时,建议监控JVM堆内存,避免缓冲区+分片结果集占用过多内存;
- 特殊字符处理:若文本包含大量特殊符号(如\t/\r),需在分片前先格式化清理。
七、扩展建议
- 自定义结束符:开放
SENTENCE_END_PATTERN配置,支持业务自定义句子结束符; - 分片过滤:增加空分片/短分片过滤逻辑(如过滤长度<50的分片);
- 进度回调:增加分片进度回调函数,便于前端展示处理进度;
- 批量写入:结合ES的bulk写入,积累一定数量分片后批量存储,减少网络请求;
- 超时控制:为分片处理增加超时机制,避免单个分片处理耗时过长;
- 监控指标:统计分片总数、平均分片长度、处理耗时,便于性能调优;
- 自适应缓冲区:根据文本读取速度动态调整缓冲区大小,平衡性能与内存。