Skip to content

Commit 56759b8

Browse files
committed
bucket notifications - allow kafka topic in TopicArn, default connection
Signed-off-by: Amit Prinz Setter <[email protected]>
1 parent afbd337 commit 56759b8

File tree

6 files changed

+43
-6
lines changed

6 files changed

+43
-6
lines changed

docs/NooBaaNonContainerized/NooBaaCLI.md

+10-2
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ The `account add` command is used to create a new account with customizable opti
7575
```sh
7676
noobaa-cli account add --name <account_name> --uid <uid> --gid <gid> [--user]
7777
[--new_buckets_path][--access_key][--secret_key][--fs_backend]
78-
[--allow_bucket_creation][--force_md5_etag][--anonymous][--from_file][--iam_operate_on_root_account]
78+
[--allow_bucket_creation][--force_md5_etag][--anonymous][--from_file][--iam_operate_on_root_account][--default_connection]
7979
```
8080
#### Flags -
8181
- `name` (Required)
@@ -135,6 +135,10 @@ noobaa-cli account add --name <account_name> --uid <uid> --gid <gid> [--user]
135135
- Type: Boolean
136136
- Description: Specifies if the account allowed to create root accounts using the IAM API (the default behavior is to create of IAM accounts). See - [IAM - Root Accounts Manager](./../design/iam.md#root-accounts-manager).
137137

138+
- `default_connection`
139+
- Type: String
140+
- Description: A default account for Kafka external servers. See bucket-notifications.md.
141+
138142
### Update Account
139143

140144
The `account update` command is used to update an existing account with customizable options.
@@ -143,7 +147,7 @@ The `account update` command is used to update an existing account with customiz
143147
```sh
144148
noobaa-cli account update --name <account_name> [--new_name][--uid][--gid][--user]
145149
[--new_buckets_path][--access_key][--secret_key][--regenerate][--fs_backend]
146-
[--allow_bucket_creation][--force_md5_etag][--anonymous][--iam_operate_on_root_account]
150+
[--allow_bucket_creation][--force_md5_etag][--anonymous][--iam_operate_on_root_account][--default_connection]
147151
```
148152
#### Flags -
149153
- `name` (Required)
@@ -207,6 +211,10 @@ noobaa-cli account update --name <account_name> [--new_name][--uid][--gid][--use
207211
- Type: Boolean
208212
- Description: Specifies if the account allowed to create root accounts using the IAM API (the default behavior is to create of IAM accounts). See - [IAM - Root Accounts Manager](./../design/iam.md#root-accounts-manager).
209213

214+
- `default_connection`
215+
- Type: String
216+
- Description: A default account for Kafka external servers. See bucket-notifications.md.
217+
210218
### Account Status
211219

212220
The `account status` command is used to print the status of the account.

src/cmd/manage_nsfs.js

+2-1
Original file line numberDiff line numberDiff line change
@@ -422,7 +422,8 @@ async function fetch_account_data(action, user_input) {
422422
gid: user_input.user ? undefined : user_input.gid,
423423
new_buckets_path: user_input.new_buckets_path,
424424
fs_backend: user_input.fs_backend ? String(user_input.fs_backend) : config.NSFS_NC_STORAGE_BACKEND
425-
}
425+
},
426+
default_connection: user_input.default_connection === undefined ? undefined : String(user_input.default_connection)
426427
};
427428
if (action === ACTIONS.UPDATE || action === ACTIONS.DELETE) {
428429
// @ts-ignore

src/endpoint/s3/ops/s3_put_bucket_notification.js

+24
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,37 @@ const notif_util = require('../../../util/notifications_util');
1010
async function put_bucket_notification(req) {
1111

1212
let topic_configuration = req.body.NotificationConfiguration?.TopicConfiguration;
13+
const default_connection = req.object_sdk.requesting_account.default_connection;
1314

1415
//adapt to db shcema
1516
if (topic_configuration) {
1617
for (const conf of topic_configuration) {
1718
conf.id = conf.Id;
1819
conf.event = conf.Event;
1920
conf.topic = conf.Topic;
21+
//handle Kafka's topic synax, if present
22+
if (conf.Topic && conf.Topic.length > 0 && conf.Topic[0].startsWith('kafka:::topic/')) {
23+
//kafka_topic_parts[0] = 'kafka:::topic'
24+
//kafka_topic_parts[1] = connection, optional
25+
//kafka_topic_parts[2] = Kafka topic, mandatory
26+
const kafka_topic_parts = conf.Topic[0].split('/');
27+
if (kafka_topic_parts.length !== 3) {
28+
throw new S3Error({
29+
code: 'InvalidArgument',
30+
message: "kafka:::topic is invalid. Must be of syntax: kafka:::topic:/connection/topic",
31+
http_code: 400,
32+
detail: conf.Topic[0]
33+
});
34+
}
35+
//default to account's default_connection
36+
let connection = default_connection;
37+
if (typeof kafka_topic_parts[1] === 'string' && kafka_topic_parts[1].length > 0) {
38+
connection = kafka_topic_parts[1];
39+
}
40+
const topic = kafka_topic_parts[2];
41+
//write the full Topic string with the connection
42+
conf.topic = ['kafka:::topic/' + connection + "/" + topic];
43+
}
2044
delete conf.Id;
2145
delete conf.Event;
2246
delete conf.Topic;

src/manage_nsfs/manage_nsfs_constants.js

+3-2
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@ const FROM_FILE = 'from_file';
4646
const ANONYMOUS = 'anonymous';
4747

4848
const VALID_OPTIONS_ACCOUNT = {
49-
'add': new Set(['name', 'uid', 'gid', 'supplemental_groups', 'new_buckets_path', 'user', 'access_key', 'secret_key', 'fs_backend', 'allow_bucket_creation', 'force_md5_etag', 'iam_operate_on_root_account', FROM_FILE, ...CLI_MUTUAL_OPTIONS]),
50-
'update': new Set(['name', 'uid', 'gid', 'supplemental_groups', 'new_buckets_path', 'user', 'access_key', 'secret_key', 'fs_backend', 'allow_bucket_creation', 'force_md5_etag', 'iam_operate_on_root_account', 'new_name', 'regenerate', ...CLI_MUTUAL_OPTIONS]),
49+
'add': new Set(['name', 'uid', 'gid', 'supplemental_groups', 'new_buckets_path', 'user', 'access_key', 'secret_key', 'fs_backend', 'allow_bucket_creation', 'force_md5_etag', 'iam_operate_on_root_account', 'default_connection', FROM_FILE, ...CLI_MUTUAL_OPTIONS]),
50+
'update': new Set(['name', 'uid', 'gid', 'supplemental_groups', 'new_buckets_path', 'user', 'access_key', 'secret_key', 'fs_backend', 'allow_bucket_creation', 'force_md5_etag', 'iam_operate_on_root_account', 'new_name', 'regenerate', 'default_connection', ...CLI_MUTUAL_OPTIONS]),
5151
'delete': new Set(['name', ...CLI_MUTUAL_OPTIONS]),
5252
'list': new Set(['wide', 'show_secrets', 'gid', 'uid', 'user', 'name', 'access_key', ...CLI_MUTUAL_OPTIONS]),
5353
'status': new Set(['name', 'access_key', 'show_secrets', ...CLI_MUTUAL_OPTIONS]),
@@ -142,6 +142,7 @@ const OPTION_TYPE = {
142142
ips: 'string',
143143
force: 'boolean',
144144
anonymous: 'boolean',
145+
default_connection: 'string',
145146
// health options
146147
deployment_type: 'string',
147148
all_account_details: 'boolean',

src/server/system_services/schemas/nsfs_account_schema.js

+3
Original file line numberDiff line numberDiff line change
@@ -101,5 +101,8 @@ module.exports = {
101101
}
102102
}]
103103
},
104+
default_connection: {
105+
type: 'string'
106+
}
104107
}
105108
};

src/util/notifications_util.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ class Notificator {
193193
if (connection_name.startsWith("kafka:::topic/")) {
194194
const connection_parts = connection_name.split('/');
195195
connect_filename = connection_parts[1];
196-
kafka_topic_from_connection_name = connection_parts.length > 2 && connection_parts[3];
196+
kafka_topic_from_connection_name = connection_parts.length > 1 && connection_parts[2];
197197
}
198198

199199
if (this.nc_config_fs) {

0 commit comments

Comments
 (0)