Skip to content

Commit 4b0c3b9

Browse files
committed
Complete GCP namespace support
Signed-off-by: Ben <[email protected]>
1 parent bedd272 commit 4b0c3b9

File tree

1 file changed

+144
-8
lines changed

1 file changed

+144
-8
lines changed

src/sdk/namespace_gcp.js

+144-8
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,11 @@ const util = require('util');
77
const P = require('../util/promise');
88
const stream_utils = require('../util/stream_utils');
99
const dbg = require('../util/debug_module')(__filename);
10+
const s3_utils = require('../endpoint/s3/s3_utils');
1011
const S3Error = require('../endpoint/s3/s3_errors').S3Error;
1112
// we use this wrapper to set a custom user agent
1213
const GoogleCloudStorage = require('../util/google_storage_wrap');
14+
const aws_sdk = require('aws-sdk');
1315

1416
/**
1517
* @implements {nb.Namespace}
@@ -47,6 +49,16 @@ class NamespaceGCP {
4749
private_key: this.private_key,
4850
}
4951
});
52+
this.gcs.createHmacKey(client_email).then((res) => {
53+
this.hmac_key = res[0];
54+
this.hmac_secret = res[1];
55+
});
56+
this.s3_client = new aws_sdk.S3({
57+
endpoint: 'https://storage.googleapis.com',
58+
accessKeyId: this.hmac_key,
59+
secretAccessKey: this.hmac_secret
60+
});
61+
5062
this.bucket = target_bucket;
5163
this.access_mode = access_mode;
5264
this.stats = stats;
@@ -289,27 +301,129 @@ class NamespaceGCP {
289301

290302
async create_object_upload(params, object_sdk) {
291303
dbg.log0('NamespaceGCP.create_object_upload:', this.bucket, inspect(params));
292-
throw new S3Error(S3Error.NotImplemented);
304+
const Tagging = params.tagging && params.tagging.map(tag => tag.key + '=' + tag.value).join('&');
305+
/** @type {AWS.S3.CreateMultipartUploadRequest} */
306+
const request = {
307+
Bucket: this.bucket,
308+
Key: params.key,
309+
ContentType: params.content_type,
310+
StorageClass: params.storage_class,
311+
Metadata: params.xattr,
312+
Tagging
313+
};
314+
const res = await this.s3_client.createMultipartUpload(request).promise();
315+
316+
dbg.log0('NamespaceGCP.create_object_upload:', this.bucket, inspect(params), 'res', inspect(res));
317+
return { obj_id: res.UploadId };
293318
}
294319

295320
async upload_multipart(params, object_sdk) {
296321
dbg.log0('NamespaceGCP.upload_multipart:', this.bucket, inspect(params));
297-
throw new S3Error(S3Error.NotImplemented);
322+
let res;
323+
if (params.copy_source) {
324+
const { copy_source, copy_source_range } = s3_utils.format_copy_source(params.copy_source);
325+
326+
/** @type {AWS.S3.UploadPartCopyRequest} */
327+
const request = {
328+
Bucket: this.bucket,
329+
Key: params.key,
330+
UploadId: params.obj_id,
331+
PartNumber: params.num,
332+
CopySource: copy_source,
333+
CopySourceRange: copy_source_range,
334+
};
335+
336+
res = await this.s3_client.uploadPartCopy(request).promise();
337+
} else {
338+
let count = 1;
339+
const count_stream = stream_utils.get_tap_stream(data => {
340+
this.stats?.update_namespace_write_stats({
341+
namespace_resource_id: this.namespace_resource_id,
342+
size: data.length,
343+
count
344+
});
345+
// clear count for next updates
346+
count = 0;
347+
});
348+
/** @type {AWS.S3.UploadPartRequest} */
349+
const request = {
350+
Bucket: this.bucket,
351+
Key: params.key,
352+
UploadId: params.obj_id,
353+
PartNumber: params.num,
354+
Body: params.source_stream.pipe(count_stream),
355+
ContentMD5: params.md5_b64,
356+
ContentLength: params.size,
357+
};
358+
try {
359+
res = await this.s3_client.uploadPart(request).promise();
360+
} catch (err) {
361+
object_sdk.rpc_client.pool.update_issues_report({
362+
namespace_resource_id: this.namespace_resource_id,
363+
error_code: String(err.code),
364+
time: Date.now(),
365+
});
366+
throw err;
367+
}
368+
}
369+
dbg.log0('NamespaceGCP.upload_multipart:', this.bucket, inspect(params), 'res', inspect(res));
370+
const etag = s3_utils.parse_etag(res.ETag);
371+
return { etag };
298372
}
299373

300374
async list_multiparts(params, object_sdk) {
301375
dbg.log0('NamespaceGCP.list_multiparts:', this.bucket, inspect(params));
302-
throw new S3Error(S3Error.NotImplemented);
376+
377+
const res = await this.s3_client.listParts({
378+
Bucket: this.bucket,
379+
Key: params.key,
380+
UploadId: params.obj_id,
381+
MaxParts: params.max,
382+
PartNumberMarker: params.num_marker,
383+
}).promise();
384+
385+
dbg.log0('NamespaceGCP.list_multiparts:', this.bucket, inspect(params), 'res', inspect(res));
386+
return {
387+
is_truncated: res.IsTruncated,
388+
next_num_marker: res.NextPartNumberMarker,
389+
multiparts: _.map(res.Parts, p => ({
390+
num: p.PartNumber,
391+
size: p.Size,
392+
etag: s3_utils.parse_etag(p.ETag),
393+
last_modified: p.LastModified,
394+
}))
395+
};
303396
}
304397

305398
async complete_object_upload(params, object_sdk) {
306399
dbg.log0('NamespaceGCP.complete_object_upload:', this.bucket, inspect(params));
307-
throw new S3Error(S3Error.NotImplemented);
400+
401+
const res = await this.s3_client.completeMultipartUpload({
402+
Bucket: this.bucket,
403+
Key: params.key,
404+
UploadId: params.obj_id,
405+
MultipartUpload: {
406+
Parts: _.map(params.multiparts, p => ({
407+
PartNumber: p.num,
408+
ETag: `"${p.etag}"`,
409+
}))
410+
}
411+
}).promise();
412+
413+
dbg.log0('NamespaceGCP.complete_object_upload:', this.bucket, inspect(params), 'res', inspect(res));
414+
const etag = s3_utils.parse_etag(res.ETag);
415+
return { etag, version_id: res.VersionId };
308416
}
309417

310418
async abort_object_upload(params, object_sdk) {
311419
dbg.log0('NamespaceGCP.abort_object_upload:', this.bucket, inspect(params));
312-
throw new S3Error(S3Error.NotImplemented);
420+
const res = await this.s3_client.abortMultipartUpload({
421+
Bucket: this.bucket,
422+
Key: params.key,
423+
UploadId: params.obj_id,
424+
}).promise();
425+
426+
dbg.log0('NamespaceGCP.abort_object_upload:', this.bucket, inspect(params), 'res', inspect(res));
313427
}
314428

315429
//////////
@@ -364,13 +478,35 @@ class NamespaceGCP {
364478
////////////////////
365479

366480
async get_object_tagging(params, object_sdk) {
367-
throw new Error('TODO');
481+
dbg.log0('NamespaceGCP.get_object_tagging:', this.bucket, inspect(params));
482+
const obj_tags = (await this.read_object_md(params, object_sdk)).xattr
483+
// Converting tag dictionary to array of key-value object pairs
484+
const tags = Object.entries(obj_tags).map(([key, value]) => ({ key, value }));
485+
return {
486+
tagging: tags
487+
};
368488
}
369489
async delete_object_tagging(params, object_sdk) {
370-
throw new Error('TODO');
490+
dbg.log0('NamespaceGCP.delete_object_tagging:', this.bucket, inspect(params));
491+
try {
492+
// Set an empty metadata object to remove all tags
493+
const res = await this.gcs.bucket(this.bucket).file(params.key).setMetadata({});
494+
dbg.log0('NamespaceGCP.delete_object_tagging:', this.bucket, inspect(params), 'res', inspect(res));
495+
} catch (err) {
496+
dbg.error('NamespaceGCP.delete_object_tagging error:', err);
497+
}
371498
}
372499
async put_object_tagging(params, object_sdk) {
373-
throw new Error('TODO');
500+
dbg.log0('NamespaceGCP.put_object_tagging:', this.bucket, inspect(params));
501+
try {
502+
// Convert the array of key-value object pairs to a dictionary
503+
const tags_to_put = Object.fromEntries(params.tagging.map(tag => ([tag.key, tag.value])));
504+
const res = await this.gcs.bucket(this.bucket).file(params.key).setMetadata({ metadata: tags_to_put });
505+
dbg.log0('NamespaceGCP.put_object_tagging:', this.bucket, inspect(params), 'res', inspect(res));
506+
} catch (err) {
507+
dbg.error('NamespaceGCP.put_object_tagging error:', err);
508+
}
509+
374510
}
375511

376512
///////////////////

0 commit comments

Comments
 (0)