@@ -9,7 +9,12 @@ const ObjectQueueEntry = require('../../../lib/models/ObjectQueueEntry');
99const ReplicateObject = require ( './ReplicateObject' ) ;
1010const {
1111 GetObjectCommand,
12- MultipleBackendPutObjectCommand
12+ MultipleBackendPutObjectCommand,
13+ MultipleBackendDeleteObjectCommand,
14+ MultipleBackendInitiateMPUCommand,
15+ MultipleBackendAbortMPUCommand,
16+ MultipleBackendPutObjectTaggingCommand,
17+ MultipleBackendDeleteObjectTaggingCommand
1318} = require ( '../../../lib/clients/smithy/build/smithy/source/typescript-codegen' ) ;
1419const { attachReqUids } = require ( '../../../lib/clients/utils' ) ;
1520const getExtMetrics = require ( '../utils/getExtMetrics' ) ;
@@ -304,16 +309,20 @@ class MultipleBackendTask extends ReplicateObject {
304309 uploadId,
305310 } ) ;
306311 const doneOnce = jsutil . once ( cb ) ;
307- const destReq = this . backbeatSource . multipleBackendAbortMPU ( {
312+ const command = new MultipleBackendAbortMPUCommand ( {
308313 Bucket : sourceEntry . getBucket ( ) ,
309314 Key : sourceEntry . getObjectKey ( ) ,
310315 StorageType : sourceEntry . getReplicationStorageType ( ) ,
311316 StorageClass : this . site ,
312317 UploadId : uploadId ,
318+ RequestUids : log . getSerializedUids ( ) ,
313319 } ) ;
314- attachReqUids ( destReq , log ) ;
315- return destReq . send ( err => {
316- if ( err ) {
320+
321+ return this . backbeatSource . send ( command )
322+ . then ( ( ) => {
323+ return doneOnce ( ) ;
324+ } )
325+ . catch ( err => {
317326 // eslint-disable-next-line no-param-reassign
318327 err . origin = 'source' ;
319328 log . error ( 'an error occurred aborting multipart upload' , {
@@ -324,9 +333,7 @@ class MultipleBackendTask extends ReplicateObject {
324333 error : err . message ,
325334 } ) ;
326335 return doneOnce ( err ) ;
327- }
328- return doneOnce ( ) ;
329- } ) ;
336+ } ) ;
330337 }
331338
332339 /**
@@ -450,7 +457,7 @@ class MultipleBackendTask extends ReplicateObject {
450457 const uploadId = uuid ( ) . replace ( / - / g, '' ) ;
451458 return setImmediate ( ( ) => cb ( null , uploadId ) ) ;
452459 }
453- const destReq = this . backbeatSource . multipleBackendInitiateMPU ( {
460+ const command = new MultipleBackendInitiateMPUCommand ( {
454461 Bucket : sourceEntry . getBucket ( ) ,
455462 Key : sourceEntry . getObjectKey ( ) ,
456463 StorageType : sourceEntry . getReplicationStorageType ( ) ,
@@ -463,10 +470,14 @@ class MultipleBackendTask extends ReplicateObject {
463470 sourceEntry . getContentDisposition ( ) || undefined ,
464471 ContentEncoding : sourceEntry . getContentEncoding ( ) || undefined ,
465472 Tags : JSON . stringify ( sourceEntry . getTags ( ) ) ,
473+ RequestUids : log . getSerializedUids ( ) ,
466474 } ) ;
467- attachReqUids ( destReq , log ) ;
468- return destReq . send ( ( err , data ) => {
469- if ( err ) {
475+
476+ return this . backbeatSource . send ( command )
477+ . then ( data => {
478+ return cb ( null , data . uploadId ) ;
479+ } )
480+ . catch ( err => {
470481 // eslint-disable-next-line no-param-reassign
471482 err . origin = 'source' ;
472483 log . error ( 'an error occurred on initating MPU to S3' , {
@@ -477,9 +488,7 @@ class MultipleBackendTask extends ReplicateObject {
477488 error : err . message ,
478489 } ) ;
479490 return cb ( err ) ;
480- }
481- return cb ( null , data . uploadId ) ;
482- } ) ;
491+ } ) ;
483492 }
484493
485494 /**
@@ -907,36 +916,38 @@ class MultipleBackendTask extends ReplicateObject {
907916 log . debug ( 'replicating object tags' , {
908917 entry : sourceEntry . getLogInfo ( ) ,
909918 } ) ;
910- const destReq = this . backbeatSource
911- . multipleBackendPutObjectTagging ( {
912- Bucket : sourceEntry . getBucket ( ) ,
913- Key : sourceEntry . getObjectKey ( ) ,
914- StorageType : sourceEntry . getReplicationStorageType ( ) ,
915- StorageClass : this . site ,
916- DataStoreVersionId :
917- sourceEntry . getReplicationSiteDataStoreVersionId ( this . site ) ,
918- Tags : JSON . stringify ( sourceEntry . getTags ( ) ) ,
919- SourceBucket : sourceEntry . getBucket ( ) ,
920- SourceVersionId : sourceEntry . getVersionId ( ) ,
921- ReplicationEndpointSite : this . site ,
922- } ) ;
923- attachReqUids ( destReq , log ) ;
919+ const command = new MultipleBackendPutObjectTaggingCommand ( {
920+ Bucket : sourceEntry . getBucket ( ) ,
921+ Key : sourceEntry . getObjectKey ( ) ,
922+ StorageType : sourceEntry . getReplicationStorageType ( ) ,
923+ StorageClass : this . site ,
924+ DataStoreVersionId :
925+ sourceEntry . getReplicationSiteDataStoreVersionId ( this . site ) ,
926+ Tags : JSON . stringify ( sourceEntry . getTags ( ) ) ,
927+ SourceBucket : sourceEntry . getBucket ( ) ,
928+ SourceVersionId : sourceEntry . getVersionId ( ) ,
929+ ReplicationEndpointSite : this . site ,
930+ RequestUids : log . getSerializedUids ( ) ,
931+ } ) ;
932+
924933 const writeStartTime = Date . now ( ) ;
925- return destReq . send ( ( err , data ) => {
926- if ( err ) {
934+ return this . backbeatSource . send ( command )
935+ . then ( data => {
936+ sourceEntry . setReplicationSiteDataStoreVersionId ( this . site ,
937+ data . versionId ) ;
938+ // TODO : review metadata metrics
939+ this . _publishMetadataWriteMetrics ( JSON . stringify ( command . input ) , writeStartTime ) ;
940+ return doneOnce ( ) ;
941+ } )
942+ . catch ( err => {
927943 log . error ( 'an error occurred putting object tagging to S3' , {
928944 method : 'MultipleBackendTask._putObjectTaggingOnce' ,
929945 entry : sourceEntry . getLogInfo ( ) ,
930946 origin : 'target' ,
931947 error : err . message ,
932948 } ) ;
933949 return doneOnce ( err ) ;
934- }
935- sourceEntry . setReplicationSiteDataStoreVersionId ( this . site ,
936- data . versionId ) ;
937- this . _publishMetadataWriteMetrics ( destReq . httpRequest . body , writeStartTime ) ;
938- return doneOnce ( ) ;
939- } ) ;
950+ } ) ;
940951 }
941952
942953 _deleteObjectTagging ( sourceEntry , log , cb ) {
@@ -961,7 +972,7 @@ class MultipleBackendTask extends ReplicateObject {
961972 log . debug ( 'replicating delete object tagging' , {
962973 entry : sourceEntry . getLogInfo ( ) ,
963974 } ) ;
964- const destReq = this . backbeatSource . multipleBackendDeleteObjectTagging ( {
975+ const command = new MultipleBackendDeleteObjectTaggingCommand ( {
965976 Bucket : sourceEntry . getBucket ( ) ,
966977 Key : sourceEntry . getObjectKey ( ) ,
967978 StorageType : sourceEntry . getReplicationStorageType ( ) ,
@@ -971,11 +982,19 @@ class MultipleBackendTask extends ReplicateObject {
971982 SourceBucket : sourceEntry . getBucket ( ) ,
972983 SourceVersionId : sourceEntry . getVersionId ( ) ,
973984 ReplicationEndpointSite : this . site ,
985+ RequestUids : log . getSerializedUids ( ) ,
974986 } ) ;
975- attachReqUids ( destReq , log ) ;
987+
976988 const writeStartTime = Date . now ( ) ;
977- return destReq . send ( ( err , data ) => {
978- if ( err ) {
989+ return this . backbeatSource . send ( command )
990+ . then ( data => {
991+ sourceEntry . setReplicationSiteDataStoreVersionId ( this . site ,
992+ data . versionId ) ;
993+ // TODO : review metadata metrics
994+ this . _publishMetadataWriteMetrics ( JSON . stringify ( command . input ) , writeStartTime ) ;
995+ return doneOnce ( ) ;
996+ } )
997+ . catch ( err => {
979998 log . error ( 'an error occurred on deleting object tagging' , {
980999 method : 'MultipleBackendTask._deleteObjectTaggingOnce' ,
9811000 entry : sourceEntry . getLogInfo ( ) ,
@@ -984,12 +1003,7 @@ class MultipleBackendTask extends ReplicateObject {
9841003 error : err . message ,
9851004 } ) ;
9861005 return doneOnce ( err ) ;
987- }
988- sourceEntry . setReplicationSiteDataStoreVersionId ( this . site ,
989- data . versionId ) ;
990- this . _publishMetadataWriteMetrics ( destReq . httpRequest . body , writeStartTime ) ;
991- return doneOnce ( ) ;
992- } ) ;
1006+ } ) ;
9931007 }
9941008
9951009 _putDeleteMarker ( sourceEntry , log , cb ) {
@@ -1038,16 +1052,22 @@ class MultipleBackendTask extends ReplicateObject {
10381052 * @return {undefined }
10391053 */
10401054 _sendMultipleBackendDeleteObject ( sourceEntry , log , doneOnce ) {
1041- const destReq = this . backbeatSource . multipleBackendDeleteObject ( {
1055+ const command = new MultipleBackendDeleteObjectCommand ( {
10421056 Bucket : sourceEntry . getBucket ( ) ,
10431057 Key : sourceEntry . getObjectKey ( ) ,
10441058 StorageType : sourceEntry . getReplicationStorageType ( ) ,
10451059 StorageClass : this . site ,
1060+ RequestUids : log . getSerializedUids ( ) ,
10461061 } ) ;
1047- attachReqUids ( destReq , log ) ;
1062+
10481063 const writeStartTime = Date . now ( ) ;
1049- return destReq . send ( err => {
1050- if ( err ) {
1064+ return this . backbeatSource . send ( command )
1065+ . then ( ( ) => {
1066+ // TODO : This metric is changed, needs double checking
1067+ this . _publishMetadataWriteMetrics ( JSON . stringify ( command . input ) , writeStartTime ) ;
1068+ return doneOnce ( ) ;
1069+ } )
1070+ . catch ( err => {
10511071 // eslint-disable-next-line no-param-reassign
10521072 err . origin = 'source' ;
10531073 log . error ( 'an error occurred on putting delete marker to S3' , {
@@ -1058,10 +1078,7 @@ class MultipleBackendTask extends ReplicateObject {
10581078 error : err . message ,
10591079 } ) ;
10601080 return doneOnce ( err ) ;
1061- }
1062- this . _publishMetadataWriteMetrics ( destReq . httpRequest . body , writeStartTime ) ;
1063- return doneOnce ( ) ;
1064- } ) ;
1081+ } ) ;
10651082 }
10661083
10671084 /**
0 commit comments