前端大文件上传实现

文件上传是一个常见的功能,但是大文件上传需要考虑几个问题:

  • 单个请求上传时间及传输大小限制
  • 断点续传
  • 重复文件不上传

本节我们实现一个简单的文件上传功能,前端基于react实现,但基本不涉及到react内容。后端采用koa搭建简单后台用于调试。

前端实现

前端: App.tsx

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import { ChangeEvent, useState } from "react"
import FileUpdater from "./FileUpdater"
function App() {
const [progress, setProgress] = useState(0)
const handleProgressChange = (value: number) => {
setProgress(value)
}
const handleFileChange = (e: ChangeEvent<HTMLInputElement>) => {
const file = e.target.files![0]
const updater = new FileUpdater(file, 0.125)
updater.onProgressChange(handleProgressChange)
updater.upload()
}
return (
<div>
<input id="file" type="file" onChange={handleFileChange} />
<span>进度:{progress}</span>
</div>
)
}

export default App

前端: FileUpdater.ts

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
import axios from 'axios'
import SparkMD5 from 'spark-md5'
interface ChunksItem {
chunk: Blob,
start: number
}
class FileUpdater {
// 要上传的文件
private file: File
// 文件切片数组
private chunks: ChunksItem[] = []
// 上传进度
private uploadProgress: number = 0
// 订阅上传进度变化的回调
private cbs: Function[] = []
// 上传切片的序号
private uploadNum: number = 0
// 已经上传完成的切片数
private uploadedChunkNum: number = 0
// 切片的大小(KB)
private chunkSize: number;
// 文件上传的并发数
private maxUploadNum: number;
// 文件的md5
private md5: string = ''
// 是否需要在计算完md5之后执行upload方法,如果在执行upload方法的时候md5已经计算好了,那么为false,否则为true,在计算好之后执行
private needUpload: boolean = false

/**
* @author zhaolei_hu
* @description constructor
* @param {File} file 要上传的文件
* @param {number} chunkSize 切片大小,默认1M
* @param {number} maxUploadNum 上传并发数
*/
constructor(file: File, chunkSize: number = 1024, maxUploadNum: number = 5) {
this.file = file
this.chunkSize = chunkSize * 1024
this.maxUploadNum = maxUploadNum
this.generateFileChunk()
this.generateMD5()
}
/**
* @author zhaolei_hu
* @description 切分文件
*/
generateFileChunk() {
const size = this.file.size
let start = 0
while (start < size) {
this.chunks.push({
chunk: this.file.slice(start, start + this.chunkSize),
start: start
})
start += this.chunkSize
}
}
/**
* @author zhaolei_hu
* @description 计算文件md5
*/
generateMD5() {
const spark = new SparkMD5.ArrayBuffer()
const fileReader = new FileReader()
let currentChunk = 0
fileReader.onload = (e) => {
spark.append(e.target!.result as ArrayBuffer)
currentChunk += 1
if (currentChunk < this.chunks.length) {
loadNext()
} else {
this.md5 = spark.end()
this.needUpload && this.upload()
}
}
const loadNext = () => {
fileReader.readAsArrayBuffer(this.chunks[currentChunk].chunk)
}
loadNext()
}
/**
* @author zhaolei_hu
* @description 上传入口
*/
async upload() {
if (!this.md5) {
// 如果还没有计算出md5,则先不上传,等待md5计算完成
this.needUpload = true
return;
}
// 根据md5去请求后台判断是否已经存在该文件
const exist = await this.checkIfExist()
// 如果已经存在则不需要上传
if (exist) {
this.uploadProgress = 100
this.triggerProgressChange()
// 秒传完成
return;
}
// 当前可以并发上传的数量
const lth = this.chunks.length > this.maxUploadNum ? this.maxUploadNum : this.chunks.length
for (let i = 0; i < lth; i++) {
this.uploadChunk(this.chunks[this.uploadNum])
this.uploadNum += 1
}
}
/**
* @author zhaolei_hu
* @description 检查文件是否已存在
*/
async checkIfExist() {
// 根据md5去后台判断
const res:any = await axios.get(`http://localhost:8088/checkExists/${this.md5}`)
if (res.data.success) {
// 后台约定code为1则已经上传了
return res.data.code === 1
}
return false
}
/**
* @author zhaolei_hu
* @description 订阅进度变化
*/
onProgressChange(cb: Function) {
this.cbs.push(cb)
}
/**
* @author zhaolei_hu
* @description 通知进度变化
*/
triggerProgressChange() {
this.cbs.forEach(fn => {
fn(this.uploadProgress)
})
}
/**
* @author zhaolei_hu
* @description 上传切片流程
* @param {ChunksItem} chunksItem
*/
async uploadChunk(chunksItem: ChunksItem) {
const fd = new FormData()
fd.append('file', chunksItem.chunk)
// 存储改切片的开始位置用来做服务器存储的名字,方便排序合并为一个文件
fd.append('start', chunksItem.start + '')
fd.append('hash', this.md5)
const { success } = await this.postFormData(fd)
if (success) {
// 该切片上传成功
this.uploadedChunkNum += 1
this.uploadProgress = Math.ceil(100 * this.uploadedChunkNum / this.chunks.length)
// 触发回调变化
this.triggerProgressChange()
if (this.uploadNum < this.chunks.length) {
this.uploadChunk(this.chunks[this.uploadNum])
this.uploadNum += 1
} else {
// 上传成功的数量等于切片的数量
if (this.uploadedChunkNum === this.chunks.length) {
// 上传完成了,开始合成切片
this.mergeFile()
}
}
} else {
this.uploadChunk(chunksItem)
}
}
/**
* @author zhaolei_hu
* @description 上传数据
* @param {FormData} fd
*/
async postFormData(fd: FormData) {
const res = await axios.post('http://localhost:8088/upload', fd)
return {
success: res.status >= 200 && res.status < 300 ? true : false,
data: res.data
}
}
/**
* @author zhaolei_hu
* @description 通知服务器合成切片
*/
mergeFile() {
axios.get(`http://localhost:8088/mergeFile/${this.md5}/${this.file.name}`)
}
}

export default FileUpdater

后端实现

后端:index.js

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import Koa from 'koa'
import { koaBody } from 'koa-body'
import cors from 'koa2-cors'
const app = new Koa()
import router from './routers/router.js'

app.use(koaBody())
app.use(cors())

app.use(router.routes()).use(router.allowedMethods())

app.listen(8088, () => {
console.log('server running: 8088')
})

后端: router.js

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
import Router from 'koa-router'
import multiparty from 'multiparty'
import path from 'node:path'
import {fileURLToPath} from 'node:url';
import fse from 'fs-extra'
import fs from 'node:fs'
const router = new Router()
const __filename = fileURLToPath(import.meta.url);
const __dirname = path.dirname(__filename);

const hash2path = {}

router.get('/checkExists/:hash', async (ctx, next) => {
const { hash } = ctx.params
if (hash2path[hash]) {
console.log('已经上传过了')
ctx.body = {
success: true,
msg: '已经上传过了',
code: 1,
path: hash2path[hash]
}
} else {
ctx.body = {
success: true,
msg: '没有上传过',
code: 0,
path: null
}
}

})

router.post('/upload', async (ctx, next) => {
const form = new multiparty.Form()
const msg = await writeChunk(form, ctx.req)
ctx.body = {
success: true,
msg,
}
})

router.get('/mergeFile/:md5/:name', async (ctx, next) => {
// 合并完成后可以把md5和对应的文件名、文件地址在数据库中存储,方便后续上传新文件匹配md5是否存在
// 如果已经存在就可以把新的文件名及md5存储到数据库,也指向同一个文件地址
const {md5, name} = ctx.params
const dirPath = path.resolve(__dirname,'chunks/' + md5)
const targetDir = path.resolve(__dirname, 'files')
if(!fse.existsSync(targetDir)) {
await fse.mkdirs(targetDir)
}
const filePath = path.resolve(targetDir, name)
let dests = fse.readdirSync(dirPath)
dests = dests.sort((a, b) => {
return Number(a) - Number(b)
})
await mergeFile(dests, filePath, dirPath)
await fse.remove(dirPath)
hash2path[md5] = filePath
ctx.body = {
success: true,
msg: 'success',
file: filePath
}
})
function writeChunk(form, req) {
return new Promise((resolve, reject) => {
form.parse(req, async (err, fields, files) => {
if (err) return;
// console.log(err, fields, files)
// console.log(files.file)
const [file] = files.file
const [hash] = fields.hash
const [start] = fields.start
const chunkDir = path.resolve(__dirname, 'chunks/' + hash)
if (!fse.existsSync(chunkDir)) {
await fse.mkdirs(chunkDir)
}
if (!fse.existsSync(chunkDir + '/' + start)) {
await fse.move(file.path, `${chunkDir}/${start}`)
}
resolve('上传成功')
})
})
}

const mergeFile = async (dests, filePath, dirPath) => {
let ws = fs.createWriteStream(filePath)
for (let i = 0; i < dests.length; i ++) {
// 按顺序写入,拿到chunk的路径
const chunkPath = path.resolve(__dirname, dirPath + '/' + dests[i])
await write(ws, chunkPath)
}
}

const write = (ws, chunkPath) => {
return new Promise(resolve => {
let rs = fs.createReadStream(chunkPath)
rs.pipe(ws, { end: false})
rs.on('end', () => {
resolve()
})
})
}

export default router

后端的内容写的比较糙,主要是实现一下这个流程。只是在内存中维护了一个已经上传文件的md5,没有再引入数据库。

总结

前端实现思路:

  • 将文件根据指定大小切分成多份,然后根据切片计算md5
  • 通过计算出的md5请求后台检测是否已经有相同的文件了,有的话就不用再传一次了可以实现秒传
  • 如果没有改文件,那么需要并发的将切片传给后台。每有一个切片传递完成了,就继续传递下一个切片。
  • 在上传切片的过程中传递了start字段,这是这个切片的开始位置,为了让合并切片的时候用。
  • 上传的进度是通过上传完的切片数/总切片数来得到的,所以每上传完成一个切片,我们会更新一下进度数值。并通过发布订阅的形式告诉页面进度的变化。
  • 在所有切片都上传完成之后,我们发起一个合并文件的请求,请求后台将所有切片合并为一个文件。

后端实现思路:

  • 前端上传文件切片,我们以文件的hash值作为目录名,文件的start作为文件名,存储到磁盘上。
  • 前端所有文件切片上传完成,请求合并的时候,我们根据hash值去找对应的目录,然后拿到该目录下的所有文件的文件名,因为文件名是这个文件的start,所以可以进行排序。然后按照排好的顺序,把切片文件流依次写入到同一个文件中。写入完成后删除之前的切片。

注意!

前端:

  • 没有实现断点续传,这个需要根据自己的策略来做,可以让后端告知已经上传了哪些切片,也可以前端上传切片之后保存在storage已经上传的。
  • 暂停、取消上传的功能可以使用xhr.abort() 或者 axios.Cancel 相关功能取消正在上传的请求,然后不再继续上传了。如果需要继续上传就是用断点续传的思路来做。

后端:

  • 后端实现很糙,完全为了实现而实现,看起来比较乱,注意思路即可。
  • 为了实现的简易,没有引入数据库,实际开发中不可能在内存里存储映射的。
  • 创建写入流的时候要注意不会自动创建外层的目录,所以需要判断并创建一下。
作者

胡兆磊

发布于

2023-03-08

更新于

2023-03-08

许可协议