ProvSQL C/C++ API
Adding support for provenance and uncertainty management to PostgreSQL databases
Loading...
Searching...
No Matches
provsql_mmap.c
Go to the documentation of this file.
1/**
2 * @file provsql_mmap.c
3 * @brief Background worker registration and IPC primitives for mmap-backed storage.
4 *
5 * Implements the PostgreSQL background worker lifecycle functions declared
6 * in @c provsql_mmap.h:
7 * - @c RegisterProvSQLMMapWorker(): registers the worker with the postmaster
8 * during @c _PG_init().
9 * - @c provsql_mmap_worker(): worker entry point; sets up signal handlers
10 * and enters @c provsql_mmap_main_loop().
11 *
12 * The IPC between normal backends and the background worker is handled in
13 * @c MMappedCircuit.cpp. This file provides the PostgreSQL-specific glue
14 * (background worker API, signal handling).
15 *
16 * Also declares the shared write buffer @c buffer[] and position counter
17 * @c bufferpos used by the @c STARTWRITEM / @c ADDWRITEM / @c SENDWRITEM
18 * macros in @c provsql_mmap.h.
19 *
20 * The gate-creation SQL functions (e.g. @c create_gate()) that backends
21 * call are also implemented here; they acquire the IPC lock, write a
22 * message to the background worker, and wait for an acknowledgment.
23 */
24#include "provsql_mmap.h"
25#include "provsql_shmem.h"
26#include "provsql_utils.h"
27
28#include <errno.h>
29#include <unistd.h>
30#include <poll.h>
31#include <math.h>
32#include <assert.h>
33
34#include "postgres.h"
35#include "postmaster/bgworker.h"
36#include "fmgr.h"
37#include "funcapi.h"
38#include "utils/array.h"
39#include "access/htup_details.h"
40#include "utils/builtins.h"
41
42#include "circuit_cache.h"
43
44char buffer[PIPE_BUF]={}; // flawfinder: ignore
45unsigned bufferpos=0;
46
47/** @brief Background worker entry point: initialises the mmap circuit store and runs the main loop. */
48__attribute__((visibility("default")))
49void provsql_mmap_worker(Datum ignored)
50{
51 BackgroundWorkerUnblockSignals();
55 provsql_log("%s initialized", MyBgworkerEntry->bgw_name);
56
58
60}
61
63{
64 BackgroundWorker worker;
65
66 snprintf(worker.bgw_name, BGW_MAXLEN, "ProvSQL MMap Worker");
67#if PG_VERSION_NUM >= 110000
68 snprintf(worker.bgw_type, BGW_MAXLEN, "ProvSQL MMap");
69#endif
70
71 worker.bgw_flags = BGWORKER_SHMEM_ACCESS;
72 worker.bgw_start_time = BgWorkerStart_PostmasterStart;
73 worker.bgw_restart_time = 1;
74
75 snprintf(worker.bgw_library_name, BGW_MAXLEN, "provsql");
76 snprintf(worker.bgw_function_name, BGW_MAXLEN, "provsql_mmap_worker");
77#if PG_VERSION_NUM < 100000
78 worker.bgw_main = NULL;
79#endif
80
81 worker.bgw_main_arg = (Datum) 0;
82 worker.bgw_notify_pid = 0;
83
84 RegisterBackgroundWorker(&worker);
85}
86
87PG_FUNCTION_INFO_V1(get_gate_type);
88/** @brief PostgreSQL-callable wrapper for get_gate_type(). */
89Datum get_gate_type(PG_FUNCTION_ARGS)
90{
91 pg_uuid_t *token = DatumGetUUIDP(PG_GETARG_DATUM(0));
92 gate_type type;
93 constants_t constants=get_constants(true);
94
95 if(PG_ARGISNULL(0))
96 PG_RETURN_NULL();
97
98 type = circuit_cache_get_type(*token);
99 if(type!=gate_invalid)
100 PG_RETURN_INT32(constants.GATE_TYPE_TO_OID[type]); ;
101
102 STARTWRITEM();
103 ADDWRITEM("t", char);
104 ADDWRITEM(token, pg_uuid_t);
105
107
108 if(!SENDWRITEM() || !READB(type, gate_type)) {
110 provsql_error("Cannot communicate on pipe (message type t)");
111 }
112
114
115 if(type==gate_invalid)
116 PG_RETURN_NULL();
117 else {
118 circuit_cache_create_gate(*token, type, 0, NULL);
119 PG_RETURN_INT32(constants.GATE_TYPE_TO_OID[type]);
120 }
121}
122
123PG_FUNCTION_INFO_V1(create_gate);
124/** @brief PostgreSQL-callable wrapper for create_gate(). */
125Datum create_gate(PG_FUNCTION_ARGS)
126{
127 pg_uuid_t *token = DatumGetUUIDP(PG_GETARG_DATUM(0));
128 Oid oid_type = PG_GETARG_INT32(1);
129 ArrayType *children = PG_ARGISNULL(2)?NULL:PG_GETARG_ARRAYTYPE_P(2);
130 unsigned nb_children = 0;
131 gate_type type = gate_invalid;
132 constants_t constants;
133 pg_uuid_t *children_data;
134
135 if(PG_ARGISNULL(0) || PG_ARGISNULL(1))
136 provsql_error("Invalid NULL value passed to create_gate");
137
138 if(children) {
139 if(ARR_NDIM(children) > 1)
140 provsql_error("Invalid multi-dimensional array passed to create_gate");
141 else if(ARR_NDIM(children) == 1)
142 nb_children = *ARR_DIMS(children);
143 }
144
145 constants=get_constants(true);
146
147 for(int i=0; i<nb_gate_types; ++i) {
148 if(constants.GATE_TYPE_TO_OID[i]==oid_type) {
149 type = i;
150 break;
151 }
152 }
153 if(type == gate_invalid) {
154 provsql_error("Invalid gate type");
155 }
156
157 if(nb_children>0)
158 children_data = (pg_uuid_t*) ARR_DATA_PTR(children);
159 else
160 children_data = NULL;
161
162 if(circuit_cache_create_gate(*token, type, nb_children, children_data))
163 PG_RETURN_VOID();
164
165 STARTWRITEM();
166 ADDWRITEM("C", char);
167 ADDWRITEM(token, pg_uuid_t);
168 ADDWRITEM(&type, gate_type);
169 ADDWRITEM(&nb_children, unsigned);
170
171 if(PIPE_BUF-bufferpos>nb_children*sizeof(pg_uuid_t)) {
172 // Enough space in the buffer for an atomic write, no need of
173 // exclusive locks
174
175 for(int i=0; i<nb_children; ++i)
176 ADDWRITEM(&children_data[i], pg_uuid_t);
177
179 if(!SENDWRITEM()) {
181 provsql_error("Cannot write to pipe (message type C)");
182 }
184 } else {
185 // Not enough space in buffer, pipe write won't be atomic, we need to
186 // make several writes and use locks
187 unsigned children_per_batch = PIPE_BUF/sizeof(pg_uuid_t);
188
190
191 if(!SENDWRITEM()) {
193 provsql_error("Cannot write to pipe (message type C)");
194 }
195
196 for(unsigned j=0; j<1+(nb_children-1)/children_per_batch; ++j) {
197 STARTWRITEM();
198
199 for(unsigned i=j*children_per_batch; i<(j+1)*children_per_batch && i<nb_children; ++i) {
200 ADDWRITEM(&children_data[i], pg_uuid_t);
201 }
202
203 if(!SENDWRITEM()) {
205 provsql_error("Cannot write to pipe (message type C)");
206 }
207 }
208
210 }
211
212 PG_RETURN_VOID();
213}
214
215PG_FUNCTION_INFO_V1(set_prob);
216/** @brief PostgreSQL-callable wrapper for set_prob(). */
217Datum set_prob(PG_FUNCTION_ARGS)
218{
219 pg_uuid_t *token = DatumGetUUIDP(PG_GETARG_DATUM(0));
220 double prob = PG_GETARG_FLOAT8(1);
221 char result;
222
223 if(PG_ARGISNULL(0) || PG_ARGISNULL(1))
224 provsql_error("Invalid NULL value passed to set_prob");
225
226 STARTWRITEM();
227 ADDWRITEM("P", char);
228 ADDWRITEM(token, pg_uuid_t);
229 ADDWRITEM(&prob, double);
230
232 if(!SENDWRITEM() || !READB(result, char)) {
234 provsql_error("Cannot write to pipe");
235 }
237
238 if(!result)
239 provsql_error("set_prob called on non-input gate");
240
241 PG_RETURN_VOID();
242}
243
244PG_FUNCTION_INFO_V1(set_infos);
245/** @brief PostgreSQL-callable wrapper for set_infos(). */
246Datum set_infos(PG_FUNCTION_ARGS)
247{
248 pg_uuid_t *token = DatumGetUUIDP(PG_GETARG_DATUM(0));
249 unsigned info1 = PG_GETARG_INT32(1);
250 unsigned info2 = PG_GETARG_INT32(2);
251
252
253 if(PG_ARGISNULL(1))
254 info1=0;
255 if(PG_ARGISNULL(2))
256 info2=0;
257
258 STARTWRITEM();
259 ADDWRITEM("I", char);
260 ADDWRITEM(token, pg_uuid_t);
261 ADDWRITEM(&info1, unsigned);
262 ADDWRITEM(&info2, unsigned);
263
265 if(!SENDWRITEM()) {
267 provsql_error("Cannot write to pipe (message type I)");
268 }
270
271 PG_RETURN_VOID();
272}
273
274PG_FUNCTION_INFO_V1(set_extra);
275/** @brief PostgreSQL-callable wrapper for set_extra(). */
276Datum set_extra(PG_FUNCTION_ARGS)
277{
278 pg_uuid_t *token = DatumGetUUIDP(PG_GETARG_DATUM(0));
279 text *data = PG_GETARG_TEXT_P(1);
280 char *str=text_to_cstring(data);
281 unsigned len=strlen(str);
282
283 STARTWRITEM();
284 ADDWRITEM("E", char);
285 ADDWRITEM(token, pg_uuid_t);
286 ADDWRITEM(&len, unsigned);
287
288 assert(PIPE_BUF-bufferpos>len);
289 memcpy(buffer+bufferpos, str, len), bufferpos+=len;
290 pfree(str);
291
293 if(!SENDWRITEM()) {
295 provsql_error("Cannot write to pipe (message type E)");
296 }
298
299 PG_RETURN_VOID();
300}
301
302PG_FUNCTION_INFO_V1(get_extra);
303/** @brief PostgreSQL-callable wrapper for get_extra(). */
304Datum get_extra(PG_FUNCTION_ARGS)
305{
306 pg_uuid_t *token = DatumGetUUIDP(PG_GETARG_DATUM(0));
307 text *result;
308 unsigned len;
309
310 if(PG_ARGISNULL(0))
311 PG_RETURN_NULL();
312
313 STARTWRITEM();
314 ADDWRITEM("e", char);
315 ADDWRITEM(token, pg_uuid_t);
316
318
319 if(!SENDWRITEM() || !READB(len, unsigned)) {
321 provsql_error("Cannot communicate with pipe (message type e)");
322 }
323
324 result = palloc(len + VARHDRSZ);
325 SET_VARSIZE(result, VARHDRSZ + len);
326
327 if(read(provsql_shared_state->pipembr, VARDATA(result), len)<len) {
329 provsql_error("Cannot communicate with pipe (message type e)");
330 }
331
333
334 PG_RETURN_TEXT_P(result);
335}
336
337PG_FUNCTION_INFO_V1(get_nb_gates);
338/** @brief PostgreSQL-callable wrapper for get_nb_gates(). */
339Datum get_nb_gates(PG_FUNCTION_ARGS)
340{
341 unsigned long nb;
342
344
345 if(!WRITEM("n", char) || !READB(nb, unsigned long)) {
347 provsql_error("Cannot communicate with pipe (message type n)");
348 }
349
351
352 PG_RETURN_INT64((long) nb);
353}
354
355PG_FUNCTION_INFO_V1(get_children);
356/** @brief PostgreSQL-callable wrapper for get_children(). */
357Datum get_children(PG_FUNCTION_ARGS)
358{
359 pg_uuid_t *token = DatumGetUUIDP(PG_GETARG_DATUM(0));
360 ArrayType *result = NULL;
361 unsigned nb_children;
362 pg_uuid_t *children;
363 Datum *children_ptr;
364 constants_t constants;
365
366 if(PG_ARGISNULL(0))
367 PG_RETURN_NULL();
368
369 nb_children = circuit_cache_get_children(*token, &children);
370
371 if(!children) {
372 STARTWRITEM();
373 ADDWRITEM("c", char);
374 ADDWRITEM(token, pg_uuid_t);
375
377
378 if(!SENDWRITEM()) {
380 provsql_error("Cannot write to pipe (message type c)");
381 }
382
383 if(!READB(nb_children, unsigned)) {
385 provsql_error("Cannot read response from pipe (message type c)");
386 }
387
388 children=calloc(nb_children, sizeof(pg_uuid_t));
389
390 {
391 char *p = (char*)children;
392 ssize_t actual_read, remaining_size=nb_children*sizeof(pg_uuid_t);
393 while((actual_read=read(provsql_shared_state->pipembr, p, remaining_size))<remaining_size) {
394 if(actual_read<=0) {
396 provsql_error("Cannot read from pipe (message type c)");
397 } else {
398 remaining_size-=actual_read;
399 p+=actual_read;
400 }
401 }
402 }
404
405 circuit_cache_create_gate(*token, gate_invalid, nb_children, children);
406 }
407
408 children_ptr = palloc(nb_children * sizeof(Datum));
409 for(unsigned i=0; i<nb_children; ++i)
410 children_ptr[i] = UUIDPGetDatum(&children[i]);
411
412 constants=get_constants(true);
413 result = construct_array(
414 children_ptr,
415 nb_children,
416 constants.OID_TYPE_UUID,
417 16,
418 false,
419 'c');
420 pfree(children_ptr);
421 free(children);
422
423 PG_RETURN_ARRAYTYPE_P(result);
424}
425
426PG_FUNCTION_INFO_V1(get_prob);
427/** @brief PostgreSQL-callable wrapper for get_prob(). */
428Datum get_prob(PG_FUNCTION_ARGS)
429{
430 pg_uuid_t *token = DatumGetUUIDP(PG_GETARG_DATUM(0));
431 double result;
432
433 if(PG_ARGISNULL(0))
434 PG_RETURN_NULL();
435
436 STARTWRITEM();
437 ADDWRITEM("p", char);
438 ADDWRITEM(token, pg_uuid_t);
439
441
442 if(!SENDWRITEM() || !READB(result, double)) {
444 provsql_error("Cannot communicate with pipe (message type p)");
445 }
446
448
449 if(isnan(result))
450 PG_RETURN_NULL();
451 else
452 PG_RETURN_FLOAT8(result);
453}
454
455PG_FUNCTION_INFO_V1(get_infos);
456/** @brief PostgreSQL-callable wrapper for get_infos(). */
457Datum get_infos(PG_FUNCTION_ARGS)
458{
459 pg_uuid_t *token = DatumGetUUIDP(PG_GETARG_DATUM(0));
460 unsigned info1 =0, info2 = 0;
461
462 if(PG_ARGISNULL(0))
463 PG_RETURN_NULL();
464
465 STARTWRITEM();
466 ADDWRITEM("i", char);
467 ADDWRITEM(token, pg_uuid_t);
468
470
471 if(!SENDWRITEM() || !READB(info1, int) || !READB(info2, int)) {
473 provsql_error("Cannot communicate with pipe (message type i)");
474 }
475
477
478 if(info1 == 0 && info2 == 0)
479 PG_RETURN_NULL();
480 else {
481 TupleDesc tupdesc;
482 Datum values[2];
483 bool nulls[2] = {false, false};
484
485 get_call_result_type(fcinfo,NULL,&tupdesc);
486 tupdesc = BlessTupleDesc(tupdesc);
487
488 values[0] = Int32GetDatum(info1);
489 values[1] = Int32GetDatum(info2);
490
491 PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
492 }
493}
void destroy_provsql_mmap()
Unmap and close the mmap files.
void provsql_mmap_main_loop()
Main processing loop of the mmap background worker.
void initialize_provsql_mmap()
Open (or create) the mmap files and initialise the circuit store.
C-linkage interface to the in-process provenance circuit cache.
gate_type circuit_cache_get_type(pg_uuid_t token)
Retrieve the type of a cached gate.
unsigned circuit_cache_get_children(pg_uuid_t token, pg_uuid_t **children)
Retrieve the children of a cached gate.
bool circuit_cache_create_gate(pg_uuid_t token, gate_type type, unsigned nb_children, pg_uuid_t *children)
Insert a new gate into the circuit cache.
#define provsql_error(fmt,...)
Report a fatal ProvSQL error and abort the current transaction.
#define provsql_log(fmt,...)
Write a ProvSQL message to the server log only.
Datum get_infos(PG_FUNCTION_ARGS)
PostgreSQL-callable wrapper for get_infos().
Datum set_extra(PG_FUNCTION_ARGS)
PostgreSQL-callable wrapper for set_extra().
Datum get_nb_gates(PG_FUNCTION_ARGS)
PostgreSQL-callable wrapper for get_nb_gates().
Datum create_gate(PG_FUNCTION_ARGS)
PostgreSQL-callable wrapper for create_gate().
__attribute__((visibility("default")))
Background worker entry point: initialises the mmap circuit store and runs the main loop.
Datum set_infos(PG_FUNCTION_ARGS)
PostgreSQL-callable wrapper for set_infos().
Datum get_gate_type(PG_FUNCTION_ARGS)
PostgreSQL-callable wrapper for get_gate_type().
char buffer[PIPE_BUF]
Shared write buffer used with STARTWRITEM / ADDWRITEM / SENDWRITEM.
Datum get_children(PG_FUNCTION_ARGS)
PostgreSQL-callable wrapper for get_children().
Datum get_extra(PG_FUNCTION_ARGS)
PostgreSQL-callable wrapper for get_extra().
Datum get_prob(PG_FUNCTION_ARGS)
PostgreSQL-callable wrapper for get_prob().
unsigned bufferpos
Current write position within buffer.
void RegisterProvSQLMMapWorker(void)
Register the ProvSQL mmap background worker with PostgreSQL.
Datum set_prob(PG_FUNCTION_ARGS)
PostgreSQL-callable wrapper for set_prob().
Background worker and IPC primitives for mmap-backed circuit storage.
void provsql_mmap_worker(Datum)
Entry point for the ProvSQL mmap background worker.
#define WRITEM(pvar, type)
Write one value of type to the background-to-main pipe.
#define READB(var, type)
Read one value of type from the main-to-background pipe.
#define STARTWRITEM()
Reset the shared write buffer for a new batched write.
#define ADDWRITEM(pvar, type)
Append one value of type to the shared write buffer.
#define SENDWRITEM()
Flush the shared write buffer to the background-to-main pipe atomically.
void provsql_shmem_unlock(void)
Release the ProvSQL LWLock.
void provsql_shmem_lock_exclusive(void)
Acquire the ProvSQL LWLock in exclusive mode.
provsqlSharedState * provsql_shared_state
Pointer to the ProvSQL shared-memory segment (set in provsql_shmem_startup).
void provsql_shmem_lock_shared(void)
Acquire the ProvSQL LWLock in shared mode.
Shared-memory segment and inter-process pipe management.
constants_t get_constants(bool failure_if_not_possible)
Retrieve the cached OID constants for the current database.
Core types, constants, and utilities shared across ProvSQL.
gate_type
Possible gate type in the provenance circuit.
@ gate_invalid
Invalid gate type.
@ nb_gate_types
Total number of gate types.
Structure to store the value of various constants.
Oid GATE_TYPE_TO_OID[nb_gate_types]
Array of the OID of each provenance_gate ENUM value.
Oid OID_TYPE_UUID
OID of the uuid TYPE.
UUID structure.
long pipembr
Main-to-background pipe: read end (worker reads)
long pipebmw
Background-to-main pipe: write end (worker writes)