advance.js 41 KB


  1. var session = require('./session');
  2. var Async = require('./async');
  3. var EventProxy = require('./event').EventProxy;
  4. var util = require('./util');
  5. // 文件分块上传全过程,暴露的分块上传接口
  6. function sliceUploadFile(params, callback) {
  7. var self = this;
  8. // 如果小程序版本不支持获取文件分片内容,统一转到 postObject 接口上传
  9. if (!util.canFileSlice()) {
  10. params.SkipTask = true;
  11. self.postObject(params, callback);
  12. return;
  13. }
  14. var ep = new EventProxy();
  15. var TaskId = params.TaskId;
  16. var Bucket = params.Bucket;
  17. var Region = params.Region;
  18. var Key = params.Key;
  19. var FilePath = params.FilePath;
  20. var ChunkSize = params.ChunkSize || params.SliceSize || self.options.ChunkSize;
  21. var AsyncLimit = params.AsyncLimit;
  22. var StorageClass = params.StorageClass;
  23. var ServerSideEncryption = params.ServerSideEncryption;
  24. var FileSize;
  25. var onProgress;
  26. var onHashProgress = params.onHashProgress;
  27. // 上传过程中出现错误,返回错误
  28. ep.on('error', function (err) {
  29. if (!self._isRunningTask(TaskId)) return;
  30. var _err = {
  31. UploadId: params.UploadData.UploadId || '',
  32. err: err,
  33. };
  34. return callback(_err);
  35. });
  36. // 上传分块完成,开始 uploadSliceComplete 操作
  37. ep.on('upload_complete', function (UploadCompleteData) {
  38. var _UploadCompleteData = util.extend({
  39. UploadId: params.UploadData.UploadId || ''
  40. }, UploadCompleteData);
  41. callback(null, _UploadCompleteData);
  42. });
  43. // 上传分块完成,开始 uploadSliceComplete 操作
  44. ep.on('upload_slice_complete', function (UploadData) {
  45. uploadSliceComplete.call(self, {
  46. Bucket: Bucket,
  47. Region: Region,
  48. Key: Key,
  49. UploadId: UploadData.UploadId,
  50. SliceList: UploadData.SliceList,
  51. }, function (err, data) {
  52. if (!self._isRunningTask(TaskId)) return;
  53. session.removeUsing(UploadData.UploadId);
  54. if (err) {
  55. onProgress(null, true);
  56. return ep.emit('error', err);
  57. }
  58. session.removeUploadId(UploadData.UploadId);
  59. onProgress({loaded: FileSize, total: FileSize}, true);
  60. ep.emit('upload_complete', data);
  61. });
  62. });
  63. // 获取 UploadId 完成,开始上传每个分片
  64. ep.on('get_upload_data_finish', function (UploadData) {
  65. // 处理 UploadId 缓存
  66. var uuid = session.getFileId(params.FileStat, params.ChunkSize, Bucket, Key);
  67. uuid && session.saveUploadId(uuid, UploadData.UploadId, self.options.UploadIdCacheLimit); // 缓存 UploadId
  68. session.setUsing(UploadData.UploadId); // 标记 UploadId 为正在使用
  69. // 获取 UploadId
  70. onProgress(null, true); // 任务状态开始 uploading
  71. uploadSliceList.call(self, {
  72. TaskId: TaskId,
  73. Bucket: Bucket,
  74. Region: Region,
  75. Key: Key,
  76. FilePath: FilePath,
  77. FileSize: FileSize,
  78. SliceSize: ChunkSize,
  79. AsyncLimit: AsyncLimit,
  80. ServerSideEncryption: ServerSideEncryption,
  81. UploadData: UploadData,
  82. onProgress: onProgress
  83. }, function (err, data) {
  84. if (!self._isRunningTask(TaskId)) return;
  85. if (err) {
  86. onProgress(null, true);
  87. return ep.emit('error', err);
  88. }
  89. ep.emit('upload_slice_complete', data);
  90. });
  91. });
  92. // 开始获取文件 UploadId,里面会视情况计算 ETag,并比对,保证文件一致性,也优化上传
  93. ep.on('get_file_size_finish', function () {
  94. onProgress = util.throttleOnProgress.call(self, FileSize, params.onProgress);
  95. if (params.UploadData.UploadId) {
  96. ep.emit('get_upload_data_finish', params.UploadData);
  97. } else {
  98. var _params = util.extend({
  99. TaskId: TaskId,
  100. Bucket: Bucket,
  101. Region: Region,
  102. Key: Key,
  103. Headers: params.Headers,
  104. StorageClass: StorageClass,
  105. FilePath: FilePath,
  106. FileSize: FileSize,
  107. SliceSize: ChunkSize,
  108. onHashProgress: onHashProgress,
  109. }, params);
  110. getUploadIdAndPartList.call(self, _params, function (err, UploadData) {
  111. if (!self._isRunningTask(TaskId)) return;
  112. if (err) return ep.emit('error', err);
  113. params.UploadData.UploadId = UploadData.UploadId;
  114. params.UploadData.PartList = UploadData.PartList;
  115. ep.emit('get_upload_data_finish', params.UploadData);
  116. });
  117. }
  118. });
  119. // 获取上传文件大小
  120. FileSize = params.ContentLength;
  121. delete params.ContentLength;
  122. !params.Headers && (params.Headers = {});
  123. util.each(params.Headers, function (item, key) {
  124. if (key.toLowerCase() === 'content-length') {
  125. delete params.Headers[key];
  126. }
  127. });
  128. // 控制分片大小
  129. (function () {
  130. var SIZE = [1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 1024 * 2, 1024 * 4, 1024 * 5];
  131. var AutoChunkSize = 1024 * 1024;
  132. for (var i = 0; i < SIZE.length; i++) {
  133. AutoChunkSize = SIZE[i] * 1024 * 1024;
  134. if (FileSize / AutoChunkSize <= self.options.MaxPartNumber) break;
  135. }
  136. params.ChunkSize = params.SliceSize = ChunkSize = Math.max(ChunkSize, AutoChunkSize);
  137. })();
  138. // 开始上传
  139. if (FileSize === 0) {
  140. params.Body = '';
  141. params.ContentLength = 0;
  142. params.SkipTask = true;
  143. self.putObject(params, function (err, data) {
  144. if (err) {
  145. return callback(err);
  146. }
  147. callback(null, data);
  148. });
  149. } else {
  150. ep.emit('get_file_size_finish');
  151. }
  152. }
  153. // 获取上传任务的 UploadId
  154. function getUploadIdAndPartList(params, callback) {
  155. var TaskId = params.TaskId;
  156. var Bucket = params.Bucket;
  157. var Region = params.Region;
  158. var Key = params.Key;
  159. var StorageClass = params.StorageClass;
  160. var self = this;
  161. // 计算 ETag
  162. var ETagMap = {};
  163. var FileSize = params.FileSize;
  164. var SliceSize = params.SliceSize;
  165. var SliceCount = Math.ceil(FileSize / SliceSize);
  166. var FinishSliceCount = 0;
  167. var FinishSize = 0;
  168. var onHashProgress = util.throttleOnProgress.call(self, FileSize, params.onHashProgress);
  169. var getChunkETag = function (PartNumber, callback) {
  170. var start = SliceSize * (PartNumber - 1);
  171. var end = Math.min(start + SliceSize, FileSize);
  172. var ChunkSize = end - start;
  173. if (ETagMap[PartNumber]) {
  174. callback(null, {
  175. PartNumber: PartNumber,
  176. ETag: ETagMap[PartNumber],
  177. Size: ChunkSize
  178. });
  179. } else {
  180. util.fileSlice(params.FilePath, start, end, function (chunkItem) {
  181. try {
  182. var md5 = util.getFileMd5(chunkItem);
  183. } catch (err) {
  184. return callback(err);
  185. }
  186. var ETag = '"' + md5 + '"';
  187. ETagMap[PartNumber] = ETag;
  188. FinishSliceCount += 1;
  189. FinishSize += ChunkSize;
  190. callback(null, {
  191. PartNumber: PartNumber,
  192. ETag: ETag,
  193. Size: ChunkSize
  194. });
  195. onHashProgress({loaded: FinishSize, total: FileSize});
  196. });
  197. }
  198. };
  199. // 通过和文件的 md5 对比,判断 UploadId 是否可用
  200. var isAvailableUploadList = function (PartList, callback) {
  201. var PartCount = PartList.length;
  202. // 如果没有分片,通过
  203. if (PartCount === 0) {
  204. return callback(null, true);
  205. }
  206. // 检查分片数量
  207. if (PartCount > SliceCount) {
  208. return callback(null, false);
  209. }
  210. // 检查分片大小
  211. if (PartCount > 1) {
  212. var PartSliceSize = Math.max(PartList[0].Size, PartList[1].Size);
  213. if (PartSliceSize !== SliceSize) {
  214. return callback(null, false);
  215. }
  216. }
  217. // 逐个分片计算并检查 ETag 是否一致
  218. var next = function (index) {
  219. if (index < PartCount) {
  220. var Part = PartList[index];
  221. getChunkETag(Part.PartNumber, function (err, chunk) {
  222. if (chunk && chunk.ETag === Part.ETag && chunk.Size === Part.Size) {
  223. next(index + 1);
  224. } else {
  225. callback(null, false);
  226. }
  227. });
  228. } else {
  229. callback(null, true);
  230. }
  231. };
  232. next(0);
  233. };
  234. var ep = new EventProxy();
  235. ep.on('error', function (errData) {
  236. if (!self._isRunningTask(TaskId)) return;
  237. return callback(errData);
  238. });
  239. // 存在 UploadId
  240. ep.on('upload_id_available', function (UploadData) {
  241. // 转换成 map
  242. var map = {};
  243. var list = [];
  244. util.each(UploadData.PartList, function (item) {
  245. map[item.PartNumber] = item;
  246. });
  247. for (var PartNumber = 1; PartNumber <= SliceCount; PartNumber++) {
  248. var item = map[PartNumber];
  249. if (item) {
  250. item.PartNumber = PartNumber;
  251. item.Uploaded = true;
  252. } else {
  253. item = {
  254. PartNumber: PartNumber,
  255. ETag: null,
  256. Uploaded: false
  257. };
  258. }
  259. list.push(item);
  260. }
  261. UploadData.PartList = list;
  262. callback(null, UploadData);
  263. });
  264. // 不存在 UploadId, 初始化生成 UploadId
  265. ep.on('no_available_upload_id', function () {
  266. if (!self._isRunningTask(TaskId)) return;
  267. var _params = util.extend({
  268. Bucket: Bucket,
  269. Region: Region,
  270. Key: Key,
  271. Headers: util.clone(params.Headers),
  272. Query: util.clone(params.Query),
  273. StorageClass: StorageClass,
  274. }, params);
  275. self.multipartInit(_params, function (err, data) {
  276. if (!self._isRunningTask(TaskId)) return;
  277. if (err) return ep.emit('error', err);
  278. var UploadId = data.UploadId;
  279. if (!UploadId) {
  280. return callback({Message: 'no upload id'});
  281. }
  282. ep.emit('upload_id_available', {UploadId: UploadId, PartList: []});
  283. });
  284. });
  285. // 如果已存在 UploadId,找一个可以用的 UploadId
  286. ep.on('has_and_check_upload_id', function (UploadIdList) {
  287. // 串行地,找一个内容一致的 UploadId
  288. UploadIdList = UploadIdList.reverse();
  289. Async.eachLimit(UploadIdList, 1, function (UploadId, asyncCallback) {
  290. if (!self._isRunningTask(TaskId)) return;
  291. // 如果正在上传,跳过
  292. if (session.using[UploadId]) {
  293. asyncCallback(); // 检查下一个 UploadId
  294. return;
  295. }
  296. // 判断 UploadId 是否可用
  297. wholeMultipartListPart.call(self, {
  298. Bucket: Bucket,
  299. Region: Region,
  300. Key: Key,
  301. UploadId: UploadId,
  302. }, function (err, PartListData) {
  303. if (!self._isRunningTask(TaskId)) return;
  304. if (err) {
  305. session.removeUsing(UploadId);
  306. return ep.emit('error', err);
  307. }
  308. var PartList = PartListData.PartList;
  309. PartList.forEach(function (item) {
  310. item.PartNumber *= 1;
  311. item.Size *= 1;
  312. item.ETag = item.ETag || '';
  313. });
  314. isAvailableUploadList(PartList, function (err, isAvailable) {
  315. if (!self._isRunningTask(TaskId)) return;
  316. if (err) return ep.emit('error', err);
  317. if (isAvailable) {
  318. asyncCallback({
  319. UploadId: UploadId,
  320. PartList: PartList
  321. }); // 马上结束
  322. } else {
  323. asyncCallback(); // 检查下一个 UploadId
  324. }
  325. });
  326. });
  327. }, function (AvailableUploadData) {
  328. if (!self._isRunningTask(TaskId)) return;
  329. onHashProgress(null, true);
  330. if (AvailableUploadData && AvailableUploadData.UploadId) {
  331. ep.emit('upload_id_available', AvailableUploadData);
  332. } else {
  333. ep.emit('no_available_upload_id');
  334. }
  335. });
  336. });
  337. // 在本地缓存找可用的 UploadId
  338. ep.on('seek_local_avail_upload_id', function (RemoteUploadIdList) {
  339. // 在本地找可用的 UploadId
  340. var uuid = session.getFileId(params.FileStat, params.ChunkSize, Bucket, Key);
  341. var LocalUploadIdList = session.getUploadIdList(uuid);
  342. if (!uuid || !LocalUploadIdList) {
  343. ep.emit('has_and_check_upload_id', RemoteUploadIdList);
  344. return;
  345. }
  346. var next = function (index) {
  347. // 如果本地找不到可用 UploadId,再一个个遍历校验远端
  348. if (index >= LocalUploadIdList.length) {
  349. ep.emit('has_and_check_upload_id', RemoteUploadIdList);
  350. return;
  351. }
  352. var UploadId = LocalUploadIdList[index];
  353. // 如果不在远端 UploadId 列表里,跳过并删除
  354. if (!util.isInArray(RemoteUploadIdList, UploadId)) {
  355. session.removeUploadId(UploadId);
  356. next(index + 1);
  357. return;
  358. }
  359. // 如果正在上传,跳过
  360. if (session.using[UploadId]) {
  361. next(index + 1);
  362. return;
  363. }
  364. // 判断 UploadId 是否存在线上
  365. wholeMultipartListPart.call(self, {
  366. Bucket: Bucket,
  367. Region: Region,
  368. Key: Key,
  369. UploadId: UploadId,
  370. }, function (err, PartListData) {
  371. if (!self._isRunningTask(TaskId)) return;
  372. if (err) {
  373. // 如果 UploadId 获取会出错,跳过并删除
  374. session.removeUploadId(UploadId);
  375. next(index + 1);
  376. } else {
  377. // 找到可用 UploadId
  378. ep.emit('upload_id_available', {
  379. UploadId: UploadId,
  380. PartList: PartListData.PartList,
  381. });
  382. }
  383. });
  384. };
  385. next(0);
  386. });
  387. // 获取线上 UploadId 列表
  388. ep.on('get_remote_upload_id_list', function () {
  389. // 获取符合条件的 UploadId 列表,因为同一个文件可以有多个上传任务。
  390. wholeMultipartList.call(self, {
  391. Bucket: Bucket,
  392. Region: Region,
  393. Key: Key,
  394. }, function (err, data) {
  395. if (!self._isRunningTask(TaskId)) return;
  396. if (err) {
  397. return ep.emit('error', err);
  398. }
  399. // 整理远端 UploadId 列表
  400. var RemoteUploadIdList = util.filter(data.UploadList, function (item) {
  401. return item.Key === Key && (!StorageClass || item.StorageClass.toUpperCase() === StorageClass.toUpperCase());
  402. }).reverse().map(function (item) {
  403. return item.UploadId || item.UploadID;
  404. });
  405. if (RemoteUploadIdList.length) {
  406. ep.emit('seek_local_avail_upload_id', RemoteUploadIdList);
  407. } else {
  408. // 远端没有 UploadId,清理缓存的 UploadId
  409. var uuid = session.getFileId(params.FileStat, params.ChunkSize, Bucket, Key), LocalUploadIdList;
  410. if (uuid && (LocalUploadIdList = session.getUploadIdList(uuid))) {
  411. util.each(LocalUploadIdList, function (UploadId) {
  412. session.removeUploadId(UploadId);
  413. });
  414. }
  415. ep.emit('no_available_upload_id');
  416. }
  417. });
  418. });
  419. // 开始找可用 UploadId
  420. ep.emit('get_remote_upload_id_list');
  421. }
  422. // 获取符合条件的全部上传任务 (条件包括 Bucket, Region, Prefix)
  423. function wholeMultipartList(params, callback) {
  424. var self = this;
  425. var UploadList = [];
  426. var sendParams = {
  427. Bucket: params.Bucket,
  428. Region: params.Region,
  429. Prefix: params.Key
  430. };
  431. var next = function () {
  432. self.multipartList(sendParams, function (err, data) {
  433. if (err) return callback(err);
  434. UploadList.push.apply(UploadList, data.Upload || []);
  435. if (data.IsTruncated === 'true') { // 列表不完整
  436. sendParams.KeyMarker = data.NextKeyMarker;
  437. sendParams.UploadIdMarker = data.NextUploadIdMarker;
  438. next();
  439. } else {
  440. callback(null, {UploadList: UploadList});
  441. }
  442. });
  443. };
  444. next();
  445. }
  446. // 获取指定上传任务的分块列表
  447. function wholeMultipartListPart(params, callback) {
  448. var self = this;
  449. var PartList = [];
  450. var sendParams = {
  451. Bucket: params.Bucket,
  452. Region: params.Region,
  453. Key: params.Key,
  454. UploadId: params.UploadId
  455. };
  456. var next = function () {
  457. self.multipartListPart(sendParams, function (err, data) {
  458. if (err) return callback(err);
  459. PartList.push.apply(PartList, data.Part || []);
  460. if (data.IsTruncated === 'true') { // 列表不完整
  461. sendParams.PartNumberMarker = data.NextPartNumberMarker;
  462. next();
  463. } else {
  464. callback(null, {PartList: PartList});
  465. }
  466. });
  467. };
  468. next();
  469. }
  470. // 上传文件分块,包括
  471. /*
  472. UploadId (上传任务编号)
  473. AsyncLimit (并发量),
  474. SliceList (上传的分块数组),
  475. FilePath (本地文件的位置),
  476. SliceSize (文件分块大小)
  477. FileSize (文件大小)
  478. onProgress (上传成功之后的回调函数)
  479. */
  480. function uploadSliceList(params, cb) {
  481. var self = this;
  482. var TaskId = params.TaskId;
  483. var Bucket = params.Bucket;
  484. var Region = params.Region;
  485. var Key = params.Key;
  486. var UploadData = params.UploadData;
  487. var FileSize = params.FileSize;
  488. var SliceSize = params.SliceSize;
  489. var ChunkParallel = Math.min(params.AsyncLimit || self.options.ChunkParallelLimit || 1, 256);
  490. var FilePath = params.FilePath;
  491. var SliceCount = Math.ceil(FileSize / SliceSize);
  492. var FinishSize = 0;
  493. var ServerSideEncryption = params.ServerSideEncryption;
  494. var needUploadSlices = util.filter(UploadData.PartList, function (SliceItem) {
  495. if (SliceItem['Uploaded']) {
  496. FinishSize += SliceItem['PartNumber'] >= SliceCount ? (FileSize % SliceSize || SliceSize) : SliceSize;
  497. }
  498. return !SliceItem['Uploaded'];
  499. });
  500. var onProgress = params.onProgress;
  501. Async.eachLimit(needUploadSlices, ChunkParallel, function (SliceItem, asyncCallback) {
  502. if (!self._isRunningTask(TaskId)) return;
  503. var PartNumber = SliceItem['PartNumber'];
  504. var currentSize = Math.min(FileSize, SliceItem['PartNumber'] * SliceSize) - (SliceItem['PartNumber'] - 1) * SliceSize;
  505. var preAddSize = 0;
  506. uploadSliceItem.call(self, {
  507. TaskId: TaskId,
  508. Bucket: Bucket,
  509. Region: Region,
  510. Key: Key,
  511. SliceSize: SliceSize,
  512. FileSize: FileSize,
  513. PartNumber: PartNumber,
  514. ServerSideEncryption: ServerSideEncryption,
  515. FilePath: FilePath,
  516. UploadData: UploadData,
  517. onProgress: function (data) {
  518. FinishSize += data.loaded - preAddSize;
  519. preAddSize = data.loaded;
  520. onProgress({loaded: FinishSize, total: FileSize});
  521. },
  522. }, function (err, data) {
  523. if (!self._isRunningTask(TaskId)) return;
  524. if (err) {
  525. FinishSize -= preAddSize;
  526. } else {
  527. FinishSize += currentSize - preAddSize;
  528. SliceItem.ETag = data.ETag;
  529. }
  530. onProgress({loaded: FinishSize, total: FileSize});
  531. asyncCallback(err || null, data);
  532. });
  533. }, function (err) {
  534. if (!self._isRunningTask(TaskId)) return;
  535. if (err) return cb(err);
  536. cb(null, {
  537. UploadId: UploadData.UploadId,
  538. SliceList: UploadData.PartList
  539. });
  540. });
  541. }
  542. // 上传指定分片
  543. function uploadSliceItem(params, callback) {
  544. var self = this;
  545. var TaskId = params.TaskId;
  546. var Bucket = params.Bucket;
  547. var Region = params.Region;
  548. var Key = params.Key;
  549. var FileSize = params.FileSize;
  550. var FilePath = params.FilePath;
  551. var PartNumber = params.PartNumber * 1;
  552. var SliceSize = params.SliceSize;
  553. var ServerSideEncryption = params.ServerSideEncryption;
  554. var UploadData = params.UploadData;
  555. var ChunkRetryTimes = self.options.ChunkRetryTimes + 1;
  556. var Headers = params.Headers || {};
  557. var start = SliceSize * (PartNumber - 1);
  558. var ContentLength = SliceSize;
  559. var end = start + SliceSize;
  560. if (end > FileSize) {
  561. end = FileSize;
  562. ContentLength = end - start;
  563. }
  564. var headersWhiteList = ['x-cos-traffic-limit', 'x-cos-mime-limit'];
  565. var headers = {};
  566. util.each(Headers, function(v, k) {
  567. if (headersWhiteList.indexOf(k) > -1) {
  568. headers[k] = v;
  569. }
  570. });
  571. util.fileSlice(FilePath, start, end, function (Body) {
  572. var md5 = util.getFileMd5(Body);
  573. var contentMd5 = md5 ? util.binaryBase64(md5) : null;
  574. var PartItem = UploadData.PartList[PartNumber - 1];
  575. Async.retry(ChunkRetryTimes, function (tryCallback) {
  576. if (!self._isRunningTask(TaskId)) return;
  577. self.multipartUpload({
  578. TaskId: TaskId,
  579. Bucket: Bucket,
  580. Region: Region,
  581. Key: Key,
  582. ContentLength: ContentLength,
  583. PartNumber: PartNumber,
  584. UploadId: UploadData.UploadId,
  585. ServerSideEncryption: ServerSideEncryption,
  586. Body: Body,
  587. Headers: headers,
  588. onProgress: params.onProgress,
  589. ContentMD5: contentMd5,
  590. }, function (err, data) {
  591. if (!self._isRunningTask(TaskId)) return;
  592. if (err) {
  593. return tryCallback(err);
  594. } else {
  595. PartItem.Uploaded = true;
  596. return tryCallback(null, data);
  597. }
  598. });
  599. }, function (err, data) {
  600. if (!self._isRunningTask(TaskId)) return;
  601. return callback(err, data);
  602. });
  603. });
  604. }
  605. // 完成分块上传
  606. function uploadSliceComplete(params, callback) {
  607. var Bucket = params.Bucket;
  608. var Region = params.Region;
  609. var Key = params.Key;
  610. var UploadId = params.UploadId;
  611. var SliceList = params.SliceList;
  612. var self = this;
  613. var ChunkRetryTimes = this.options.ChunkRetryTimes + 1;
  614. var Parts = SliceList.map(function (item) {
  615. return {
  616. PartNumber: item.PartNumber,
  617. ETag: item.ETag
  618. };
  619. });
  620. // 完成上传的请求也做重试
  621. Async.retry(ChunkRetryTimes, function (tryCallback) {
  622. self.multipartComplete({
  623. Bucket: Bucket,
  624. Region: Region,
  625. Key: Key,
  626. UploadId: UploadId,
  627. Parts: Parts
  628. }, tryCallback);
  629. }, function (err, data) {
  630. callback(err, data);
  631. });
  632. }
  633. // 抛弃分块上传任务
  634. /*
  635. AsyncLimit (抛弃上传任务的并发量),
  636. UploadId (上传任务的编号,当 Level 为 task 时候需要)
  637. Level (抛弃分块上传任务的级别,task : 抛弃指定的上传任务,file : 抛弃指定的文件对应的上传任务,其他值 :抛弃指定Bucket 的全部上传任务)
  638. */
  639. function abortUploadTask(params, callback) {
  640. var Bucket = params.Bucket;
  641. var Region = params.Region;
  642. var Key = params.Key;
  643. var UploadId = params.UploadId;
  644. var Level = params.Level || 'task';
  645. var AsyncLimit = params.AsyncLimit;
  646. var self = this;
  647. var ep = new EventProxy();
  648. ep.on('error', function (errData) {
  649. return callback(errData);
  650. });
  651. // 已经获取到需要抛弃的任务列表
  652. ep.on('get_abort_array', function (AbortArray) {
  653. abortUploadTaskArray.call(self, {
  654. Bucket: Bucket,
  655. Region: Region,
  656. Key: Key,
  657. Headers: params.Headers,
  658. AsyncLimit: AsyncLimit,
  659. AbortArray: AbortArray
  660. }, function (err, data) {
  661. if (err) {
  662. return callback(err);
  663. }
  664. callback(null, data);
  665. });
  666. });
  667. if (Level === 'bucket') {
  668. // Bucket 级别的任务抛弃,抛弃该 Bucket 下的全部上传任务
  669. wholeMultipartList.call(self, {
  670. Bucket: Bucket,
  671. Region: Region
  672. }, function (err, data) {
  673. if (err) {
  674. return callback(err);
  675. }
  676. ep.emit('get_abort_array', data.UploadList || []);
  677. });
  678. } else if (Level === 'file') {
  679. // 文件级别的任务抛弃,抛弃该文件的全部上传任务
  680. if (!Key) return callback({error: 'abort_upload_task_no_key'});
  681. wholeMultipartList.call(self, {
  682. Bucket: Bucket,
  683. Region: Region,
  684. Key: Key
  685. }, function (err, data) {
  686. if (err) {
  687. return callback(err);
  688. }
  689. ep.emit('get_abort_array', data.UploadList || []);
  690. });
  691. } else if (Level === 'task') {
  692. // 单个任务级别的任务抛弃,抛弃指定 UploadId 的上传任务
  693. if (!UploadId) return callback({error: 'abort_upload_task_no_id'});
  694. if (!Key) return callback({error: 'abort_upload_task_no_key'});
  695. ep.emit('get_abort_array', [{
  696. Key: Key,
  697. UploadId: UploadId
  698. }]);
  699. } else {
  700. return callback({error: 'abort_unknown_level'});
  701. }
  702. }
  703. // 批量抛弃分块上传任务
  704. function abortUploadTaskArray(params, callback) {
  705. var Bucket = params.Bucket;
  706. var Region = params.Region;
  707. var Key = params.Key;
  708. var AbortArray = params.AbortArray;
  709. var AsyncLimit = params.AsyncLimit || 1;
  710. var self = this;
  711. var index = 0;
  712. var resultList = new Array(AbortArray.length);
  713. Async.eachLimit(AbortArray, AsyncLimit, function (AbortItem, callback) {
  714. var eachIndex = index;
  715. if (Key && Key !== AbortItem.Key) {
  716. resultList[eachIndex] = {error: {KeyNotMatch: true}};
  717. callback(null);
  718. return;
  719. }
  720. var UploadId = AbortItem.UploadId || AbortItem.UploadID;
  721. self.multipartAbort({
  722. Bucket: Bucket,
  723. Region: Region,
  724. Key: AbortItem.Key,
  725. Headers: params.Headers,
  726. UploadId: UploadId
  727. }, function (err) {
  728. var task = {
  729. Bucket: Bucket,
  730. Region: Region,
  731. Key: AbortItem.Key,
  732. UploadId: UploadId
  733. };
  734. resultList[eachIndex] = {error: err, task: task};
  735. callback(null);
  736. });
  737. index++;
  738. }, function (err) {
  739. if (err) {
  740. return callback(err);
  741. }
  742. var successList = [];
  743. var errorList = [];
  744. for (var i = 0, len = resultList.length; i < len; i++) {
  745. var item = resultList[i];
  746. if (item['task']) {
  747. if (item['error']) {
  748. errorList.push(item['task']);
  749. } else {
  750. successList.push(item['task']);
  751. }
  752. }
  753. }
  754. return callback(null, {
  755. successList: successList,
  756. errorList: errorList
  757. });
  758. });
  759. }
  760. // 高级上传
  761. function uploadFile(params, callback) {
  762. var self = this;
  763. // 判断多大的文件使用分片上传
  764. var SliceSize = params.SliceSize === undefined ? self.options.SliceSize : params.SliceSize;
  765. var taskList = [];
  766. var FileSize = params.FileSize;
  767. var fileInfo = {TaskId: ''};
  768. // 整理 option,用于返回给回调
  769. util.each(params, function (v, k) {
  770. if (typeof v !== 'object' && typeof v !== 'function') {
  771. fileInfo[k] = v;
  772. }
  773. });
  774. // 处理文件 TaskReady
  775. var _onTaskReady = params.onTaskReady;
  776. params.onTaskReady = function (tid) {
  777. fileInfo.TaskId = tid;
  778. _onTaskReady && _onTaskReady(tid);
  779. };
  780. // 处理文件完成
  781. var _onFileFinish = params.onFileFinish;
  782. var onFileFinish = function (err, data) {
  783. _onFileFinish && _onFileFinish(err, data, fileInfo);
  784. callback && callback(err, data);
  785. };
  786. // 添加上传任务
  787. var api = FileSize > SliceSize ? 'sliceUploadFile' : 'postObject';
  788. taskList.push({
  789. api: api,
  790. params: params,
  791. callback: onFileFinish,
  792. });
  793. self._addTasks(taskList);
  794. }
  795. // 批量上传文件
  796. function uploadFiles(params, callback) {
  797. var self = this;
  798. // 判断多大的文件使用分片上传
  799. var SliceSize = params.SliceSize === undefined ? self.options.SliceSize : params.SliceSize;
  800. // 汇总返回进度
  801. var TotalSize = 0;
  802. var TotalFinish = 0;
  803. var onTotalProgress = util.throttleOnProgress.call(self, TotalFinish, params.onProgress);
  804. // 汇总返回回调
  805. var unFinishCount = params.files.length;
  806. var _onTotalFileFinish = params.onFileFinish;
  807. var resultList = Array(unFinishCount);
  808. var onTotalFileFinish = function (err, data, options) {
  809. onTotalProgress(null, true);
  810. _onTotalFileFinish && _onTotalFileFinish(err, data, options);
  811. resultList[options.Index] = {
  812. options: options,
  813. error: err,
  814. data: data
  815. };
  816. if (--unFinishCount <= 0 && callback) {
  817. callback(null, {
  818. files: resultList,
  819. });
  820. }
  821. };
  822. // 开始处理每个文件
  823. var taskList = [];
  824. util.each(params.files, function (fileParams, index) {
  825. var FileSize = fileParams.FileSize;
  826. var fileInfo = {Index: index, TaskId: ''};
  827. // 更新文件总大小
  828. TotalSize += FileSize;
  829. // 整理 option,用于返回给回调
  830. util.each(fileParams, function (v, k) {
  831. if (typeof v !== 'object' && typeof v !== 'function') {
  832. fileInfo[k] = v;
  833. }
  834. });
  835. // 处理单个文件 TaskReady
  836. var _onTaskReady = fileParams.onTaskReady;
  837. fileParams.onTaskReady = function (tid) {
  838. fileInfo.TaskId = tid;
  839. _onTaskReady && _onTaskReady(tid);
  840. };
  841. // 处理单个文件进度
  842. var PreAddSize = 0;
  843. var _onProgress = fileParams.onProgress;
  844. fileParams.onProgress = function (info) {
  845. TotalFinish = TotalFinish - PreAddSize + info.loaded;
  846. PreAddSize = info.loaded;
  847. _onProgress && _onProgress(info);
  848. onTotalProgress({loaded: TotalFinish, total: TotalSize});
  849. };
  850. // 处理单个文件完成
  851. var _onFileFinish = fileParams.onFileFinish;
  852. var onFileFinish = function (err, data) {
  853. _onFileFinish && _onFileFinish(err, data);
  854. onTotalFileFinish && onTotalFileFinish(err, data, fileInfo);
  855. };
  856. // 添加上传任务
  857. var api = FileSize > SliceSize ? 'sliceUploadFile' : 'postObject';
  858. taskList.push({
  859. api: api,
  860. params: fileParams,
  861. callback: onFileFinish,
  862. });
  863. });
  864. self._addTasks(taskList);
  865. }
  866. // 分片复制文件
  867. function sliceCopyFile(params, callback) {
  868. var ep = new EventProxy();
  869. var self = this;
  870. var Bucket = params.Bucket;
  871. var Region = params.Region;
  872. var Key = params.Key;
  873. var CopySource = params.CopySource;
  874. var m = CopySource.match(/^([^.]+-\d+)\.cos(v6)?\.([^.]+)\.[^/]+\/(.+)$/);
  875. if (!m) {
  876. callback({error: 'CopySource format error'});
  877. return;
  878. }
  879. var SourceBucket = m[1];
  880. var SourceRegion = m[3];
  881. var SourceKey = decodeURIComponent(m[4]);
  882. var CopySliceSize = params.CopySliceSize === undefined ? self.options.CopySliceSize : params.CopySliceSize;
  883. CopySliceSize = Math.max(0, CopySliceSize);
  884. var ChunkSize = params.CopyChunkSize || this.options.CopyChunkSize;
  885. var ChunkParallel = this.options.CopyChunkParallelLimit;
  886. var FinishSize = 0;
  887. var FileSize;
  888. var onProgress;
  889. // 分片复制完成,开始 multipartComplete 操作
  890. ep.on('copy_slice_complete', function (UploadData) {
  891. self.multipartComplete({
  892. Bucket: Bucket,
  893. Region: Region,
  894. Key: Key,
  895. UploadId: UploadData.UploadId,
  896. Parts: UploadData.PartList,
  897. },function (err, data) {
  898. if (err) {
  899. onProgress(null, true);
  900. return callback(err);
  901. }
  902. onProgress({loaded: FileSize, total: FileSize}, true);
  903. callback(null, data);
  904. });
  905. });
  906. ep.on('get_copy_data_finish',function (UploadData) {
  907. Async.eachLimit(UploadData.PartList, ChunkParallel, function (SliceItem, asyncCallback) {
  908. var PartNumber = SliceItem.PartNumber;
  909. var CopySourceRange = SliceItem.CopySourceRange;
  910. var currentSize = SliceItem.end - SliceItem.start;
  911. var preAddSize = 0;
  912. copySliceItem.call(self, {
  913. Bucket: Bucket,
  914. Region: Region,
  915. Key: Key,
  916. CopySource: CopySource,
  917. UploadId: UploadData.UploadId,
  918. PartNumber: PartNumber,
  919. CopySourceRange: CopySourceRange,
  920. onProgress: function (data) {
  921. FinishSize += data.loaded - preAddSize;
  922. preAddSize = data.loaded;
  923. onProgress({loaded: FinishSize, total: FileSize});
  924. }
  925. },function (err,data) {
  926. if (err) {
  927. return asyncCallback(err);
  928. }
  929. onProgress({loaded: FinishSize, total: FileSize});
  930. FinishSize += currentSize - preAddSize;
  931. SliceItem.ETag = data.ETag;
  932. asyncCallback(err || null, data);
  933. });
  934. }, function (err) {
  935. if (err) {
  936. onProgress(null, true);
  937. return callback(err);
  938. }
  939. ep.emit('copy_slice_complete', UploadData);
  940. });
  941. });
  942. ep.on('get_file_size_finish', function (SourceHeaders) {
  943. // 控制分片大小
  944. (function () {
  945. var SIZE = [1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 1024 * 2, 1024 * 4, 1024 * 5];
  946. var AutoChunkSize = 1024 * 1024;
  947. for (var i = 0; i < SIZE.length; i++) {
  948. AutoChunkSize = SIZE[i] * 1024 * 1024;
  949. if (FileSize / AutoChunkSize <= self.options.MaxPartNumber) break;
  950. }
  951. params.ChunkSize = ChunkSize = Math.max(ChunkSize, AutoChunkSize);
  952. var ChunkCount = Math.ceil(FileSize / ChunkSize);
  953. var list = [];
  954. for (var partNumber = 1; partNumber <= ChunkCount; partNumber++) {
  955. var start = (partNumber - 1) * ChunkSize;
  956. var end = partNumber * ChunkSize < FileSize ? (partNumber * ChunkSize - 1) : FileSize - 1;
  957. var item = {
  958. PartNumber: partNumber,
  959. start: start,
  960. end: end,
  961. CopySourceRange: "bytes=" + start + "-" + end,
  962. };
  963. list.push(item);
  964. }
  965. params.PartList = list;
  966. })();
  967. var TargetHeader;
  968. if (params.Headers['x-cos-metadata-directive'] === 'Replaced') {
  969. TargetHeader = params.Headers;
  970. } else {
  971. TargetHeader = SourceHeaders;
  972. }
  973. TargetHeader['x-cos-storage-class'] = params.Headers['x-cos-storage-class'] || SourceHeaders['x-cos-storage-class'];
  974. TargetHeader = util.clearKey(TargetHeader);
  975. /**
  976. * 对于归档存储的对象,如果未恢复副本,则不允许 Copy
  977. */
  978. if (SourceHeaders['x-cos-storage-class'] === 'ARCHIVE' || SourceHeaders['x-cos-storage-class'] === 'DEEP_ARCHIVE') {
  979. var restoreHeader = SourceHeaders['x-cos-restore'];
  980. if (!restoreHeader || restoreHeader === 'ongoing-request="true"') {
  981. callback({ error: 'Unrestored archive object is not allowed to be copied' });
  982. return;
  983. }
  984. }
  985. /**
  986. * 去除一些无用的头部,规避 multipartInit 出错
  987. * 这些头部通常是在 putObjectCopy 时才使用
  988. */
  989. delete TargetHeader['x-cos-copy-source'];
  990. delete TargetHeader['x-cos-metadata-directive'];
  991. delete TargetHeader['x-cos-copy-source-If-Modified-Since'];
  992. delete TargetHeader['x-cos-copy-source-If-Unmodified-Since'];
  993. delete TargetHeader['x-cos-copy-source-If-Match'];
  994. delete TargetHeader['x-cos-copy-source-If-None-Match'];
  995. self.multipartInit({
  996. Bucket: Bucket,
  997. Region: Region,
  998. Key: Key,
  999. Headers: TargetHeader,
  1000. },function (err,data) {
  1001. if (err) {
  1002. return callback(err);
  1003. }
  1004. params.UploadId = data.UploadId;
  1005. ep.emit('get_copy_data_finish', params);
  1006. });
  1007. });
  1008. // 获取远端复制源文件的大小
  1009. self.headObject({
  1010. Bucket: SourceBucket,
  1011. Region: SourceRegion,
  1012. Key: SourceKey,
  1013. },function(err, data) {
  1014. if (err) {
  1015. if (err.statusCode && err.statusCode === 404) {
  1016. callback({ErrorStatus: SourceKey + ' Not Exist'});
  1017. } else {
  1018. callback(err);
  1019. }
  1020. return;
  1021. }
  1022. FileSize = params.FileSize = data.headers['content-length'];
  1023. if (FileSize === undefined || !FileSize) {
  1024. callback({error: 'get Content-Length error, please add "Content-Length" to CORS ExposeHeader setting.'});
  1025. return;
  1026. }
  1027. onProgress = util.throttleOnProgress.call(self, FileSize, params.onProgress);
  1028. // 开始上传
  1029. if (FileSize <= CopySliceSize) {
  1030. if (!params.Headers['x-cos-metadata-directive']) {
  1031. params.Headers['x-cos-metadata-directive'] = 'Copy';
  1032. }
  1033. self.putObjectCopy(params, function (err, data) {
  1034. if (err) {
  1035. onProgress(null, true);
  1036. return callback(err);
  1037. }
  1038. onProgress({loaded: FileSize, total: FileSize}, true);
  1039. callback(err, data);
  1040. });
  1041. } else {
  1042. var resHeaders = data.headers;
  1043. var SourceHeaders = {
  1044. 'Cache-Control': resHeaders['cache-control'],
  1045. 'Content-Disposition': resHeaders['content-disposition'],
  1046. 'Content-Encoding': resHeaders['content-encoding'],
  1047. 'Content-Type': resHeaders['content-type'],
  1048. 'Expires': resHeaders['expires'],
  1049. 'x-cos-storage-class': resHeaders['x-cos-storage-class'],
  1050. };
  1051. util.each(resHeaders, function (v, k) {
  1052. var metaPrefix = 'x-cos-meta-';
  1053. if (k.indexOf(metaPrefix) === 0 && k.length > metaPrefix.length) {
  1054. SourceHeaders[k] = v;
  1055. }
  1056. });
  1057. ep.emit('get_file_size_finish', SourceHeaders);
  1058. }
  1059. });
  1060. }
  1061. // 复制指定分片
  1062. function copySliceItem(params, callback) {
  1063. var TaskId = params.TaskId;
  1064. var Bucket = params.Bucket;
  1065. var Region = params.Region;
  1066. var Key = params.Key;
  1067. var CopySource = params.CopySource;
  1068. var UploadId = params.UploadId;
  1069. var PartNumber = params.PartNumber * 1;
  1070. var CopySourceRange = params.CopySourceRange;
  1071. var ChunkRetryTimes = this.options.ChunkRetryTimes + 1;
  1072. var self = this;
  1073. Async.retry(ChunkRetryTimes, function (tryCallback) {
  1074. self.uploadPartCopy({
  1075. TaskId: TaskId,
  1076. Bucket: Bucket,
  1077. Region: Region,
  1078. Key: Key,
  1079. CopySource: CopySource,
  1080. UploadId: UploadId,
  1081. PartNumber:PartNumber,
  1082. CopySourceRange:CopySourceRange,
  1083. onProgress:params.onProgress,
  1084. },function (err,data) {
  1085. tryCallback(err || null, data);
  1086. })
  1087. }, function (err, data) {
  1088. return callback(err, data);
  1089. });
  1090. }
  1091. var API_MAP = {
  1092. sliceUploadFile: sliceUploadFile,
  1093. abortUploadTask: abortUploadTask,
  1094. uploadFile: uploadFile,
  1095. uploadFiles: uploadFiles,
  1096. sliceCopyFile: sliceCopyFile,
  1097. };
  1098. module.exports.init = function (COS, task) {
  1099. task.transferToTaskMethod(API_MAP, 'sliceUploadFile');
  1100. util.each(API_MAP, function (fn, apiName) {
  1101. COS.prototype[apiName] = util.apiWrapper(apiName, fn);
  1102. });
  1103. };