Skip to content

Commit

Permalink
[#22257] YSQL: Build and run pg_cron on YB
Browse files Browse the repository at this point in the history
Summary:
Enable the use of pg_cron in YugabyteDB for testing purposes.
This change allows us to schedule and test the cron APIS. There is no cron leader or worker node yet, so every tserver node will behave as its own leader. This means that each cron job will run N (number of tserver nodes) times instead of running only once.

 - Feature is guarded under NON_RUNTIME gFlag `TEST_enable_pg_cron`.
 - Fixed `YBCDeleteSysCatalogTuple` to use `relfileNodeId` to create the `delete_stmt`.
 - Use `YBCDeleteHeapTuple` in pg_cron instead of `simple_heap_delete`.
 - Set the default cron database to `yugabyte`.
 - Changes the default for `use_background_workers` to `true` since yb will only use remote execution in the future.
 - Cron runner will always call `RefreshTaskHash` once every `cron.yb_job_list_refresh_interval` seconds. Since jobs are scheduled on remote nodes which cannot send invalidation messages, we need to reload the table regularly. This also means that tasks will continue to get scheduled for upto 1 minute after they are removed.
- Added `YBCPgResetCatalogReadTime()` to the launcher loop. Without this we cache the catalog read time and never update it resulting is stale job table reads.

```
CREATE EXTENSION pg_cron;
CREATE TABLE tbl1(a INT, i_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP);
SELECT cron.schedule('tbl1', '* * * * *', 'INSERT INTO tbl1 VALUES(1);');

SELECT * from cron.job_run_details ORDER BY runid;
\watch 60
```

Fixes #22257
Jira: DB-11175

Test Plan: TestPgRegressThirdPartyExtensionsPgCron

Reviewers: fizaa, tnayak

Reviewed By: fizaa, tnayak

Subscribers: yql, ybase

Differential Revision: https://phorge.dev.yugabyte.com/D34750
  • Loading branch information
hari90 committed May 10, 2024
1 parent 894de8a commit 5550818
Show file tree
Hide file tree
Showing 7 changed files with 122 additions and 13 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright (c) YugabyteDB, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
// in compliance with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software distributed under the License
// is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
// or implied. See the License for the specific language governing permissions and limitations
// under the License.
//
package org.yb.pgsql;

import java.io.File;
import java.util.Map;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.yb.YBTestRunner;
import org.yb.client.TestUtils;

@RunWith(value = YBTestRunner.class)
public class TestPgRegressThirdPartyExtensionsPgCron extends BasePgSQLTest {
@Override
public int getTestMethodTimeoutSec() {
return 1800;
}

@Override
protected Map<String, String> getTServerFlags() {
Map<String, String> flagMap = super.getTServerFlags();
flagMap.put("TEST_enable_pg_cron", "true");
return flagMap;
}

@Test
public void schedule() throws Exception {
runPgRegressTest(
new File(TestUtils.getBuildRootDir(), "postgres_build/third-party-extensions/pg_cron"),
"yb_schedule");
}
}
6 changes: 3 additions & 3 deletions src/postgres/src/backend/executor/ybcModifyTable.c
Original file line number Diff line number Diff line change
Expand Up @@ -1182,8 +1182,8 @@ Oid YBCExecuteUpdateReplace(Relation rel,
void YBCDeleteSysCatalogTuple(Relation rel, HeapTuple tuple)
{
Oid dboid = YBCGetDatabaseOid(rel);
Oid relid = RelationGetRelid(rel);
YBCPgStatement delete_stmt = NULL;
Oid relfileNodeId = YbGetRelfileNodeId(rel);

if (tuple->t_ybctid == 0)
ereport(ERROR,
Expand All @@ -1192,7 +1192,7 @@ void YBCDeleteSysCatalogTuple(Relation rel, HeapTuple tuple)

/* Prepare DELETE statement. */
HandleYBStatus(YBCPgNewDelete(dboid,
relid,
relfileNodeId,
YBCIsRegionLocal(rel),
&delete_stmt,
YB_TRANSACTIONAL));
Expand All @@ -1202,7 +1202,7 @@ void YBCDeleteSysCatalogTuple(Relation rel, HeapTuple tuple)
false /* is_null */);

/* Delete row from foreign key cache */
YBCPgDeleteFromForeignKeyReferenceCache(YbGetRelfileNodeId(rel),
YBCPgDeleteFromForeignKeyReferenceCache(relfileNodeId,
tuple->t_ybctid);

HandleYBStatus(YBCPgDmlBindColumn(delete_stmt, YBTupleIdAttributeNumber, ybctid_expr));
Expand Down
2 changes: 1 addition & 1 deletion src/postgres/third-party-extensions/pg_cron/src/entry.c
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ parse_cron_entry(char *schedule)
*/
ch = get_string(cmd, MAX_COMMAND, file, " \t\n");
if (!strcmp("reboot", cmd) || !strcmp("restart", cmd)) {
e->flags |= WHEN_REBOOT;
e->flags |= WHEN_REBOOT; /* TODO(hari): Disable in YB? */
} else if (!strcmp("yearly", cmd) || !strcmp("annually", cmd)){
set_element(e->minute, FIRST_MINUTE, LAST_MINUTE,
FIRST_MINUTE);
Expand Down
14 changes: 12 additions & 2 deletions src/postgres/third-party-extensions/pg_cron/src/job_metadata.c
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@
#include "commands/dbcommands.h"
#include "catalog/pg_authid.h"

/* YB includes */
#include "executor/ybcModifyTable.h"
#include "pg_yb_utils.h"

#if (PG_VERSION_NUM < 120000)
#define table_open(r, l) heap_open(r, l)
#define table_close(r, l) heap_close(r, l)
Expand Down Expand Up @@ -660,7 +664,10 @@ cron_unschedule(PG_FUNCTION_ARGS)

EnsureDeletePermission(cronJobsTable, heapTuple);

simple_heap_delete(cronJobsTable, &heapTuple->t_self);
if (IsYugaByteEnabled())
YBCDeleteSysCatalogTuple(cronJobsTable, heapTuple);
else
simple_heap_delete(cronJobsTable, &heapTuple->t_self);

systable_endscan(scanDescriptor);
table_close(cronJobsTable, NoLock);
Expand Down Expand Up @@ -731,7 +738,10 @@ cron_unschedule_named(PG_FUNCTION_ARGS)

EnsureDeletePermission(cronJobsTable, heapTuple);

simple_heap_delete(cronJobsTable, &heapTuple->t_self);
if (IsYugaByteEnabled())
YBCDeleteSysCatalogTuple(cronJobsTable, heapTuple);
else
simple_heap_delete(cronJobsTable, &heapTuple->t_self);

systable_endscan(scanDescriptor);
table_close(cronJobsTable, NoLock);
Expand Down
62 changes: 55 additions & 7 deletions src/postgres/third-party-extensions/pg_cron/src/pg_cron.c
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@
#include "libpq/pqformat.h"
#include "utils/builtins.h"

/* YB includes */
#include "pg_yb_utils.h"

PG_MODULE_MAGIC;

Expand Down Expand Up @@ -148,9 +150,10 @@ static bool jobCanceled(CronTask *task);
static bool jobStartupTimeout(CronTask *task, TimestampTz currentTime);
static char* pg_cron_cmdTuples(char *msg);
static void bgw_generate_returned_message(StringInfoData *display_msg, ErrorData edata);
static long YbSecondsPassed(TimestampTz startTime, TimestampTz stopTime);

/* global settings */
char *CronTableDatabaseName = "postgres";
char *CronTableDatabaseName = "yugabyte";
static bool CronLogStatement = true;
static bool CronLogRun = true;
static bool CronReloadConfig = false;
Expand All @@ -165,7 +168,8 @@ static bool RebootJobsScheduled = false;
static int RunningTaskCount = 0;
static int MaxRunningTasks = 0;
static int CronLogMinMessages = WARNING;
static bool UseBackgroundWorkers = false;
static bool UseBackgroundWorkers = true;
static int YbJobListRefreshSeconds = 60;

char *cron_timezone = NULL;

Expand Down Expand Up @@ -213,7 +217,7 @@ _PG_init(void)
gettext_noop("Database in which pg_cron metadata is kept."),
NULL,
&CronTableDatabaseName,
"postgres",
"yugabyte",
PGC_POSTMASTER,
GUC_SUPERUSER_ONLY,
NULL, NULL, NULL);
Expand All @@ -223,7 +227,7 @@ _PG_init(void)
gettext_noop("Log all cron statements prior to execution."),
NULL,
&CronLogStatement,
true,
true, /* TODO(hari): false? */
PGC_POSTMASTER,
GUC_SUPERUSER_ONLY,
NULL, NULL, NULL);
Expand Down Expand Up @@ -263,7 +267,7 @@ _PG_init(void)
gettext_noop("Use background workers instead of client sessions."),
NULL,
&UseBackgroundWorkers,
false,
true,
PGC_POSTMASTER,
GUC_SUPERUSER_ONLY,
NULL, NULL, NULL);
Expand Down Expand Up @@ -295,7 +299,7 @@ _PG_init(void)
"cron.max_running_jobs",
gettext_noop("Maximum number of jobs that can run concurrently."),
NULL,
&MaxRunningTasks,
&MaxRunningTasks, /* TODO(Hari): We need local and global limits */
(max_worker_processes - 1 < 5) ? max_worker_processes - 1 : 5,
0,
max_worker_processes - 1,
Expand Down Expand Up @@ -324,6 +328,18 @@ _PG_init(void)
GUC_SUPERUSER_ONLY,
check_timezone, NULL, NULL);

DefineCustomIntVariable(
"cron.yb_job_list_refresh_interval",
gettext_noop("The frequency at which the pb_cron leader reloads the job list and picks up new jobs."),
NULL,
&YbJobListRefreshSeconds,
60,
1,
INT_MAX,
PGC_SUSET,
GUC_UNIT_S,
NULL, NULL, NULL);

/* set up common data for all our workers */
worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
Expand Down Expand Up @@ -626,8 +642,14 @@ PgCronLauncherMain(Datum arg)

MemoryContextSwitchTo(CronLoopContext);

TimestampTz ybLastRefreshTime = 0;

while (!got_sigterm)
{
/* YB Note: The latest entries in the catalog must be read during every run */
if (IsYugaByteEnabled())
YBCPgResetCatalogReadTime();

List *taskList = NIL;
TimestampTz currentTime = 0;

Expand All @@ -646,9 +668,19 @@ PgCronLauncherMain(Datum arg)
* Both CronReloadConfig and CronJobCacheValid are triggered by SIGHUP.
* ProcessConfigFile should come first, because RefreshTaskHash depends
* on settings that might have changed.
*
* In Yugabyte mode jobs scheduled from a different nodes cannot invalidate
* the cache on the cron leader. So in addition to the regular invalidations
* we RefreshTaskHash every YbJobListRefreshSeconds.
* NOTE: It can take up to YbJobListRefreshSeconds for change to the jobs
* to take effect.
*/
if (!CronJobCacheValid)
currentTime = GetCurrentTimestamp();
if (!CronJobCacheValid ||
(IsYugaByteEnabled() &&
YbSecondsPassed(ybLastRefreshTime, currentTime) >= YbJobListRefreshSeconds))
{
ybLastRefreshTime = currentTime;
RefreshTaskHash();
}

Expand Down Expand Up @@ -2349,3 +2381,19 @@ jobStartupTimeout(CronTask *task, TimestampTz currentTime)
else
return false;
}

/*
* Returns the number of seconds between startTime and stopTime rounded down to
* the closest integer.
*/
static long
YbSecondsPassed(TimestampTz startTime, TimestampTz stopTime)
{
int microsPassed = 0;
long secondsPassed = 0;

TimestampDifference(startTime, stopTime,
&secondsPassed, &microsPassed);

return secondsPassed;
}
3 changes: 3 additions & 0 deletions src/postgres/third-party-extensions/pg_cron/yb_schedule
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# third-party-extensions/pg_cron/yb_schedule

test: pg_cron-test
6 changes: 6 additions & 0 deletions src/yb/yql/pgwrapper/pg_wrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,8 @@ DEPRECATE_FLAG(int32, ysql_yb_ash_sampling_interval, "2024_03");
DEFINE_RUNTIME_PG_FLAG(int32, yb_ash_sample_size, 500,
"Number of samples captured from each component per sampling event");

DEFINE_test_flag(bool, enable_pg_cron, false, "Enables the pg_cron extension");

using gflags::CommandLineFlagInfo;
using std::string;
using std::vector;
Expand Down Expand Up @@ -448,6 +450,10 @@ Result<string> WritePostgresConfig(const PgProcessConf& conf) {
metricsLibs.push_back("pgaudit");
metricsLibs.push_back("pg_hint_plan");

if (FLAGS_TEST_enable_pg_cron) {
metricsLibs.push_back("pg_cron");
}

vector<string> lines;
string line;
while (std::getline(conf_file, line)) {
Expand Down

0 comments on commit 5550818

Please sign in to comment.