嘉兴市网站建设_网站建设公司_VS Code_seo优化
2025/12/25 15:47:09 网站建设 项目流程

知识库-向量化功能-流式分片

一、设计背景

针对超大文本(如100MB以上纯文本文件),传统“一次性加载全部文本到内存再分片”的方式易导致内存溢出、方法卡死等问题。因此采用流式分片策略:逐批次读取文本到缓冲区,按需生成分片,全程不加载完整文本到内存,大幅降低内存占用。

二、核心逻辑

  1. 流式读取:通过Reader逐缓冲区读取文本,避免一次性加载全量内容;
  2. 批次分片:对每批次读取的文本按句子结束符分割,保证分片语义完整性;
  3. 兜底机制:设置最小递增步长、最大循环次数,避免死循环;
  4. 重叠处理:分片间保留重叠字符,防止跨批次句子语义断裂;
  5. 剩余文本处理:合并未处理完的文本到下一批次,保证分片完整性。

三、核心实现代码

import java.io.IOException; import java.io.Reader; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.regex.Pattern; /** * 超大文本流式分片工具类 * 核心:按需读取、按需分片,避免一次性加载全量文本到内存 */ public class StreamTextSplitter { // 句子结束符正则(覆盖中英文结束符) private static final Pattern SENTENCE_END_PATTERN = Pattern.compile("[。!?;\\n\\.!?;]"); // 单分片最大字符数(可根据模型上下文调整) private static final int MAX_CHUNK_LENGTH = 1000; // 分片重叠字符数(避免跨分片语义断裂) public static final int CHUNK_OVERLAP = 100; // 最小递增步长(防止死循环) private static final int MIN_STEP = MAX_CHUNK_LENGTH / 2; // 最大循环次数(兜底,避免无限循环) private static final int MAX_LOOP_COUNT = 100; /** * 流式分片:核心方法,按需生成文本分片 * @param textReader 文本流式读取器(支持FileReader/InputStreamReader等) * @param bufferSize 缓冲区大小(建议8192/16384字符,平衡IO次数与内存占用) * @return 分片迭代器(按需遍历,不一次性加载所有分片到内存) * @throws IOException 文本读取异常 */ public Iterator<String> streamSplitText(Reader textReader, int bufferSize) throws IOException { List<String> results = new ArrayList<>(); List<String> currentBatchChunks = new ArrayList<>(); char[] buffer = new char[bufferSize]; int readLen; // 上一批次未处理完的文本(合并到下一批) String remainingText = ""; // 逐缓冲区读取文本 while ((readLen = textReader.read(buffer)) != -1) { // 拼接:剩余文本 + 新读取的文本 String batchText = remainingText + new String(buffer, 0, readLen); // 对当前批次文本进行分片 List<String> batchChunks = splitBatchText(batchText); if (!batchChunks.isEmpty()) { String lastChunk = batchChunks.get(batchChunks.size() - 1); // 判断最后一个分片是否为“不完整分片”(长度<最大长度的1/2) if (lastChunk.length() < MAX_CHUNK_LENGTH / 2) { // 不完整分片:保留到下一批次处理 remainingText = lastChunk; // 完整分片:加入结果集 currentBatchChunks.addAll(batchChunks.subList(0, batchChunks.size() - 1)); } else { // 最后一个分片完整:清空剩余文本,全部加入 remainingText = ""; currentBatchChunks.addAll(batchChunks); } } // 批量添加到结果集,减少内存波动 results.addAll(currentBatchChunks); currentBatchChunks.clear(); } // 处理最后剩余的文本(非空则作为最后一个分片) if (!remainingText.isEmpty()) { String lastChunk = remainingText.trim(); if (!lastChunk.isEmpty()) { results.add(lastChunk); } } // 返回迭代器,支持按需遍历 return results.iterator(); } /** * 批次文本分片:对单批次文本按句子结束符分割,保证语义完整 * @param text 单批次读取的文本 * @return 该批次的文本分片列表 */ private List<String> splitBatchText(String text) { List<String> chunks = new ArrayList<>(); int textLength = text.length(); if (textLength == 0) { return chunks; } int start = 0; int loopCount = 0; // 循环分片,直到处理完当前批次文本或达到最大循环次数 while (start < textLength && loopCount < MAX_LOOP_COUNT) { loopCount++; // 1. 基础结束位置(不超过最大分片长度) int end = Math.min(start + MAX_CHUNK_LENGTH, textLength); // 2. 调整结束位置到最近的句子结束符(保证语义完整) end = adjustToSentenceEnd(text, start, end); // 3. 截取分片并去空 String chunk = text.substring(start, end).trim(); if (!chunk.isEmpty()) { chunks.add(chunk); } // 4. 计算下一个分片的起始位置(核心:避免死循环) int nextStart = end - CHUNK_OVERLAP; nextStart = Math.max(nextStart, start + MIN_STEP); // 至少递增最小步长 nextStart = Math.min(nextStart, textLength); // 不超过文本长度 // 5. 终止条件:起始位置不再递增(防止死循环) if (nextStart <= start) { break; } start = nextStart; } // 兜底:处理最后剩余的文本(循环终止后仍有未处理内容) if (start < textLength) { String lastChunk = text.substring(start).trim(); if (!lastChunk.isEmpty()) { chunks.add(lastChunk); } } // 日志提示:循环次数达上限(需排查文本格式问题) if (loopCount >= MAX_LOOP_COUNT) { System.err.printf("批次文本分片循环次数达上限(%d次),可能存在异常文本格式!文本长度:%d%n", MAX_LOOP_COUNT, textLength); } return chunks; } /** * 调整分片结束位置到句子末尾(优化:避免过度回溯) * @param text 待分片文本 * @param start 分片起始位置 * @param end 基础结束位置 * @return 调整后的结束位置(保证语义完整) */ private int adjustToSentenceEnd(String text, int start, int end) { // 已到文本末尾,直接返回 if (end >= text.length()) { return end; } // 截取当前分片范围的文本 String subText = text.substring(start, end); // 查找最后一个句子结束符的位置 int lastEndPos = findLastSentenceEnd(subText); // 仅当回溯后分片长度≥最小有效长度时,才调整(避免过短分片) int minValidLength = 50; if (lastEndPos != -1) { int adjustedEnd = start + lastEndPos + 1; if (adjustedEnd - start >= minValidLength) { return adjustedEnd; } } // 无有效句子结束符,返回基础结束位置 return end; } /** * 查找文本中最后一个句子结束符的位置 * @param text 待查找文本 * @return 最后一个结束符的索引(无则返回-1) */ private int findLastSentenceEnd(String text) { for (int i = text.length() - 1; i >= 0; i--) { if (SENTENCE_END_PATTERN.matcher(String.valueOf(text.charAt(i))).matches()) { return i; } } return -1; } }

四、使用方法

4.1 基础使用(读取本地超大文本文件)

import java.io.FileReader; import java.io.Reader; import java.util.Iterator; public class StreamSplitterDemo { public static void main(String[] args) { // 1. 初始化流式分片工具 StreamTextSplitter splitter = new StreamTextSplitter(); // 2. 定义文件路径和缓冲区大小(建议8192字符) String filePath = "D:/large_text.txt"; int bufferSize = 8192; try (Reader textReader = new FileReader(filePath)) { // 3. 执行流式分片,获取迭代器 Iterator<String> chunkIterator = splitter.streamSplitText(textReader, bufferSize); // 4. 按需遍历分片(逐一分片处理,不加载全量到内存) int chunkIndex = 0; while (chunkIterator.hasNext()) { String chunk = chunkIterator.next(); chunkIndex++; // 业务处理:如向量化、存储到ES等 System.out.printf("分片%d:长度=%d,内容预览:%s%n", chunkIndex, chunk.length(), chunk.substring(0, Math.min(chunk.length(), 50))); // 预览前50字符 } System.out.printf("分片完成,共生成%d个分片%n", chunkIndex); } catch (Exception e) { System.err.println("流式分片失败:" + e.getMessage()); e.printStackTrace(); } } }

4.2 结合文件上传场景(InputStreamReader)

import org.springframework.web.multipart.MultipartFile; import java.io.InputStreamReader; import java.io.Reader; import java.nio.charset.StandardCharsets; /** * 处理上传的超大文本文件(如TXT/CSV) * @param file 上传的文件 * @throws Exception 处理异常 */ public void processUploadedLargeFile(MultipartFile file) throws Exception { StreamTextSplitter splitter = new StreamTextSplitter(); int bufferSize = 16384; // 大文件建议增大缓冲区 // 基于文件输入流创建Reader(指定UTF-8编码避免乱码) try (Reader reader = new InputStreamReader(file.getInputStream(), StandardCharsets.UTF_8)) { Iterator<String> chunkIterator = splitter.streamSplitText(reader, bufferSize); // 遍历分片,逐个向量化并存储 while (chunkIterator.hasNext()) { String chunk = chunkIterator.next(); // 调用向量化方法 // float[] vector = embeddingModel.embed(chunk); // 存储到ES // esService.saveChunk(chunk, vector); } } }

4.3 结合PDF/Word解析(流式处理解析结果)

/** * 流式处理PDF解析后的超大文本 * @param pdfFilePath PDF文件路径 * @throws Exception 处理异常 */ public void streamProcessPdf(String pdfFilePath) throws Exception { // 1. 解析PDF获取文本(参考PDF解析文档) String pdfText = PdfTextExtractor.getContent(pdfFilePath); // 2. 将字符串转为Reader(模拟流式读取) try (Reader reader = new java.io.StringReader(pdfText)) { StreamTextSplitter splitter = new StreamTextSplitter(); Iterator<String> chunkIterator = splitter.streamSplitText(reader, 8192); // 3. 逐分片处理 while (chunkIterator.hasNext()) { String chunk = chunkIterator.next(); // 向量化+存储逻辑 } } }

五、核心设计说明

5.1 关键参数说明
参数名取值作用
MAX_CHUNK_LENGTH1000单分片最大字符数,需匹配嵌入模型上下文窗口(如num_ctx=1024)
CHUNK_OVERLAP100分片间重叠字符数,避免跨分片句子语义断裂(如“我是一个测试文本”被截断为“我是一个”和“测试文本”)
MIN_STEP500最小递增步长,防止因重叠导致start位置不递增,触发死循环
MAX_LOOP_COUNT100最大循环次数,兜底处理异常文本(如无句子结束符的超长文本)
bufferSize8192/16384缓冲区大小,建议为2的幂次,平衡IO次数与内存占用
5.2 核心优势
优势解决的问题
流式读取避免一次性加载超大文本到内存,防止OOM/方法卡死
句子结束符分割保证分片语义完整性,避免生硬截断导致的向量化误差
剩余文本合并防止跨批次文本丢失,保证分片完整性
多重兜底机制最小步长+最大循环次数,彻底避免死循环
迭代器返回支持按需遍历,分片处理完成后立即释放内存

六、注意事项

  1. 编码一致性:创建Reader时需指定编码(如UTF-8),避免不同编码导致的字符解析错误;
  2. 缓冲区大小调整:小文件(<1MB)建议用8192,超大文件(>100MB)建议用16384/32768,减少IO次数;
  3. 异常文本处理:若文本无任何句子结束符(如纯数字/乱码),会触发兜底按最大长度分割,需在日志中监控此类情况;
  4. 资源释放:必须通过try-with-resources关闭Reader,避免文件句柄泄漏;
  5. 性能优化:分片处理(向量化/存储)建议结合线程池异步执行,提升整体效率;
  6. 内存监控:处理GB级文本时,建议监控JVM堆内存,避免缓冲区+分片结果集占用过多内存;
  7. 特殊字符处理:若文本包含大量特殊符号(如\t/\r),需在分片前先格式化清理。

七、扩展建议

  1. 自定义结束符:开放SENTENCE_END_PATTERN配置,支持业务自定义句子结束符;
  2. 分片过滤:增加空分片/短分片过滤逻辑(如过滤长度<50的分片);
  3. 进度回调:增加分片进度回调函数,便于前端展示处理进度;
  4. 批量写入:结合ES的bulk写入,积累一定数量分片后批量存储,减少网络请求;
  5. 超时控制:为分片处理增加超时机制,避免单个分片处理耗时过长;
  6. 监控指标:统计分片总数、平均分片长度、处理耗时,便于性能调优;
  7. 自适应缓冲区:根据文本读取速度动态调整缓冲区大小,平衡性能与内存。

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询