ProvSQL C/C++ API
Adding support for provenance and uncertainty management to PostgreSQL databases
Loading...
Searching...
No Matches
provsql_mmap.c
Go to the documentation of this file.
1/**
2 * @file provsql_mmap.c
3 * @brief Background worker registration and IPC primitives for mmap-backed storage.
4 *
5 * Implements the PostgreSQL background worker lifecycle functions declared
6 * in @c provsql_mmap.h:
7 * - @c RegisterProvSQLMMapWorker(): registers the worker with the postmaster
8 * during @c _PG_init().
9 * - @c provsql_mmap_worker(): worker entry point; sets up signal handlers
10 * and enters @c provsql_mmap_main_loop().
11 *
12 * The IPC between normal backends and the background worker is handled in
13 * @c MMappedCircuit.cpp. This file provides the PostgreSQL-specific glue
14 * (background worker API, signal handling).
15 *
16 * Also declares the shared write buffer @c buffer[] and position counter
17 * @c bufferpos used by the @c STARTWRITEM / @c ADDWRITEM / @c SENDWRITEM
18 * macros in @c provsql_mmap.h.
19 *
20 * The gate-creation SQL functions (e.g. @c create_gate()) that backends
21 * call are also implemented here; they acquire the IPC lock, write a
22 * message to the background worker, and wait for an acknowledgment.
23 */
24#include "provsql_mmap.h"
25#include "provsql_shmem.h"
26#include "provsql_utils.h"
27#include "MMappedTableInfo.h"
28
29#include <errno.h>
30#include <unistd.h>
31#include <poll.h>
32#include <math.h>
33#include <assert.h>
34
35#include "postgres.h"
36#include "postmaster/bgworker.h"
37#include "catalog/pg_type.h" /* INT2OID etc. -- the _d.h variant
38 only exists from PG 11 onwards */
39#include "fmgr.h"
40#include "funcapi.h"
41#include "utils/array.h"
42#include "access/htup_details.h"
43#include "utils/builtins.h"
44#include "utils/inval.h"
45#include "utils/syscache.h"
46
47#include "circuit_cache.h"
48
49char buffer[PIPE_BUF]={}; // flawfinder: ignore
50unsigned bufferpos=0;
51
52PGDLLEXPORT void provsql_mmap_worker(Datum ignored)
53{
54 BackgroundWorkerUnblockSignals();
56 close(provsql_shared_state->pipebmw);
57 close(provsql_shared_state->pipembr);
58 provsql_log("%s initialized", MyBgworkerEntry->bgw_name);
59
61
63}
64
66{
67 BackgroundWorker worker;
68
69 snprintf(worker.bgw_name, BGW_MAXLEN, "ProvSQL MMap Worker");
70#if PG_VERSION_NUM >= 110000
71 snprintf(worker.bgw_type, BGW_MAXLEN, "ProvSQL MMap");
72#endif
73
74 worker.bgw_flags = BGWORKER_SHMEM_ACCESS;
75 worker.bgw_start_time = BgWorkerStart_PostmasterStart;
76 worker.bgw_restart_time = 1;
77
78 snprintf(worker.bgw_library_name, BGW_MAXLEN, "provsql");
79 snprintf(worker.bgw_function_name, BGW_MAXLEN, "provsql_mmap_worker");
80#if PG_VERSION_NUM < 100000
81 worker.bgw_main = NULL;
82#endif
83
84 worker.bgw_main_arg = (Datum) 0;
85 worker.bgw_notify_pid = 0;
86
87 RegisterBackgroundWorker(&worker);
88}
89
90PG_FUNCTION_INFO_V1(get_gate_type);
91/** @brief PostgreSQL-callable wrapper for get_gate_type().
92 *
93 * On cache miss this fetches BOTH the gate type and its children from
94 * the worker, in one critical section, then caches them together. If
95 * we cached only the type (with an empty children list), a subsequent
96 * get_children() call for the same token would consult the cache, find
97 * the entry, and return 0 children : never querying the worker for the
98 * real children. provsql.provenance_evaluate hits exactly that pattern
99 * (it calls get_gate_type first, then unnest(get_children(...))) and
100 * silently folds plus/times gates over an empty set.
101 */
102Datum get_gate_type(PG_FUNCTION_ARGS)
103{
104 pg_uuid_t *token = DatumGetUUIDP(PG_GETARG_DATUM(0));
105 gate_type type;
106 constants_t constants=get_constants(true);
107 unsigned nb_children = 0;
108 pg_uuid_t *children = NULL;
109
110 if(PG_ARGISNULL(0))
111 PG_RETURN_NULL();
112
113 type = circuit_cache_get_type(*token);
114 if(type!=gate_invalid)
115 PG_RETURN_INT32(constants.GATE_TYPE_TO_OID[type]); ;
116
117 /* Type fetch (message 't'). */
118 STARTWRITEM();
119 ADDWRITEM("t", char);
120 ADDWRITEM(&MyDatabaseId, Oid);
121 ADDWRITEM(token, pg_uuid_t);
122
124
125 if(!SENDWRITEM() || !READB(type, gate_type)) {
127 provsql_error("Cannot communicate on pipe (message type t)");
128 }
129
130 /* Children fetch (message 'c'), batched in the same critical
131 * section so the cache entry below is complete. Skipped when the
132 * token is unknown (worker reports gate_invalid). */
133 if(type != gate_invalid) {
134 STARTWRITEM();
135 ADDWRITEM("c", char);
136 ADDWRITEM(&MyDatabaseId, Oid);
137 ADDWRITEM(token, pg_uuid_t);
138
139 if(!SENDWRITEM() || !READB(nb_children, unsigned)) {
141 provsql_error("Cannot communicate on pipe (message type c during get_gate_type)");
142 }
143
144 if(nb_children > 0) {
145 char *p;
146 ssize_t actual_read, remaining_size;
147
148 children = calloc(nb_children, sizeof(pg_uuid_t));
149 p = (char*)children;
150 remaining_size = nb_children * sizeof(pg_uuid_t);
151 while((actual_read = read(provsql_shared_state->pipembr, p, remaining_size)) < remaining_size) {
152 if(actual_read <= 0) {
154 provsql_error("Cannot read children from pipe (during get_gate_type)");
155 } else {
156 remaining_size -= actual_read;
157 p += actual_read;
158 }
159 }
160 }
161 }
162
164
165 /* Skip caching the gate_input lazy default: MMappedCircuit::getGateType
166 * returns gate_input both for real input gates and for tokens that are
167 * not yet in the mapping. Caching the latter would poison subsequent
168 * create_gate() calls in this session (the cache hit would short-circuit
169 * the worker IPC, dropping the gate). The cost is one extra IPC per
170 * lookup of a real input gate -- acceptable. */
171 if(!(type == gate_input && nb_children == 0))
172 circuit_cache_create_gate(*token, type, nb_children, children);
173 if(children) free(children);
174 PG_RETURN_INT32(constants.GATE_TYPE_TO_OID[type]);
175}
176
177PG_FUNCTION_INFO_V1(create_gate);
178/** @brief PostgreSQL-callable wrapper for create_gate(). */
179Datum create_gate(PG_FUNCTION_ARGS)
180{
181 pg_uuid_t *token = DatumGetUUIDP(PG_GETARG_DATUM(0));
182 Oid oid_type = PG_GETARG_INT32(1);
183 ArrayType *children = PG_ARGISNULL(2)?NULL:PG_GETARG_ARRAYTYPE_P(2);
184 unsigned nb_children = 0;
185 gate_type type = gate_invalid;
186 constants_t constants;
187 pg_uuid_t *children_data;
188
189 if(PG_ARGISNULL(0) || PG_ARGISNULL(1))
190 provsql_error("Invalid NULL value passed to create_gate");
191
192 if(children) {
193 if(ARR_NDIM(children) > 1)
194 provsql_error("Invalid multi-dimensional array passed to create_gate");
195 else if(ARR_NDIM(children) == 1)
196 nb_children = *ARR_DIMS(children);
197 }
198
199 constants=get_constants(true);
200
201 for(int i=0; i<nb_gate_types; ++i) {
202 if(constants.GATE_TYPE_TO_OID[i]==oid_type) {
203 type = i;
204 break;
205 }
206 }
207 if(type == gate_invalid) {
208 provsql_error("Invalid gate type");
209 }
210
211 if(nb_children>0)
212 children_data = (pg_uuid_t*) ARR_DATA_PTR(children);
213 else
214 children_data = NULL;
215
216 /* Populate the per-session cache, but unconditionally fall through to
217 * the worker IPC: a cache hit only proves "this token has been seen
218 * in this session before" (e.g. by get_gate_type returning the
219 * gate_input lazy default for an unknown token) -- not "the worker
220 * already has a gate for it". Skipping the IPC on a cache hit caused
221 * silently-dropped create_gate calls under concurrent backends.
222 * MMappedCircuit::createGate is idempotent on already-mapped tokens. */
223 circuit_cache_create_gate(*token, type, nb_children, children_data);
224
225 STARTWRITEM();
226 ADDWRITEM("C", char);
227 ADDWRITEM(&MyDatabaseId, Oid);
228 ADDWRITEM(token, pg_uuid_t);
229 ADDWRITEM(&type, gate_type);
230 ADDWRITEM(&nb_children, unsigned);
231
232 if(PIPE_BUF-bufferpos>nb_children*sizeof(pg_uuid_t)) {
233 // Enough space in the buffer for an atomic write, no need of
234 // exclusive locks
235
236 for(int i=0; i<nb_children; ++i)
237 ADDWRITEM(&children_data[i], pg_uuid_t);
238
240 if(!SENDWRITEM()) {
242 provsql_error("Cannot write to pipe (message type C)");
243 }
245 } else {
246 // Not enough space in buffer, pipe write won't be atomic, we need to
247 // make several writes and use locks
248 unsigned children_per_batch = PIPE_BUF/sizeof(pg_uuid_t);
249
251
252 if(!SENDWRITEM()) {
254 provsql_error("Cannot write to pipe (message type C)");
255 }
256
257 for(unsigned j=0; j<1+(nb_children-1)/children_per_batch; ++j) {
258 STARTWRITEM();
259
260 for(unsigned i=j*children_per_batch; i<(j+1)*children_per_batch && i<nb_children; ++i) {
261 ADDWRITEM(&children_data[i], pg_uuid_t);
262 }
263
264 if(!SENDWRITEM()) {
266 provsql_error("Cannot write to pipe (message type C)");
267 }
268 }
269
271 }
272
273 PG_RETURN_VOID();
274}
275
276PG_FUNCTION_INFO_V1(set_prob);
277/** @brief PostgreSQL-callable wrapper for set_prob(). */
278Datum set_prob(PG_FUNCTION_ARGS)
279{
280 pg_uuid_t *token = DatumGetUUIDP(PG_GETARG_DATUM(0));
281 double prob = PG_GETARG_FLOAT8(1);
282 char result;
283
284 if(PG_ARGISNULL(0) || PG_ARGISNULL(1))
285 provsql_error("Invalid NULL value passed to set_prob");
286
287 STARTWRITEM();
288 ADDWRITEM("P", char);
289 ADDWRITEM(&MyDatabaseId, Oid);
290 ADDWRITEM(token, pg_uuid_t);
291 ADDWRITEM(&prob, double);
292
294 if(!SENDWRITEM() || !READB(result, char)) {
296 provsql_error("Cannot write to pipe");
297 }
299
300 if(!result)
301 provsql_error("set_prob called on non-input gate");
302
303 PG_RETURN_VOID();
304}
305
306PG_FUNCTION_INFO_V1(set_infos);
307/** @brief PostgreSQL-callable wrapper for set_infos(). */
308Datum set_infos(PG_FUNCTION_ARGS)
309{
310 pg_uuid_t *token = DatumGetUUIDP(PG_GETARG_DATUM(0));
311 unsigned info1 = PG_GETARG_INT32(1);
312 unsigned info2 = PG_GETARG_INT32(2);
313
314
315 if(PG_ARGISNULL(1))
316 info1=0;
317 if(PG_ARGISNULL(2))
318 info2=0;
319
320 STARTWRITEM();
321 ADDWRITEM("I", char);
322 ADDWRITEM(&MyDatabaseId, Oid);
323 ADDWRITEM(token, pg_uuid_t);
324 ADDWRITEM(&info1, unsigned);
325 ADDWRITEM(&info2, unsigned);
326
328 if(!SENDWRITEM()) {
330 provsql_error("Cannot write to pipe (message type I)");
331 }
333
334 PG_RETURN_VOID();
335}
336
337PG_FUNCTION_INFO_V1(set_extra);
338/** @brief PostgreSQL-callable wrapper for set_extra(). */
339Datum set_extra(PG_FUNCTION_ARGS)
340{
341 pg_uuid_t *token = DatumGetUUIDP(PG_GETARG_DATUM(0));
342 text *data = PG_GETARG_TEXT_P(1);
343 char *str=text_to_cstring(data);
344 unsigned len=strlen(str);
345
346 STARTWRITEM();
347 ADDWRITEM("E", char);
348 ADDWRITEM(&MyDatabaseId, Oid);
349 ADDWRITEM(token, pg_uuid_t);
350 ADDWRITEM(&len, unsigned);
351
352 assert(PIPE_BUF-bufferpos>len);
353 memcpy(buffer+bufferpos, str, len), bufferpos+=len;
354 pfree(str);
355
357 if(!SENDWRITEM()) {
359 provsql_error("Cannot write to pipe (message type E)");
360 }
362
363 PG_RETURN_VOID();
364}
365
366PG_FUNCTION_INFO_V1(get_extra);
367/** @brief PostgreSQL-callable wrapper for get_extra(). */
368Datum get_extra(PG_FUNCTION_ARGS)
369{
370 pg_uuid_t *token = DatumGetUUIDP(PG_GETARG_DATUM(0));
371 text *result;
372 unsigned len;
373
374 if(PG_ARGISNULL(0))
375 PG_RETURN_NULL();
376
377 STARTWRITEM();
378 ADDWRITEM("e", char);
379 ADDWRITEM(&MyDatabaseId, Oid);
380 ADDWRITEM(token, pg_uuid_t);
381
383
384 if(!SENDWRITEM() || !READB(len, unsigned)) {
386 provsql_error("Cannot communicate with pipe (message type e)");
387 }
388
389 result = palloc(len + VARHDRSZ);
390 SET_VARSIZE(result, VARHDRSZ + len);
391
392 if(read(provsql_shared_state->pipembr, VARDATA(result), len)<len) {
394 provsql_error("Cannot communicate with pipe (message type e)");
395 }
396
398
399 PG_RETURN_TEXT_P(result);
400}
401
402PG_FUNCTION_INFO_V1(get_nb_gates);
403/** @brief PostgreSQL-callable wrapper for get_nb_gates(). */
404Datum get_nb_gates(PG_FUNCTION_ARGS)
405{
406 unsigned long nb;
407
408 STARTWRITEM();
409 ADDWRITEM("n", char);
410 ADDWRITEM(&MyDatabaseId, Oid);
411
413
414 if(!SENDWRITEM() || !READB(nb, unsigned long)) {
416 provsql_error("Cannot communicate with pipe (message type n)");
417 }
418
420
421 PG_RETURN_INT64((long) nb);
422}
423
424PG_FUNCTION_INFO_V1(get_children);
425/** @brief PostgreSQL-callable wrapper for get_children(). */
426Datum get_children(PG_FUNCTION_ARGS)
427{
428 pg_uuid_t *token = DatumGetUUIDP(PG_GETARG_DATUM(0));
429 ArrayType *result = NULL;
430 unsigned nb_children;
431 pg_uuid_t *children;
432 Datum *children_ptr;
433 constants_t constants;
434
435 if(PG_ARGISNULL(0))
436 PG_RETURN_NULL();
437
438 nb_children = circuit_cache_get_children(*token, &children);
439
440 if(!children) {
441 STARTWRITEM();
442 ADDWRITEM("c", char);
443 ADDWRITEM(&MyDatabaseId, Oid);
444 ADDWRITEM(token, pg_uuid_t);
445
447
448 if(!SENDWRITEM()) {
450 provsql_error("Cannot write to pipe (message type c)");
451 }
452
453 if(!READB(nb_children, unsigned)) {
455 provsql_error("Cannot read response from pipe (message type c)");
456 }
457
458 children=calloc(nb_children, sizeof(pg_uuid_t));
459
460 {
461 char *p = (char*)children;
462 ssize_t actual_read, remaining_size=nb_children*sizeof(pg_uuid_t);
463 while((actual_read=read(provsql_shared_state->pipembr, p, remaining_size))<remaining_size) {
464 if(actual_read<=0) {
466 provsql_error("Cannot read from pipe (message type c)");
467 } else {
468 remaining_size-=actual_read;
469 p+=actual_read;
470 }
471 }
472 }
474
475 /* Skip caching when the worker reports zero children: we cannot
476 * distinguish a real zero-child gate (input/zero/one/...) from a
477 * token unknown to the worker, and caching the latter poisons
478 * subsequent create_gate() calls in this session. */
479 if(nb_children > 0)
480 circuit_cache_create_gate(*token, gate_invalid, nb_children, children);
481 }
482
483 children_ptr = palloc(nb_children * sizeof(Datum));
484 for(unsigned i=0; i<nb_children; ++i)
485 children_ptr[i] = UUIDPGetDatum(&children[i]);
486
487 constants=get_constants(true);
488 result = construct_array(
489 children_ptr,
490 nb_children,
491 constants.OID_TYPE_UUID,
492 16,
493 false,
494 'c');
495 pfree(children_ptr);
496 free(children);
497
498 PG_RETURN_ARRAYTYPE_P(result);
499}
500
501PG_FUNCTION_INFO_V1(get_prob);
502/** @brief PostgreSQL-callable wrapper for get_prob(). */
503Datum get_prob(PG_FUNCTION_ARGS)
504{
505 pg_uuid_t *token = DatumGetUUIDP(PG_GETARG_DATUM(0));
506 double result;
507
508 if(PG_ARGISNULL(0))
509 PG_RETURN_NULL();
510
511 STARTWRITEM();
512 ADDWRITEM("p", char);
513 ADDWRITEM(&MyDatabaseId, Oid);
514 ADDWRITEM(token, pg_uuid_t);
515
517
518 if(!SENDWRITEM() || !READB(result, double)) {
520 provsql_error("Cannot communicate with pipe (message type p)");
521 }
522
524
525 if(isnan(result))
526 PG_RETURN_NULL();
527 else
528 PG_RETURN_FLOAT8(result);
529}
530
531/** @brief Translate a SQL-side kind label into the persisted enum value. */
532static uint8_t parse_table_kind(const char *label)
533{
534 if(strcmp(label, "tid") == 0) return PROVSQL_TABLE_TID;
535 if(strcmp(label, "bid") == 0) return PROVSQL_TABLE_BID;
536 if(strcmp(label, "opaque") == 0) return PROVSQL_TABLE_OPAQUE;
537 provsql_error("set_table_info: unknown table kind '%s' (expected "
538 "'tid', 'bid', or 'opaque')", label);
539 return PROVSQL_TABLE_TID; /* unreachable */
540}
541
542/** @brief Inverse of @c parse_table_kind for use by @c get_table_info. */
543static const char *table_kind_label(uint8_t kind)
544{
545 switch(kind) {
546 case PROVSQL_TABLE_TID: return "tid";
547 case PROVSQL_TABLE_BID: return "bid";
548 case PROVSQL_TABLE_OPAQUE: return "opaque";
549 }
550 provsql_error("get_table_info: unknown table kind value %u", kind);
551 return NULL; /* unreachable */
552}
553
554PG_FUNCTION_INFO_V1(set_table_info);
555/**
556 * @brief PostgreSQL-callable wrapper for setTableInfo() over the IPC pipe.
557 *
558 * Stores per-relation provenance metadata used by the safe-query
559 * optimisation. @p relid is the @c pg_class OID of the relation;
560 * @p kind is one of the textual labels @c 'tid' / @c 'bid' /
561 * @c 'opaque' (see @c provsql_table_kind in @c MMappedTableInfo.h);
562 * @p block_key is an @c int2 array (possibly empty) listing the
563 * block-key column numbers when @p kind is @c 'bid'.
564 */
565Datum set_table_info(PG_FUNCTION_ARGS)
566{
567 Oid relid;
568 text *kind_text;
569 char *kind_str;
570 uint8_t kind;
571 ArrayType *block_key;
572 uint16 block_key_n = 0;
573 int16 *block_key_data = NULL;
574 Size payload_size;
575
576 if(PG_ARGISNULL(0) || PG_ARGISNULL(1))
577 provsql_error("Invalid NULL value passed to set_table_info");
578
579 relid = PG_GETARG_OID(0);
580 kind_text = PG_GETARG_TEXT_PP(1);
581 kind_str = text_to_cstring(kind_text);
582 kind = parse_table_kind(kind_str);
583 pfree(kind_str);
584 block_key = PG_ARGISNULL(2) ? NULL : PG_GETARG_ARRAYTYPE_P(2);
585
586 if(block_key) {
587 if(ARR_NDIM(block_key) > 1)
588 provsql_error("Invalid multi-dimensional array passed to set_table_info");
589 else if(ARR_NDIM(block_key) == 1)
590 block_key_n = *ARR_DIMS(block_key);
591 if(block_key_n > 0)
592 block_key_data = (int16 *) ARR_DATA_PTR(block_key);
593 }
594
595 if(block_key_n > PROVSQL_TABLE_INFO_MAX_BLOCK_KEY)
596 provsql_error("set_table_info: block key wider than %d columns "
597 "(%u given) is not supported",
599
600 payload_size = sizeof(char) + sizeof(Oid) + sizeof(Oid) + sizeof(uint8)
601 + sizeof(uint16) + block_key_n * sizeof(int16);
602 if(payload_size > PIPE_BUF)
603 provsql_error("set_table_info: IPC payload exceeds PIPE_BUF");
604
605 STARTWRITEM();
606 ADDWRITEM("T", char);
607 ADDWRITEM(&MyDatabaseId, Oid);
608 ADDWRITEM(&relid, Oid);
609 ADDWRITEM(&kind, uint8);
610 ADDWRITEM(&block_key_n, uint16);
611 for(uint16 i = 0; i < block_key_n; ++i)
612 ADDWRITEM(&block_key_data[i], int16);
613
615 if(!SENDWRITEM()) {
617 provsql_error("Cannot write to pipe (message type T)");
618 }
620
621 /* Broadcast a relcache invalidation so every backend re-fetches on
622 * next access. Standard DDL (the ALTER TABLE in add_provenance and
623 * repair_key) already does this, but set_table_info is also called
624 * from DML paths that do not (INSERT INTO T SELECT, UPDATE under
625 * provsql.update_provenance, ...) and the upgrade-script backfill
626 * runs outside any DDL on the target relation. Guarded by a
627 * pg_class existence check so the sql_drop event-trigger path,
628 * which calls remove_table_info on a relid that has just been
629 * deleted from pg_class, does not raise "cache lookup failed". */
630 if(SearchSysCacheExists1(RELOID, ObjectIdGetDatum(relid)))
631 CacheInvalidateRelcacheByRelid(relid);
632
633 PG_RETURN_VOID();
634}
635
636PG_FUNCTION_INFO_V1(remove_table_info);
637/** @brief PostgreSQL-callable wrapper for removeTableInfo() over the IPC pipe. */
638Datum remove_table_info(PG_FUNCTION_ARGS)
639{
640 Oid relid;
641
642 if(PG_ARGISNULL(0))
643 provsql_error("Invalid NULL value passed to remove_table_info");
644
645 relid = PG_GETARG_OID(0);
646
647 STARTWRITEM();
648 ADDWRITEM("D", char);
649 ADDWRITEM(&MyDatabaseId, Oid);
650 ADDWRITEM(&relid, Oid);
651
653 if(!SENDWRITEM()) {
655 provsql_error("Cannot write to pipe (message type D)");
656 }
658
659 /* Same guard as set_table_info: skip the broadcast when the relation
660 * is already gone (typical for the sql_drop event-trigger path,
661 * where pg_class no longer has a row for this OID). */
662 if(SearchSysCacheExists1(RELOID, ObjectIdGetDatum(relid)))
663 CacheInvalidateRelcacheByRelid(relid);
664
665 PG_RETURN_VOID();
666}
667
668/**
669 * @brief C-callable IPC fetch for per-table provenance metadata.
670 *
671 * Sends an @c 's' message to the background worker and reads back the
672 * response. No caching: every call hits the worker. Use
673 * @c provsql_lookup_table_info() for the cached, planner-hot-path
674 * variant.
675 *
676 * @param relid pg_class OID of the relation to look up.
677 * @param out On success, filled with the stored record.
678 * @return @c true if the worker returned a record, @c false otherwise.
679 */
681{
682 char found;
683
684 STARTWRITEM();
685 ADDWRITEM("s", char);
686 ADDWRITEM(&MyDatabaseId, Oid);
687 ADDWRITEM(&relid, Oid);
688
690
691 if(!SENDWRITEM() || !READB(found, char)) {
693 provsql_error("Cannot communicate with pipe (message type s)");
694 }
695 if(found) {
696 if(!READB(out->kind, uint8_t) || !READB(out->block_key_n, uint16)) {
698 provsql_error("Cannot communicate with pipe (message type s)");
699 }
702 provsql_error("provsql_fetch_table_info: server returned an unexpectedly wide block key");
703 }
704 for(uint16 i = 0; i < out->block_key_n; ++i)
705 if(!READB(out->block_key[i], AttrNumber)) {
707 provsql_error("Cannot communicate with pipe (message type s)");
708 }
709 out->relid = relid;
710 }
711
713 return found != 0;
714}
715
716PG_FUNCTION_INFO_V1(get_table_info);
717/**
718 * @brief PostgreSQL-callable wrapper around the cached table-info lookup.
719 *
720 * Returns @c NULL when no record exists for @p relid; otherwise a
721 * record @c (kind text, block_key int2[]) where @c kind is one of
722 * @c 'tid' / @c 'bid' / @c 'opaque'. Goes through
723 * @c provsql_lookup_table_info so repeated calls in the same session
724 * do not pay for IPC.
725 */
726Datum get_table_info(PG_FUNCTION_ARGS)
727{
728 Oid relid;
730 TupleDesc tupdesc;
731 Datum values[2];
732 bool nulls[2] = {false, false};
733 Datum *elems;
734 ArrayType *arr;
735
736 if(PG_ARGISNULL(0))
737 PG_RETURN_NULL();
738
739 relid = PG_GETARG_OID(0);
740
741 if(!provsql_lookup_table_info(relid, &info))
742 PG_RETURN_NULL();
743
744 if(get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
745 provsql_error("get_table_info: expected composite return type");
746 tupdesc = BlessTupleDesc(tupdesc);
747
748 values[0] = CStringGetTextDatum(table_kind_label(info.kind));
749
750 elems = palloc(info.block_key_n * sizeof(Datum));
751 for(uint16 i = 0; i < info.block_key_n; ++i)
752 elems[i] = Int16GetDatum(info.block_key[i]);
753 arr = construct_array(elems, info.block_key_n, INT2OID, 2, true, 's');
754 pfree(elems);
755 values[1] = PointerGetDatum(arr);
756
757 PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
758}
759
760PG_FUNCTION_INFO_V1(set_ancestors);
761/**
762 * @brief PostgreSQL-callable wrapper for setTableAncestry() over the
763 * IPC pipe.
764 *
765 * Records the base-relation ancestor set of a tracked relation.
766 * @p relid is the @c pg_class OID of the relation; @p ancestors is
767 * an @c oid[] (possibly empty) listing the base @c add_provenance /
768 * @c repair_key relations this one's atoms ultimately come from.
769 * The worker preserves the relation's existing @c kind / @c
770 * block_key half on update.
771 *
772 * Silently no-op on the worker side when @p relid has no kind
773 * record yet -- the safe-query rewriter only consults ancestry
774 * for tracked relations, so callers should run
775 * @c add_provenance / @c repair_key / @c set_table_info first.
776 */
777Datum set_ancestors(PG_FUNCTION_ARGS)
778{
779 Oid relid;
780 ArrayType *ancestors;
781 uint16 ancestor_n = 0;
782 Oid *ancestor_data = NULL;
783 Size payload_size;
784
785 if(PG_ARGISNULL(0))
786 provsql_error("Invalid NULL value passed to set_ancestors");
787
788 relid = PG_GETARG_OID(0);
789 ancestors = PG_ARGISNULL(1) ? NULL : PG_GETARG_ARRAYTYPE_P(1);
790
791 if(ancestors) {
792 if(ARR_NDIM(ancestors) > 1)
793 provsql_error("Invalid multi-dimensional array passed to set_ancestors");
794 else if(ARR_NDIM(ancestors) == 1)
795 ancestor_n = *ARR_DIMS(ancestors);
796 if(ancestor_n > 0)
797 ancestor_data = (Oid *) ARR_DATA_PTR(ancestors);
798 }
799
800 if(ancestor_n > PROVSQL_TABLE_INFO_MAX_ANCESTORS)
801 provsql_error("set_ancestors: ancestor set wider than %d entries "
802 "(%u given) is not supported",
804
805 payload_size = sizeof(char) + sizeof(Oid) + sizeof(Oid)
806 + sizeof(uint16) + ancestor_n * sizeof(Oid);
807 if(payload_size > PIPE_BUF)
808 provsql_error("set_ancestors: IPC payload exceeds PIPE_BUF");
809
810 STARTWRITEM();
811 ADDWRITEM("A", char);
812 ADDWRITEM(&MyDatabaseId, Oid);
813 ADDWRITEM(&relid, Oid);
814 ADDWRITEM(&ancestor_n, uint16);
815 for(uint16 i = 0; i < ancestor_n; ++i)
816 ADDWRITEM(&ancestor_data[i], Oid);
817
819 if(!SENDWRITEM()) {
821 provsql_error("Cannot write to pipe (message type A)");
822 }
824
825 if(SearchSysCacheExists1(RELOID, ObjectIdGetDatum(relid)))
826 CacheInvalidateRelcacheByRelid(relid);
827
828 PG_RETURN_VOID();
829}
830
831PG_FUNCTION_INFO_V1(remove_ancestors);
832/**
833 * @brief PostgreSQL-callable wrapper for removeTableAncestry() over
834 * the IPC pipe.
835 *
836 * Clears just the ancestor half of a per-table metadata record,
837 * leaving @c kind / @c block_key intact. Use @c remove_table_info
838 * to delete the whole record instead.
839 */
840Datum remove_ancestors(PG_FUNCTION_ARGS)
841{
842 Oid relid;
843
844 if(PG_ARGISNULL(0))
845 provsql_error("Invalid NULL value passed to remove_ancestors");
846
847 relid = PG_GETARG_OID(0);
848
849 STARTWRITEM();
850 ADDWRITEM("R", char);
851 ADDWRITEM(&MyDatabaseId, Oid);
852 ADDWRITEM(&relid, Oid);
853
855 if(!SENDWRITEM()) {
857 provsql_error("Cannot write to pipe (message type R)");
858 }
860
861 if(SearchSysCacheExists1(RELOID, ObjectIdGetDatum(relid)))
862 CacheInvalidateRelcacheByRelid(relid);
863
864 PG_RETURN_VOID();
865}
866
867/**
868 * @brief C-callable IPC fetch for the ancestor half of a per-table
869 * metadata record.
870 *
871 * Sends an @c 'a' message to the background worker and reads back
872 * the response. No caching: every call hits the worker. Use
873 * @c provsql_lookup_ancestry for the cached, planner-hot-path
874 * variant.
875 *
876 * @param relid pg_class OID of the relation to look up.
877 * @param ancestor_n_out On @c true return, count of valid entries.
878 * @param ancestors_out On @c true return, the ancestor OIDs
879 * (caller buffer of
880 * @c PROVSQL_TABLE_INFO_MAX_ANCESTORS).
881 * @return @c true if the worker returned a non-zero ancestor count;
882 * @c false otherwise (no record, or empty ancestor set).
883 */
884bool provsql_fetch_ancestry(Oid relid, uint16 *ancestor_n_out,
885 Oid *ancestors_out)
886{
887 char found;
888 uint16 n = 0;
889
890 STARTWRITEM();
891 ADDWRITEM("a", char);
892 ADDWRITEM(&MyDatabaseId, Oid);
893 ADDWRITEM(&relid, Oid);
894
896
897 if(!SENDWRITEM() || !READB(found, char)) {
899 provsql_error("Cannot communicate with pipe (message type a)");
900 }
901 if(found) {
902 if(!READB(n, uint16)) {
904 provsql_error("Cannot communicate with pipe (message type a)");
905 }
908 provsql_error("provsql_fetch_ancestry: server returned an "
909 "unexpectedly wide ancestor set");
910 }
911 for(uint16 i = 0; i < n; ++i)
912 if(!READB(ancestors_out[i], Oid)) {
914 provsql_error("Cannot communicate with pipe (message type a)");
915 }
916 }
917
919 *ancestor_n_out = n;
920 /* "Found but empty" collapses to the same return as "not found":
921 * both make the safe-query rewriter take the conservative path. */
922 return found != 0 && n > 0;
923}
924
925PG_FUNCTION_INFO_V1(get_ancestors);
926/**
927 * @brief PostgreSQL-callable wrapper around the cached ancestry lookup.
928 *
929 * Returns @c NULL when no ancestor record exists (or the record is
930 * empty); otherwise an @c oid[] listing the base-relation OIDs.
931 * Goes through @c provsql_lookup_ancestry so repeated calls in the
932 * same session do not pay for IPC.
933 */
934Datum get_ancestors(PG_FUNCTION_ARGS)
935{
936 Oid relid;
937 uint16 ancestor_n;
939 Datum *elems;
940 ArrayType *arr;
941
942 if(PG_ARGISNULL(0))
943 PG_RETURN_NULL();
944
945 relid = PG_GETARG_OID(0);
946
947 if(!provsql_lookup_ancestry(relid, &ancestor_n, ancestors))
948 PG_RETURN_NULL();
949
950 elems = palloc(ancestor_n * sizeof(Datum));
951 for(uint16 i = 0; i < ancestor_n; ++i)
952 elems[i] = ObjectIdGetDatum(ancestors[i]);
953 arr = construct_array(elems, ancestor_n, OIDOID,
954 sizeof(Oid), true, 'i');
955 pfree(elems);
956
957 PG_RETURN_ARRAYTYPE_P(arr);
958}
959
960PG_FUNCTION_INFO_V1(get_infos);
961/** @brief PostgreSQL-callable wrapper for get_infos(). */
962Datum get_infos(PG_FUNCTION_ARGS)
963{
964 pg_uuid_t *token = DatumGetUUIDP(PG_GETARG_DATUM(0));
965 unsigned info1 =0, info2 = 0;
966
967 if(PG_ARGISNULL(0))
968 PG_RETURN_NULL();
969
970 STARTWRITEM();
971 ADDWRITEM("i", char);
972 ADDWRITEM(&MyDatabaseId, Oid);
973 ADDWRITEM(token, pg_uuid_t);
974
976
977 if(!SENDWRITEM() || !READB(info1, int) || !READB(info2, int)) {
979 provsql_error("Cannot communicate with pipe (message type i)");
980 }
981
983
984 {
985 TupleDesc tupdesc;
986 Datum values[2];
987 bool nulls[2] = {false, false};
988
989 get_call_result_type(fcinfo,NULL,&tupdesc);
990 tupdesc = BlessTupleDesc(tupdesc);
991
992 values[0] = Int32GetDatum(info1);
993 values[1] = Int32GetDatum(info2);
994
995 PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
996 }
997}
void destroy_provsql_mmap()
Unmap and close the mmap files.
void provsql_mmap_main_loop()
Main processing loop of the mmap background worker.
void initialize_provsql_mmap()
Open (or create) the mmap files and initialise the circuit store.
Per-table provenance metadata persisted alongside the circuit store.
#define PROVSQL_TABLE_INFO_MAX_BLOCK_KEY
Cap on the number of block-key columns recorded per relation.
@ PROVSQL_TABLE_TID
@ PROVSQL_TABLE_BID
@ PROVSQL_TABLE_OPAQUE
#define PROVSQL_TABLE_INFO_MAX_ANCESTORS
Cap on the number of base ancestors recorded per relation.
C-linkage interface to the in-process provenance circuit cache.
gate_type circuit_cache_get_type(pg_uuid_t token)
Retrieve the type of a cached gate.
unsigned circuit_cache_get_children(pg_uuid_t token, pg_uuid_t **children)
Retrieve the children of a cached gate.
bool circuit_cache_create_gate(pg_uuid_t token, gate_type type, unsigned nb_children, pg_uuid_t *children)
Insert a new gate into the circuit cache.
Datum set_ancestors(PG_FUNCTION_ARGS)
PostgreSQL-callable wrapper for setTableAncestry() over the IPC pipe.
Datum set_table_info(PG_FUNCTION_ARGS)
Forward declaration of the C SQL entry points.
#define provsql_error(fmt,...)
Report a fatal ProvSQL error and abort the current transaction.
#define provsql_log(fmt,...)
Write a ProvSQL message to the server log only.
Datum get_infos(PG_FUNCTION_ARGS)
PostgreSQL-callable wrapper for get_infos().
void provsql_mmap_worker(Datum ignored)
Entry point for the ProvSQL mmap background worker.
Datum set_ancestors(PG_FUNCTION_ARGS)
PostgreSQL-callable wrapper for setTableAncestry() over the IPC pipe.
static const char * table_kind_label(uint8_t kind)
Inverse of parse_table_kind for use by get_table_info.
Datum set_table_info(PG_FUNCTION_ARGS)
PostgreSQL-callable wrapper for setTableInfo() over the IPC pipe.
Datum get_table_info(PG_FUNCTION_ARGS)
PostgreSQL-callable wrapper around the cached table-info lookup.
Datum remove_ancestors(PG_FUNCTION_ARGS)
PostgreSQL-callable wrapper for removeTableAncestry() over the IPC pipe.
Datum set_extra(PG_FUNCTION_ARGS)
PostgreSQL-callable wrapper for set_extra().
Datum get_nb_gates(PG_FUNCTION_ARGS)
PostgreSQL-callable wrapper for get_nb_gates().
Datum create_gate(PG_FUNCTION_ARGS)
PostgreSQL-callable wrapper for create_gate().
Datum get_ancestors(PG_FUNCTION_ARGS)
PostgreSQL-callable wrapper around the cached ancestry lookup.
static uint8_t parse_table_kind(const char *label)
Translate a SQL-side kind label into the persisted enum value.
Datum remove_table_info(PG_FUNCTION_ARGS)
PostgreSQL-callable wrapper for removeTableInfo() over the IPC pipe.
Datum set_infos(PG_FUNCTION_ARGS)
PostgreSQL-callable wrapper for set_infos().
bool provsql_fetch_table_info(Oid relid, ProvenanceTableInfo *out)
C-callable IPC fetch for per-table provenance metadata.
Datum get_gate_type(PG_FUNCTION_ARGS)
PostgreSQL-callable wrapper for get_gate_type().
bool provsql_fetch_ancestry(Oid relid, uint16 *ancestor_n_out, Oid *ancestors_out)
C-callable IPC fetch for the ancestor half of a per-table metadata record.
char buffer[PIPE_BUF]
Shared write buffer used with STARTWRITEM / ADDWRITEM / SENDWRITEM.
Datum get_children(PG_FUNCTION_ARGS)
PostgreSQL-callable wrapper for get_children().
Datum get_extra(PG_FUNCTION_ARGS)
PostgreSQL-callable wrapper for get_extra().
Datum get_prob(PG_FUNCTION_ARGS)
PostgreSQL-callable wrapper for get_prob().
unsigned bufferpos
Current write position within buffer.
void RegisterProvSQLMMapWorker(void)
Register the ProvSQL mmap background worker with PostgreSQL.
Datum set_prob(PG_FUNCTION_ARGS)
PostgreSQL-callable wrapper for set_prob().
Background worker and IPC primitives for mmap-backed circuit storage.
#define READB(var, type)
Read one value of type from the main-to-background pipe.
#define STARTWRITEM()
Reset the shared write buffer for a new batched write.
#define ADDWRITEM(pvar, type)
Append one value of type to the shared write buffer.
#define SENDWRITEM()
Flush the shared write buffer to the background-to-main pipe atomically.
void provsql_shmem_unlock(void)
Release the ProvSQL LWLock.
void provsql_shmem_lock_exclusive(void)
Acquire the ProvSQL LWLock in exclusive mode.
provsqlSharedState * provsql_shared_state
Pointer to the ProvSQL shared-memory segment (set in provsql_shmem_startup).
void provsql_shmem_lock_shared(void)
Acquire the ProvSQL LWLock in shared mode.
Shared-memory segment and inter-process pipe management.
bool provsql_lookup_ancestry(Oid relid, uint16 *ancestor_n_out, Oid *ancestors_out)
Look up the base-ancestor set of a tracked relation.
bool provsql_lookup_table_info(Oid relid, ProvenanceTableInfo *out)
Look up per-table provenance metadata with a backend-local cache.
constants_t get_constants(bool failure_if_not_possible)
Retrieve the cached OID constants for the current database.
Core types, constants, and utilities shared across ProvSQL.
Per-relation metadata for the safe-query optimisation.
Oid relid
pg_class OID of the relation (primary key)
AttrNumber block_key[PROVSQL_TABLE_INFO_MAX_BLOCK_KEY]
Block-key column numbers.
uint16_t block_key_n
Number of valid entries in block_key.
uint8_t kind
One of provsql_table_kind.
Structure to store the value of various constants.
Oid GATE_TYPE_TO_OID[nb_gate_types]
Array of the OID of each provenance_gate ENUM value.
Oid OID_TYPE_UUID
OID of the uuid TYPE.
UUID structure.