Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce cron_shutdown function for stopping the main background worker #381

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion pg_cron--1.5--1.6.sql
Original file line number Diff line number Diff line change
@@ -1 +1,7 @@
/* no SQL changes in 1.6 */
DROP FUNCTION IF EXISTS cron.shutdown();
CREATE FUNCTION cron.shutdown()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this would require a new SQL script (1.7), otherwise existing 1.6 users will never get the function.

RETURNS bool
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$cron_shutdown$$;
COMMENT ON FUNCTION cron.shutdown()
IS 'shutdown pg_cron';
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should also revoke execute from public, to ensure only superuser can call this (or grant privileges)

81 changes: 79 additions & 2 deletions src/pg_cron.c
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ static bool jobStartupTimeout(CronTask *task, TimestampTz currentTime);
static char* pg_cron_cmdTuples(char *msg);
static void bgw_generate_returned_message(StringInfoData *display_msg, ErrorData edata);

/* SQL-callable functions */
PG_FUNCTION_INFO_V1(cron_shutdown);

/* global settings */
char *CronTableDatabaseName = "postgres";
static bool CronLogStatement = true;
Expand All @@ -169,6 +172,9 @@ static bool UseBackgroundWorkers = false;

char *cron_timezone = NULL;

static shmem_request_hook_type prev_shmem_request_hook = NULL;
static shmem_startup_hook_type prev_shmem_startup_hook = NULL;

static const struct config_enum_entry cron_message_level_options[] = {
{"debug5", DEBUG5, false},
{"debug4", DEBUG4, false},
Expand All @@ -188,6 +194,63 @@ static const struct config_enum_entry cron_message_level_options[] = {

static const char *cron_error_severity(int elevel);

struct scheduler_shared_data_t {
pid_t scheduler_pid;
bool restart_scheduler;
LWLockId lock;
};

static struct scheduler_shared_data_t* scheduler_shared_data = NULL;

Datum
cron_shutdown(PG_FUNCTION_ARGS)
{
bool result = false;
Oid funcid;
pid_t pid;

LWLockAcquire(scheduler_shared_data->lock, LW_EXCLUSIVE);
pid = scheduler_shared_data->scheduler_pid;
scheduler_shared_data->scheduler_pid = 0;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the code style should be adapted to the rest of the project

(except the Vixie cron logic in entry.c/misc.c)

scheduler_shared_data->restart_scheduler = false;
LWLockRelease(scheduler_shared_data->lock);

funcid = fmgr_internal_function("pg_terminate_backend");
if (funcid == InvalidOid) {
ereport(ERROR, (errmsg("Function pg_terminate_backend not found")));
}
result = DatumGetBool(OidFunctionCall1(funcid, Int32GetDatum(pid)));
PG_RETURN_BOOL(result);
}

static void cron_shmem_request(void)
{
if (prev_shmem_request_hook) {
prev_shmem_request_hook();
}

RequestNamedLWLockTranche("cron", 1);
}

static void cron_shmem_startup(void)
{
bool found;
if (prev_shmem_startup_hook) {
prev_shmem_startup_hook();
}

LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
InitShmemIndex();
scheduler_shared_data = (struct scheduler_shared_data_t*)
ShmemInitStruct("cron_scheduler_shared_data", sizeof(struct scheduler_shared_data_t), &found);
if (!found) {
scheduler_shared_data->scheduler_pid = 0;
scheduler_shared_data->restart_scheduler = true;
scheduler_shared_data->lock = &(GetNamedLWLockTranche("cron"))->lock;
}
LWLockRelease(AddinShmemInitLock);
}

/*
* _PG_init gets called when the extension is loaded.
*/
Expand All @@ -211,6 +274,12 @@ _PG_init(void)
/* watch for invalidation events */
CacheRegisterRelcacheCallback(InvalidateJobCacheCallback, (Datum) 0);

prev_shmem_request_hook = shmem_request_hook;
shmem_request_hook = cron_shmem_request;

prev_shmem_startup_hook = shmem_startup_hook;
shmem_startup_hook = cron_shmem_startup;

DefineCustomStringVariable(
"cron.database_name",
gettext_noop("Database in which pg_cron metadata is kept."),
Expand Down Expand Up @@ -560,6 +629,11 @@ PgCronLauncherMain(Datum arg)
{
MemoryContext CronLoopContext = NULL;
struct rlimit limit;
int retcode;

LWLockAcquire(scheduler_shared_data->lock, LW_EXCLUSIVE);
scheduler_shared_data->scheduler_pid = MyProcPid;
LWLockRelease(scheduler_shared_data->lock);

/* Establish signal handlers before unblocking signals. */
pqsignal(SIGHUP, pg_cron_sighup);
Expand Down Expand Up @@ -670,8 +744,11 @@ PgCronLauncherMain(Datum arg)

ereport(LOG, (errmsg("pg_cron scheduler shutting down")));

/* return error code to trigger restart */
proc_exit(1);
/* return error code to trigger restart unless shutting down cron scheduler */
LWLockAcquire(scheduler_shared_data->lock, LW_EXCLUSIVE);
retcode = (int)scheduler_shared_data->restart_scheduler;
LWLockRelease(scheduler_shared_data->lock);
proc_exit((int)retcode);
}


Expand Down