Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 25 additions & 13 deletions extensions/lifecycle/LifecycleMetrics.js
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,11 @@ const lifecycleKafkaPublish = {
};

class LifecycleMetrics {
static handleError(log, err, method) {
static handleError(log, err, method, params = {}) {
if (log) {
log.error('failed to update prometheus metrics', { error: Object.assign({}, err), method });
log.error('failed to update prometheus metrics', {
error: err.toString(), method, ...params
});
}
}

Expand Down Expand Up @@ -156,15 +158,15 @@ class LifecycleMetrics {
try {
lifecycleActiveIndexingJobs.set({ origin: 'conductor' }, count);
} catch (err) {
LifecycleMetrics.handleError(log, err, 'LifecycleMetrics.onActiveIndexingJobs');
LifecycleMetrics.handleError(log, err, 'LifecycleMetrics.onActiveIndexingJobs', { count });
}
}

static onLegacyTask(log, status) {
try {
lifecycleLegacyTask.inc({ origin: 'conductor', status });
} catch (err) {
LifecycleMetrics.handleError(log, err, 'LifecycleMetrics.onLegacyTask');
LifecycleMetrics.handleError(log, err, 'LifecycleMetrics.onLegacyTask', { status });
}
}

Expand All @@ -176,7 +178,9 @@ class LifecycleMetrics {
[LIFECYCLE_LABEL_LOCATION]: location,
}, latencyMs / 1000);
} catch (err) {
LifecycleMetrics.handleError(log, err, 'LifecycleMetrics.onLifecycleTriggered');
LifecycleMetrics.handleError(log, err, 'LifecycleMetrics.onLifecycleTriggered', {
process, type, location, latencyMs,
});
}
}

Expand All @@ -187,7 +191,9 @@ class LifecycleMetrics {
[LIFECYCLE_LABEL_LOCATION]: location,
}, durationMs / 1000);
} catch (err) {
LifecycleMetrics.handleError(log, err, 'LifecycleMetrics.onLifecycleStarted');
LifecycleMetrics.handleError(log, err, 'LifecycleMetrics.onLifecycleStarted', {
type, location, durationMs,
});
}
}

Expand All @@ -203,31 +209,37 @@ class LifecycleMetrics {
[LIFECYCLE_LABEL_LOCATION]: location,
}, new Date().getTime());
} catch (err) {
LifecycleMetrics.handleError(log, err, 'LifecycleMetrics.onLifecycleCompleted');
LifecycleMetrics.handleError(log, err, 'LifecycleMetrics.onLifecycleCompleted', {
type, location, durationMs,
});
}
}

static onS3Request(log, op, process, err) {
const statusCode = err && err.statusCode ? err.statusCode : '200';
static onS3Request(log, op, process, s3Err) {
const statusCode = s3Err && s3Err.statusCode ? s3Err.statusCode : '200';
try {
lifecycleS3Operations.inc({
[LIFECYCLE_LABEL_ORIGIN]: process,
[LIFECYCLE_LABEL_OP]: op,
[LIFECYCLE_LABEL_STATUS]: statusCode,
});
} catch (err) {
LifecycleMetrics.handleError(log, err, 'LifecycleMetrics.onS3Request');
LifecycleMetrics.handleError(log, err, 'LifecycleMetrics.onS3Request', {
op, process, statusCode,
});
}
}

static onKafkaPublish(log, op, process, err, count) {
static onKafkaPublish(log, op, process, kafkaErr, count) {
try {
lifecycleKafkaPublish[err ? 'error' : 'success'].inc({
lifecycleKafkaPublish[kafkaErr ? 'error' : 'success'].inc({
[LIFECYCLE_LABEL_ORIGIN]: process,
[LIFECYCLE_LABEL_OP]: op,
}, count);
} catch (err) {
LifecycleMetrics.handleError(log, err, 'LifecycleMetrics.onKafkaPublish');
LifecycleMetrics.handleError(log, err, 'LifecycleMetrics.onKafkaPublish', {
op, process, count, kafkaErr,
});
}
}
}
Expand Down
21 changes: 9 additions & 12 deletions extensions/lifecycle/tasks/LifecycleTask.js
Original file line number Diff line number Diff line change
Expand Up @@ -1425,19 +1425,14 @@ class LifecycleTask extends BackbeatTask {
new Date(deleteMarker.LastModified)
);

const eodm = rules.Expiration &&
rules.Expiration.ExpiredObjectDeleteMarker;

// Backbeat performs automatic ExpiredObjectDeleteMarker cleanup
// for compatibility with Amazon S3,
// - either when the delete markers meet the age criteria
// - or when the ExpiredObjectDeleteMarker tag is set to true.
const applicableExpRule = rules.Expiration && (
(rules.Expiration.Days !== undefined
&& daysSinceInitiated >= rules.Expiration.Days)
|| (rules.Expiration.Date !== undefined
&& rules.Expiration.Date < Date.now())
|| eodm === true
(rules.Expiration.Days !== undefined && daysSinceInitiated >= rules.Expiration.Days)
|| (rules.Expiration.Date !== undefined && rules.Expiration.Date < Date.now())
|| rules.Expiration.ExpiredObjectDeleteMarker
);

// if there are no other versions with the same Key as this DM and
Expand All @@ -1454,10 +1449,11 @@ class LifecycleTask extends BackbeatTask {
.setAttribute('target.key', deleteMarker.Key)
.setAttribute('target.accountId', bucketData.target.accountId)
.setAttribute('target.version', deleteMarker.VersionId)
.setAttribute('transitionTime',
this._lifecycleDateTime.getTransitionTimestamp(
rules.Expiration, deleteMarker.LastModified)
);
// EODM applies only once all other versions have been deleted, so transition
// time is not `lastModified`, but the time when the "last" version was
// removed. We still need to pass a transitionTime for metrics though, so
// just use the current time to avoid alert for slow/stale lifecycle.
.setAttribute('transitionTime', Date.now());
this._sendObjectAction(entry, err => {
if (!err) {
log.debug('sent object entry for consumption',
Expand Down Expand Up @@ -1520,6 +1516,7 @@ class LifecycleTask extends BackbeatTask {
.setAttribute('target.key', verToExpire.Key)
.setAttribute('target.version', verToExpire.VersionId)
.setAttribute('details.dataStoreName', verToExpire.StorageClass || '')
// details.lastModified is not set for NCVE...

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is NCVE ? Non current version expiration ?

Copy link
Contributor

@SylvainSenechal SylvainSenechal Oct 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Guess so, but is this comment a TODO ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, it just (tries to) make it explicit we do not set details.lastModified in this specific case, contrary to all other similar places where we send a message to object processor

.setAttribute('transitionTime',
this._lifecycleDateTime.getTransitionTimestamp(
{ Days: rules[ncve][ncd] }, staleDate)
Expand Down
173 changes: 173 additions & 0 deletions tests/unit/lifecycle/LifecycleMetrics.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
const assert = require('assert');
const sinon = require('sinon');
const { LifecycleMetrics } = require('../../../extensions/lifecycle/LifecycleMetrics');
const { ZenkoMetrics } = require('arsenal').metrics;

describe('LifecycleMetrics', () => {
let log;

beforeEach(() => {
log = {
error: sinon.stub(),
};
});

afterEach(() => {
sinon.restore();
});

describe('error handling', () => {
it('should catch errors in onProcessBuckets', () => {
const metric = ZenkoMetrics.getMetric('s3_lifecycle_latest_batch_start_time');
sinon.stub(metric, 'set').throws(new Error('Metric error'));

LifecycleMetrics.onProcessBuckets(log);

assert(log.error.calledOnce);
assert(log.error.calledWithMatch('failed to update prometheus metrics', {
method: 'LifecycleMetrics.onProcessBuckets',
}));
});

it('should catch errors in onBucketListing', () => {
const metric = ZenkoMetrics.getMetric('s3_lifecycle_conductor_bucket_list_success_total');
sinon.stub(metric, 'inc').throws(new Error('Metric error'));

LifecycleMetrics.onBucketListing(log, null);

assert(log.error.calledOnce);
assert(log.error.calledWithMatch('failed to update prometheus metrics', {
method: 'LifecycleMetrics.onBucketListing',
}));
});

it('should catch errors in onActiveIndexingJobsFailed', () => {
const metric = ZenkoMetrics.getMetric('s3_lifecycle_active_indexing_jobs');
sinon.stub(metric, 'reset').throws(new Error('Metric error'));

LifecycleMetrics.onActiveIndexingJobsFailed(log);

assert(log.error.calledOnce);
assert(log.error.calledWithMatch('failed to update prometheus metrics', {
method: 'LifecycleMetrics.onActiveIndexingJobsFailed',
}));
});

it('should catch errors in onActiveIndexingJobs', () => {
const metric = ZenkoMetrics.getMetric('s3_lifecycle_active_indexing_jobs');
sinon.stub(metric, 'set').throws(new Error('Metric error'));

LifecycleMetrics.onActiveIndexingJobs(log, 12);

assert(log.error.calledOnce);
assert(log.error.calledWithMatch('failed to update prometheus metrics', {
method: 'LifecycleMetrics.onActiveIndexingJobs',
count: 12,
}));
});

it('should catch errors in onLegacyTask', () => {
const metric = ZenkoMetrics.getMetric('s3_lifecycle_legacy_tasks_total');
sinon.stub(metric, 'inc').throws(new Error('Metric error'));

LifecycleMetrics.onLegacyTask(log, 'success');

assert(log.error.calledOnce);
assert(log.error.calledWithMatch('failed to update prometheus metrics', {
method: 'LifecycleMetrics.onLegacyTask',
status: 'success',
}));
});

it('should catch errors in onLifecycleTriggered', () => {
LifecycleMetrics.onLifecycleTriggered(log, 'conductor', 'expiration', 'us-east-1', NaN);

assert(log.error.calledOnce);
assert(log.error.calledWithMatch('failed to update prometheus metrics', {
method: 'LifecycleMetrics.onLifecycleTriggered',
process: 'conductor',
type: 'expiration',
location: 'us-east-1',
latencyMs: NaN,
}));
});

it('should catch errors in onLifecycleStarted', () => {
LifecycleMetrics.onLifecycleStarted(log, 'transition', 'us-west-2', NaN);

assert(log.error.calledOnce);
assert(log.error.calledWithMatch('failed to update prometheus metrics', {
method: 'LifecycleMetrics.onLifecycleStarted',
type: 'transition',
location: 'us-west-2',
durationMs: NaN,
}));
});

it('should catch errors in onLifecycleCompleted', () => {
const metric = ZenkoMetrics.getMetric('s3_lifecycle_duration_seconds');
sinon.stub(metric, 'observe').throws(new Error('Metric error'));

LifecycleMetrics.onLifecycleCompleted(log, 'expiration', 'eu-west-1', NaN);

assert(log.error.calledOnce);
assert(log.error.calledWithMatch('failed to update prometheus metrics', {
method: 'LifecycleMetrics.onLifecycleCompleted',
}));
});

it('should catch errors in onS3Request', () => {
const metric = ZenkoMetrics.getMetric('s3_lifecycle_s3_operations_total');
sinon.stub(metric, 'inc').throws(new Error('Metric error'));

LifecycleMetrics.onS3Request(log, 'deleteObject', 'processor', null);

assert(log.error.calledOnce);
assert(log.error.calledWithMatch('failed to update prometheus metrics', {
method: 'LifecycleMetrics.onS3Request',
op: 'deleteObject',
process: 'processor',
statusCode: '200',
}));
});

it('should pass err.statusCode in onS3Request', () => {
const metric = ZenkoMetrics.getMetric('s3_lifecycle_s3_operations_total');
const incStub = sinon.stub(metric, 'inc');

const fakeError = new Error('S3 Error');
fakeError.statusCode = '503';

LifecycleMetrics.onS3Request(log, 'putObject', 'conductor', fakeError);

assert(incStub.calledOnce);
assert(incStub.calledWithMatch({ op: 'putObject', origin: 'conductor', status: '503' }));
assert(log.error.notCalled);
});

it('should pass 200 in onS3Request when no error', () => {
const metric = ZenkoMetrics.getMetric('s3_lifecycle_s3_operations_total');
const incStub = sinon.stub(metric, 'inc');

LifecycleMetrics.onS3Request(log, 'putObject', 'conductor', null);

assert(incStub.calledOnce);
assert(incStub.calledWithMatch({ op: 'putObject', origin: 'conductor', status: '200' }));
assert(log.error.notCalled);
});

it('should catch errors in onKafkaPublish with NaN count', () => {
const metric = ZenkoMetrics.getMetric('s3_lifecycle_kafka_publish_success_total');
sinon.stub(metric, 'inc').throws(new Error('Invalid value'));

LifecycleMetrics.onKafkaPublish(log, 'publish', 'conductor', null, 1);

assert(log.error.calledOnce);
assert(log.error.calledWithMatch('failed to update prometheus metrics', {
method: 'LifecycleMetrics.onKafkaPublish',
op: 'publish',
process: 'conductor',
}));
});
});
});
10 changes: 10 additions & 0 deletions tests/unit/lifecycle/LifecycleTask.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,12 @@ describe('lifecycle task helper methods', () => {
});
});

const assertNow = time => {
assert(time, 'transitionTime is not set');
const now = Date.now();
assert.ok(time <= now && time >= now - 1000, 'transitionTime is not the current time');
};

it('should send any entry to Kafka when delete marker meets the age criteria and ' +
'ExpiredObjectDeleteMarker is not set', () => {
const rules = {
Expand All @@ -545,6 +551,7 @@ describe('lifecycle task helper methods', () => {
assert.strictEqual(latestEntry.getActionType(), 'deleteObject');
assert.deepStrictEqual(
latestEntry.getAttribute('target'), expectedTarget);
assertNow(latestEntry.getAttribute('transitionTime'));
});
});

Expand All @@ -567,6 +574,7 @@ describe('lifecycle task helper methods', () => {
assert.strictEqual(latestEntry.getActionType(), 'deleteObject');
assert.deepStrictEqual(
latestEntry.getAttribute('target'), expectedTarget);
assertNow(latestEntry.getAttribute('transitionTime'));
});
});

Expand All @@ -588,6 +596,7 @@ describe('lifecycle task helper methods', () => {
assert.strictEqual(latestEntry.getActionType(), 'deleteObject');
assert.deepStrictEqual(
latestEntry.getAttribute('target'), expectedTarget);
assertNow(latestEntry.getAttribute('transitionTime'));
});
});

Expand All @@ -610,6 +619,7 @@ describe('lifecycle task helper methods', () => {
assert.strictEqual(latestEntry.getActionType(), 'deleteObject');
assert.deepStrictEqual(
latestEntry.getAttribute('target'), expectedTarget);
assertNow(latestEntry.getAttribute('transitionTime'));
});
});

Expand Down
Loading