35#include "postmaster/bgworker.h"
38#include "utils/array.h"
39#include "access/htup_details.h"
40#include "utils/builtins.h"
51 BackgroundWorkerUnblockSignals();
55 provsql_log(
"%s initialized", MyBgworkerEntry->bgw_name);
64 BackgroundWorker worker;
66 snprintf(worker.bgw_name, BGW_MAXLEN,
"ProvSQL MMap Worker");
67#if PG_VERSION_NUM >= 110000
68 snprintf(worker.bgw_type, BGW_MAXLEN,
"ProvSQL MMap");
71 worker.bgw_flags = BGWORKER_SHMEM_ACCESS;
72 worker.bgw_start_time = BgWorkerStart_PostmasterStart;
73 worker.bgw_restart_time = 1;
75 snprintf(worker.bgw_library_name, BGW_MAXLEN,
"provsql");
76 snprintf(worker.bgw_function_name, BGW_MAXLEN,
"provsql_mmap_worker");
77#if PG_VERSION_NUM < 100000
78 worker.bgw_main = NULL;
81 worker.bgw_main_arg = (Datum) 0;
82 worker.bgw_notify_pid = 0;
84 RegisterBackgroundWorker(&worker);
91 pg_uuid_t *token = DatumGetUUIDP(PG_GETARG_DATUM(0));
110 provsql_error(
"Cannot communicate on pipe (message type t)");
127 pg_uuid_t *token = DatumGetUUIDP(PG_GETARG_DATUM(0));
128 Oid oid_type = PG_GETARG_INT32(1);
129 ArrayType *children = PG_ARGISNULL(2)?NULL:PG_GETARG_ARRAYTYPE_P(2);
130 unsigned nb_children = 0;
135 if(PG_ARGISNULL(0) || PG_ARGISNULL(1))
139 if(ARR_NDIM(children) > 1)
140 provsql_error(
"Invalid multi-dimensional array passed to create_gate");
141 else if(ARR_NDIM(children) == 1)
142 nb_children = *ARR_DIMS(children);
158 children_data = (
pg_uuid_t*) ARR_DATA_PTR(children);
160 children_data = NULL;
175 for(
int i=0; i<nb_children; ++i)
187 unsigned children_per_batch = PIPE_BUF/
sizeof(
pg_uuid_t);
196 for(
unsigned j=0; j<1+(nb_children-1)/children_per_batch; ++j) {
199 for(
unsigned i=j*children_per_batch; i<(j+1)*children_per_batch && i<nb_children; ++i) {
219 pg_uuid_t *token = DatumGetUUIDP(PG_GETARG_DATUM(0));
220 double prob = PG_GETARG_FLOAT8(1);
223 if(PG_ARGISNULL(0) || PG_ARGISNULL(1))
248 pg_uuid_t *token = DatumGetUUIDP(PG_GETARG_DATUM(0));
249 unsigned info1 = PG_GETARG_INT32(1);
250 unsigned info2 = PG_GETARG_INT32(2);
278 pg_uuid_t *token = DatumGetUUIDP(PG_GETARG_DATUM(0));
279 text *data = PG_GETARG_TEXT_P(1);
280 char *str=text_to_cstring(data);
281 unsigned len=strlen(str);
306 pg_uuid_t *token = DatumGetUUIDP(PG_GETARG_DATUM(0));
321 provsql_error(
"Cannot communicate with pipe (message type e)");
324 result = palloc(len + VARHDRSZ);
325 SET_VARSIZE(result, VARHDRSZ + len);
329 provsql_error(
"Cannot communicate with pipe (message type e)");
334 PG_RETURN_TEXT_P(result);
345 if(!
WRITEM(
"n",
char) || !
READB(nb,
unsigned long)) {
347 provsql_error(
"Cannot communicate with pipe (message type n)");
352 PG_RETURN_INT64((
long) nb);
359 pg_uuid_t *token = DatumGetUUIDP(PG_GETARG_DATUM(0));
360 ArrayType *result = NULL;
361 unsigned nb_children;
383 if(!
READB(nb_children,
unsigned)) {
385 provsql_error(
"Cannot read response from pipe (message type c)");
388 children=calloc(nb_children,
sizeof(
pg_uuid_t));
391 char *p = (
char*)children;
392 ssize_t actual_read, remaining_size=nb_children*
sizeof(
pg_uuid_t);
398 remaining_size-=actual_read;
408 children_ptr = palloc(nb_children *
sizeof(Datum));
409 for(
unsigned i=0; i<nb_children; ++i)
410 children_ptr[i] = UUIDPGetDatum(&children[i]);
413 result = construct_array(
423 PG_RETURN_ARRAYTYPE_P(result);
430 pg_uuid_t *token = DatumGetUUIDP(PG_GETARG_DATUM(0));
444 provsql_error(
"Cannot communicate with pipe (message type p)");
452 PG_RETURN_FLOAT8(result);
459 pg_uuid_t *token = DatumGetUUIDP(PG_GETARG_DATUM(0));
460 unsigned info1 =0, info2 = 0;
473 provsql_error(
"Cannot communicate with pipe (message type i)");
478 if(info1 == 0 && info2 == 0)
483 bool nulls[2] = {
false,
false};
485 get_call_result_type(fcinfo,NULL,&tupdesc);
486 tupdesc = BlessTupleDesc(tupdesc);
488 values[0] = Int32GetDatum(info1);
489 values[1] = Int32GetDatum(info2);
491 PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
void destroy_provsql_mmap()
Unmap and close the mmap files.
void provsql_mmap_main_loop()
Main processing loop of the mmap background worker.
void initialize_provsql_mmap()
Open (or create) the mmap files and initialise the circuit store.
C-linkage interface to the in-process provenance circuit cache.
gate_type circuit_cache_get_type(pg_uuid_t token)
Retrieve the type of a cached gate.
unsigned circuit_cache_get_children(pg_uuid_t token, pg_uuid_t **children)
Retrieve the children of a cached gate.
bool circuit_cache_create_gate(pg_uuid_t token, gate_type type, unsigned nb_children, pg_uuid_t *children)
Insert a new gate into the circuit cache.
#define provsql_error(fmt,...)
Report a fatal ProvSQL error and abort the current transaction.
#define provsql_log(fmt,...)
Write a ProvSQL message to the server log only.
Datum get_infos(PG_FUNCTION_ARGS)
PostgreSQL-callable wrapper for get_infos().
Datum set_extra(PG_FUNCTION_ARGS)
PostgreSQL-callable wrapper for set_extra().
Datum get_nb_gates(PG_FUNCTION_ARGS)
PostgreSQL-callable wrapper for get_nb_gates().
Datum create_gate(PG_FUNCTION_ARGS)
PostgreSQL-callable wrapper for create_gate().
__attribute__((visibility("default")))
Background worker entry point: initialises the mmap circuit store and runs the main loop.
Datum set_infos(PG_FUNCTION_ARGS)
PostgreSQL-callable wrapper for set_infos().
Datum get_gate_type(PG_FUNCTION_ARGS)
PostgreSQL-callable wrapper for get_gate_type().
char buffer[PIPE_BUF]
Shared write buffer used with STARTWRITEM / ADDWRITEM / SENDWRITEM.
Datum get_children(PG_FUNCTION_ARGS)
PostgreSQL-callable wrapper for get_children().
Datum get_extra(PG_FUNCTION_ARGS)
PostgreSQL-callable wrapper for get_extra().
Datum get_prob(PG_FUNCTION_ARGS)
PostgreSQL-callable wrapper for get_prob().
unsigned bufferpos
Current write position within buffer.
void RegisterProvSQLMMapWorker(void)
Register the ProvSQL mmap background worker with PostgreSQL.
Datum set_prob(PG_FUNCTION_ARGS)
PostgreSQL-callable wrapper for set_prob().
Background worker and IPC primitives for mmap-backed circuit storage.
void provsql_mmap_worker(Datum)
Entry point for the ProvSQL mmap background worker.
#define WRITEM(pvar, type)
Write one value of type to the background-to-main pipe.
#define READB(var, type)
Read one value of type from the main-to-background pipe.
#define STARTWRITEM()
Reset the shared write buffer for a new batched write.
#define ADDWRITEM(pvar, type)
Append one value of type to the shared write buffer.
#define SENDWRITEM()
Flush the shared write buffer to the background-to-main pipe atomically.
void provsql_shmem_unlock(void)
Release the ProvSQL LWLock.
void provsql_shmem_lock_exclusive(void)
Acquire the ProvSQL LWLock in exclusive mode.
provsqlSharedState * provsql_shared_state
Pointer to the ProvSQL shared-memory segment (set in provsql_shmem_startup).
void provsql_shmem_lock_shared(void)
Acquire the ProvSQL LWLock in shared mode.
Shared-memory segment and inter-process pipe management.
constants_t get_constants(bool failure_if_not_possible)
Retrieve the cached OID constants for the current database.
Core types, constants, and utilities shared across ProvSQL.
gate_type
Possible gate type in the provenance circuit.
@ gate_invalid
Invalid gate type.
@ nb_gate_types
Total number of gate types.
Structure to store the value of various constants.
Oid GATE_TYPE_TO_OID[nb_gate_types]
Array of the OID of each provenance_gate ENUM value.
Oid OID_TYPE_UUID
OID of the uuid TYPE.
long pipembr
Main-to-background pipe: read end (worker reads)
long pipebmw
Background-to-main pipe: write end (worker writes)