大文件上传:分片、断点续传和秒传的实现方案
转载自:https://mp.weixin.qq.com/s/lXStj_oS4dXGl8UWsYx8LQ
处理大文件上传是个常见需求。直接上传大文件会遇到很多问题:网络不稳定、内存占用高、上传失败后又要从头开始。我们需要一套完整的解决方案。
核心思路
我们的做法是把大文件切成很多小片,分别上传这些小片,最后在服务器上把它们拼起来。这样做有几个好处:
-
分片上传:大文件变多个小文件,上传更稳定
-
断点续传:上传中断后,可以从断点继续
-
秒传:如果服务器已经有相同文件,可以瞬间完成
文件选择和队列管理
当用户选择一批文件时,不能同时上传所有文件。那样会拖慢浏览器,导致页面卡顿。
我们需要一个上传队列来控制同时上传的文件数量。
状态初始化和加入队列
// hooks/useFileUpload.tsconst addFiles = useCallback(async (fileList: FileList) => {// 把每个文件包装成对象const newFiles: FileItem[] = Array.from(fileList).map((file) => ({id: crypto.randomUUID(), // 生成唯一IDfile, // 原始文件对象status: "pending", // 初始状态:等待中progress: 0, // 进度从0开始name: file.name,size: file.size,// 其他属性...}));// 更新界面显示的文件列表setFiles((prev) => [...prev, ...newFiles]);// 加入上传队列queueRef.current.push(...newFiles);// 开始处理队列processQueue();}, []);
文件级并发控制
我们用一个调度器来控制同时上传的文件数量:
// hooks/useFileUpload.ts// 设置最大并发数const MAX_CONCURRENT_FILES = 3; // 同时最多上传3个文件const processQueue = useCallback(async () => {// 循环条件:队列有文件 且 当前上传数没到上限while (queueRef.current.length > 0 &&uploadingCountRef.current < MAX_CONCURRENT_FILES) {const fileItem = queueRef.current.shift(); // 从队列取一个文件if (fileItem) {uploadingCountRef.current++; // 增加正在上传的计数// 开始上传这个文件uploadFile(fileItem).finally(() => {// 上传完成(无论成功失败)uploadingCountRef.current--; // 减少计数processQueue(); // 继续处理队列中的下一个});}}}, [uploadFile]);
这样无论用户选择多少文件,最多同时只上传3个,避免卡顿。
文件预处理:计算MD5
为了实现秒传和断点续传,我们需要给文件一个唯一标识。用文件名不行,因为用户可以改名字。我们采用MD5作为文件的“指纹”。
为什么不用整个文件计算MD5?
对于大文件(比如几个GB),一次性读入内存计算MD5会:
-
占用大量内存
-
卡住浏览器界面
-
可能导致浏览器崩溃
增量计算MD5
更好的做法是分块读取计算:
// libs/md5.tsimport SparkMD5 from 'spark-md5';async function calculateMD5(file: File): Promise<string> {return new Promise((resolve, reject) => {const spark = new SparkMD5.ArrayBuffer();const fileReader = new FileReader();// 每次读取2MBconst chunkSize = 2 * 1024 * 1024;let currentChunk = 0;const totalChunks = Math.ceil(file.size / chunkSize);fileReader.onload = (event) => {if (event.target?.result) {// 把这一块数据加入MD5计算spark.append(event.target.result as ArrayBuffer);currentChunk++;// 如果还有下一块,继续读取if (currentChunk < totalChunks) {loadNextChunk();} else {// 所有块都读完,得到最终MD5resolve(spark.end());}}};fileReader.onerror = () => {reject(new Error('文件读取失败'));};function loadNextChunk() {const start = currentChunk * chunkSize;const end = Math.min(start + chunkSize, file.size);const chunk = file.slice(start, end);fileReader.readAsArrayBuffer(chunk);}// 开始读取第一块loadNextChunk();});}
使用Web Worker优化
MD5计算很耗时,如果在主线程计算,用户界面还是会卡。可以用Web Worker在后台线程计算:
// worker/md5.worker.tsself.onmessage = async (event) => {const { file } = event.data;// 在worker里计算MD5const md5 = await calculateMD5(file);// 把结果发回主线程self.postMessage({ md5 });};// 在主线程中使用const worker = new Worker('md5.worker.ts');worker.postMessage({ file });worker.onmessage = (event) => {const { md5 } = event.data;// 拿到MD5,继续下一步};
预检查:秒传和断点续传
拿到文件的MD5后,先问服务器:“这个文件你见过吗?”
发送检查请求
// hooks/useFileUpload.tsasync function checkFile(md5: string, filename: string) {const response = await fetch('/api/upload/check', {method: 'POST',headers: { 'Content-Type': 'application/json' },body: JSON.stringify({ md5, filename })});return response.json();}
服务器端检查逻辑
// app/api/upload/check/route.tsexport async function POST(request: Request) {const { md5, filename } = await request.json();// 1. 检查是否已有完整文件(秒传)const existingFile = await checkFileInIndex(md5);if (existingFile) {return Response.json({exists: true, // 文件已存在path: existingFile.path,uploadedChunks: [] // 没有需要上传的分片});}// 2. 检查是否有部分上传的分片(断点续传)const tempDir = `./temp/${md5}`;let uploadedChunks: number[] = [];if (await fs.exists(tempDir)) {// 读取临时目录,找出已上传的分片const files = await fs.readdir(tempDir);uploadedChunks = files.filter(name => name.endsWith('.chunk')).map(name => parseInt(name.split('.')[0])).sort((a, b) => a - b);}return Response.json({exists: false,uploadedChunks // 返回已上传的分片编号});}
分片上传
如果文件不能秒传,就开始分片上传。
文件分片
// hooks/useFileUpload.tsfunction createChunks(file: File, chunkSize: number = 5 * 1024 * 1024) {const chunks: Blob[] = [];let start = 0;while (start < file.size) {const end = Math.min(start + chunkSize, file.size);chunks.push(file.slice(start, end));start = end;}return chunks;}
分片级并发控制
对于单个大文件,它的分片也不能同时上传,需要控制并发:
// hooks/useFileUpload.tsasync function uploadChunksWithConcurrency(file: File,md5: string,chunksToUpload: number[],uploadedChunks: number[] = []) {const concurrency = 3; // 同时上传3个分片let index = 0; // 当前处理的索引// 上传单个分片const uploadChunk = async (chunkIndex: number) => {const chunkSize = 5 * 1024 * 1024; // 5MBconst start = chunkIndex * chunkSize;const end = Math.min(start + chunkSize, file.size);const chunk = file.slice(start, end);const formData = new FormData();formData.append('file', chunk);formData.append('md5', md5);formData.append('chunkIndex', chunkIndex.toString());formData.append('totalChunks', Math.ceil(file.size / chunkSize).toString());await fetch('/api/upload/chunk', {method: 'POST',body: formData});// 更新进度updateProgress(md5, chunkIndex, uploadedChunks.length);};// 并发上传const workers = Array.from({ length: concurrency }, async () => {while (index < chunksToUpload.length) {const chunkIndex = chunksToUpload[index];index++;await uploadChunk(chunkIndex);}});await Promise.all(workers);}
分片合并
所有分片上传完成后,通知服务器合并文件。
客户端发送合并请求
// hooks/useFileUpload.tsasync function mergeFile(md5: string, filename: string, totalChunks: number) {const response = await fetch('/api/upload/merge', {method: 'POST',headers: { 'Content-Type': 'application/json' },body: JSON.stringify({md5,filename,totalChunks})});return response.json();}
服务器端流式合并
用Node.js的流(Stream)来合并,避免内存溢出:
// app/api/upload/merge/route.tsimport fs from 'fs/promises';import { createReadStream, createWriteStream } from 'fs';import path from 'path';export async function POST(request: Request) {const { md5, filename, totalChunks } = await request.json();const tempDir = `./temp/${md5}`;const finalPath = `./uploads/${md5}_${filename}`;// 创建写入流const writeStream = createWriteStream(finalPath);try {// 按顺序合并所有分片for (let i = 0; i < totalChunks; i++) {const chunkPath = path.join(tempDir, `${i}.chunk`);await new Promise((resolve, reject) => {const readStream = createReadStream(chunkPath);// 把读流连接到写流,{ end: false } 让写流保持打开readStream.pipe(writeStream, { end: false });readStream.on('end', () => {// 这个分片读完,删除临时文件fs.unlink(chunkPath).then(resolve).catch(reject);});readStream.on('error', reject);});}// 所有分片都写完了,关闭写流writeStream.end();// 等待写流完全关闭await new Promise((resolve, reject) => {writeStream.on('finish', resolve);writeStream.on('error', reject);});// 删除临时目录await fs.rmdir(tempDir);// 更新文件索引await addToIndex(md5, finalPath);return Response.json({ success: true, path: finalPath });} catch (error) {writeStream.destroy();return Response.json({ success: false, error: String(error) }, { status: 500 });}}
安全的文件索引更新
多个文件同时完成时,更新索引需要防止冲突:
// libs/file-index.tslet indexWritePromise: Promise<void> = Promise.resolve();export async function addToIndex(md5: string, filePath: string): Promise<void> {// 把新操作链接到Promise链末尾,确保顺序执行const newOperation = indexWritePromise.then(async () => {// 1. 读取现有索引const indexData = await fs.readFile('./index.json', 'utf-8');const index = JSON.parse(indexData || '{}');// 2. 添加新记录index[md5] = filePath;// 3. 写回文件await fs.writeFile('./index.json', JSON.stringify(index, null, 2));}).catch((error) => {console.error('更新索引失败:', error);throw error;});// 更新全局Promise链indexWritePromise = newOperation;// 返回新操作的Promisereturn newOperation;}
完整的上传流程
1. 用户选择文件
// 在组件中const handleFileSelect = (event: react.ChangeEvent<htmlInputElement>) => {if (event.target.files) {addFiles(event.target.files);}};
2. 计算MD5和预检查
async function processFile(fileItem: FileItem) {try {// 1. 计算MD5const md5 = await calculateMD5(fileItem.file);// 2. 检查服务器状态const checkResult = await checkFile(md5, fileItem.file.name);if (checkResult.exists) {// 秒传成功fileItem.status = 'success';fileItem.progress = 100;return;}// 3. 准备上传分片const totalChunks = Math.ceil(fileItem.file.size / (5 * 1024 * 1024));const chunksToUpload = Array.from({ length: totalChunks }, (_, i) => i).filter(i => !checkResult.uploadedChunks.includes(i));if (chunksToUpload.length === 0) {// 所有分片都已上传,直接合并await mergeFile(md5, fileItem.file.name, totalChunks);fileItem.status = 'success';fileItem.progress = 100;return;}// 4. 上传剩余分片fileItem.status = 'uploading';await uploadChunksWithConcurrency(fileItem.file,md5,chunksToUpload,checkResult.uploadedChunks);// 5. 合并分片await mergeFile(md5, fileItem.file.name, totalChunks);fileItem.status = 'success';fileItem.progress = 100;} catch (error) {fileItem.status = 'error';fileItem.error = String(error);}}
界面显示和进度更新
用户需要看到上传进度:
// hooks/useFileUpload.tsconst updateProgress = useCallback((fileId: string, uploadedChunks: number, totalChunks: number) => {setFiles(prev => prev.map(file => {if (file.id === fileId) {const progress = Math.round((uploadedChunks / totalChunks) * 100);return { ...file, progress };}return file;}));}, []);
错误处理和重试
网络可能不稳定,需要重试机制:
// hooks/useFileUpload.tsasync function uploadWithRetry(fn: () => Promise<any>,maxRetries: number = 3): Promise<any> {let lastError: Error;for (let i = 0; i < maxRetries; i++) {try {return await fn();} catch (error) {lastError = error as Error;if (i < maxRetries - 1) {// 等待一段时间再重试(指数退避)const delay = Math.min(1000 * Math.pow(2, i), 10000);await new Promise(resolve => setTimeout(resolve, delay));}}}throw lastError;}
总结
实现大文件上传的关键点:
-
队列管理:控制同时上传的文件数,避免卡顿
-
分片处理:大文件切成小片,上传更稳定
-
MD5计算:增量计算,避免内存问题
-
预检查:实现秒传和断点续传
-
并发控制:文件级和分片级都要控制
-
流式合并:服务器端用流避免内存溢出
-
错误处理:重试机制保证可靠性
这套方案能很好地处理各种大小的文件上传,用户体验好,服务器压力也小。实际项目中可以根据具体需求调整参数,比如分片大小、并发数等。