Skip to content

Commit 7bc42c2

Browse files
authored
notifications | lifecycle impl (#8528)
Signed-off-by: Amit Prinz Setter <[email protected]>
1 parent e22503b commit 7bc42c2

File tree

7 files changed

+168
-54
lines changed

7 files changed

+168
-54
lines changed

src/api/common_api.js

+1-2
Original file line numberDiff line numberDiff line change
@@ -1413,10 +1413,9 @@ module.exports = {
14131413
's3:ObjectTagging:*',
14141414
's3:ObjectTagging:Put',
14151415
's3:ObjectTagging:Delete',
1416-
/*We plan to support LifecycleExpiration
14171416
's3:LifecycleExpiration:*',
14181417
's3:LifecycleExpiration:Delete',
1419-
's3:LifecycleExpiration:DeleteMarkerCreated',*/
1418+
's3:LifecycleExpiration:DeleteMarkerCreated',
14201419
],
14211420
}
14221421
}

src/api/object_api.js

+9
Original file line numberDiff line numberDiff line change
@@ -1151,6 +1151,9 @@ module.exports = {
11511151
},
11521152
limit: {
11531153
type: 'integer'
1154+
},
1155+
reply_objects: {
1156+
type: 'boolean'
11541157
}
11551158
}
11561159
},
@@ -1159,6 +1162,12 @@ module.exports = {
11591162
properties: {
11601163
num_objects_deleted: {
11611164
type: 'integer'
1165+
},
1166+
deleted_objects: {
1167+
type: 'array',
1168+
items: {
1169+
$ref: '#/definitions/object_info'
1170+
}
11621171
}
11631172
}
11641173
},

src/endpoint/endpoint.js

+5-5
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ const { NamespaceMonitor } = require('../server/bg_services/namespace_monitor');
4343
const { SemaphoreMonitor } = require('../server/bg_services/semaphore_monitor');
4444
const prom_reporting = require('../server/analytic_services/prometheus_reporting');
4545
const { PersistentLogger } = require('../util/persistent_logger');
46+
const { get_notification_logger } = require('../util/notifications_util');
4647
const NoobaaEvent = require('../manage_nsfs/manage_nsfs_events_utils').NoobaaEvent;
4748
const cluster = /** @type {import('node:cluster').Cluster} */ (
4849
/** @type {unknown} */ (require('node:cluster'))
@@ -139,11 +140,10 @@ async function main(options = {}) {
139140
poll_interval: config.NSFS_GLACIER_LOGS_POLL_INTERVAL,
140141
});
141142

142-
notification_logger = config.NOTIFICATION_LOG_DIR &&
143-
new PersistentLogger(config.NOTIFICATION_LOG_DIR, node_name + '_' + config.NOTIFICATION_LOG_NS, {
144-
locking: 'SHARED',
145-
poll_interval: config.NSFS_GLACIER_LOGS_POLL_INTERVAL,
146-
});
143+
notification_logger = config.NOTIFICATION_LOG_DIR && get_notification_logger(
144+
'SHARED', //shared locking for endpoitns
145+
undefined, //use default namespace based on hostname
146+
config.NSFS_GLACIER_LOGS_POLL_INTERVAL);
147147

148148
process.on('warning', e => dbg.warn(e.stack));
149149

src/endpoint/s3/s3_bucket_logging.js

+2-8
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ const http_utils = require('../../util/http_utils');
66
const dgram = require('node:dgram');
77
const { Buffer } = require('node:buffer');
88
const config = require('../../../config');
9-
const {compose_notification, check_notif_relevant} = require('../../util/notifications_util');
9+
const {compose_notification_req, check_notif_relevant} = require('../../util/notifications_util');
1010

1111
async function send_bucket_op_logs(req, res) {
1212
if (req.params && req.params.bucket &&
@@ -30,13 +30,7 @@ async function send_bucket_op_logs(req, res) {
3030
if (req.notification_logger && bucket_info.notifications) {
3131
for (const notif_conf of bucket_info.notifications) {
3232
if (check_notif_relevant(notif_conf, req)) {
33-
const notif = {
34-
meta: {
35-
connect: notif_conf.Connect,
36-
name: notif_conf.name
37-
},
38-
notif: compose_notification(req, res, bucket_info, notif_conf)
39-
};
33+
const notif = compose_notification_req(req, res, bucket_info, notif_conf);
4034
dbg.log1("logging notif ", notif_conf, ", notif = ", notif);
4135
writes_aggregate.push({
4236
file: req.notification_logger,

src/server/bg_services/lifecycle.js

+50-2
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ const server_rpc = require('../server_rpc');
1010
const system_store = require('../system_services/system_store').get_instance();
1111
const auth_server = require('../common_services/auth_server');
1212
const config = require('../../../config');
13+
const { get_notification_logger, check_notif_relevant,
14+
OP_TO_EVENT, compose_notification_lifecycle } = require('../../util/notifications_util');
1315

1416
function get_expiration_timestamp(expiration) {
1517
if (!expiration) {
@@ -37,7 +39,18 @@ async function handle_bucket_rule(system, rule, j, bucket) {
3739
dbg.log0('LIFECYCLE SKIP bucket:', bucket.name, '(bucket id:', bucket._id, ') rule', util.inspect(rule), 'now', now, 'last_sync', rule.last_sync, 'no expiration');
3840
return;
3941
}
40-
dbg.log0('LIFECYCLE PROCESSING bucket:', bucket.name, '(bucket id:', bucket._id, ') rule', util.inspect(rule));
42+
dbg.log0('LIFECYCLE PROCESSING bucket:', bucket.name.unwrap(), '(bucket id:', bucket._id, ') rule', util.inspect(rule));
43+
44+
//we might need to send notifications for deleted objects, if
45+
//1. notifications are enabled AND
46+
//2. bucket has notifications at all AND
47+
//3. bucket has a relevant notification, either
48+
//3.1. notification is without event filtering OR
49+
//3.2. notification is for LifecycleExpiration event
50+
//if so, we need the metadata of the deleted objects from the object server
51+
const reply_objects = config.NOTIFICATION_LOG_DIR && bucket.notifications &&
52+
_.some(bucket.notifications, notif =>
53+
(!notif.Events || _.some(notif.Events, event => event.includes(OP_TO_EVENT.lifecycle_delete.name))));
4154

4255
const res = await server_rpc.client.object.delete_multiple_objects_by_filter({
4356
bucket: bucket.name,
@@ -47,6 +60,7 @@ async function handle_bucket_rule(system, rule, j, bucket) {
4760
size_greater: rule.filter.object_size_greater_than,
4861
tags: rule.filter.tags,
4962
limit: config.LIFECYCLE_BATCH_SIZE,
63+
reply_objects,
5064
}, {
5165
auth_token: auth_server.make_auth_token({
5266
system_id: system._id,
@@ -55,6 +69,40 @@ async function handle_bucket_rule(system, rule, j, bucket) {
5569
})
5670
});
5771

72+
//dbg.log0("LIFECYCLE PROCESSING res =", res);
73+
74+
if (res.deleted_objects) {
75+
76+
const writes = [];
77+
78+
for (const deleted_obj of res.deleted_objects) {
79+
for (const notif of bucket.notifications) {
80+
if (check_notif_relevant(notif, {
81+
op_name: 'lifecycle_delete',
82+
s3_event_method: deleted_obj.created_delete_marker ? 'DeleteMarkerCreated' : 'Delete',
83+
})) {
84+
//remember that this deletion needs a notif for this specific notification conf
85+
writes.push({notif, deleted_obj});
86+
}
87+
}
88+
}
89+
90+
//if any notifications are needed, write them in notification log file
91+
//(otherwise don't do any unnecessary filesystem actions)
92+
if (writes.length > 0) {
93+
let logger;
94+
try {
95+
logger = get_notification_logger('SHARED');
96+
await P.map_with_concurrency(100, writes, async write => {
97+
const notif = compose_notification_lifecycle(write.deleted_obj, write.notif, bucket);
98+
logger.append(JSON.stringify(notif));
99+
});
100+
} finally {
101+
if (logger) logger.close();
102+
}
103+
}
104+
}
105+
58106
bucket.lifecycle_configuration_rules[j].last_sync = Date.now();
59107
if (res.num_objects_deleted >= config.LIFECYCLE_BATCH_SIZE) should_rerun = true;
60108
dbg.log0('LIFECYCLE Done bucket:', bucket.name, '(bucket id:', bucket._id, ') done deletion of objects per rule',
@@ -78,7 +126,7 @@ async function background_worker() {
78126

79127
const results = await P.all(_.map(bucket.lifecycle_configuration_rules,
80128
async (lifecycle_rule, j) => {
81-
dbg.log0('LIFECYCLE READ BUCKETS configuration handle_bucket_rule bucket name:', bucket.name, "rule", lifecycle_rule, 'j', j);
129+
dbg.log0('LIFECYCLE READ BUCKETS configuration handle_bucket_rule bucket name:', bucket.name.unwrap(), "rule", lifecycle_rule, 'j', j);
82130
return handle_bucket_rule(system, lifecycle_rule, j, bucket);
83131
}
84132
));

src/server/object_services/object_server.js

+11-1
Original file line numberDiff line numberDiff line change
@@ -949,6 +949,7 @@ async function delete_multiple_objects_by_filter(req) {
949949
dbg.log1(`delete_multiple_objects_by_filter: bucket=${req.bucket.name} filter=${util.inspect(req.rpc_params)}`);
950950
const key = new RegExp('^' + _.escapeRegExp(req.rpc_params.prefix));
951951
const bucket_id = req.bucket._id;
952+
const reply_objects = req.rpc_params.reply_objects;
952953
// TODO: change it to perform changes in batch. Won't scale.
953954
const query = {
954955
bucket_id,
@@ -972,7 +973,16 @@ async function delete_multiple_objects_by_filter(req) {
972973
}))
973974
}
974975
}));
975-
return { num_objects_deleted: objects.length };
976+
977+
const reply = { num_objects_deleted: objects.length };
978+
if (reply_objects) {
979+
//reply needs to include deleted objects
980+
//(this is used for LifecycleExpiratoin event notifications)
981+
//so map the md into (api friendly) object info
982+
reply.deleted_objects = _.map(objects, get_object_info);
983+
}
984+
985+
return reply;
976986
}
977987

978988
/**

src/util/notifications_util.js

+90-36
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ const OP_TO_EVENT = Object.freeze({
2222
put_object_acl: { name: 'ObjectAcl' },
2323
put_object_tagging: { name: 'ObjectTagging' },
2424
delete_object_tagging: { name: 'ObjectTagging' },
25+
lifecycle_delete: { name: 'LifecycleExpiration' },
2526
});
2627

2728
class Notificator {
@@ -86,7 +87,7 @@ class Notificator {
8687
seen_nodes.add(node_namespace);
8788
}
8889
dbg.log1("process_notification_files node_namespace =", node_namespace, ", file =", entry.name);
89-
const log = new PersistentLogger(config.NOTIFICATION_LOG_DIR, node_namespace, { locking: 'EXCLUSIVE' });
90+
const log = get_notification_logger('EXCLUSIVE', node_namespace);
9091
try {
9192
await log.process(async (file, failure_append) => await this._notify(this.fs_context, file, failure_append));
9293
} catch (err) {
@@ -310,52 +311,66 @@ async function test_notifications(bucket) {
310311
}
311312
}
312313

313-
//see https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-content-structure.html
314-
function compose_notification(req, res, bucket, notif_conf) {
315-
let eTag = res.getHeader('ETag');
316-
//eslint-disable-next-line
317-
if (eTag && eTag.startsWith('\"') && eTag.endsWith('\"')) {
318-
eTag = eTag.substring(2, eTag.length - 2);
319-
}
314+
function compose_notification_base(notif_conf, bucket, req) {
320315

321-
const event = OP_TO_EVENT[req.op_name];
322-
const http_verb_capitalized = req.method.charAt(0).toUpperCase() + req.method.slice(1).toLowerCase();
323316
const event_time = new Date();
324317

325318
const notif = {
326319
eventVersion: '2.3',
327320
eventSource: _get_system_name(req) + ':s3',
328321
eventTime: event_time.toISOString(),
329-
eventName: event.name + ':' + (event.method || req.s3_event_method || http_verb_capitalized),
330-
userIdentity: {
331-
principalId: req.object_sdk.requesting_account.name,
332-
},
333-
requestParameters: {
334-
sourceIPAddress: http_utils.parse_client_ip(req),
335-
},
336-
responseElements: {
337-
"x-amz-request-id": req.request_id,
338-
"x-amz-id-2": req.request_id,
339-
},
322+
340323
s3: {
341324
s3SchemaVersion: "1.0",
342325
configurationId: notif_conf.name,
326+
object: {
327+
//default for sequencer, overriden in compose_notification_req for noobaa ns
328+
sequencer: event_time.getTime().toString(16),
329+
},
343330
bucket: {
344331
name: bucket.name,
345332
ownerIdentity: {
346-
principalId: bucket.bucket_owner.unwrap(),
333+
//buckets from s3 reqs are sdk-style, from lifcycle are "raw" system store object
334+
principalId: bucket.bucket_owner ? bucket.bucket_owner.unwrap() : bucket.owner_account.name.unwrap(),
347335
},
348336
arn: "arn:aws:s3:::" + bucket.name,
349337
},
350-
object: {
351-
key: req.params.key,
352-
size: res.getHeader('content-length'),
353-
eTag,
354-
versionId: res.getHeader('x-amz-version-id'),
355-
},
356338
}
357339
};
358340

341+
return notif;
342+
343+
}
344+
345+
//see https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-content-structure.html
346+
function compose_notification_req(req, res, bucket, notif_conf) {
347+
let eTag = res.getHeader('ETag');
348+
//eslint-disable-next-line
349+
if (eTag && eTag.startsWith('\"') && eTag.endsWith('\"')) {
350+
eTag = eTag.substring(2, eTag.length - 2);
351+
}
352+
353+
const event = OP_TO_EVENT[req.op_name];
354+
const http_verb_capitalized = req.method.charAt(0).toUpperCase() + req.method.slice(1).toLowerCase();
355+
356+
const notif = compose_notification_base(notif_conf, bucket, req);
357+
358+
notif.eventName = event.name + ':' + (event.method || req.s3_event_method || http_verb_capitalized);
359+
notif.userIdentity = {
360+
principalId: req.object_sdk.requesting_account.name,
361+
};
362+
notif.requestParameters = {
363+
sourceIPAddress: http_utils.parse_client_ip(req),
364+
};
365+
notif.responseElements = {
366+
"x-amz-request-id": req.request_id,
367+
"x-amz-id-2": req.request_id,
368+
};
369+
notif.s3.object.key = req.params.key;
370+
notif.s3.object.size = res.getHeader('content-length');
371+
notif.s3.object.eTag = eTag;
372+
notif.s3.object.versionId = res.getHeader('x-amz-version-id');
373+
359374
//handle glacierEventData
360375
if (res.restore_object_result) {
361376
notif.glacierEventData = {
@@ -370,21 +385,41 @@ function compose_notification(req, res, bucket, notif_conf) {
370385
if (res.seq) {
371386
//in noobaa-ns we have a sequence from db
372387
notif.s3.object.sequencer = res.seq;
373-
} else {
374-
//fallback to time-based sequence
375-
notif.s3.object.sequencer = event_time.getTime().toString(16);
376388
}
377389

378-
const records = [notif];
379-
380-
return {Records: records};
390+
return compose_meta(notif, notif_conf);
381391
}
382392

393+
function compose_notification_lifecycle(deleted_obj, notif_conf, bucket) {
383394

395+
const notif = compose_notification_base(notif_conf, bucket);
396+
397+
notif.eventName = OP_TO_EVENT.lifecycle_delete.name + ':' +
398+
(deleted_obj.created_delete_marker ? 'DeleteMarkerCreated' : 'Delete');
399+
notif.s3.object.key = deleted_obj.key;
400+
notif.s3.object.size = deleted_obj.size;
401+
notif.s3.object.eTag = deleted_obj.etag;
402+
notif.s3.object.versionId = deleted_obj.version_id;
403+
404+
return compose_meta(notif, notif_conf);
405+
406+
}
407+
408+
function compose_meta(record, notif_conf) {
409+
return {
410+
meta: {
411+
connect: notif_conf.Connect,
412+
name: notif_conf.Id
413+
},
414+
notif: {
415+
Records: [record],
416+
}
417+
};
418+
}
384419

385420
function _get_system_name(req) {
386421

387-
if (req.object_sdk.nsfs_config_root) {
422+
if (req && req.object_sdk && req.object_sdk.nsfs_system) {
388423
const name = Object.keys(req.object_sdk.nsfs_system)[0];
389424
return name;
390425
} else {
@@ -427,7 +462,26 @@ function check_notif_relevant(notif, req) {
427462
return false;
428463
}
429464

465+
/**
466+
*
467+
* @param {"SHARED" | "EXCLUSIVE"} locking counterintuitively, either 'SHARED' for writing or 'EXCLUSIVE' for reading
468+
*/
469+
function get_notification_logger(locking, namespace, poll_interval) {
470+
if (!namespace) {
471+
const node_name = process.env.NODE_NAME || os.hostname();
472+
namespace = node_name + '_' + config.NOTIFICATION_LOG_NS;
473+
}
474+
475+
return new PersistentLogger(config.NOTIFICATION_LOG_DIR, namespace, {
476+
locking,
477+
poll_interval,
478+
});
479+
}
480+
430481
exports.Notificator = Notificator;
431482
exports.test_notifications = test_notifications;
432-
exports.compose_notification = compose_notification;
483+
exports.compose_notification_req = compose_notification_req;
484+
exports.compose_notification_lifecycle = compose_notification_lifecycle;
433485
exports.check_notif_relevant = check_notif_relevant;
486+
exports.get_notification_logger = get_notification_logger;
487+
exports.OP_TO_EVENT = OP_TO_EVENT;

0 commit comments

Comments
 (0)