Skip to content

Commit 5d5ee14

Browse files
committed
bucket notifications - kafka topic notation
Signed-off-by: Amit Prinz Setter <[email protected]>
1 parent 2dfcbfb commit 5d5ee14

File tree

2 files changed

+16
-42
lines changed

2 files changed

+16
-42
lines changed

src/test/unit_tests/test_notifications.js

-31
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@ let expected_event_name;
5555
let expected_key;
5656
let expected_eTag;
5757
let expect_test;
58-
let expected_url;
5958

6059
// eslint-disable-next-line max-lines-per-function
6160
mocha.describe('notifications', function() {
@@ -96,7 +95,6 @@ mocha.describe('notifications', function() {
9695
assert.strictEqual(notif.Records[0].Event, "s3:TestEvent", 'wrong event name in notification');
9796
expect_test = false;
9897
} else {
99-
assert.strictEqual(req.url, expected_url);
10098
assert.strictEqual(notif.Records[0].s3.bucket.name, expected_bucket, 'wrong bucket name in notification');
10199
assert.strictEqual(notif.Records[0].eventName, expected_event_name, 'wrong event name in notification');
102100
assert.strictEqual(notif.Records[0].s3.object.key, expected_key, 'wrong key in notification');
@@ -237,34 +235,6 @@ mocha.describe('notifications', function() {
237235
});
238236
});
239237

240-
mocha.it('override connection', async () => {
241-
await s3.putBucketNotificationConfiguration({
242-
Bucket: bucket,
243-
NotificationConfiguration: {
244-
TopicConfigurations: [{
245-
"Id": "system_test_http_no_event_override",
246-
"TopicArn": http_connect_filename + "?" + JSON.stringify({
247-
request_options_object: {path: "/override"}
248-
}),
249-
}],
250-
},
251-
});
252-
253-
const res = await s3.putObject({
254-
Bucket: bucket,
255-
Key: 'f1',
256-
Body: 'this is the body',
257-
});
258-
259-
await notify_await_result({
260-
bucket_name: bucket,
261-
event_name: 'ObjectCreated:Put',
262-
key: "f1",
263-
etag: res.ETag,
264-
url: "/override"
265-
});
266-
});
267-
268238
});
269239

270240
});
@@ -277,7 +247,6 @@ async function notify_await_result({bucket_name, event_name, etag, key, url = "/
277247
expected_event_name = event_name;
278248
expected_eTag = etag;
279249
expected_key = key;
280-
expected_url = url;
281250
server_done = false;
282251

283252
//busy-sync wait for server

src/util/notifications_util.js

+16-11
Original file line numberDiff line numberDiff line change
@@ -186,24 +186,23 @@ class Notificator {
186186
}
187187
}
188188

189-
async parse_connect_file(connect_filename_with_overrides, decrypt = false) {
189+
async parse_connect_file(connection_name, decrypt = false) {
190190
let connect;
191-
const filename_parts = connect_filename_with_overrides.split('?');
192-
const connect_filename_no_overrides = filename_parts[0];
193-
const overrides_str = filename_parts[1];
191+
let connect_filename = connection_name;
192+
let kafka_topic_from_connection_name;
193+
if (connection_name.startsWith("kafka:::topic/")) {
194+
const connection_parts = connection_name.split('/');
195+
connect_filename = connection_parts[1];
196+
kafka_topic_from_connection_name = connection_parts.length > 2 && connection_parts[3];
197+
}
194198

195199
if (this.nc_config_fs) {
196-
connect = await this.nc_config_fs.get_connection_by_name(connect_filename_no_overrides);
200+
connect = await this.nc_config_fs.get_connection_by_name(connect_filename);
197201
} else {
198-
const filepath = path.join(this.connect_files_dir, connect_filename_no_overrides);
202+
const filepath = path.join(this.connect_files_dir, connect_filename);
199203
const connect_str = fs.readFileSync(filepath, 'utf-8');
200204
connect = JSON.parse(connect_str);
201205
}
202-
if (overrides_str) {
203-
const overrides_obj = JSON.parse(overrides_str);
204-
_.merge(connect, overrides_obj);
205-
dbg.log2("effective connect =", connect);
206-
}
207206

208207
//if connect file is encrypted (and decryption is requested),
209208
//decrypt the auth field
@@ -212,6 +211,11 @@ class Notificator {
212211
connect.request_options_object.auth, connect.master_key_id);
213212
}
214213
load_files(connect);
214+
215+
//use the kafka topic, if it was present in connection_name
216+
if (kafka_topic_from_connection_name) {
217+
connect.topic = kafka_topic_from_connection_name;
218+
}
215219
return connect;
216220
}
217221
}
@@ -371,6 +375,7 @@ async function test_notifications(notifs, nc_config_dir, req) {
371375
let notif_failure;
372376
try {
373377
connect = await notificator.parse_connect_file(notif.topic[0]);
378+
dbg.log0(`effective connect for notif ${notif.id[0]} is`, connect);
374379
connection = get_connection(connect);
375380
await connection.connect();
376381
await connection.promise_notify(compose_notification_test(req), async (notif_cb, err_cb, err) => {

0 commit comments

Comments
 (0)