advance.js 42 KB


  1. var session = require('./session');
  2. var Async = require('./async');
  3. var EventProxy = require('./event').EventProxy;
  4. var util = require('./util');
  5. // 文件分块上传全过程,暴露的分块上传接口
  6. function sliceUploadFile(params, callback) {
  7. var self = this;
  8. var ep = new EventProxy();
  9. var TaskId = params.TaskId;
  10. var Bucket = params.Bucket;
  11. var Region = params.Region;
  12. var Key = params.Key;
  13. var Body = params.Body;
  14. var ChunkSize = params.ChunkSize || params.SliceSize || self.options.ChunkSize;
  15. var AsyncLimit = params.AsyncLimit;
  16. var StorageClass = params.StorageClass;
  17. var ServerSideEncryption = params.ServerSideEncryption;
  18. var FileSize;
  19. var onProgress;
  20. var onHashProgress = params.onHashProgress;
  21. // 上传过程中出现错误,返回错误
  22. ep.on('error', function (err) {
  23. if (!self._isRunningTask(TaskId)) return;
  24. err.UploadId = params.UploadData.UploadId || '';
  25. return callback(err);
  26. });
  27. // 上传分块完成,开始 uploadSliceComplete 操作
  28. ep.on('upload_complete', function (UploadCompleteData) {
  29. var _UploadCompleteData = util.extend({
  30. UploadId: params.UploadData.UploadId || ''
  31. }, UploadCompleteData);
  32. callback(null, _UploadCompleteData);
  33. });
  34. // 上传分块完成,开始 uploadSliceComplete 操作
  35. ep.on('upload_slice_complete', function (UploadData) {
  36. var metaHeaders = {};
  37. util.each(params.Headers, function (val, k) {
  38. var shortKey = k.toLowerCase();
  39. if (shortKey.indexOf('x-cos-meta-') === 0 || shortKey === 'pic-operations') metaHeaders[k] = val;
  40. });
  41. uploadSliceComplete.call(self, {
  42. Bucket: Bucket,
  43. Region: Region,
  44. Key: Key,
  45. UploadId: UploadData.UploadId,
  46. SliceList: UploadData.SliceList,
  47. Headers: metaHeaders,
  48. }, function (err, data) {
  49. if (!self._isRunningTask(TaskId)) return;
  50. session.removeUsing(UploadData.UploadId);
  51. if (err) {
  52. onProgress(null, true);
  53. return ep.emit('error', err);
  54. }
  55. session.removeUploadId.call(self, UploadData.UploadId);
  56. onProgress({loaded: FileSize, total: FileSize}, true);
  57. ep.emit('upload_complete', data);
  58. });
  59. });
  60. // 获取 UploadId 完成,开始上传每个分片
  61. ep.on('get_upload_data_finish', function (UploadData) {
  62. // 处理 UploadId 缓存
  63. var uuid = session.getFileId(Body, params.ChunkSize, Bucket, Key);
  64. uuid && session.saveUploadId.call(self, uuid, UploadData.UploadId, self.options.UploadIdCacheLimit); // 缓存 UploadId
  65. session.setUsing(UploadData.UploadId); // 标记 UploadId 为正在使用
  66. // 获取 UploadId
  67. onProgress(null, true); // 任务状态开始 uploading
  68. uploadSliceList.call(self, {
  69. TaskId: TaskId,
  70. Bucket: Bucket,
  71. Region: Region,
  72. Key: Key,
  73. Body: Body,
  74. FileSize: FileSize,
  75. SliceSize: ChunkSize,
  76. AsyncLimit: AsyncLimit,
  77. ServerSideEncryption: ServerSideEncryption,
  78. UploadData: UploadData,
  79. Headers: params.Headers,
  80. onProgress: onProgress
  81. }, function (err, data) {
  82. if (!self._isRunningTask(TaskId)) return;
  83. if (err) {
  84. onProgress(null, true);
  85. return ep.emit('error', err);
  86. }
  87. ep.emit('upload_slice_complete', data);
  88. });
  89. });
  90. // 开始获取文件 UploadId,里面会视情况计算 ETag,并比对,保证文件一致性,也优化上传
  91. ep.on('get_file_size_finish', function () {
  92. onProgress = util.throttleOnProgress.call(self, FileSize, params.onProgress);
  93. if (params.UploadData.UploadId) {
  94. ep.emit('get_upload_data_finish', params.UploadData);
  95. } else {
  96. var _params = util.extend({
  97. TaskId: TaskId,
  98. Bucket: Bucket,
  99. Region: Region,
  100. Key: Key,
  101. Headers: params.Headers,
  102. StorageClass: StorageClass,
  103. Body: Body,
  104. FileSize: FileSize,
  105. SliceSize: ChunkSize,
  106. onHashProgress: onHashProgress,
  107. }, params);
  108. getUploadIdAndPartList.call(self, _params, function (err, UploadData) {
  109. if (!self._isRunningTask(TaskId)) return;
  110. if (err) return ep.emit('error', err);
  111. params.UploadData.UploadId = UploadData.UploadId;
  112. params.UploadData.PartList = UploadData.PartList;
  113. ep.emit('get_upload_data_finish', params.UploadData);
  114. });
  115. }
  116. });
  117. // 获取上传文件大小
  118. FileSize = params.ContentLength;
  119. delete params.ContentLength;
  120. !params.Headers && (params.Headers = {});
  121. util.each(params.Headers, function (item, key) {
  122. if (key.toLowerCase() === 'content-length') {
  123. delete params.Headers[key];
  124. }
  125. });
  126. // 控制分片大小
  127. (function () {
  128. var SIZE = [1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 1024 * 2, 1024 * 4, 1024 * 5];
  129. var AutoChunkSize = 1024 * 1024;
  130. for (var i = 0; i < SIZE.length; i++) {
  131. AutoChunkSize = SIZE[i] * 1024 * 1024;
  132. if (FileSize / AutoChunkSize <= self.options.MaxPartNumber) break;
  133. }
  134. params.ChunkSize = params.SliceSize = ChunkSize = Math.max(ChunkSize, AutoChunkSize);
  135. })();
  136. // 开始上传
  137. if (FileSize === 0) {
  138. params.Body = '';
  139. params.ContentLength = 0;
  140. params.SkipTask = true;
  141. self.putObject(params, callback);
  142. } else {
  143. ep.emit('get_file_size_finish');
  144. }
  145. }
  146. // 获取上传任务的 UploadId
  147. function getUploadIdAndPartList(params, callback) {
  148. var TaskId = params.TaskId;
  149. var Bucket = params.Bucket;
  150. var Region = params.Region;
  151. var Key = params.Key;
  152. var StorageClass = params.StorageClass;
  153. var self = this;
  154. // 计算 ETag
  155. var ETagMap = {};
  156. var FileSize = params.FileSize;
  157. var SliceSize = params.SliceSize;
  158. var SliceCount = Math.ceil(FileSize / SliceSize);
  159. var FinishSliceCount = 0;
  160. var FinishSize = 0;
  161. var onHashProgress = util.throttleOnProgress.call(self, FileSize, params.onHashProgress);
  162. var getChunkETag = function (PartNumber, callback) {
  163. var start = SliceSize * (PartNumber - 1);
  164. var end = Math.min(start + SliceSize, FileSize);
  165. var ChunkSize = end - start;
  166. if (ETagMap[PartNumber]) {
  167. callback(null, {
  168. PartNumber: PartNumber,
  169. ETag: ETagMap[PartNumber],
  170. Size: ChunkSize
  171. });
  172. } else {
  173. util.fileSlice(params.Body, start, end, false, function (chunkItem) {
  174. util.getFileMd5(chunkItem, function (err, md5) {
  175. if (err) return callback(util.error(err));
  176. var ETag = '"' + md5 + '"';
  177. ETagMap[PartNumber] = ETag;
  178. FinishSliceCount += 1;
  179. FinishSize += ChunkSize;
  180. onHashProgress({loaded: FinishSize, total: FileSize});
  181. callback(null, {
  182. PartNumber: PartNumber,
  183. ETag: ETag,
  184. Size: ChunkSize
  185. });
  186. });
  187. });
  188. }
  189. };
  190. // 通过和文件的 md5 对比,判断 UploadId 是否可用
  191. var isAvailableUploadList = function (PartList, callback) {
  192. var PartCount = PartList.length;
  193. // 如果没有分片,通过
  194. if (PartCount === 0) {
  195. return callback(null, true);
  196. }
  197. // 检查分片数量
  198. if (PartCount > SliceCount) {
  199. return callback(null, false);
  200. }
  201. // 检查分片大小
  202. if (PartCount > 1) {
  203. var PartSliceSize = Math.max(PartList[0].Size, PartList[1].Size);
  204. if (PartSliceSize !== SliceSize) {
  205. return callback(null, false);
  206. }
  207. }
  208. // 逐个分片计算并检查 ETag 是否一致
  209. var next = function (index) {
  210. if (index < PartCount) {
  211. var Part = PartList[index];
  212. getChunkETag(Part.PartNumber, function (err, chunk) {
  213. if (chunk && chunk.ETag === Part.ETag && chunk.Size === Part.Size) {
  214. next(index + 1);
  215. } else {
  216. callback(null, false);
  217. }
  218. });
  219. } else {
  220. callback(null, true);
  221. }
  222. };
  223. next(0);
  224. };
  225. var ep = new EventProxy();
  226. ep.on('error', function (errData) {
  227. if (!self._isRunningTask(TaskId)) return;
  228. return callback(errData);
  229. });
  230. // 存在 UploadId
  231. ep.on('upload_id_available', function (UploadData) {
  232. // 转换成 map
  233. var map = {};
  234. var list = [];
  235. util.each(UploadData.PartList, function (item) {
  236. map[item.PartNumber] = item;
  237. });
  238. for (var PartNumber = 1; PartNumber <= SliceCount; PartNumber++) {
  239. var item = map[PartNumber];
  240. if (item) {
  241. item.PartNumber = PartNumber;
  242. item.Uploaded = true;
  243. } else {
  244. item = {
  245. PartNumber: PartNumber,
  246. ETag: null,
  247. Uploaded: false
  248. };
  249. }
  250. list.push(item);
  251. }
  252. UploadData.PartList = list;
  253. callback(null, UploadData);
  254. });
  255. // 不存在 UploadId, 初始化生成 UploadId
  256. ep.on('no_available_upload_id', function () {
  257. if (!self._isRunningTask(TaskId)) return;
  258. var _params = util.extend({
  259. Bucket: Bucket,
  260. Region: Region,
  261. Key: Key,
  262. Query: util.clone(params.Query),
  263. StorageClass: StorageClass,
  264. Body: params.Body,
  265. }, params);
  266. var headers = util.clone(params.Headers)
  267. delete headers['x-cos-mime-limit'];
  268. _params.Headers = headers;
  269. self.multipartInit(_params, function (err, data) {
  270. if (!self._isRunningTask(TaskId)) return;
  271. if (err) return ep.emit('error', err);
  272. var UploadId = data.UploadId;
  273. if (!UploadId) {
  274. return callback(util.error(new Error('no such upload id')));
  275. }
  276. ep.emit('upload_id_available', {UploadId: UploadId, PartList: []});
  277. });
  278. });
  279. // 如果已存在 UploadId,找一个可以用的 UploadId
  280. ep.on('has_and_check_upload_id', function (UploadIdList) {
  281. // 串行地,找一个内容一致的 UploadId
  282. UploadIdList = UploadIdList.reverse();
  283. Async.eachLimit(UploadIdList, 1, function (UploadId, asyncCallback) {
  284. if (!self._isRunningTask(TaskId)) return;
  285. // 如果正在上传,跳过
  286. if (session.using[UploadId]) {
  287. asyncCallback(); // 检查下一个 UploadId
  288. return;
  289. }
  290. // 判断 UploadId 是否可用
  291. wholeMultipartListPart.call(self, {
  292. Bucket: Bucket,
  293. Region: Region,
  294. Key: Key,
  295. UploadId: UploadId,
  296. }, function (err, PartListData) {
  297. if (!self._isRunningTask(TaskId)) return;
  298. if (err) {
  299. session.removeUsing(UploadId);
  300. return ep.emit('error', err);
  301. }
  302. var PartList = PartListData.PartList;
  303. PartList.forEach(function (item) {
  304. item.PartNumber *= 1;
  305. item.Size *= 1;
  306. item.ETag = item.ETag || '';
  307. });
  308. isAvailableUploadList(PartList, function (err, isAvailable) {
  309. if (!self._isRunningTask(TaskId)) return;
  310. if (err) return ep.emit('error', err);
  311. if (isAvailable) {
  312. asyncCallback({
  313. UploadId: UploadId,
  314. PartList: PartList
  315. }); // 马上结束
  316. } else {
  317. asyncCallback(); // 检查下一个 UploadId
  318. }
  319. });
  320. });
  321. }, function (AvailableUploadData) {
  322. if (!self._isRunningTask(TaskId)) return;
  323. onHashProgress(null, true);
  324. if (AvailableUploadData && AvailableUploadData.UploadId) {
  325. ep.emit('upload_id_available', AvailableUploadData);
  326. } else {
  327. ep.emit('no_available_upload_id');
  328. }
  329. });
  330. });
  331. // 在本地缓存找可用的 UploadId
  332. ep.on('seek_local_avail_upload_id', function (RemoteUploadIdList) {
  333. // 在本地找可用的 UploadId
  334. var uuid = session.getFileId(params.Body, params.ChunkSize, Bucket, Key);
  335. var LocalUploadIdList = session.getUploadIdList.call(self, uuid);
  336. if (!uuid || !LocalUploadIdList) {
  337. ep.emit('has_and_check_upload_id', RemoteUploadIdList);
  338. return;
  339. }
  340. var next = function (index) {
  341. // 如果本地找不到可用 UploadId,再一个个遍历校验远端
  342. if (index >= LocalUploadIdList.length) {
  343. ep.emit('has_and_check_upload_id', RemoteUploadIdList);
  344. return;
  345. }
  346. var UploadId = LocalUploadIdList[index];
  347. // 如果不在远端 UploadId 列表里,跳过并删除
  348. if (!util.isInArray(RemoteUploadIdList, UploadId)) {
  349. session.removeUploadId.call(self, UploadId);
  350. next(index + 1);
  351. return;
  352. }
  353. // 如果正在上传,跳过
  354. if (session.using[UploadId]) {
  355. next(index + 1);
  356. return;
  357. }
  358. // 判断 UploadId 是否存在线上
  359. wholeMultipartListPart.call(self, {
  360. Bucket: Bucket,
  361. Region: Region,
  362. Key: Key,
  363. UploadId: UploadId,
  364. }, function (err, PartListData) {
  365. if (!self._isRunningTask(TaskId)) return;
  366. if (err) {
  367. // 如果 UploadId 获取会出错,跳过并删除
  368. session.removeUploadId.call(self, UploadId);
  369. next(index + 1);
  370. } else {
  371. // 找到可用 UploadId
  372. ep.emit('upload_id_available', {
  373. UploadId: UploadId,
  374. PartList: PartListData.PartList,
  375. });
  376. }
  377. });
  378. };
  379. next(0);
  380. });
  381. // 获取线上 UploadId 列表
  382. ep.on('get_remote_upload_id_list', function () {
  383. // 获取符合条件的 UploadId 列表,因为同一个文件可以有多个上传任务。
  384. wholeMultipartList.call(self, {
  385. Bucket: Bucket,
  386. Region: Region,
  387. Key: Key,
  388. }, function (err, data) {
  389. if (!self._isRunningTask(TaskId)) return;
  390. if (err) return ep.emit('error', err);
  391. // 整理远端 UploadId 列表
  392. var RemoteUploadIdList = util.filter(data.UploadList, function (item) {
  393. return item.Key === Key && (!StorageClass || item.StorageClass.toUpperCase() === StorageClass.toUpperCase());
  394. }).reverse().map(function (item) {
  395. return item.UploadId || item.UploadID;
  396. });
  397. if (RemoteUploadIdList.length) {
  398. ep.emit('seek_local_avail_upload_id', RemoteUploadIdList);
  399. } else {
  400. // 远端没有 UploadId,清理缓存的 UploadId
  401. var uuid = session.getFileId(params.Body, params.ChunkSize, Bucket, Key), LocalUploadIdList;
  402. if (uuid && (LocalUploadIdList = session.getUploadIdList.call(self, uuid))) {
  403. util.each(LocalUploadIdList, function (UploadId) {
  404. session.removeUploadId.call(self, UploadId);
  405. });
  406. }
  407. ep.emit('no_available_upload_id');
  408. }
  409. });
  410. });
  411. // 开始找可用 UploadId
  412. ep.emit('get_remote_upload_id_list');
  413. }
  414. // 获取符合条件的全部上传任务 (条件包括 Bucket, Region, Prefix)
  415. function wholeMultipartList(params, callback) {
  416. var self = this;
  417. var UploadList = [];
  418. var sendParams = {
  419. Bucket: params.Bucket,
  420. Region: params.Region,
  421. Prefix: params.Key
  422. };
  423. var next = function () {
  424. self.multipartList(sendParams, function (err, data) {
  425. if (err) return callback(err);
  426. UploadList.push.apply(UploadList, data.Upload || []);
  427. if (data.IsTruncated === 'true') { // 列表不完整
  428. sendParams.KeyMarker = data.NextKeyMarker;
  429. sendParams.UploadIdMarker = data.NextUploadIdMarker;
  430. next();
  431. } else {
  432. callback(null, {UploadList: UploadList});
  433. }
  434. });
  435. };
  436. next();
  437. }
  438. // 获取指定上传任务的分块列表
  439. function wholeMultipartListPart(params, callback) {
  440. var self = this;
  441. var PartList = [];
  442. var sendParams = {
  443. Bucket: params.Bucket,
  444. Region: params.Region,
  445. Key: params.Key,
  446. UploadId: params.UploadId
  447. };
  448. var next = function () {
  449. self.multipartListPart(sendParams, function (err, data) {
  450. if (err) return callback(err);
  451. PartList.push.apply(PartList, data.Part || []);
  452. if (data.IsTruncated === 'true') { // 列表不完整
  453. sendParams.PartNumberMarker = data.NextPartNumberMarker;
  454. next();
  455. } else {
  456. callback(null, {PartList: PartList});
  457. }
  458. });
  459. };
  460. next();
  461. }
  462. // 上传文件分块,包括
  463. /*
  464. UploadId (上传任务编号)
  465. AsyncLimit (并发量),
  466. SliceList (上传的分块数组),
  467. FilePath (本地文件的位置),
  468. SliceSize (文件分块大小)
  469. FileSize (文件大小)
  470. onProgress (上传成功之后的回调函数)
  471. */
  472. function uploadSliceList(params, cb) {
  473. var self = this;
  474. var TaskId = params.TaskId;
  475. var Bucket = params.Bucket;
  476. var Region = params.Region;
  477. var Key = params.Key;
  478. var UploadData = params.UploadData;
  479. var FileSize = params.FileSize;
  480. var SliceSize = params.SliceSize;
  481. var ChunkParallel = Math.min(params.AsyncLimit || self.options.ChunkParallelLimit || 1, 256);
  482. var Body = params.Body;
  483. var SliceCount = Math.ceil(FileSize / SliceSize);
  484. var FinishSize = 0;
  485. var ServerSideEncryption = params.ServerSideEncryption;
  486. var Headers = params.Headers;
  487. var needUploadSlices = util.filter(UploadData.PartList, function (SliceItem) {
  488. if (SliceItem['Uploaded']) {
  489. FinishSize += SliceItem['PartNumber'] >= SliceCount ? (FileSize % SliceSize || SliceSize) : SliceSize;
  490. }
  491. return !SliceItem['Uploaded'];
  492. });
  493. var onProgress = params.onProgress;
  494. Async.eachLimit(needUploadSlices, ChunkParallel, function (SliceItem, asyncCallback) {
  495. if (!self._isRunningTask(TaskId)) return;
  496. var PartNumber = SliceItem['PartNumber'];
  497. var currentSize = Math.min(FileSize, SliceItem['PartNumber'] * SliceSize) - (SliceItem['PartNumber'] - 1) * SliceSize;
  498. var preAddSize = 0;
  499. uploadSliceItem.call(self, {
  500. TaskId: TaskId,
  501. Bucket: Bucket,
  502. Region: Region,
  503. Key: Key,
  504. SliceSize: SliceSize,
  505. FileSize: FileSize,
  506. PartNumber: PartNumber,
  507. ServerSideEncryption: ServerSideEncryption,
  508. Body: Body,
  509. UploadData: UploadData,
  510. Headers: Headers,
  511. onProgress: function (data) {
  512. FinishSize += data.loaded - preAddSize;
  513. preAddSize = data.loaded;
  514. onProgress({loaded: FinishSize, total: FileSize});
  515. },
  516. }, function (err, data) {
  517. if (!self._isRunningTask(TaskId)) return;
  518. if (!err && !data.ETag) err = 'get ETag error, please add "ETag" to CORS ExposeHeader setting.( 获取ETag失败,请在CORS ExposeHeader设置中添加ETag,请参考文档:https://cloud.tencent.com/document/product/436/13318 )';
  519. if (err) {
  520. FinishSize -= preAddSize;
  521. } else {
  522. FinishSize += currentSize - preAddSize;
  523. SliceItem.ETag = data.ETag;
  524. }
  525. onProgress({loaded: FinishSize, total: FileSize});
  526. asyncCallback(err || null, data);
  527. });
  528. }, function (err) {
  529. if (!self._isRunningTask(TaskId)) return;
  530. if (err) return cb(err);
  531. cb(null, {
  532. UploadId: UploadData.UploadId,
  533. SliceList: UploadData.PartList
  534. });
  535. });
  536. }
  537. // 上传指定分片
  538. function uploadSliceItem(params, callback) {
  539. var self = this;
  540. var TaskId = params.TaskId;
  541. var Bucket = params.Bucket;
  542. var Region = params.Region;
  543. var Key = params.Key;
  544. var FileSize = params.FileSize;
  545. var FileBody = params.Body;
  546. var PartNumber = params.PartNumber * 1;
  547. var SliceSize = params.SliceSize;
  548. var ServerSideEncryption = params.ServerSideEncryption;
  549. var UploadData = params.UploadData;
  550. var Headers = params.Headers || {};
  551. var ChunkRetryTimes = self.options.ChunkRetryTimes + 1;
  552. var start = SliceSize * (PartNumber - 1);
  553. var ContentLength = SliceSize;
  554. var end = start + SliceSize;
  555. if (end > FileSize) {
  556. end = FileSize;
  557. ContentLength = end - start;
  558. }
  559. var headersWhiteList = ['x-cos-traffic-limit', 'x-cos-mime-limit'];
  560. var headers = {};
  561. util.each(Headers, function(v, k) {
  562. if (headersWhiteList.indexOf(k) > -1) {
  563. headers[k] = v;
  564. }
  565. });
  566. var PartItem = UploadData.PartList[PartNumber - 1];
  567. Async.retry(ChunkRetryTimes, function (tryCallback) {
  568. if (!self._isRunningTask(TaskId)) return;
  569. util.fileSlice(FileBody, start, end, true, function (Body) {
  570. self.multipartUpload({
  571. TaskId: TaskId,
  572. Bucket: Bucket,
  573. Region: Region,
  574. Key: Key,
  575. ContentLength: ContentLength,
  576. PartNumber: PartNumber,
  577. UploadId: UploadData.UploadId,
  578. ServerSideEncryption: ServerSideEncryption,
  579. Body: Body,
  580. Headers: headers,
  581. onProgress: params.onProgress,
  582. }, function (err, data) {
  583. if (!self._isRunningTask(TaskId)) return;
  584. if (err) return tryCallback(err);
  585. PartItem.Uploaded = true;
  586. return tryCallback(null, data);
  587. });
  588. });
  589. }, function (err, data) {
  590. if (!self._isRunningTask(TaskId)) return;
  591. return callback(err, data);
  592. });
  593. }
  594. // 完成分块上传
  595. function uploadSliceComplete(params, callback) {
  596. var Bucket = params.Bucket;
  597. var Region = params.Region;
  598. var Key = params.Key;
  599. var UploadId = params.UploadId;
  600. var SliceList = params.SliceList;
  601. var self = this;
  602. var ChunkRetryTimes = this.options.ChunkRetryTimes + 1;
  603. var Headers = params.Headers;
  604. var Parts = SliceList.map(function (item) {
  605. return {
  606. PartNumber: item.PartNumber,
  607. ETag: item.ETag
  608. };
  609. });
  610. // 完成上传的请求也做重试
  611. Async.retry(ChunkRetryTimes, function (tryCallback) {
  612. self.multipartComplete({
  613. Bucket: Bucket,
  614. Region: Region,
  615. Key: Key,
  616. UploadId: UploadId,
  617. Parts: Parts,
  618. Headers: Headers,
  619. }, tryCallback);
  620. }, function (err, data) {
  621. callback(err, data);
  622. });
  623. }
  624. // 抛弃分块上传任务
  625. /*
  626. AsyncLimit (抛弃上传任务的并发量),
  627. UploadId (上传任务的编号,当 Level 为 task 时候需要)
  628. Level (抛弃分块上传任务的级别,task : 抛弃指定的上传任务,file : 抛弃指定的文件对应的上传任务,其他值 :抛弃指定Bucket 的全部上传任务)
  629. */
  630. function abortUploadTask(params, callback) {
  631. var Bucket = params.Bucket;
  632. var Region = params.Region;
  633. var Key = params.Key;
  634. var UploadId = params.UploadId;
  635. var Level = params.Level || 'task';
  636. var AsyncLimit = params.AsyncLimit;
  637. var self = this;
  638. var ep = new EventProxy();
  639. ep.on('error', function (errData) {
  640. return callback(errData);
  641. });
  642. // 已经获取到需要抛弃的任务列表
  643. ep.on('get_abort_array', function (AbortArray) {
  644. abortUploadTaskArray.call(self, {
  645. Bucket: Bucket,
  646. Region: Region,
  647. Key: Key,
  648. Headers: params.Headers,
  649. AsyncLimit: AsyncLimit,
  650. AbortArray: AbortArray
  651. }, callback);
  652. });
  653. if (Level === 'bucket') {
  654. // Bucket 级别的任务抛弃,抛弃该 Bucket 下的全部上传任务
  655. wholeMultipartList.call(self, {
  656. Bucket: Bucket,
  657. Region: Region
  658. }, function (err, data) {
  659. if (err) return callback(err);
  660. ep.emit('get_abort_array', data.UploadList || []);
  661. });
  662. } else if (Level === 'file') {
  663. // 文件级别的任务抛弃,抛弃该文件的全部上传任务
  664. if (!Key) return callback(util.error(new Error('abort_upload_task_no_key')));
  665. wholeMultipartList.call(self, {
  666. Bucket: Bucket,
  667. Region: Region,
  668. Key: Key
  669. }, function (err, data) {
  670. if (err) return callback(err);
  671. ep.emit('get_abort_array', data.UploadList || []);
  672. });
  673. } else if (Level === 'task') {
  674. // 单个任务级别的任务抛弃,抛弃指定 UploadId 的上传任务
  675. if (!UploadId) return callback(util.error(new Error('abort_upload_task_no_id')));
  676. if (!Key) return callback(util.error(new Error('abort_upload_task_no_key')));
  677. ep.emit('get_abort_array', [{
  678. Key: Key,
  679. UploadId: UploadId
  680. }]);
  681. } else {
  682. return callback(util.error(new Error('abort_unknown_level')));
  683. }
  684. }
  685. // 批量抛弃分块上传任务
  686. function abortUploadTaskArray(params, callback) {
  687. var Bucket = params.Bucket;
  688. var Region = params.Region;
  689. var Key = params.Key;
  690. var AbortArray = params.AbortArray;
  691. var AsyncLimit = params.AsyncLimit || 1;
  692. var self = this;
  693. var index = 0;
  694. var resultList = new Array(AbortArray.length);
  695. Async.eachLimit(AbortArray, AsyncLimit, function (AbortItem, nextItem) {
  696. var eachIndex = index;
  697. if (Key && Key !== AbortItem.Key) {
  698. resultList[eachIndex] = {error: {KeyNotMatch: true}};
  699. nextItem(null);
  700. return;
  701. }
  702. var UploadId = AbortItem.UploadId || AbortItem.UploadID;
  703. self.multipartAbort({
  704. Bucket: Bucket,
  705. Region: Region,
  706. Key: AbortItem.Key,
  707. Headers: params.Headers,
  708. UploadId: UploadId
  709. }, function (err) {
  710. var task = {
  711. Bucket: Bucket,
  712. Region: Region,
  713. Key: AbortItem.Key,
  714. UploadId: UploadId
  715. };
  716. resultList[eachIndex] = {error: err, task: task};
  717. nextItem(null);
  718. });
  719. index++;
  720. }, function (err) {
  721. if (err) return callback(err);
  722. var successList = [];
  723. var errorList = [];
  724. for (var i = 0, len = resultList.length; i < len; i++) {
  725. var item = resultList[i];
  726. if (item['task']) {
  727. if (item['error']) {
  728. errorList.push(item['task']);
  729. } else {
  730. successList.push(item['task']);
  731. }
  732. }
  733. }
  734. return callback(null, {
  735. successList: successList,
  736. errorList: errorList
  737. });
  738. });
  739. }
  740. // 高级上传
  741. function uploadFile(params, callback) {
  742. var self = this;
  743. // 判断多大的文件使用分片上传
  744. var SliceSize = params.SliceSize === undefined ? self.options.SliceSize : params.SliceSize;
  745. var taskList = [];
  746. var Body = params.Body;
  747. var FileSize = Body.size || Body.length || 0;
  748. var fileInfo = {TaskId: ''};
  749. // 整理 option,用于返回给回调
  750. util.each(params, function (v, k) {
  751. if (typeof v !== 'object' && typeof v !== 'function') {
  752. fileInfo[k] = v;
  753. }
  754. });
  755. // 处理文件 TaskReady
  756. var _onTaskReady = params.onTaskReady;
  757. var onTaskReady = function (tid) {
  758. fileInfo.TaskId = tid;
  759. _onTaskReady && _onTaskReady(tid);
  760. };
  761. params.onTaskReady = onTaskReady;
  762. // 处理文件完成
  763. var _onFileFinish = params.onFileFinish;
  764. var onFileFinish = function (err, data) {
  765. _onFileFinish && _onFileFinish(err, data, fileInfo);
  766. callback && callback(err, data);
  767. };
  768. // 添加上传任务,超过阈值使用分块上传,小于等于则简单上传
  769. var api = FileSize > SliceSize ? 'sliceUploadFile' : 'putObject';
  770. taskList.push({
  771. api: api,
  772. params: params,
  773. callback: onFileFinish,
  774. });
  775. self._addTasks(taskList);
  776. }
  777. // 批量上传文件
  778. function uploadFiles(params, callback) {
  779. var self = this;
  780. // 判断多大的文件使用分片上传
  781. var SliceSize = params.SliceSize === undefined ? self.options.SliceSize : params.SliceSize;
  782. // 汇总返回进度
  783. var TotalSize = 0;
  784. var TotalFinish = 0;
  785. var onTotalProgress = util.throttleOnProgress.call(self, TotalFinish, params.onProgress);
  786. // 汇总返回回调
  787. var unFinishCount = params.files.length;
  788. var _onTotalFileFinish = params.onFileFinish;
  789. var resultList = Array(unFinishCount);
  790. var onTotalFileFinish = function (err, data, options) {
  791. onTotalProgress(null, true);
  792. _onTotalFileFinish && _onTotalFileFinish(err, data, options);
  793. resultList[options.Index] = {
  794. options: options,
  795. error: err,
  796. data: data
  797. };
  798. if (--unFinishCount <= 0 && callback) {
  799. callback(null, {files: resultList});
  800. }
  801. };
  802. // 开始处理每个文件
  803. var taskList = [];
  804. util.each(params.files, function (fileParams, index) {
  805. (function () { // 对齐 nodejs 缩进
  806. var Body = fileParams.Body;
  807. var FileSize = Body.size || Body.length || 0;
  808. var fileInfo = {Index: index, TaskId: ''};
  809. // 更新文件总大小
  810. TotalSize += FileSize;
  811. // 整理 option,用于返回给回调
  812. util.each(fileParams, function (v, k) {
  813. if (typeof v !== 'object' && typeof v !== 'function') {
  814. fileInfo[k] = v;
  815. }
  816. });
  817. // 处理单个文件 TaskReady
  818. var _onTaskReady = fileParams.onTaskReady;
  819. var onTaskReady = function (tid) {
  820. fileInfo.TaskId = tid;
  821. _onTaskReady && _onTaskReady(tid);
  822. };
  823. fileParams.onTaskReady = onTaskReady;
  824. // 处理单个文件进度
  825. var PreAddSize = 0;
  826. var _onProgress = fileParams.onProgress;
  827. var onProgress = function (info) {
  828. TotalFinish = TotalFinish - PreAddSize + info.loaded;
  829. PreAddSize = info.loaded;
  830. _onProgress && _onProgress(info);
  831. onTotalProgress({loaded: TotalFinish, total: TotalSize});
  832. };
  833. fileParams.onProgress = onProgress;
  834. // 处理单个文件完成
  835. var _onFileFinish = fileParams.onFileFinish;
  836. var onFileFinish = function (err, data) {
  837. _onFileFinish && _onFileFinish(err, data);
  838. onTotalFileFinish && onTotalFileFinish(err, data, fileInfo);
  839. };
  840. // 添加上传任务
  841. var api = FileSize > SliceSize ? 'sliceUploadFile' : 'putObject';
  842. taskList.push({
  843. api: api,
  844. params: fileParams,
  845. callback: onFileFinish,
  846. });
  847. })();
  848. });
  849. self._addTasks(taskList);
  850. }
  851. // 分片复制文件
  852. function sliceCopyFile(params, callback) {
  853. var ep = new EventProxy();
  854. var self = this;
  855. var Bucket = params.Bucket;
  856. var Region = params.Region;
  857. var Key = params.Key;
  858. var CopySource = params.CopySource;
  859. var m = util.getSourceParams.call(this, CopySource);
  860. if (!m) {
  861. callback(util.error(new Error('CopySource format error')));
  862. return;
  863. }
  864. var SourceBucket = m.Bucket;
  865. var SourceRegion = m.Region;
  866. var SourceKey = decodeURIComponent(m.Key);
  867. var CopySliceSize = params.CopySliceSize === undefined ? self.options.CopySliceSize : params.CopySliceSize;
  868. CopySliceSize = Math.max(0, CopySliceSize);
  869. var ChunkSize = params.CopyChunkSize || this.options.CopyChunkSize;
  870. var ChunkParallel = this.options.CopyChunkParallelLimit;
  871. var FinishSize = 0;
  872. var FileSize;
  873. var onProgress;
  874. // 分片复制完成,开始 multipartComplete 操作
  875. ep.on('copy_slice_complete', function (UploadData) {
  876. var metaHeaders = {};
  877. util.each(params.Headers, function (val, k) {
  878. if (k.toLowerCase().indexOf('x-cos-meta-') === 0) metaHeaders[k] = val;
  879. });
  880. var Parts = util.map(UploadData.PartList, function (item) {
  881. return {
  882. PartNumber: item.PartNumber,
  883. ETag: item.ETag,
  884. };
  885. });
  886. self.multipartComplete({
  887. Bucket: Bucket,
  888. Region: Region,
  889. Key: Key,
  890. UploadId: UploadData.UploadId,
  891. Parts: Parts,
  892. },function (err, data) {
  893. if (err) {
  894. onProgress(null, true);
  895. return callback(err);
  896. }
  897. onProgress({loaded: FileSize, total: FileSize}, true);
  898. callback(null, data);
  899. });
  900. });
  901. ep.on('get_copy_data_finish',function (UploadData) {
  902. Async.eachLimit(UploadData.PartList, ChunkParallel, function (SliceItem, asyncCallback) {
  903. var PartNumber = SliceItem.PartNumber;
  904. var CopySourceRange = SliceItem.CopySourceRange;
  905. var currentSize = SliceItem.end - SliceItem.start;
  906. copySliceItem.call(self, {
  907. Bucket: Bucket,
  908. Region: Region,
  909. Key: Key,
  910. CopySource: CopySource,
  911. UploadId: UploadData.UploadId,
  912. PartNumber: PartNumber,
  913. CopySourceRange: CopySourceRange,
  914. },function (err,data) {
  915. if (err) return asyncCallback(err);
  916. FinishSize += currentSize;
  917. onProgress({loaded: FinishSize, total: FileSize});
  918. SliceItem.ETag = data.ETag;
  919. asyncCallback(err || null, data);
  920. });
  921. }, function (err) {
  922. if (err) {
  923. onProgress(null, true);
  924. return callback(err);
  925. }
  926. ep.emit('copy_slice_complete', UploadData);
  927. });
  928. });
  929. ep.on('get_file_size_finish', function (SourceHeaders) {
  930. // 控制分片大小
  931. (function () {
  932. var SIZE = [1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 1024 * 2, 1024 * 4, 1024 * 5];
  933. var AutoChunkSize = 1024 * 1024;
  934. for (var i = 0; i < SIZE.length; i++) {
  935. AutoChunkSize = SIZE[i] * 1024 * 1024;
  936. if (FileSize / AutoChunkSize <= self.options.MaxPartNumber) break;
  937. }
  938. params.ChunkSize = ChunkSize = Math.max(ChunkSize, AutoChunkSize);
  939. var ChunkCount = Math.ceil(FileSize / ChunkSize);
  940. var list = [];
  941. for (var partNumber = 1; partNumber <= ChunkCount; partNumber++) {
  942. var start = (partNumber - 1) * ChunkSize;
  943. var end = partNumber * ChunkSize < FileSize ? (partNumber * ChunkSize - 1) : FileSize - 1;
  944. var item = {
  945. PartNumber: partNumber,
  946. start: start,
  947. end: end,
  948. CopySourceRange: "bytes=" + start + "-" + end,
  949. };
  950. list.push(item);
  951. }
  952. params.PartList = list;
  953. })();
  954. var TargetHeader;
  955. if (params.Headers['x-cos-metadata-directive'] === 'Replaced') {
  956. TargetHeader = params.Headers;
  957. } else {
  958. TargetHeader = SourceHeaders;
  959. }
  960. TargetHeader['x-cos-storage-class'] = params.Headers['x-cos-storage-class'] || SourceHeaders['x-cos-storage-class'];
  961. TargetHeader = util.clearKey(TargetHeader);
  962. /**
  963. * 对于归档存储的对象,如果未恢复副本,则不允许 Copy
  964. */
  965. if (SourceHeaders['x-cos-storage-class'] === 'ARCHIVE' || SourceHeaders['x-cos-storage-class'] === 'DEEP_ARCHIVE') {
  966. var restoreHeader = SourceHeaders['x-cos-restore'];
  967. if (!restoreHeader || restoreHeader === 'ongoing-request="true"') {
  968. callback(util.error(new Error('Unrestored archive object is not allowed to be copied')));
  969. return;
  970. }
  971. }
  972. /**
  973. * 去除一些无用的头部,规避 multipartInit 出错
  974. * 这些头部通常是在 putObjectCopy 时才使用
  975. */
  976. delete TargetHeader['x-cos-copy-source'];
  977. delete TargetHeader['x-cos-metadata-directive'];
  978. delete TargetHeader['x-cos-copy-source-If-Modified-Since'];
  979. delete TargetHeader['x-cos-copy-source-If-Unmodified-Since'];
  980. delete TargetHeader['x-cos-copy-source-If-Match'];
  981. delete TargetHeader['x-cos-copy-source-If-None-Match'];
  982. self.multipartInit({
  983. Bucket: Bucket,
  984. Region: Region,
  985. Key: Key,
  986. Headers: TargetHeader,
  987. },function (err,data) {
  988. if (err) return callback(err);
  989. params.UploadId = data.UploadId;
  990. ep.emit('get_copy_data_finish', params);
  991. });
  992. });
  993. // 获取远端复制源文件的大小
  994. self.headObject({
  995. Bucket: SourceBucket,
  996. Region: SourceRegion,
  997. Key: SourceKey,
  998. },function(err, data) {
  999. if (err) {
  1000. if (err.statusCode && err.statusCode === 404) {
  1001. callback(util.error(err, {ErrorStatus: SourceKey + ' Not Exist'}));
  1002. } else {
  1003. callback(err);
  1004. }
  1005. return;
  1006. }
  1007. FileSize = params.FileSize = data.headers['content-length'];
  1008. if (FileSize === undefined || !FileSize) {
  1009. callback(util.error(new Error('get Content-Length error, please add "Content-Length" to CORS ExposeHeader setting.( 获取Content-Length失败,请在CORS ExposeHeader设置中添加Content-Length,请参考文档:https://cloud.tencent.com/document/product/436/13318 )')));
  1010. return;
  1011. }
  1012. onProgress = util.throttleOnProgress.call(self, FileSize, params.onProgress);
  1013. // 开始上传
  1014. if (FileSize <= CopySliceSize) {
  1015. if (!params.Headers['x-cos-metadata-directive']) {
  1016. params.Headers['x-cos-metadata-directive'] = 'Copy';
  1017. }
  1018. self.putObjectCopy(params, function (err, data) {
  1019. if (err) {
  1020. onProgress(null, true);
  1021. return callback(err);
  1022. }
  1023. onProgress({loaded: FileSize, total: FileSize}, true);
  1024. callback(err, data);
  1025. });
  1026. } else {
  1027. var resHeaders = data.headers;
  1028. var SourceHeaders = {
  1029. 'Cache-Control': resHeaders['cache-control'],
  1030. 'Content-Disposition': resHeaders['content-disposition'],
  1031. 'Content-Encoding': resHeaders['content-encoding'],
  1032. 'Content-Type': resHeaders['content-type'],
  1033. 'Expires': resHeaders['expires'],
  1034. 'x-cos-storage-class': resHeaders['x-cos-storage-class'],
  1035. };
  1036. util.each(resHeaders, function (v, k) {
  1037. var metaPrefix = 'x-cos-meta-';
  1038. if (k.indexOf(metaPrefix) === 0 && k.length > metaPrefix.length) {
  1039. SourceHeaders[k] = v;
  1040. }
  1041. });
  1042. ep.emit('get_file_size_finish', SourceHeaders);
  1043. }
  1044. });
  1045. }
  1046. // 复制指定分片
  1047. function copySliceItem(params, callback) {
  1048. var TaskId = params.TaskId;
  1049. var Bucket = params.Bucket;
  1050. var Region = params.Region;
  1051. var Key = params.Key;
  1052. var CopySource = params.CopySource;
  1053. var UploadId = params.UploadId;
  1054. var PartNumber = params.PartNumber * 1;
  1055. var CopySourceRange = params.CopySourceRange;
  1056. var ChunkRetryTimes = this.options.ChunkRetryTimes + 1;
  1057. var self = this;
  1058. Async.retry(ChunkRetryTimes, function (tryCallback) {
  1059. self.uploadPartCopy({
  1060. TaskId: TaskId,
  1061. Bucket: Bucket,
  1062. Region: Region,
  1063. Key: Key,
  1064. CopySource: CopySource,
  1065. UploadId: UploadId,
  1066. PartNumber:PartNumber,
  1067. CopySourceRange:CopySourceRange,
  1068. },function (err,data) {
  1069. tryCallback(err || null, data);
  1070. })
  1071. }, function (err, data) {
  1072. return callback(err, data);
  1073. });
  1074. }
  1075. var API_MAP = {
  1076. sliceUploadFile: sliceUploadFile,
  1077. abortUploadTask: abortUploadTask,
  1078. uploadFile: uploadFile,
  1079. uploadFiles: uploadFiles,
  1080. sliceCopyFile: sliceCopyFile,
  1081. };
  1082. module.exports.init = function (COS, task) {
  1083. task.transferToTaskMethod(API_MAP, 'sliceUploadFile');
  1084. util.each(API_MAP, function (fn, apiName) {
  1085. COS.prototype[apiName] = util.apiWrapper(apiName, fn);
  1086. });
  1087. };