Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions plugins/in_kubernetes_events/kubernetes_events.c
Original file line number Diff line number Diff line change
Expand Up @@ -1085,6 +1085,18 @@ static struct flb_config_map config_map[] = {
0, FLB_FALSE, 0,
"set a database sync method. values: extra, full, normal and off."
},
{
FLB_CONFIG_MAP_BOOL, "db.locking", "false",
0, FLB_TRUE, offsetof(struct k8s_events, db_locking),
"set exclusive locking mode, increase performance but don't allow "
"external connections to the database file."
},
{
FLB_CONFIG_MAP_STR, "db.journal_mode", "WAL",
0, FLB_TRUE, offsetof(struct k8s_events, db_journal_mode),
"set the journal mode for the database. values: DELETE, TRUNCATE, "
"PERSIST, MEMORY, WAL, OFF."
},
#endif

/* EOF */
Expand Down
3 changes: 0 additions & 3 deletions plugins/in_kubernetes_events/kubernetes_events.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,6 @@ struct k8s_events {
char *auth;
size_t auth_len;

int dns_retries;
int dns_wait_time;

struct flb_tls *tls;

struct flb_log_event_encoder *encoder;
Expand Down
39 changes: 39 additions & 0 deletions plugins/in_kubernetes_events/kubernetes_events_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,45 @@ struct k8s_events *k8s_events_conf_create(struct flb_input_instance *ins)
}

#ifdef FLB_HAVE_SQLDB
/* Database sync mode (needs to be set before opening the database) */
ctx->db_sync = 1; /* default: sqlite sync 'normal' */
tmp = flb_input_get_property("db.sync", ins);
if (tmp) {
if (strcasecmp(tmp, "extra") == 0) {
ctx->db_sync = 3;
}
else if (strcasecmp(tmp, "full") == 0) {
ctx->db_sync = 2;
}
else if (strcasecmp(tmp, "normal") == 0) {
ctx->db_sync = 1;
}
else if (strcasecmp(tmp, "off") == 0) {
ctx->db_sync = 0;
}
else {
flb_plg_error(ctx->ins, "invalid database 'db.sync' value: %s", tmp);
k8s_events_conf_destroy(ctx);
return NULL;
}
}

/* Journal mode validation */
tmp = flb_input_get_property("db.journal_mode", ins);
if (tmp) {
if (strcasecmp(tmp, "DELETE") != 0 &&
strcasecmp(tmp, "TRUNCATE") != 0 &&
strcasecmp(tmp, "PERSIST") != 0 &&
strcasecmp(tmp, "MEMORY") != 0 &&
strcasecmp(tmp, "WAL") != 0 &&
strcasecmp(tmp, "OFF") != 0) {

flb_plg_error(ctx->ins, "invalid db.journal_mode=%s", tmp);
k8s_events_conf_destroy(ctx);
return NULL;
}
}

/* Initialize database */
tmp = flb_input_get_property("db", ins);
if (tmp) {
Expand Down
180 changes: 180 additions & 0 deletions tests/runtime/in_kubernetes_events.c
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,77 @@ static struct test_ctx *test_ctx_create(struct flb_lib_out_cb *data)
return ctx;
}

#ifdef FLB_HAVE_SQLDB
/* Create test context with additional config options for config parameter testing */
static struct test_ctx *test_ctx_create_with_config(struct flb_lib_out_cb *data,
const char *db_sync,
const char *db_locking,
const char *db_journal_mode)
{
int i_ffd;
int o_ffd;
int ret;
struct test_ctx *ctx = NULL;
char kube_url[512] = {0};

ctx = flb_calloc(1, sizeof(struct test_ctx));
if (!TEST_CHECK(ctx != NULL)) {
TEST_MSG("flb_calloc failed");
flb_errno();
return NULL;
}

/* Service config */
ctx->flb = flb_create();
flb_service_set(ctx->flb,
"Flush", "0.200000000",
"Grace", "3",
"Log_Level", "debug",
NULL);

/* Input */
i_ffd = flb_input(ctx->flb, (char *) "kubernetes_events", NULL);
TEST_CHECK(i_ffd >= 0);
ctx->i_ffd = i_ffd;

sprintf(kube_url, "http://%s:%d", KUBE_API_HOST, KUBE_API_PORT);
ret = flb_input_set(ctx->flb, i_ffd,
"kube_url", kube_url,
"kube_token_file", KUBE_TOKEN_FILE,
"kube_retention_time", "365000d",
"tls", "off",
"interval_sec", "1",
"interval_nsec", "0",
NULL);
TEST_CHECK(ret == 0);

/* Set optional config parameters if provided */
if (db_sync) {
ret = flb_input_set(ctx->flb, i_ffd, "db.sync", db_sync, NULL);
TEST_CHECK(ret == 0);
}
if (db_locking) {
ret = flb_input_set(ctx->flb, i_ffd, "db.locking", db_locking, NULL);
TEST_CHECK(ret == 0);
}
if (db_journal_mode) {
ret = flb_input_set(ctx->flb, i_ffd, "db.journal_mode", db_journal_mode, NULL);
TEST_CHECK(ret == 0);
}

/* Output */
o_ffd = flb_output(ctx->flb, (char *) "lib", (void *) data);
ctx->o_ffd = o_ffd;

flb_output_set(ctx->flb, ctx->o_ffd,
"match", "*",
"format", "json",
NULL);

return ctx;
}
#endif

static void test_ctx_destroy(struct test_ctx *ctx)
{
TEST_CHECK(ctx != NULL);
Expand Down Expand Up @@ -444,10 +515,119 @@ void flb_test_events_with_chunkedrecv()
test_ctx_destroy(ctx);
}

#ifdef FLB_HAVE_SQLDB
/* Test valid db.sync values */
void flb_test_config_db_sync_values()
{
struct flb_lib_out_cb cb_data;
struct test_ctx *ctx;
int ret;
const char *sync_values[] = {"extra", "full", "normal", "off", NULL};
int i;

cb_data.cb = NULL;
cb_data.data = NULL;

for (i = 0; sync_values[i] != NULL; i++) {
ctx = test_ctx_create_with_config(&cb_data,
sync_values[i], /* db.sync */
NULL, /* db.locking */
NULL); /* db.journal_mode */
if (!TEST_CHECK(ctx != NULL)) {
TEST_MSG("test_ctx_create_with_config failed for db.sync=%s", sync_values[i]);
continue;
}

ret = flb_start(ctx->flb);
TEST_CHECK(ret == 0);
if (ret != 0) {
TEST_MSG("flb_start failed for db.sync=%s", sync_values[i]);
}

flb_stop(ctx->flb);
flb_destroy(ctx->flb);
flb_free(ctx);
}
}

/* Test valid db.journal_mode values */
void flb_test_config_db_journal_mode_values()
{
struct flb_lib_out_cb cb_data;
struct test_ctx *ctx;
int ret;
const char *journal_modes[] = {"DELETE", "TRUNCATE", "PERSIST", "MEMORY", "WAL", "OFF", NULL};
int i;

cb_data.cb = NULL;
cb_data.data = NULL;

for (i = 0; journal_modes[i] != NULL; i++) {
ctx = test_ctx_create_with_config(&cb_data,
NULL, /* db.sync */
NULL, /* db.locking */
journal_modes[i]); /* db.journal_mode */
if (!TEST_CHECK(ctx != NULL)) {
TEST_MSG("test_ctx_create_with_config failed for db.journal_mode=%s", journal_modes[i]);
continue;
}

ret = flb_start(ctx->flb);
TEST_CHECK(ret == 0);
if (ret != 0) {
TEST_MSG("flb_start failed for db.journal_mode=%s", journal_modes[i]);
}

flb_stop(ctx->flb);
flb_destroy(ctx->flb);
flb_free(ctx);
}
}

/* Test valid db.locking values */
void flb_test_config_db_locking_values()
{
struct flb_lib_out_cb cb_data;
struct test_ctx *ctx;
int ret;
const char *locking_values[] = {"true", "false", NULL};
int i;

cb_data.cb = NULL;
cb_data.data = NULL;

for (i = 0; locking_values[i] != NULL; i++) {
ctx = test_ctx_create_with_config(&cb_data,
NULL, /* db.sync */
locking_values[i], /* db.locking */
NULL); /* db.journal_mode */
if (!TEST_CHECK(ctx != NULL)) {
TEST_MSG("test_ctx_create_with_config failed for db.locking=%s", locking_values[i]);
continue;
}

ret = flb_start(ctx->flb);
TEST_CHECK(ret == 0);
if (ret != 0) {
TEST_MSG("flb_start failed for db.locking=%s", locking_values[i]);
}

flb_stop(ctx->flb);
flb_destroy(ctx->flb);
flb_free(ctx);
}
}
#endif

TEST_LIST = {
{"events_v1_with_lastTimestamp", flb_test_events_v1_with_lastTimestamp},
{"events_v1_with_creationTimestamp", flb_test_events_v1_with_creationTimestamp},
//{"events_v1_with_chunkedrecv", flb_test_events_with_chunkedrecv},
#ifdef FLB_HAVE_SQLDB
{"config_db_sync_values", flb_test_config_db_sync_values},
{"config_db_journal_mode_values", flb_test_config_db_journal_mode_values},
{"config_db_locking_values", flb_test_config_db_locking_values},
#endif
{NULL, NULL}
};

Loading