12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184 |
- var session = require('./session');
- var Async = require('./async');
- var EventProxy = require('./event').EventProxy;
- var util = require('./util');
- // 文件分块上传全过程,暴露的分块上传接口
- function sliceUploadFile(params, callback) {
- var self = this;
- var ep = new EventProxy();
- var TaskId = params.TaskId;
- var Bucket = params.Bucket;
- var Region = params.Region;
- var Key = params.Key;
- var Body = params.Body;
- var ChunkSize = params.ChunkSize || params.SliceSize || self.options.ChunkSize;
- var AsyncLimit = params.AsyncLimit;
- var StorageClass = params.StorageClass;
- var ServerSideEncryption = params.ServerSideEncryption;
- var FileSize;
- var onProgress;
- var onHashProgress = params.onHashProgress;
- // 上传过程中出现错误,返回错误
- ep.on('error', function (err) {
- if (!self._isRunningTask(TaskId)) return;
- err.UploadId = params.UploadData.UploadId || '';
- return callback(err);
- });
- // 上传分块完成,开始 uploadSliceComplete 操作
- ep.on('upload_complete', function (UploadCompleteData) {
- var _UploadCompleteData = util.extend({
- UploadId: params.UploadData.UploadId || ''
- }, UploadCompleteData);
- callback(null, _UploadCompleteData);
- });
- // 上传分块完成,开始 uploadSliceComplete 操作
- ep.on('upload_slice_complete', function (UploadData) {
- var metaHeaders = {};
- util.each(params.Headers, function (val, k) {
- var shortKey = k.toLowerCase();
- if (shortKey.indexOf('x-cos-meta-') === 0 || shortKey === 'pic-operations') metaHeaders[k] = val;
- });
- uploadSliceComplete.call(self, {
- Bucket: Bucket,
- Region: Region,
- Key: Key,
- UploadId: UploadData.UploadId,
- SliceList: UploadData.SliceList,
- Headers: metaHeaders,
- }, function (err, data) {
- if (!self._isRunningTask(TaskId)) return;
- session.removeUsing(UploadData.UploadId);
- if (err) {
- onProgress(null, true);
- return ep.emit('error', err);
- }
- session.removeUploadId.call(self, UploadData.UploadId);
- onProgress({loaded: FileSize, total: FileSize}, true);
- ep.emit('upload_complete', data);
- });
- });
- // 获取 UploadId 完成,开始上传每个分片
- ep.on('get_upload_data_finish', function (UploadData) {
- // 处理 UploadId 缓存
- var uuid = session.getFileId(Body, params.ChunkSize, Bucket, Key);
- uuid && session.saveUploadId.call(self, uuid, UploadData.UploadId, self.options.UploadIdCacheLimit); // 缓存 UploadId
- session.setUsing(UploadData.UploadId); // 标记 UploadId 为正在使用
- // 获取 UploadId
- onProgress(null, true); // 任务状态开始 uploading
- uploadSliceList.call(self, {
- TaskId: TaskId,
- Bucket: Bucket,
- Region: Region,
- Key: Key,
- Body: Body,
- FileSize: FileSize,
- SliceSize: ChunkSize,
- AsyncLimit: AsyncLimit,
- ServerSideEncryption: ServerSideEncryption,
- UploadData: UploadData,
- Headers: params.Headers,
- onProgress: onProgress
- }, function (err, data) {
- if (!self._isRunningTask(TaskId)) return;
- if (err) {
- onProgress(null, true);
- return ep.emit('error', err);
- }
- ep.emit('upload_slice_complete', data);
- });
- });
- // 开始获取文件 UploadId,里面会视情况计算 ETag,并比对,保证文件一致性,也优化上传
- ep.on('get_file_size_finish', function () {
- onProgress = util.throttleOnProgress.call(self, FileSize, params.onProgress);
- if (params.UploadData.UploadId) {
- ep.emit('get_upload_data_finish', params.UploadData);
- } else {
- var _params = util.extend({
- TaskId: TaskId,
- Bucket: Bucket,
- Region: Region,
- Key: Key,
- Headers: params.Headers,
- StorageClass: StorageClass,
- Body: Body,
- FileSize: FileSize,
- SliceSize: ChunkSize,
- onHashProgress: onHashProgress,
- }, params);
- getUploadIdAndPartList.call(self, _params, function (err, UploadData) {
- if (!self._isRunningTask(TaskId)) return;
- if (err) return ep.emit('error', err);
- params.UploadData.UploadId = UploadData.UploadId;
- params.UploadData.PartList = UploadData.PartList;
- ep.emit('get_upload_data_finish', params.UploadData);
- });
- }
- });
- // 获取上传文件大小
- FileSize = params.ContentLength;
- delete params.ContentLength;
- !params.Headers && (params.Headers = {});
- util.each(params.Headers, function (item, key) {
- if (key.toLowerCase() === 'content-length') {
- delete params.Headers[key];
- }
- });
- // 控制分片大小
- (function () {
- var SIZE = [1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 1024 * 2, 1024 * 4, 1024 * 5];
- var AutoChunkSize = 1024 * 1024;
- for (var i = 0; i < SIZE.length; i++) {
- AutoChunkSize = SIZE[i] * 1024 * 1024;
- if (FileSize / AutoChunkSize <= self.options.MaxPartNumber) break;
- }
- params.ChunkSize = params.SliceSize = ChunkSize = Math.max(ChunkSize, AutoChunkSize);
- })();
- // 开始上传
- if (FileSize === 0) {
- params.Body = '';
- params.ContentLength = 0;
- params.SkipTask = true;
- self.putObject(params, callback);
- } else {
- ep.emit('get_file_size_finish');
- }
- }
- // 获取上传任务的 UploadId
- function getUploadIdAndPartList(params, callback) {
- var TaskId = params.TaskId;
- var Bucket = params.Bucket;
- var Region = params.Region;
- var Key = params.Key;
- var StorageClass = params.StorageClass;
- var self = this;
- // 计算 ETag
- var ETagMap = {};
- var FileSize = params.FileSize;
- var SliceSize = params.SliceSize;
- var SliceCount = Math.ceil(FileSize / SliceSize);
- var FinishSliceCount = 0;
- var FinishSize = 0;
- var onHashProgress = util.throttleOnProgress.call(self, FileSize, params.onHashProgress);
- var getChunkETag = function (PartNumber, callback) {
- var start = SliceSize * (PartNumber - 1);
- var end = Math.min(start + SliceSize, FileSize);
- var ChunkSize = end - start;
- if (ETagMap[PartNumber]) {
- callback(null, {
- PartNumber: PartNumber,
- ETag: ETagMap[PartNumber],
- Size: ChunkSize
- });
- } else {
- util.fileSlice(params.Body, start, end, false, function (chunkItem) {
- util.getFileMd5(chunkItem, function (err, md5) {
- if (err) return callback(util.error(err));
- var ETag = '"' + md5 + '"';
- ETagMap[PartNumber] = ETag;
- FinishSliceCount += 1;
- FinishSize += ChunkSize;
- onHashProgress({loaded: FinishSize, total: FileSize});
- callback(null, {
- PartNumber: PartNumber,
- ETag: ETag,
- Size: ChunkSize
- });
- });
- });
- }
- };
- // 通过和文件的 md5 对比,判断 UploadId 是否可用
- var isAvailableUploadList = function (PartList, callback) {
- var PartCount = PartList.length;
- // 如果没有分片,通过
- if (PartCount === 0) {
- return callback(null, true);
- }
- // 检查分片数量
- if (PartCount > SliceCount) {
- return callback(null, false);
- }
- // 检查分片大小
- if (PartCount > 1) {
- var PartSliceSize = Math.max(PartList[0].Size, PartList[1].Size);
- if (PartSliceSize !== SliceSize) {
- return callback(null, false);
- }
- }
- // 逐个分片计算并检查 ETag 是否一致
- var next = function (index) {
- if (index < PartCount) {
- var Part = PartList[index];
- getChunkETag(Part.PartNumber, function (err, chunk) {
- if (chunk && chunk.ETag === Part.ETag && chunk.Size === Part.Size) {
- next(index + 1);
- } else {
- callback(null, false);
- }
- });
- } else {
- callback(null, true);
- }
- };
- next(0);
- };
- var ep = new EventProxy();
- ep.on('error', function (errData) {
- if (!self._isRunningTask(TaskId)) return;
- return callback(errData);
- });
- // 存在 UploadId
- ep.on('upload_id_available', function (UploadData) {
- // 转换成 map
- var map = {};
- var list = [];
- util.each(UploadData.PartList, function (item) {
- map[item.PartNumber] = item;
- });
- for (var PartNumber = 1; PartNumber <= SliceCount; PartNumber++) {
- var item = map[PartNumber];
- if (item) {
- item.PartNumber = PartNumber;
- item.Uploaded = true;
- } else {
- item = {
- PartNumber: PartNumber,
- ETag: null,
- Uploaded: false
- };
- }
- list.push(item);
- }
- UploadData.PartList = list;
- callback(null, UploadData);
- });
- // 不存在 UploadId, 初始化生成 UploadId
- ep.on('no_available_upload_id', function () {
- if (!self._isRunningTask(TaskId)) return;
- var _params = util.extend({
- Bucket: Bucket,
- Region: Region,
- Key: Key,
- Query: util.clone(params.Query),
- StorageClass: StorageClass,
- Body: params.Body,
- }, params);
- var headers = util.clone(params.Headers)
- delete headers['x-cos-mime-limit'];
- _params.Headers = headers;
- self.multipartInit(_params, function (err, data) {
- if (!self._isRunningTask(TaskId)) return;
- if (err) return ep.emit('error', err);
- var UploadId = data.UploadId;
- if (!UploadId) {
- return callback(util.error(new Error('no such upload id')));
- }
- ep.emit('upload_id_available', {UploadId: UploadId, PartList: []});
- });
- });
- // 如果已存在 UploadId,找一个可以用的 UploadId
- ep.on('has_and_check_upload_id', function (UploadIdList) {
- // 串行地,找一个内容一致的 UploadId
- UploadIdList = UploadIdList.reverse();
- Async.eachLimit(UploadIdList, 1, function (UploadId, asyncCallback) {
- if (!self._isRunningTask(TaskId)) return;
- // 如果正在上传,跳过
- if (session.using[UploadId]) {
- asyncCallback(); // 检查下一个 UploadId
- return;
- }
- // 判断 UploadId 是否可用
- wholeMultipartListPart.call(self, {
- Bucket: Bucket,
- Region: Region,
- Key: Key,
- UploadId: UploadId,
- }, function (err, PartListData) {
- if (!self._isRunningTask(TaskId)) return;
- if (err) {
- session.removeUsing(UploadId);
- return ep.emit('error', err);
- }
- var PartList = PartListData.PartList;
- PartList.forEach(function (item) {
- item.PartNumber *= 1;
- item.Size *= 1;
- item.ETag = item.ETag || '';
- });
- isAvailableUploadList(PartList, function (err, isAvailable) {
- if (!self._isRunningTask(TaskId)) return;
- if (err) return ep.emit('error', err);
- if (isAvailable) {
- asyncCallback({
- UploadId: UploadId,
- PartList: PartList
- }); // 马上结束
- } else {
- asyncCallback(); // 检查下一个 UploadId
- }
- });
- });
- }, function (AvailableUploadData) {
- if (!self._isRunningTask(TaskId)) return;
- onHashProgress(null, true);
- if (AvailableUploadData && AvailableUploadData.UploadId) {
- ep.emit('upload_id_available', AvailableUploadData);
- } else {
- ep.emit('no_available_upload_id');
- }
- });
- });
- // 在本地缓存找可用的 UploadId
- ep.on('seek_local_avail_upload_id', function (RemoteUploadIdList) {
- // 在本地找可用的 UploadId
- var uuid = session.getFileId(params.Body, params.ChunkSize, Bucket, Key);
- var LocalUploadIdList = session.getUploadIdList.call(self, uuid);
- if (!uuid || !LocalUploadIdList) {
- ep.emit('has_and_check_upload_id', RemoteUploadIdList);
- return;
- }
- var next = function (index) {
- // 如果本地找不到可用 UploadId,再一个个遍历校验远端
- if (index >= LocalUploadIdList.length) {
- ep.emit('has_and_check_upload_id', RemoteUploadIdList);
- return;
- }
- var UploadId = LocalUploadIdList[index];
- // 如果不在远端 UploadId 列表里,跳过并删除
- if (!util.isInArray(RemoteUploadIdList, UploadId)) {
- session.removeUploadId.call(self, UploadId);
- next(index + 1);
- return;
- }
- // 如果正在上传,跳过
- if (session.using[UploadId]) {
- next(index + 1);
- return;
- }
- // 判断 UploadId 是否存在线上
- wholeMultipartListPart.call(self, {
- Bucket: Bucket,
- Region: Region,
- Key: Key,
- UploadId: UploadId,
- }, function (err, PartListData) {
- if (!self._isRunningTask(TaskId)) return;
- if (err) {
- // 如果 UploadId 获取会出错,跳过并删除
- session.removeUploadId.call(self, UploadId);
- next(index + 1);
- } else {
- // 找到可用 UploadId
- ep.emit('upload_id_available', {
- UploadId: UploadId,
- PartList: PartListData.PartList,
- });
- }
- });
- };
- next(0);
- });
- // 获取线上 UploadId 列表
- ep.on('get_remote_upload_id_list', function () {
- // 获取符合条件的 UploadId 列表,因为同一个文件可以有多个上传任务。
- wholeMultipartList.call(self, {
- Bucket: Bucket,
- Region: Region,
- Key: Key,
- }, function (err, data) {
- if (!self._isRunningTask(TaskId)) return;
- if (err) return ep.emit('error', err);
- // 整理远端 UploadId 列表
- var RemoteUploadIdList = util.filter(data.UploadList, function (item) {
- return item.Key === Key && (!StorageClass || item.StorageClass.toUpperCase() === StorageClass.toUpperCase());
- }).reverse().map(function (item) {
- return item.UploadId || item.UploadID;
- });
- if (RemoteUploadIdList.length) {
- ep.emit('seek_local_avail_upload_id', RemoteUploadIdList);
- } else {
- // 远端没有 UploadId,清理缓存的 UploadId
- var uuid = session.getFileId(params.Body, params.ChunkSize, Bucket, Key), LocalUploadIdList;
- if (uuid && (LocalUploadIdList = session.getUploadIdList.call(self, uuid))) {
- util.each(LocalUploadIdList, function (UploadId) {
- session.removeUploadId.call(self, UploadId);
- });
- }
- ep.emit('no_available_upload_id');
- }
- });
- });
- // 开始找可用 UploadId
- ep.emit('get_remote_upload_id_list');
- }
- // 获取符合条件的全部上传任务 (条件包括 Bucket, Region, Prefix)
- function wholeMultipartList(params, callback) {
- var self = this;
- var UploadList = [];
- var sendParams = {
- Bucket: params.Bucket,
- Region: params.Region,
- Prefix: params.Key
- };
- var next = function () {
- self.multipartList(sendParams, function (err, data) {
- if (err) return callback(err);
- UploadList.push.apply(UploadList, data.Upload || []);
- if (data.IsTruncated === 'true') { // 列表不完整
- sendParams.KeyMarker = data.NextKeyMarker;
- sendParams.UploadIdMarker = data.NextUploadIdMarker;
- next();
- } else {
- callback(null, {UploadList: UploadList});
- }
- });
- };
- next();
- }
- // 获取指定上传任务的分块列表
- function wholeMultipartListPart(params, callback) {
- var self = this;
- var PartList = [];
- var sendParams = {
- Bucket: params.Bucket,
- Region: params.Region,
- Key: params.Key,
- UploadId: params.UploadId
- };
- var next = function () {
- self.multipartListPart(sendParams, function (err, data) {
- if (err) return callback(err);
- PartList.push.apply(PartList, data.Part || []);
- if (data.IsTruncated === 'true') { // 列表不完整
- sendParams.PartNumberMarker = data.NextPartNumberMarker;
- next();
- } else {
- callback(null, {PartList: PartList});
- }
- });
- };
- next();
- }
- // 上传文件分块,包括
- /*
- UploadId (上传任务编号)
- AsyncLimit (并发量),
- SliceList (上传的分块数组),
- FilePath (本地文件的位置),
- SliceSize (文件分块大小)
- FileSize (文件大小)
- onProgress (上传成功之后的回调函数)
- */
- function uploadSliceList(params, cb) {
- var self = this;
- var TaskId = params.TaskId;
- var Bucket = params.Bucket;
- var Region = params.Region;
- var Key = params.Key;
- var UploadData = params.UploadData;
- var FileSize = params.FileSize;
- var SliceSize = params.SliceSize;
- var ChunkParallel = Math.min(params.AsyncLimit || self.options.ChunkParallelLimit || 1, 256);
- var Body = params.Body;
- var SliceCount = Math.ceil(FileSize / SliceSize);
- var FinishSize = 0;
- var ServerSideEncryption = params.ServerSideEncryption;
- var Headers = params.Headers;
- var needUploadSlices = util.filter(UploadData.PartList, function (SliceItem) {
- if (SliceItem['Uploaded']) {
- FinishSize += SliceItem['PartNumber'] >= SliceCount ? (FileSize % SliceSize || SliceSize) : SliceSize;
- }
- return !SliceItem['Uploaded'];
- });
- var onProgress = params.onProgress;
- Async.eachLimit(needUploadSlices, ChunkParallel, function (SliceItem, asyncCallback) {
- if (!self._isRunningTask(TaskId)) return;
- var PartNumber = SliceItem['PartNumber'];
- var currentSize = Math.min(FileSize, SliceItem['PartNumber'] * SliceSize) - (SliceItem['PartNumber'] - 1) * SliceSize;
- var preAddSize = 0;
- uploadSliceItem.call(self, {
- TaskId: TaskId,
- Bucket: Bucket,
- Region: Region,
- Key: Key,
- SliceSize: SliceSize,
- FileSize: FileSize,
- PartNumber: PartNumber,
- ServerSideEncryption: ServerSideEncryption,
- Body: Body,
- UploadData: UploadData,
- Headers: Headers,
- onProgress: function (data) {
- FinishSize += data.loaded - preAddSize;
- preAddSize = data.loaded;
- onProgress({loaded: FinishSize, total: FileSize});
- },
- }, function (err, data) {
- if (!self._isRunningTask(TaskId)) return;
- if (!err && !data.ETag) err = 'get ETag error, please add "ETag" to CORS ExposeHeader setting.( 获取ETag失败,请在CORS ExposeHeader设置中添加ETag,请参考文档:https://cloud.tencent.com/document/product/436/13318 )';
- if (err) {
- FinishSize -= preAddSize;
- } else {
- FinishSize += currentSize - preAddSize;
- SliceItem.ETag = data.ETag;
- }
- onProgress({loaded: FinishSize, total: FileSize});
- asyncCallback(err || null, data);
- });
- }, function (err) {
- if (!self._isRunningTask(TaskId)) return;
- if (err) return cb(err);
- cb(null, {
- UploadId: UploadData.UploadId,
- SliceList: UploadData.PartList
- });
- });
- }
- // 上传指定分片
- function uploadSliceItem(params, callback) {
- var self = this;
- var TaskId = params.TaskId;
- var Bucket = params.Bucket;
- var Region = params.Region;
- var Key = params.Key;
- var FileSize = params.FileSize;
- var FileBody = params.Body;
- var PartNumber = params.PartNumber * 1;
- var SliceSize = params.SliceSize;
- var ServerSideEncryption = params.ServerSideEncryption;
- var UploadData = params.UploadData;
- var Headers = params.Headers || {};
- var ChunkRetryTimes = self.options.ChunkRetryTimes + 1;
- var start = SliceSize * (PartNumber - 1);
- var ContentLength = SliceSize;
- var end = start + SliceSize;
- if (end > FileSize) {
- end = FileSize;
- ContentLength = end - start;
- }
- var headersWhiteList = ['x-cos-traffic-limit', 'x-cos-mime-limit'];
- var headers = {};
- util.each(Headers, function(v, k) {
- if (headersWhiteList.indexOf(k) > -1) {
- headers[k] = v;
- }
- });
- var PartItem = UploadData.PartList[PartNumber - 1];
- Async.retry(ChunkRetryTimes, function (tryCallback) {
- if (!self._isRunningTask(TaskId)) return;
- util.fileSlice(FileBody, start, end, true, function (Body) {
- self.multipartUpload({
- TaskId: TaskId,
- Bucket: Bucket,
- Region: Region,
- Key: Key,
- ContentLength: ContentLength,
- PartNumber: PartNumber,
- UploadId: UploadData.UploadId,
- ServerSideEncryption: ServerSideEncryption,
- Body: Body,
- Headers: headers,
- onProgress: params.onProgress,
- }, function (err, data) {
- if (!self._isRunningTask(TaskId)) return;
- if (err) return tryCallback(err);
- PartItem.Uploaded = true;
- return tryCallback(null, data);
- });
- });
- }, function (err, data) {
- if (!self._isRunningTask(TaskId)) return;
- return callback(err, data);
- });
- }
- // 完成分块上传
- function uploadSliceComplete(params, callback) {
- var Bucket = params.Bucket;
- var Region = params.Region;
- var Key = params.Key;
- var UploadId = params.UploadId;
- var SliceList = params.SliceList;
- var self = this;
- var ChunkRetryTimes = this.options.ChunkRetryTimes + 1;
- var Headers = params.Headers;
- var Parts = SliceList.map(function (item) {
- return {
- PartNumber: item.PartNumber,
- ETag: item.ETag
- };
- });
- // 完成上传的请求也做重试
- Async.retry(ChunkRetryTimes, function (tryCallback) {
- self.multipartComplete({
- Bucket: Bucket,
- Region: Region,
- Key: Key,
- UploadId: UploadId,
- Parts: Parts,
- Headers: Headers,
- }, tryCallback);
- }, function (err, data) {
- callback(err, data);
- });
- }
- // 抛弃分块上传任务
- /*
- AsyncLimit (抛弃上传任务的并发量),
- UploadId (上传任务的编号,当 Level 为 task 时候需要)
- Level (抛弃分块上传任务的级别,task : 抛弃指定的上传任务,file : 抛弃指定的文件对应的上传任务,其他值 :抛弃指定Bucket 的全部上传任务)
- */
- function abortUploadTask(params, callback) {
- var Bucket = params.Bucket;
- var Region = params.Region;
- var Key = params.Key;
- var UploadId = params.UploadId;
- var Level = params.Level || 'task';
- var AsyncLimit = params.AsyncLimit;
- var self = this;
- var ep = new EventProxy();
- ep.on('error', function (errData) {
- return callback(errData);
- });
- // 已经获取到需要抛弃的任务列表
- ep.on('get_abort_array', function (AbortArray) {
- abortUploadTaskArray.call(self, {
- Bucket: Bucket,
- Region: Region,
- Key: Key,
- Headers: params.Headers,
- AsyncLimit: AsyncLimit,
- AbortArray: AbortArray
- }, callback);
- });
- if (Level === 'bucket') {
- // Bucket 级别的任务抛弃,抛弃该 Bucket 下的全部上传任务
- wholeMultipartList.call(self, {
- Bucket: Bucket,
- Region: Region
- }, function (err, data) {
- if (err) return callback(err);
- ep.emit('get_abort_array', data.UploadList || []);
- });
- } else if (Level === 'file') {
- // 文件级别的任务抛弃,抛弃该文件的全部上传任务
- if (!Key) return callback(util.error(new Error('abort_upload_task_no_key')));
- wholeMultipartList.call(self, {
- Bucket: Bucket,
- Region: Region,
- Key: Key
- }, function (err, data) {
- if (err) return callback(err);
- ep.emit('get_abort_array', data.UploadList || []);
- });
- } else if (Level === 'task') {
- // 单个任务级别的任务抛弃,抛弃指定 UploadId 的上传任务
- if (!UploadId) return callback(util.error(new Error('abort_upload_task_no_id')));
- if (!Key) return callback(util.error(new Error('abort_upload_task_no_key')));
- ep.emit('get_abort_array', [{
- Key: Key,
- UploadId: UploadId
- }]);
- } else {
- return callback(util.error(new Error('abort_unknown_level')));
- }
- }
- // 批量抛弃分块上传任务
- function abortUploadTaskArray(params, callback) {
- var Bucket = params.Bucket;
- var Region = params.Region;
- var Key = params.Key;
- var AbortArray = params.AbortArray;
- var AsyncLimit = params.AsyncLimit || 1;
- var self = this;
- var index = 0;
- var resultList = new Array(AbortArray.length);
- Async.eachLimit(AbortArray, AsyncLimit, function (AbortItem, nextItem) {
- var eachIndex = index;
- if (Key && Key !== AbortItem.Key) {
- resultList[eachIndex] = {error: {KeyNotMatch: true}};
- nextItem(null);
- return;
- }
- var UploadId = AbortItem.UploadId || AbortItem.UploadID;
- self.multipartAbort({
- Bucket: Bucket,
- Region: Region,
- Key: AbortItem.Key,
- Headers: params.Headers,
- UploadId: UploadId
- }, function (err) {
- var task = {
- Bucket: Bucket,
- Region: Region,
- Key: AbortItem.Key,
- UploadId: UploadId
- };
- resultList[eachIndex] = {error: err, task: task};
- nextItem(null);
- });
- index++;
- }, function (err) {
- if (err) return callback(err);
- var successList = [];
- var errorList = [];
- for (var i = 0, len = resultList.length; i < len; i++) {
- var item = resultList[i];
- if (item['task']) {
- if (item['error']) {
- errorList.push(item['task']);
- } else {
- successList.push(item['task']);
- }
- }
- }
- return callback(null, {
- successList: successList,
- errorList: errorList
- });
- });
- }
- // 高级上传
- function uploadFile(params, callback) {
- var self = this;
- // 判断多大的文件使用分片上传
- var SliceSize = params.SliceSize === undefined ? self.options.SliceSize : params.SliceSize;
- var taskList = [];
- var Body = params.Body;
- var FileSize = Body.size || Body.length || 0;
- var fileInfo = {TaskId: ''};
- // 整理 option,用于返回给回调
- util.each(params, function (v, k) {
- if (typeof v !== 'object' && typeof v !== 'function') {
- fileInfo[k] = v;
- }
- });
- // 处理文件 TaskReady
- var _onTaskReady = params.onTaskReady;
- var onTaskReady = function (tid) {
- fileInfo.TaskId = tid;
- _onTaskReady && _onTaskReady(tid);
- };
- params.onTaskReady = onTaskReady;
- // 处理文件完成
- var _onFileFinish = params.onFileFinish;
- var onFileFinish = function (err, data) {
- _onFileFinish && _onFileFinish(err, data, fileInfo);
- callback && callback(err, data);
- };
- // 添加上传任务,超过阈值使用分块上传,小于等于则简单上传
- var api = FileSize > SliceSize ? 'sliceUploadFile' : 'putObject';
- taskList.push({
- api: api,
- params: params,
- callback: onFileFinish,
- });
- self._addTasks(taskList);
- }
- // 批量上传文件
- function uploadFiles(params, callback) {
- var self = this;
- // 判断多大的文件使用分片上传
- var SliceSize = params.SliceSize === undefined ? self.options.SliceSize : params.SliceSize;
- // 汇总返回进度
- var TotalSize = 0;
- var TotalFinish = 0;
- var onTotalProgress = util.throttleOnProgress.call(self, TotalFinish, params.onProgress);
- // 汇总返回回调
- var unFinishCount = params.files.length;
- var _onTotalFileFinish = params.onFileFinish;
- var resultList = Array(unFinishCount);
- var onTotalFileFinish = function (err, data, options) {
- onTotalProgress(null, true);
- _onTotalFileFinish && _onTotalFileFinish(err, data, options);
- resultList[options.Index] = {
- options: options,
- error: err,
- data: data
- };
- if (--unFinishCount <= 0 && callback) {
- callback(null, {files: resultList});
- }
- };
- // 开始处理每个文件
- var taskList = [];
- util.each(params.files, function (fileParams, index) {
- (function () { // 对齐 nodejs 缩进
- var Body = fileParams.Body;
- var FileSize = Body.size || Body.length || 0;
- var fileInfo = {Index: index, TaskId: ''};
- // 更新文件总大小
- TotalSize += FileSize;
- // 整理 option,用于返回给回调
- util.each(fileParams, function (v, k) {
- if (typeof v !== 'object' && typeof v !== 'function') {
- fileInfo[k] = v;
- }
- });
- // 处理单个文件 TaskReady
- var _onTaskReady = fileParams.onTaskReady;
- var onTaskReady = function (tid) {
- fileInfo.TaskId = tid;
- _onTaskReady && _onTaskReady(tid);
- };
- fileParams.onTaskReady = onTaskReady;
- // 处理单个文件进度
- var PreAddSize = 0;
- var _onProgress = fileParams.onProgress;
- var onProgress = function (info) {
- TotalFinish = TotalFinish - PreAddSize + info.loaded;
- PreAddSize = info.loaded;
- _onProgress && _onProgress(info);
- onTotalProgress({loaded: TotalFinish, total: TotalSize});
- };
- fileParams.onProgress = onProgress;
- // 处理单个文件完成
- var _onFileFinish = fileParams.onFileFinish;
- var onFileFinish = function (err, data) {
- _onFileFinish && _onFileFinish(err, data);
- onTotalFileFinish && onTotalFileFinish(err, data, fileInfo);
- };
- // 添加上传任务
- var api = FileSize > SliceSize ? 'sliceUploadFile' : 'putObject';
- taskList.push({
- api: api,
- params: fileParams,
- callback: onFileFinish,
- });
- })();
- });
- self._addTasks(taskList);
- }
- // 分片复制文件
- function sliceCopyFile(params, callback) {
- var ep = new EventProxy();
- var self = this;
- var Bucket = params.Bucket;
- var Region = params.Region;
- var Key = params.Key;
- var CopySource = params.CopySource;
- var m = util.getSourceParams.call(this, CopySource);
- if (!m) {
- callback(util.error(new Error('CopySource format error')));
- return;
- }
- var SourceBucket = m.Bucket;
- var SourceRegion = m.Region;
- var SourceKey = decodeURIComponent(m.Key);
- var CopySliceSize = params.CopySliceSize === undefined ? self.options.CopySliceSize : params.CopySliceSize;
- CopySliceSize = Math.max(0, CopySliceSize);
- var ChunkSize = params.CopyChunkSize || this.options.CopyChunkSize;
- var ChunkParallel = this.options.CopyChunkParallelLimit;
- var FinishSize = 0;
- var FileSize;
- var onProgress;
- // 分片复制完成,开始 multipartComplete 操作
- ep.on('copy_slice_complete', function (UploadData) {
- var metaHeaders = {};
- util.each(params.Headers, function (val, k) {
- if (k.toLowerCase().indexOf('x-cos-meta-') === 0) metaHeaders[k] = val;
- });
- var Parts = util.map(UploadData.PartList, function (item) {
- return {
- PartNumber: item.PartNumber,
- ETag: item.ETag,
- };
- });
- self.multipartComplete({
- Bucket: Bucket,
- Region: Region,
- Key: Key,
- UploadId: UploadData.UploadId,
- Parts: Parts,
- },function (err, data) {
- if (err) {
- onProgress(null, true);
- return callback(err);
- }
- onProgress({loaded: FileSize, total: FileSize}, true);
- callback(null, data);
- });
- });
- ep.on('get_copy_data_finish',function (UploadData) {
- Async.eachLimit(UploadData.PartList, ChunkParallel, function (SliceItem, asyncCallback) {
- var PartNumber = SliceItem.PartNumber;
- var CopySourceRange = SliceItem.CopySourceRange;
- var currentSize = SliceItem.end - SliceItem.start;
- copySliceItem.call(self, {
- Bucket: Bucket,
- Region: Region,
- Key: Key,
- CopySource: CopySource,
- UploadId: UploadData.UploadId,
- PartNumber: PartNumber,
- CopySourceRange: CopySourceRange,
- },function (err,data) {
- if (err) return asyncCallback(err);
- FinishSize += currentSize;
- onProgress({loaded: FinishSize, total: FileSize});
- SliceItem.ETag = data.ETag;
- asyncCallback(err || null, data);
- });
- }, function (err) {
- if (err) {
- onProgress(null, true);
- return callback(err);
- }
- ep.emit('copy_slice_complete', UploadData);
- });
- });
- ep.on('get_file_size_finish', function (SourceHeaders) {
- // 控制分片大小
- (function () {
- var SIZE = [1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 1024 * 2, 1024 * 4, 1024 * 5];
- var AutoChunkSize = 1024 * 1024;
- for (var i = 0; i < SIZE.length; i++) {
- AutoChunkSize = SIZE[i] * 1024 * 1024;
- if (FileSize / AutoChunkSize <= self.options.MaxPartNumber) break;
- }
- params.ChunkSize = ChunkSize = Math.max(ChunkSize, AutoChunkSize);
- var ChunkCount = Math.ceil(FileSize / ChunkSize);
- var list = [];
- for (var partNumber = 1; partNumber <= ChunkCount; partNumber++) {
- var start = (partNumber - 1) * ChunkSize;
- var end = partNumber * ChunkSize < FileSize ? (partNumber * ChunkSize - 1) : FileSize - 1;
- var item = {
- PartNumber: partNumber,
- start: start,
- end: end,
- CopySourceRange: "bytes=" + start + "-" + end,
- };
- list.push(item);
- }
- params.PartList = list;
- })();
- var TargetHeader;
- if (params.Headers['x-cos-metadata-directive'] === 'Replaced') {
- TargetHeader = params.Headers;
- } else {
- TargetHeader = SourceHeaders;
- }
- TargetHeader['x-cos-storage-class'] = params.Headers['x-cos-storage-class'] || SourceHeaders['x-cos-storage-class'];
- TargetHeader = util.clearKey(TargetHeader);
- /**
- * 对于归档存储的对象,如果未恢复副本,则不允许 Copy
- */
- if (SourceHeaders['x-cos-storage-class'] === 'ARCHIVE' || SourceHeaders['x-cos-storage-class'] === 'DEEP_ARCHIVE') {
- var restoreHeader = SourceHeaders['x-cos-restore'];
- if (!restoreHeader || restoreHeader === 'ongoing-request="true"') {
- callback(util.error(new Error('Unrestored archive object is not allowed to be copied')));
- return;
- }
- }
- /**
- * 去除一些无用的头部,规避 multipartInit 出错
- * 这些头部通常是在 putObjectCopy 时才使用
- */
- delete TargetHeader['x-cos-copy-source'];
- delete TargetHeader['x-cos-metadata-directive'];
- delete TargetHeader['x-cos-copy-source-If-Modified-Since'];
- delete TargetHeader['x-cos-copy-source-If-Unmodified-Since'];
- delete TargetHeader['x-cos-copy-source-If-Match'];
- delete TargetHeader['x-cos-copy-source-If-None-Match'];
- self.multipartInit({
- Bucket: Bucket,
- Region: Region,
- Key: Key,
- Headers: TargetHeader,
- },function (err,data) {
- if (err) return callback(err);
- params.UploadId = data.UploadId;
- ep.emit('get_copy_data_finish', params);
- });
- });
- // 获取远端复制源文件的大小
- self.headObject({
- Bucket: SourceBucket,
- Region: SourceRegion,
- Key: SourceKey,
- },function(err, data) {
- if (err) {
- if (err.statusCode && err.statusCode === 404) {
- callback(util.error(err, {ErrorStatus: SourceKey + ' Not Exist'}));
- } else {
- callback(err);
- }
- return;
- }
- FileSize = params.FileSize = data.headers['content-length'];
- if (FileSize === undefined || !FileSize) {
- callback(util.error(new Error('get Content-Length error, please add "Content-Length" to CORS ExposeHeader setting.( 获取Content-Length失败,请在CORS ExposeHeader设置中添加Content-Length,请参考文档:https://cloud.tencent.com/document/product/436/13318 )')));
- return;
- }
- onProgress = util.throttleOnProgress.call(self, FileSize, params.onProgress);
- // 开始上传
- if (FileSize <= CopySliceSize) {
- if (!params.Headers['x-cos-metadata-directive']) {
- params.Headers['x-cos-metadata-directive'] = 'Copy';
- }
- self.putObjectCopy(params, function (err, data) {
- if (err) {
- onProgress(null, true);
- return callback(err);
- }
- onProgress({loaded: FileSize, total: FileSize}, true);
- callback(err, data);
- });
- } else {
- var resHeaders = data.headers;
- var SourceHeaders = {
- 'Cache-Control': resHeaders['cache-control'],
- 'Content-Disposition': resHeaders['content-disposition'],
- 'Content-Encoding': resHeaders['content-encoding'],
- 'Content-Type': resHeaders['content-type'],
- 'Expires': resHeaders['expires'],
- 'x-cos-storage-class': resHeaders['x-cos-storage-class'],
- };
- util.each(resHeaders, function (v, k) {
- var metaPrefix = 'x-cos-meta-';
- if (k.indexOf(metaPrefix) === 0 && k.length > metaPrefix.length) {
- SourceHeaders[k] = v;
- }
- });
- ep.emit('get_file_size_finish', SourceHeaders);
- }
- });
- }
- // 复制指定分片
- function copySliceItem(params, callback) {
- var TaskId = params.TaskId;
- var Bucket = params.Bucket;
- var Region = params.Region;
- var Key = params.Key;
- var CopySource = params.CopySource;
- var UploadId = params.UploadId;
- var PartNumber = params.PartNumber * 1;
- var CopySourceRange = params.CopySourceRange;
- var ChunkRetryTimes = this.options.ChunkRetryTimes + 1;
- var self = this;
- Async.retry(ChunkRetryTimes, function (tryCallback) {
- self.uploadPartCopy({
- TaskId: TaskId,
- Bucket: Bucket,
- Region: Region,
- Key: Key,
- CopySource: CopySource,
- UploadId: UploadId,
- PartNumber:PartNumber,
- CopySourceRange:CopySourceRange,
- },function (err,data) {
- tryCallback(err || null, data);
- })
- }, function (err, data) {
- return callback(err, data);
- });
- }
- var API_MAP = {
- sliceUploadFile: sliceUploadFile,
- abortUploadTask: abortUploadTask,
- uploadFile: uploadFile,
- uploadFiles: uploadFiles,
- sliceCopyFile: sliceCopyFile,
- };
- module.exports.init = function (COS, task) {
- task.transferToTaskMethod(API_MAP, 'sliceUploadFile');
- util.each(API_MAP, function (fn, apiName) {
- COS.prototype[apiName] = util.apiWrapper(apiName, fn);
- });
- };
|