Skip to content

Commit 033938e

Browse files
raft apis
1 parent e0c6297 commit 033938e

File tree

2 files changed

+116
-77
lines changed

2 files changed

+116
-77
lines changed

extensions/lifecycle/tasks/LifecycleDeleteObjectTask.js

Lines changed: 29 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@ const ObjectMD = require('arsenal').models.ObjectMD;
55
const BackbeatTask = require('../../../lib/tasks/BackbeatTask');
66
const { attachReqUids } = require('../../../lib/clients/utils');
77
const { LifecycleMetrics } = require('../LifecycleMetrics');
8+
const {
9+
DeleteObjectFromExpirationCommand
10+
} = require('../../../lib/clients/smithy/build/smithy/source/typescript-codegen');
811
/** @typedef { import('../objectProcessor/LifecycleObjectProcessor.js') } LifecycleObjectProcessor */
912

1013
class ObjectLockedError extends Error {}
@@ -140,26 +143,33 @@ class LifecycleDeleteObjectTask extends BackbeatTask {
140143
LifecycleMetrics.onLifecycleStarted(log,
141144
'expiration',
142145
location, startTime - transitionTime);
143-
const req = client.deleteObjectFromExpiration(reqParams);
144-
attachReqUids(req, log);
145-
req.send(err => {
146-
if (err?.statusCode === errors.MethodNotAllowed.code) {
147-
log.warn('deleteObjectFromExpiration API not supported, falling back to deleteObject',
148-
logDetails);
149-
// fallback to s3 deleteObject when using a cloudserver that
150-
// doesn't support deleteObjectFromExpiration
151-
const s3Client = this.getS3Client(accountId);
152-
if (!s3Client) {
153-
log.error('failed to get s3 client', logDetails);
154-
return done(errors.InternalError
155-
.customizeDescription('Unable to obtain client'));
156-
}
157-
const s3Req = s3Client.deleteObject(reqParams);
158-
attachReqUids(s3Req, log);
159-
return s3Req.send(done);
160-
}
161-
return done(err);
146+
const command = new DeleteObjectFromExpirationCommand({
147+
...reqParams,
148+
RequestUids: log.getSerializedUids(),
162149
});
150+
151+
return client.send(command)
152+
.then(() => {
153+
done(null);
154+
})
155+
.catch(err => {
156+
if (err?.$metadata?.httpStatusCode === errors.MethodNotAllowed.code) {
157+
log.warn('deleteObjectFromExpiration API not supported, falling back to deleteObject',
158+
logDetails);
159+
// fallback to s3 deleteObject when using a cloudserver that
160+
// doesn't support deleteObjectFromExpiration
161+
const s3Client = this.getS3Client(accountId);
162+
if (!s3Client) {
163+
log.error('failed to get s3 client', logDetails);
164+
return done(errors.InternalError
165+
.customizeDescription('Unable to obtain client'));
166+
}
167+
const s3Req = s3Client.deleteObject(reqParams);
168+
attachReqUids(s3Req, log);
169+
return s3Req.send(done);
170+
}
171+
return done(err);
172+
});
163173
}
164174

165175
_executeDelete(entry, startTime, log, done) {

lib/queuePopulator/IngestionProducer.js

Lines changed: 87 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,14 @@ const { attachReqUids } = require('../clients/utils');
1414
const RaftLogEntry = require('../models/RaftLogEntry');
1515
const IngestionPopulatorMetrics = require('./IngestionPopulatorMetrics');
1616
const { http: HttpAgent, https: HttpsAgent } = require('httpagent');
17+
const {
18+
CloudserverClient,
19+
CloudserverClientConfig,
20+
GetRaftIdCommand,
21+
GetRaftLogCommand,
22+
GetRaftBucketsCommand,
23+
GetBucketCseqCommand
24+
} = require('../clients/smithy/build/smithy/source/typescript-codegen');
1725

1826
class ListRecordStream extends stream.Transform {
1927
constructor(logger) {
@@ -90,7 +98,6 @@ class IngestionProducer {
9098
httpAgent: this._createHTTPAgent(protocol),
9199
requestTimeout: 0,
92100
};
93-
const creds = this._createCredentials(log)
94101
const config = new CloudserverClientConfig({
95102
endpoint: endpoint,
96103
credentials: {
@@ -127,13 +134,23 @@ class IngestionProducer {
127134
* @return {number} the raftId that has logs for the bucket
128135
*/
129136
getRaftId(bucketName, done) {
130-
const req = this._ringReader.getRaftId({
137+
const command = new GetRaftIdCommand({
131138
Bucket: bucketName,
139+
RequestUids: this.requestLogger.getSerializedUids(),
132140
});
133141

134-
attachReqUids(req, this.requestLogger);
135-
req.send((err, data) => {
136-
if (err) {
142+
return this._ringReader.send(command)
143+
.then(data => {
144+
if (data && data[0]) {
145+
IngestionPopulatorMetrics.onIngestionSourceOp('getRaftId', 'success');
146+
return done(null, data[0]);
147+
}
148+
this.log.error(`empty response for raftid of ${bucketName}`,
149+
{ method: 'getRaftId', bucketName });
150+
IngestionPopulatorMetrics.onIngestionSourceOp('getRaftId', 'error');
151+
return done(errors.InternalError);
152+
})
153+
.catch(err => {
137154
this.log.error(`could not find bucket ${bucketName} in any` +
138155
' raft session', {
139156
method: 'IngestionProducer.getRaftId',
@@ -142,15 +159,7 @@ class IngestionProducer {
142159
});
143160
IngestionPopulatorMetrics.onIngestionSourceOp('getRaftId', 'error');
144161
return done(err);
145-
} else if (data && data[0]) {
146-
IngestionPopulatorMetrics.onIngestionSourceOp('getRaftId', 'success');
147-
return done(null, data[0]);
148-
}
149-
this.log.error(`empty response for raftid of ${bucketName}`,
150-
{ method: 'getRaftId', bucketName });
151-
IngestionPopulatorMetrics.onIngestionSourceOp('getRaftId', 'error');
152-
return done(errors.InternalError);
153-
});
162+
});
154163
}
155164

156165
/**
@@ -212,18 +221,20 @@ class IngestionProducer {
212221
], done);
213222
}
214223

224+
// TODO : review streaming logic..
215225
getRaftLog(raftId, begin, limit, targetLeader, done) {
216226
const recordStream = new ListRecordStream(this.log);
227+
// TODO : double check recordStream
217228
recordStream.on('error', err => {
218-
if (err.statusCode === 404) {
229+
if (err.$metadata?.httpStatusCode === 404) {
219230
// no such raft session, log and ignore
220231
this.log.warn('raft session does not exist',
221232
{ raftId: this.raftId, method:
222233
'IngestionProducer.getRaftLog' });
223234
return done(null, { info: { start: null,
224235
end: null } });
225236
}
226-
if (err.statusCode === 416) {
237+
if (err.$metadata?.httpStatusCode === 416) {
227238
// requested range not satisfiable
228239
this.log.debug('no new log records to ' +
229240
'process', {
@@ -237,28 +248,40 @@ class IngestionProducer {
237248
{ error: err.message });
238249
return done(errors.InternalError);
239250
});
240-
const req = this._ringReader.getRaftLog({
251+
252+
const command = new GetRaftLogCommand({
241253
LogId: raftId.toString(),
242254
Begin: begin,
243255
Limit: limit,
244256
TargetLeader: targetLeader,
257+
RequestUids: this.requestLogger.getSerializedUids(),
245258
});
246-
attachReqUids(req, this.requestLogger);
247-
// TODO : review all createReadStream calls
248-
const readStream = req.createReadStream();
249-
const jsonResponse = readStream.pipe(jsonStream.parse('log.*'));
250-
jsonResponse.pipe(recordStream);
251-
readStream.on('error', err => recordStream.emit('error', err));
252-
jsonResponse
253-
.on('header', header => {
254-
recordStream.removeAllListeners('error');
255-
return done(null, {
256-
info: header.info,
257-
log: recordStream,
258-
});
259+
260+
// Note: Smithy client streaming might work differently than AWS SDK v2
261+
// This may need adjustment based on the actual implementation
262+
return this._ringReader.send(command)
263+
.then(response => {
264+
// The response should contain a stream or similar data
265+
// This is a simplified version - actual streaming implementation may differ
266+
if (response && response.Body) {
267+
const readStream = response.Body;
268+
const jsonResponse = readStream.pipe(jsonStream.parse('log.*'));
269+
jsonResponse.pipe(recordStream);
270+
readStream.on('error', err => recordStream.emit('error', err));
271+
jsonResponse
272+
.on('header', header => {
273+
recordStream.removeAllListeners('error');
274+
return done(null, {
275+
info: header.info,
276+
log: recordStream,
277+
});
278+
})
279+
.on('error', err => recordStream.emit('error', err));
280+
} else {
281+
return done(null, { info: { start: null, end: null } });
282+
}
259283
})
260-
.on('error', err => recordStream.emit('error', err));
261-
return undefined;
284+
.catch(err => recordStream.emit('error', err));
262285
}
263286

264287
/**
@@ -270,22 +293,23 @@ class IngestionProducer {
270293
* @return {Object} list of keys that correspond to list of buckets
271294
*/
272295
_getBuckets(raftId, done) {
273-
const req = this._ringReader.getRaftBuckets({
296+
const command = new GetRaftBucketsCommand({
274297
LogId: raftId,
298+
RequestUids: this.requestLogger.getSerializedUids(),
275299
});
276300

277-
attachReqUids(req, this.requestLogger);
278-
req.send((err, data) => {
279-
if (err) {
301+
return this._ringReader.send(command)
302+
.then(data => {
303+
const bucketList = Object.keys(data).map(index => data[index]);
304+
IngestionPopulatorMetrics.onIngestionSourceOp('getBuckets', 'success');
305+
return done(null, bucketList);
306+
})
307+
.catch(err => {
280308
this.log.error('error getting list of buckets', {
281309
method: 'IngestionProducer._getBuckets', err });
282310
IngestionPopulatorMetrics.onIngestionSourceOp('getBuckets', 'error');
283311
return done(err);
284-
}
285-
const bucketList = Object.keys(data).map(index => data[index]);
286-
IngestionPopulatorMetrics.onIngestionSourceOp('getBuckets', 'success');
287-
return done(null, bucketList);
288-
});
312+
});
289313
}
290314

291315
/**
@@ -459,30 +483,35 @@ class IngestionProducer {
459483
* @return {undefined}
460484
*/
461485
_getBucketCseq(bucket, done) {
462-
return this._ringReader.getBucketCseq({ Bucket: bucket },
463-
(err, data) => {
464-
if (err) {
486+
const command = new GetBucketCseqCommand({
487+
Bucket: bucket
488+
});
489+
490+
return this._ringReader.send(command)
491+
.then(data => {
492+
// TODO : double check, data.CseqInfo[0].cseq vs data.cseq[0]
493+
if (!data || !data[0] || !data[0].cseq) {
494+
this.log.error('could not get cseq data or data is malformed', {
495+
method: 'IngestionProducer._getBucketCseq',
496+
bucket,
497+
data,
498+
});
499+
IngestionPopulatorMetrics.onIngestionSourceOp('getBucketCseq', 'error');
500+
return done(errors.InternalError);
501+
}
502+
IngestionPopulatorMetrics.onIngestionSourceOp('getBucketCseq', 'success');
503+
// cseq returned by all nodes. Just return the first node response
504+
return done(null, data.CseqInfo[0].cseq);
505+
})
506+
.catch(err => {
465507
this.log.error('error getting bucket cseq', {
466508
method: 'IngestionProducer._getBucketCseq',
467509
error: err,
468510
bucket,
469511
});
470512
IngestionPopulatorMetrics.onIngestionSourceOp('getBucketCseq', 'error');
471513
return done(err);
472-
}
473-
if (!data || !data[0] || !data[0].cseq) {
474-
this.log.error('could not get cseq data or data is malformed', {
475-
method: 'IngestionProducer._getBucketCseq',
476-
bucket,
477-
data,
478-
});
479-
IngestionPopulatorMetrics.onIngestionSourceOp('getBucketCseq', 'error');
480-
return done(errors.InternalError);
481-
}
482-
IngestionPopulatorMetrics.onIngestionSourceOp('getBucketCseq', 'success');
483-
// cseq returned by all nodes. Just return the first node response
484-
return done(null, data[0].cseq);
485-
});
514+
});
486515
}
487516
}
488517

0 commit comments

Comments
 (0)