36#include "postmaster/bgworker.h"
37#include "catalog/pg_type.h"
41#include "utils/array.h"
42#include "access/htup_details.h"
43#include "utils/builtins.h"
44#include "utils/inval.h"
45#include "utils/syscache.h"
54 BackgroundWorkerUnblockSignals();
58 provsql_log(
"%s initialized", MyBgworkerEntry->bgw_name);
67 BackgroundWorker worker;
69 snprintf(worker.bgw_name, BGW_MAXLEN,
"ProvSQL MMap Worker");
70#if PG_VERSION_NUM >= 110000
71 snprintf(worker.bgw_type, BGW_MAXLEN,
"ProvSQL MMap");
74 worker.bgw_flags = BGWORKER_SHMEM_ACCESS;
75 worker.bgw_start_time = BgWorkerStart_PostmasterStart;
76 worker.bgw_restart_time = 1;
78 snprintf(worker.bgw_library_name, BGW_MAXLEN,
"provsql");
79 snprintf(worker.bgw_function_name, BGW_MAXLEN,
"provsql_mmap_worker");
80#if PG_VERSION_NUM < 100000
81 worker.bgw_main = NULL;
84 worker.bgw_main_arg = (Datum) 0;
85 worker.bgw_notify_pid = 0;
87 RegisterBackgroundWorker(&worker);
104 pg_uuid_t *token = DatumGetUUIDP(PG_GETARG_DATUM(0));
107 unsigned nb_children = 0;
127 provsql_error(
"Cannot communicate on pipe (message type t)");
141 provsql_error(
"Cannot communicate on pipe (message type c during get_gate_type)");
144 if(nb_children > 0) {
146 ssize_t actual_read, remaining_size;
148 children = calloc(nb_children,
sizeof(
pg_uuid_t));
150 remaining_size = nb_children *
sizeof(
pg_uuid_t);
151 while((actual_read = read(
provsql_shared_state->pipembr, p, remaining_size)) < remaining_size) {
152 if(actual_read <= 0) {
154 provsql_error(
"Cannot read children from pipe (during get_gate_type)");
156 remaining_size -= actual_read;
173 if(children) free(children);
181 pg_uuid_t *token = DatumGetUUIDP(PG_GETARG_DATUM(0));
182 Oid oid_type = PG_GETARG_INT32(1);
183 ArrayType *children = PG_ARGISNULL(2)?NULL:PG_GETARG_ARRAYTYPE_P(2);
184 unsigned nb_children = 0;
189 if(PG_ARGISNULL(0) || PG_ARGISNULL(1))
193 if(ARR_NDIM(children) > 1)
194 provsql_error(
"Invalid multi-dimensional array passed to create_gate");
195 else if(ARR_NDIM(children) == 1)
196 nb_children = *ARR_DIMS(children);
212 children_data = (
pg_uuid_t*) ARR_DATA_PTR(children);
214 children_data = NULL;
236 for(
int i=0; i<nb_children; ++i)
248 unsigned children_per_batch = PIPE_BUF/
sizeof(
pg_uuid_t);
257 for(
unsigned j=0; j<1+(nb_children-1)/children_per_batch; ++j) {
260 for(
unsigned i=j*children_per_batch; i<(j+1)*children_per_batch && i<nb_children; ++i) {
280 pg_uuid_t *token = DatumGetUUIDP(PG_GETARG_DATUM(0));
281 double prob = PG_GETARG_FLOAT8(1);
284 if(PG_ARGISNULL(0) || PG_ARGISNULL(1))
310 pg_uuid_t *token = DatumGetUUIDP(PG_GETARG_DATUM(0));
311 unsigned info1 = PG_GETARG_INT32(1);
312 unsigned info2 = PG_GETARG_INT32(2);
341 pg_uuid_t *token = DatumGetUUIDP(PG_GETARG_DATUM(0));
342 text *data = PG_GETARG_TEXT_P(1);
343 char *str=text_to_cstring(data);
344 unsigned len=strlen(str);
370 pg_uuid_t *token = DatumGetUUIDP(PG_GETARG_DATUM(0));
386 provsql_error(
"Cannot communicate with pipe (message type e)");
389 result = palloc(len + VARHDRSZ);
390 SET_VARSIZE(result, VARHDRSZ + len);
394 provsql_error(
"Cannot communicate with pipe (message type e)");
399 PG_RETURN_TEXT_P(result);
416 provsql_error(
"Cannot communicate with pipe (message type n)");
421 PG_RETURN_INT64((
long) nb);
428 pg_uuid_t *token = DatumGetUUIDP(PG_GETARG_DATUM(0));
429 ArrayType *result = NULL;
430 unsigned nb_children;
453 if(!
READB(nb_children,
unsigned)) {
455 provsql_error(
"Cannot read response from pipe (message type c)");
458 children=calloc(nb_children,
sizeof(
pg_uuid_t));
461 char *p = (
char*)children;
462 ssize_t actual_read, remaining_size=nb_children*
sizeof(
pg_uuid_t);
468 remaining_size-=actual_read;
483 children_ptr = palloc(nb_children *
sizeof(Datum));
484 for(
unsigned i=0; i<nb_children; ++i)
485 children_ptr[i] = UUIDPGetDatum(&children[i]);
488 result = construct_array(
498 PG_RETURN_ARRAYTYPE_P(result);
505 pg_uuid_t *token = DatumGetUUIDP(PG_GETARG_DATUM(0));
520 provsql_error(
"Cannot communicate with pipe (message type p)");
528 PG_RETURN_FLOAT8(result);
537 provsql_error(
"set_table_info: unknown table kind '%s' (expected "
538 "'tid', 'bid', or 'opaque')", label);
550 provsql_error(
"get_table_info: unknown table kind value %u", kind);
571 ArrayType *block_key;
572 uint16 block_key_n = 0;
573 int16 *block_key_data = NULL;
576 if(PG_ARGISNULL(0) || PG_ARGISNULL(1))
577 provsql_error(
"Invalid NULL value passed to set_table_info");
579 relid = PG_GETARG_OID(0);
580 kind_text = PG_GETARG_TEXT_PP(1);
581 kind_str = text_to_cstring(kind_text);
584 block_key = PG_ARGISNULL(2) ? NULL : PG_GETARG_ARRAYTYPE_P(2);
587 if(ARR_NDIM(block_key) > 1)
588 provsql_error(
"Invalid multi-dimensional array passed to set_table_info");
589 else if(ARR_NDIM(block_key) == 1)
590 block_key_n = *ARR_DIMS(block_key);
592 block_key_data = (int16 *) ARR_DATA_PTR(block_key);
596 provsql_error(
"set_table_info: block key wider than %d columns "
597 "(%u given) is not supported",
600 payload_size =
sizeof(char) +
sizeof(Oid) +
sizeof(Oid) +
sizeof(uint8)
601 +
sizeof(uint16) + block_key_n *
sizeof(int16);
602 if(payload_size > PIPE_BUF)
603 provsql_error(
"set_table_info: IPC payload exceeds PIPE_BUF");
611 for(uint16 i = 0; i < block_key_n; ++i)
630 if(SearchSysCacheExists1(RELOID, ObjectIdGetDatum(relid)))
631 CacheInvalidateRelcacheByRelid(relid);
643 provsql_error(
"Invalid NULL value passed to remove_table_info");
645 relid = PG_GETARG_OID(0);
662 if(SearchSysCacheExists1(RELOID, ObjectIdGetDatum(relid)))
663 CacheInvalidateRelcacheByRelid(relid);
693 provsql_error(
"Cannot communicate with pipe (message type s)");
698 provsql_error(
"Cannot communicate with pipe (message type s)");
702 provsql_error(
"provsql_fetch_table_info: server returned an unexpectedly wide block key");
707 provsql_error(
"Cannot communicate with pipe (message type s)");
732 bool nulls[2] = {
false,
false};
739 relid = PG_GETARG_OID(0);
744 if(get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
745 provsql_error(
"get_table_info: expected composite return type");
746 tupdesc = BlessTupleDesc(tupdesc);
752 elems[i] = Int16GetDatum(info.
block_key[i]);
753 arr = construct_array(elems, info.
block_key_n, INT2OID, 2,
true,
's');
755 values[1] = PointerGetDatum(arr);
757 PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
780 ArrayType *ancestors;
781 uint16 ancestor_n = 0;
782 Oid *ancestor_data = NULL;
788 relid = PG_GETARG_OID(0);
789 ancestors = PG_ARGISNULL(1) ? NULL : PG_GETARG_ARRAYTYPE_P(1);
792 if(ARR_NDIM(ancestors) > 1)
793 provsql_error(
"Invalid multi-dimensional array passed to set_ancestors");
794 else if(ARR_NDIM(ancestors) == 1)
795 ancestor_n = *ARR_DIMS(ancestors);
797 ancestor_data = (Oid *) ARR_DATA_PTR(ancestors);
801 provsql_error(
"set_ancestors: ancestor set wider than %d entries "
802 "(%u given) is not supported",
805 payload_size =
sizeof(char) +
sizeof(Oid) +
sizeof(Oid)
806 +
sizeof(uint16) + ancestor_n *
sizeof(Oid);
807 if(payload_size > PIPE_BUF)
808 provsql_error(
"set_ancestors: IPC payload exceeds PIPE_BUF");
815 for(uint16 i = 0; i < ancestor_n; ++i)
825 if(SearchSysCacheExists1(RELOID, ObjectIdGetDatum(relid)))
826 CacheInvalidateRelcacheByRelid(relid);
845 provsql_error(
"Invalid NULL value passed to remove_ancestors");
847 relid = PG_GETARG_OID(0);
861 if(SearchSysCacheExists1(RELOID, ObjectIdGetDatum(relid)))
862 CacheInvalidateRelcacheByRelid(relid);
899 provsql_error(
"Cannot communicate with pipe (message type a)");
902 if(!
READB(n, uint16)) {
904 provsql_error(
"Cannot communicate with pipe (message type a)");
909 "unexpectedly wide ancestor set");
911 for(uint16 i = 0; i < n; ++i)
912 if(!
READB(ancestors_out[i], Oid)) {
914 provsql_error(
"Cannot communicate with pipe (message type a)");
922 return found != 0 && n > 0;
945 relid = PG_GETARG_OID(0);
950 elems = palloc(ancestor_n *
sizeof(Datum));
951 for(uint16 i = 0; i < ancestor_n; ++i)
952 elems[i] = ObjectIdGetDatum(ancestors[i]);
953 arr = construct_array(elems, ancestor_n, OIDOID,
954 sizeof(Oid),
true,
'i');
957 PG_RETURN_ARRAYTYPE_P(arr);
964 pg_uuid_t *token = DatumGetUUIDP(PG_GETARG_DATUM(0));
965 unsigned info1 =0, info2 = 0;
979 provsql_error(
"Cannot communicate with pipe (message type i)");
987 bool nulls[2] = {
false,
false};
989 get_call_result_type(fcinfo,NULL,&tupdesc);
990 tupdesc = BlessTupleDesc(tupdesc);
992 values[0] = Int32GetDatum(info1);
993 values[1] = Int32GetDatum(info2);
995 PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
void destroy_provsql_mmap()
Unmap and close the mmap files.
void provsql_mmap_main_loop()
Main processing loop of the mmap background worker.
void initialize_provsql_mmap()
Open (or create) the mmap files and initialise the circuit store.
Per-table provenance metadata persisted alongside the circuit store.
#define PROVSQL_TABLE_INFO_MAX_BLOCK_KEY
Cap on the number of block-key columns recorded per relation.
#define PROVSQL_TABLE_INFO_MAX_ANCESTORS
Cap on the number of base ancestors recorded per relation.
C-linkage interface to the in-process provenance circuit cache.
gate_type circuit_cache_get_type(pg_uuid_t token)
Retrieve the type of a cached gate.
unsigned circuit_cache_get_children(pg_uuid_t token, pg_uuid_t **children)
Retrieve the children of a cached gate.
bool circuit_cache_create_gate(pg_uuid_t token, gate_type type, unsigned nb_children, pg_uuid_t *children)
Insert a new gate into the circuit cache.
Datum set_ancestors(PG_FUNCTION_ARGS)
PostgreSQL-callable wrapper for setTableAncestry() over the IPC pipe.
Datum set_table_info(PG_FUNCTION_ARGS)
Forward declaration of the C SQL entry points.
#define provsql_error(fmt,...)
Report a fatal ProvSQL error and abort the current transaction.
#define provsql_log(fmt,...)
Write a ProvSQL message to the server log only.
Datum get_infos(PG_FUNCTION_ARGS)
PostgreSQL-callable wrapper for get_infos().
void provsql_mmap_worker(Datum ignored)
Entry point for the ProvSQL mmap background worker.
Datum set_ancestors(PG_FUNCTION_ARGS)
PostgreSQL-callable wrapper for setTableAncestry() over the IPC pipe.
static const char * table_kind_label(uint8_t kind)
Inverse of parse_table_kind for use by get_table_info.
Datum set_table_info(PG_FUNCTION_ARGS)
PostgreSQL-callable wrapper for setTableInfo() over the IPC pipe.
Datum get_table_info(PG_FUNCTION_ARGS)
PostgreSQL-callable wrapper around the cached table-info lookup.
Datum remove_ancestors(PG_FUNCTION_ARGS)
PostgreSQL-callable wrapper for removeTableAncestry() over the IPC pipe.
Datum set_extra(PG_FUNCTION_ARGS)
PostgreSQL-callable wrapper for set_extra().
Datum get_nb_gates(PG_FUNCTION_ARGS)
PostgreSQL-callable wrapper for get_nb_gates().
Datum create_gate(PG_FUNCTION_ARGS)
PostgreSQL-callable wrapper for create_gate().
Datum get_ancestors(PG_FUNCTION_ARGS)
PostgreSQL-callable wrapper around the cached ancestry lookup.
static uint8_t parse_table_kind(const char *label)
Translate a SQL-side kind label into the persisted enum value.
Datum remove_table_info(PG_FUNCTION_ARGS)
PostgreSQL-callable wrapper for removeTableInfo() over the IPC pipe.
Datum set_infos(PG_FUNCTION_ARGS)
PostgreSQL-callable wrapper for set_infos().
bool provsql_fetch_table_info(Oid relid, ProvenanceTableInfo *out)
C-callable IPC fetch for per-table provenance metadata.
Datum get_gate_type(PG_FUNCTION_ARGS)
PostgreSQL-callable wrapper for get_gate_type().
bool provsql_fetch_ancestry(Oid relid, uint16 *ancestor_n_out, Oid *ancestors_out)
C-callable IPC fetch for the ancestor half of a per-table metadata record.
char buffer[PIPE_BUF]
Shared write buffer used with STARTWRITEM / ADDWRITEM / SENDWRITEM.
Datum get_children(PG_FUNCTION_ARGS)
PostgreSQL-callable wrapper for get_children().
Datum get_extra(PG_FUNCTION_ARGS)
PostgreSQL-callable wrapper for get_extra().
Datum get_prob(PG_FUNCTION_ARGS)
PostgreSQL-callable wrapper for get_prob().
unsigned bufferpos
Current write position within buffer.
void RegisterProvSQLMMapWorker(void)
Register the ProvSQL mmap background worker with PostgreSQL.
Datum set_prob(PG_FUNCTION_ARGS)
PostgreSQL-callable wrapper for set_prob().
Background worker and IPC primitives for mmap-backed circuit storage.
#define READB(var, type)
Read one value of type from the main-to-background pipe.
#define STARTWRITEM()
Reset the shared write buffer for a new batched write.
#define ADDWRITEM(pvar, type)
Append one value of type to the shared write buffer.
#define SENDWRITEM()
Flush the shared write buffer to the background-to-main pipe atomically.
void provsql_shmem_unlock(void)
Release the ProvSQL LWLock.
void provsql_shmem_lock_exclusive(void)
Acquire the ProvSQL LWLock in exclusive mode.
provsqlSharedState * provsql_shared_state
Pointer to the ProvSQL shared-memory segment (set in provsql_shmem_startup).
void provsql_shmem_lock_shared(void)
Acquire the ProvSQL LWLock in shared mode.
Shared-memory segment and inter-process pipe management.
bool provsql_lookup_ancestry(Oid relid, uint16 *ancestor_n_out, Oid *ancestors_out)
Look up the base-ancestor set of a tracked relation.
bool provsql_lookup_table_info(Oid relid, ProvenanceTableInfo *out)
Look up per-table provenance metadata with a backend-local cache.
constants_t get_constants(bool failure_if_not_possible)
Retrieve the cached OID constants for the current database.
Core types, constants, and utilities shared across ProvSQL.
Per-relation metadata for the safe-query optimisation.
Oid relid
pg_class OID of the relation (primary key)
AttrNumber block_key[PROVSQL_TABLE_INFO_MAX_BLOCK_KEY]
Block-key column numbers.
uint16_t block_key_n
Number of valid entries in block_key.
uint8_t kind
One of provsql_table_kind.
Structure to store the value of various constants.
Oid GATE_TYPE_TO_OID[nb_gate_types]
Array of the OID of each provenance_gate ENUM value.
Oid OID_TYPE_UUID
OID of the uuid TYPE.