ProvSQL C/C++ API
Adding support for provenance and uncertainty management to PostgreSQL databases
Loading...
Searching...
No Matches
kcmcp_supervisor.c
Go to the documentation of this file.
1/**
2 * @file kcmcp_supervisor.c
3 * @brief Background worker that launches and supervises the managed KCMCP
4 * knowledge-compiler server (the "managed mode" of the KCMCP client).
5 *
6 * When the @c provsql.kcmcp_server GUC is non-empty it is a shell command to
7 * start a KCMCP server (see @c doc/source/dev/kc-server-protocol.rst), with
8 * the literal @c {endpoint} replaced by a Unix-socket path this worker picks.
9 * The worker forks/execs that command in its own process group, publishes the
10 * endpoint in shared memory (read by the in-extension client,
11 * @c kcmcp_client.cpp, for a registry record whose @c endpoint is
12 * @c 'managed'), and supervises it: on the server's exit it relaunches, on a
13 * config reload it restarts a changed command, and on shutdown it kills the
14 * whole group. When the GUC is empty the worker simply idles on its latch.
15 *
16 * Modeled on @c RegisterProvSQLMMapWorker (src/provsql_mmap.c); like it, the
17 * worker needs only @c BGWORKER_SHMEM_ACCESS (no database connection): the GUC
18 * is a process-global and the endpoint lives in the shared segment.
19 */
20#include "postgres.h"
21#include "miscadmin.h"
22#include "pgstat.h" /* PG_WAIT_EXTENSION */
23#include "lib/stringinfo.h"
24#include "postmaster/bgworker.h"
25#include "postmaster/postmaster.h" /* PostPortNumber */
26#include "storage/latch.h"
27#include "storage/ipc.h"
28#include "utils/guc.h"
29
30#include <string.h>
31#include <signal.h>
32#include <unistd.h>
33#include <sys/wait.h>
34#include <errno.h>
35
36#include "provsql_utils.h"
37#include "provsql_shmem.h"
38#include "provsql_error.h"
39
40#if PG_VERSION_NUM < 120000
41/* WL_EXIT_ON_PM_DEATH (have WaitLatch exit on postmaster death) was introduced
42 * in PostgreSQL 12; on PG 10/11 fall back to the older WL_POSTMASTER_DEATH and
43 * leave the supervise loop ourselves when WaitLatch reports it (see kcmcp_wait). */
44#define WL_EXIT_ON_PM_DEATH WL_POSTMASTER_DEATH
45#endif
46
47static volatile sig_atomic_t got_sigterm = false;
48static volatile sig_atomic_t got_sighup = false;
49
50static void kcmcp_sigterm(SIGNAL_ARGS)
51{
52 int save_errno = errno;
53 got_sigterm = true;
54 SetLatch(MyLatch);
55 errno = save_errno;
56}
57
58static void kcmcp_sighup(SIGNAL_ARGS)
59{
60 int save_errno = errno;
61 got_sighup = true;
62 SetLatch(MyLatch);
63 errno = save_errno;
64}
65
67{
68 static char buf[256];
69 buf[0] = '\0';
70 if (provsql_shared_state == NULL)
71 return buf;
73 strlcpy(buf, provsql_shared_state->kcmcp_endpoint, sizeof(buf));
75 return buf;
76}
77
78static void publish_endpoint(const char *endpoint)
79{
80 if (provsql_shared_state == NULL)
81 return;
83 strlcpy(provsql_shared_state->kcmcp_endpoint, endpoint,
84 sizeof(provsql_shared_state->kcmcp_endpoint));
86}
87
88/* Build the server command by replacing the first "{endpoint}" in the GUC
89 * template with @p endpoint. Returns a palloc'd string, or NULL if the
90 * template lacks the placeholder. */
91static char *build_server_command(const char *tmpl, const char *endpoint)
92{
93 const char *p = strstr(tmpl, "{endpoint}");
94 StringInfoData s;
95 if (p == NULL)
96 return NULL;
97 initStringInfo(&s);
98 appendBinaryStringInfo(&s, tmpl, p - tmpl);
99 appendStringInfoString(&s, endpoint);
100 appendStringInfoString(&s, p + strlen("{endpoint}"));
101 return s.data;
102}
103
104/* Fork/exec the server command in its own process group; returns the child
105 * pid, or -1 on failure. */
106static pid_t launch_server(const char *cmd)
107{
108 pid_t child;
109 fflush(NULL);
110 child = fork();
111 if (child < 0)
112 return -1;
113 if (child == 0) {
114 setpgid(0, 0);
115 execl("/bin/sh", "sh", "-c", cmd, (char *) NULL);
116 _exit(127);
117 }
118 setpgid(child, child);
119 return child;
120}
121
122static void kill_server(pid_t child)
123{
124 int status;
125 pid_t r;
126 if (child <= 0)
127 return;
128 killpg(child, SIGKILL);
129 do { r = waitpid(child, &status, 0); } while (r < 0 && errno == EINTR);
130}
131
132/* Wait on the latch (and an optional timeout), resetting it. Returns true if
133 * the postmaster died: on PG >= 12 WL_EXIT_ON_PM_DEATH makes WaitLatch exit the
134 * process itself (so this never returns true); on PG 10/11 it degrades to
135 * WL_POSTMASTER_DEATH and WaitLatch returns with that bit set, which the caller
136 * turns into a clean exit from the supervise loop. */
137static bool kcmcp_wait(long timeout_ms)
138{
139 int events = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH;
140 int rc;
141 if (timeout_ms >= 0)
142 events |= WL_TIMEOUT;
143 rc = WaitLatch(MyLatch, events, timeout_ms, PG_WAIT_EXTENSION);
144 ResetLatch(MyLatch);
145 return (rc & WL_POSTMASTER_DEATH) != 0;
146}
147
148PGDLLEXPORT void provsql_kcmcp_worker(Datum ignored);
149
150void provsql_kcmcp_worker(Datum ignored)
151{
152 pid_t child = -1;
153 char endpoint[256];
154
155 (void) ignored;
156 pqsignal(SIGTERM, kcmcp_sigterm);
157 pqsignal(SIGHUP, kcmcp_sighup);
158 BackgroundWorkerUnblockSignals();
159
160 snprintf(endpoint, sizeof(endpoint), "unix:/tmp/.provsql-kcmcp-%d.sock",
161 PostPortNumber);
162
163 for (;;) {
164 bool configured = (provsql_kcmcp_server != NULL
165 && provsql_kcmcp_server[0] != '\0');
166
167 if (got_sigterm)
168 break;
169
170 if (got_sighup) {
171 got_sighup = false;
172 ProcessConfigFile(PGC_SIGHUP);
173 /* A changed command (or one just cleared) takes effect by recycling the
174 * child; the relaunch below picks up the new template. */
175 if (child > 0) {
176 kill_server(child);
177 child = -1;
179 }
180 continue;
181 }
182
183 if (!configured) {
184 if (child > 0) {
185 kill_server(child);
186 child = -1;
188 }
189 if (kcmcp_wait(-1))
190 break;
191 continue;
192 }
193
194 if (child <= 0) {
195 char *cmd = build_server_command(provsql_kcmcp_server, endpoint);
196 if (cmd == NULL) {
197 provsql_warning("provsql.kcmcp_server has no {endpoint} placeholder; "
198 "not launching the managed KCMCP server");
199 if (kcmcp_wait(-1))
200 break;
201 continue;
202 }
203 child = launch_server(cmd);
204 pfree(cmd);
205 if (child < 0) {
206 provsql_warning("could not fork the managed KCMCP server");
207 } else {
208 publish_endpoint(endpoint);
209 provsql_log("managed KCMCP server started (pid %d) on %s",
210 (int) child, endpoint);
211 }
212 }
213
214 /* Supervise: wake on the child's exit (SIGCHLD interrupts the wait), a
215 * signal, or the 1 s timeout, then reap non-blockingly. */
216 if (kcmcp_wait(1000))
217 break;
218
219 if (child > 0) {
220 int status;
221 pid_t w = waitpid(child, &status, WNOHANG);
222 if (w == child) {
224 provsql_log("managed KCMCP server (pid %d) exited; relaunching",
225 (int) child);
226 child = -1; /* the next iteration relaunches */
227 }
228 }
229 }
230
231 if (child > 0)
232 kill_server(child);
234}
235
237{
238 BackgroundWorker worker;
239 memset(&worker, 0, sizeof(worker));
240 snprintf(worker.bgw_name, BGW_MAXLEN, "ProvSQL KCMCP Supervisor");
241#if PG_VERSION_NUM >= 110000
242 snprintf(worker.bgw_type, BGW_MAXLEN, "ProvSQL KCMCP");
243#endif
244 worker.bgw_flags = BGWORKER_SHMEM_ACCESS;
245 worker.bgw_start_time = BgWorkerStart_PostmasterStart;
246 worker.bgw_restart_time = 1;
247 snprintf(worker.bgw_library_name, BGW_MAXLEN, "provsql");
248 snprintf(worker.bgw_function_name, BGW_MAXLEN, "provsql_kcmcp_worker");
249 worker.bgw_main_arg = (Datum) 0;
250 worker.bgw_notify_pid = 0;
251 RegisterBackgroundWorker(&worker);
252}
static volatile sig_atomic_t got_sigterm
static void publish_endpoint(const char *endpoint)
static char * build_server_command(const char *tmpl, const char *endpoint)
void RegisterProvSQLKCMCPWorker(void)
Register the supervisor background worker that launches and supervises the managed KCMCP server; call...
const char * provsql_kcmcp_managed_endpoint(void)
Read the live endpoint of the managed KCMCP server from shared memory (e.g.
static void kcmcp_sigterm(SIGNAL_ARGS)
static volatile sig_atomic_t got_sighup
static pid_t launch_server(const char *cmd)
#define WL_EXIT_ON_PM_DEATH
void provsql_kcmcp_worker(Datum ignored)
static void kcmcp_sighup(SIGNAL_ARGS)
static void kill_server(pid_t child)
static bool kcmcp_wait(long timeout_ms)
char * provsql_kcmcp_server
Launch command for the managed KCMCP server (with a {endpoint} placeholder); controlled by the provsq...
Definition provsql.c:83
Uniform error-reporting macros for ProvSQL.
#define provsql_log(fmt,...)
Write a ProvSQL message to the server log only.
#define provsql_warning(fmt,...)
Emit a ProvSQL warning message (execution continues).
void provsql_shmem_unlock(void)
Release the ProvSQL LWLock.
void provsql_shmem_lock_exclusive(void)
Acquire the ProvSQL LWLock in exclusive mode.
provsqlSharedState * provsql_shared_state
Pointer to the ProvSQL shared-memory segment (set in provsql_shmem_startup).
void provsql_shmem_lock_shared(void)
Acquire the ProvSQL LWLock in shared mode.
Shared-memory segment and inter-process pipe management.
Core types, constants, and utilities shared across ProvSQL.