ProvSQL C/C++ API
Adding support for provenance and uncertainty management to PostgreSQL databases
Loading...
Searching...
No Matches
MMappedCircuit.cpp
Go to the documentation of this file.
1/**
2 * @file MMappedCircuit.cpp
3 * @brief Persistent mmap-backed circuit: implementation and background-worker entry points.
4 *
5 * Implements the @c MMappedCircuit methods declared in @c MMappedCircuit.h,
6 * the @c createGenericCircuit() free function, and the background-worker
7 * entry points declared in @c provsql_mmap.h:
8 *
9 * - @c initialize_provsql_mmap(): called by the background worker at
10 * startup; opens all four mmap files and creates the singleton
11 * @c MMappedCircuit instance.
12 * - @c destroy_provsql_mmap(): called on shutdown; syncs and deletes the
13 * singleton.
14 * - @c provsql_mmap_main_loop(): the worker's main loop; receives gate-
15 * creation messages from backends over the IPC pipe and writes them
16 * to the mmap store.
17 *
18 * The @c createGenericCircuit() function performs a BFS from a root UUID,
19 * reading gates from the mmap store and building an in-memory @c GenericCircuit.
20 */
21#include <cerrno>
22#include <cmath>
23#include <map>
24#include <sstream>
25#include <string>
26
27#include "MMappedCircuit.h"
28#include "GenericCircuit.h"
29#include "Circuit.hpp"
30#include "provsql_utils_cpp.h"
31
32extern "C" {
33#include "miscadmin.h"
34#include "provsql_mmap.h"
35#include "provsql_shmem.h"
36}
37
38/** @brief Per-database mmap-backed provenance circuits, keyed by database OID. */
39static std::map<Oid, MMappedCircuit*> circuits;
40
41std::string MMappedCircuit::makePath(Oid db_oid, const char *filename)
42{
43 return std::string(DataDir) + "/base/" + std::to_string(db_oid) + "/" + filename;
44}
45
46MMappedCircuit::MMappedCircuit(Oid db_oid, bool read_only) :
49 makePath(db_oid, GATES_FILENAME),
50 makePath(db_oid, WIRES_FILENAME),
51 makePath(db_oid, EXTRA_FILENAME),
53 read_only) {}
54
56{
57 /* circuits are opened lazily on first IPC message */
58}
59
61{
62 for(auto &kv: circuits)
63 delete kv.second;
64 circuits.clear();
65}
66
68 pg_uuid_t token, gate_type type, const std::vector<pg_uuid_t> &children)
69{
70 auto [idx, created] = mapping.add(token);
71 if(!created) {
72 // The gate may have been lazy-added as a default gate_input below
73 // (when an earlier-arriving parent createGate referenced it as a
74 // child whose own createGate had not yet been received). Under
75 // concurrent backends, parent/child IPCs from different sessions
76 // can be interleaved such that the parent's lazy-add wins and the
77 // real create for the child is then silently dropped. Detect that
78 // case and upgrade the placeholder in place; otherwise leave the
79 // existing gate alone (real duplicate creation, idempotent).
80 bool placeholder = gates[idx].type == gate_input
81 && gates[idx].nb_children == 0;
82 bool real_create = type != gate_input || !children.empty();
83 if(placeholder && real_create) {
84 gates[idx].type = type;
85 gates[idx].nb_children = static_cast<unsigned>(children.size());
86 gates[idx].children_idx = wires.nbElements();
87 for(const auto &c: children)
88 wires.add(c);
89 for(const auto &c: children) {
90 auto [child_idx, child_created] = mapping.add(c);
91 if(child_created)
92 gates.add({gate_input, 0, wires.nbElements()});
93 }
94 }
95 return;
96 }
97
98 gates.add({type, static_cast<unsigned>(children.size()), wires.nbElements()});
99 for(const auto &c: children)
100 wires.add(c);
101
102 for(const auto &c: children) {
103 auto [child_idx, child_created] = mapping.add(c);
104 if(child_created)
105 gates.add({gate_input, 0, wires.nbElements()});
106 }
107}
108
110{
111 auto idx = mapping[token];
113 return gate_input;
114 else
115 return gates[idx].type;
116}
117
118std::vector<pg_uuid_t> MMappedCircuit::getChildren(pg_uuid_t token) const
119{
120 std::vector<pg_uuid_t> result;
121 auto idx = mapping[token];
123 const GateInformation &gi = gates[idx];
124 for(unsigned long k=gi.children_idx; k<gi.children_idx+gi.nb_children; ++k)
125 result.push_back(wires[k]);
126 }
127 return result;
128}
129
130bool MMappedCircuit::setProb(pg_uuid_t token, double prob)
131{
132 auto [idx, created] = mapping.add(token);
133 if(created)
134 gates.add({gate_input, 0, wires.nbElements()});
135 if(gates[idx].type == gate_input || gates[idx].type == gate_update || gates[idx].type == gate_mulinput) {
136 gates[idx].prob = prob;
137 return true;
138 }
139 return false;
140}
141
143{
144 auto idx = mapping[token];
146 (gates[idx].type != gate_input && gates[idx].type != gate_update && gates[idx].type != gate_mulinput))
147 return NAN;
148 else
149 return gates[idx].prob;
150}
151
152void MMappedCircuit::setInfos(pg_uuid_t token, unsigned info1, unsigned info2)
153{
154 auto idx = mapping[token];
156 gates[idx].info1=info1;
157 gates[idx].info2=info2;
158 }
159}
160
161void MMappedCircuit::setExtra(pg_uuid_t token, const std::string &s)
162{
163 auto idx = mapping[token];
165 gates[idx].extra_idx=extra.nbElements();
166 for(auto c: s)
167 extra.add(c);
168 gates[idx].extra_len=s.size();
169 }
170}
171
172std::pair<unsigned, unsigned> MMappedCircuit::getInfos(pg_uuid_t token) const
173{
174 auto idx = mapping[token];
176 return std::make_pair(0, 0);
177 } else {
178 const GateInformation &gi = gates[idx];
179 return std::make_pair(gi.info1, gi.info2);
180 }
181}
182
183std::string MMappedCircuit::getExtra(pg_uuid_t token) const
184{
185 std::string result;
186
187 auto idx = mapping[token];
189 for(unsigned long start=gates[idx].extra_idx, k=start, end=start+gates[idx].extra_len; k<end; ++k)
190 result+=extra[k];
191 }
192
193 return result;
194}
195
196/** @brief Return (creating lazily if needed) the circuit for @p db_oid. */
197static MMappedCircuit *getCircuit(Oid db_oid)
198{
199 auto it = circuits.find(db_oid);
200 if(it == circuits.end()) {
201 circuits[db_oid] = new MMappedCircuit(db_oid);
202 return circuits[db_oid];
203 }
204 return it->second;
205}
206
208{
209 char c;
210
211 while(READM(c, char)) {
212 Oid db_oid;
213 if(!READM(db_oid, Oid))
214 provsql_error("Cannot read database OID from pipe");
215
216 MMappedCircuit *circuit = getCircuit(db_oid);
217
218 switch(c) {
219 case 'C':
220 {
221 pg_uuid_t token;
222 gate_type type;
223 unsigned nb_children;
224
225 if(!READM(token, pg_uuid_t) || !READM(type, gate_type) || !READM(nb_children, unsigned))
226 provsql_error("Cannot read from pipe (message type C)"); ;
227
228 std::vector<pg_uuid_t> children(nb_children);
229 for(unsigned i=0; i<nb_children; ++i)
230 if(!READM(children[i], pg_uuid_t))
231 provsql_error("Cannot read from pipe (message type C)");
232
233 circuit->createGate(token, type, children);
234 break;
235 }
236
237 case 'P':
238 {
239 pg_uuid_t token;
240 double prob;
241
242 if(!READM(token, pg_uuid_t) || !READM(prob, double))
243 provsql_error("Cannot read from pipe (message type P)");
244
245 bool ok = circuit->setProb(token, prob);
246 char return_value = ok?static_cast<char>(1):0;
247
248 if(!WRITEB(&return_value, char))
249 provsql_error("Cannot write response to pipe (message type P)");
250 break;
251 }
252
253 case 'I':
254 {
255 pg_uuid_t token;
256 unsigned info1, info2;
257
258 if(!READM(token, pg_uuid_t) || !READM(info1, unsigned) || !READM(info2, unsigned))
259 provsql_error("Cannot read from pipe (message type I)");
260
261 circuit->setInfos(token, info1, info2);
262 break;
263 }
264
265 case 'E':
266 {
267 pg_uuid_t token;
268 unsigned len;
269
270 if(!READM(token, pg_uuid_t) || !READM(len, unsigned))
271 provsql_error("Cannot read from pipe (message type E)");
272
273 if(len>0) {
274 char *data = new char[len];
275 if(read(provsql_shared_state->pipebmr, data, len)<len)
276 provsql_error("Cannot read from pipe (message type E)");
277
278 circuit->setExtra(token, std::string(data, len));
279 }
280
281 break;
282 }
283
284 case 't':
285 {
286 pg_uuid_t token;
287
288 if(!READM(token, pg_uuid_t))
289 provsql_error("Cannot read from pipe (message type t)");
290
291 gate_type type = circuit->getGateType(token);
292
293 if(!WRITEB(&type, gate_type))
294 provsql_error("Cannot write response to pipe (message type t)");
295 break;
296 }
297
298 case 'n':
299 {
300 unsigned long nb = circuit->getNbGates();
301
302 if(!WRITEB(&nb, unsigned long))
303 provsql_error("Cannot write response to pipe (message type n)");
304 break;
305 }
306
307 case 'c':
308 {
309 pg_uuid_t token;
310
311 if(!READM(token, pg_uuid_t))
312 provsql_error("Cannot read from pipe (message type c)");
313
314 auto children = circuit->getChildren(token);
315 unsigned nb_children = children.size();
316 if(!WRITEB(&nb_children, unsigned))
317 provsql_error("Cannot write response to pipe (message type c)");
318
319 if(write(provsql_shared_state->pipembw, &children[0], nb_children*sizeof(pg_uuid_t))==-1)
320 provsql_error("Cannot write response to pipe (message type c)");
321 break;
322 }
323
324 case 'p':
325 {
326 pg_uuid_t token;
327
328 if(!READM(token, pg_uuid_t))
329 provsql_error("Cannot read from pipe (message type p)");
330
331 double prob = circuit->getProb(token);
332
333 if(!WRITEB(&prob, double))
334 provsql_error("Cannot write response to pipe (message type p)");
335 break;
336 }
337
338 case 'i':
339 {
340 pg_uuid_t token;
341
342 if(!READM(token, pg_uuid_t))
343 provsql_error("Cannot read from pipe (message type i)");
344
345 auto infos = circuit->getInfos(token);
346
347 if(!WRITEB(&infos.first, unsigned) || !WRITEB(&infos.second, unsigned))
348 provsql_error("Cannot write response to pipe (message type i)");
349 break;
350 }
351
352 case 'e':
353 {
354 pg_uuid_t token;
355
356 if(!READM(token, pg_uuid_t))
357 provsql_error("Cannot read from pipe (message type e)");
358
359 auto str = circuit->getExtra(token);
360 unsigned len = str.size();
361
362 if(!WRITEB(&len, unsigned) || write(provsql_shared_state->pipembw, str.data(), len)==-1)
363 provsql_error("Cannot write response to pipe (message type e)");
364 break;
365 }
366
367 case 'g':
368 {
369 pg_uuid_t token;
370
371 if(!READM(token, pg_uuid_t))
372 provsql_error("Cannot read from pipe (message type g)");
373
374 std::stringstream ss;
375 boost::archive::binary_oarchive oa(ss);
376 oa << circuit->createGenericCircuit(token);
377
378 ss.seekg(0, std::ios::end);
379 unsigned long size = ss.tellg();
380 ss.seekg(0, std::ios::beg);
381
382 if(!WRITEB(&size, unsigned long) || write(provsql_shared_state->pipembw, ss.str().data(), size)==-1)
383 provsql_error("Cannot write to pipe (message type g)");
384 break;
385 }
386
387 case 'T':
388 {
389 /* Insert / upsert per-table provenance metadata. */
390 ProvenanceTableInfo info{};
391 if(!READM(info.relid, Oid) || !READM(info.kind, uint8_t)
392 || !READM(info.block_key_n, uint16_t))
393 provsql_error("Cannot read from pipe (message type T)");
395 provsql_error("ProvSQL: block key wider than %d columns "
396 "(message type T)", PROVSQL_TABLE_INFO_MAX_BLOCK_KEY);
397 for(uint16_t i=0; i<info.block_key_n; ++i)
398 if(!READM(info.block_key[i], AttrNumber))
399 provsql_error("Cannot read from pipe (message type T)");
400 circuit->setTableInfo(info);
401 break;
402 }
403
404 case 'D':
405 {
406 /* Delete per-table provenance metadata. */
407 Oid relid;
408 if(!READM(relid, Oid))
409 provsql_error("Cannot read from pipe (message type D)");
410 circuit->removeTableInfo(relid);
411 break;
412 }
413
414 case 's':
415 {
416 /* Look up per-table provenance metadata. */
417 Oid relid;
418 if(!READM(relid, Oid))
419 provsql_error("Cannot read from pipe (message type s)");
420 ProvenanceTableInfo info{};
421 char found = circuit->getTableInfo(relid, info) ? 1 : 0;
422 if(!WRITEB(&found, char))
423 provsql_error("Cannot write response to pipe (message type s)");
424 if(found) {
425 if(!WRITEB(&info.kind, uint8_t) || !WRITEB(&info.block_key_n, uint16_t))
426 provsql_error("Cannot write response to pipe (message type s)");
427 for(uint16_t i=0; i<info.block_key_n; ++i)
428 if(!WRITEB(&info.block_key[i], AttrNumber))
429 provsql_error("Cannot write response to pipe (message type s)");
430 }
431 break;
432 }
433
434 case 'A':
435 {
436 /* Insert / upsert the ancestor half of a per-table metadata
437 * record (the kind / block_key half is preserved). */
438 Oid relid;
439 uint16_t ancestor_n;
440 if(!READM(relid, Oid) || !READM(ancestor_n, uint16_t))
441 provsql_error("Cannot read from pipe (message type A)");
442 if(ancestor_n > PROVSQL_TABLE_INFO_MAX_ANCESTORS)
443 provsql_error("ProvSQL: ancestor set wider than %d entries "
444 "(message type A)",
447 for(uint16_t i=0; i<ancestor_n; ++i)
448 if(!READM(ancestors[i], Oid))
449 provsql_error("Cannot read from pipe (message type A)");
450 circuit->setTableAncestry(relid, ancestor_n, ancestors);
451 break;
452 }
453
454 case 'R':
455 {
456 /* Clear just the ancestor half of a per-table metadata record. */
457 Oid relid;
458 if(!READM(relid, Oid))
459 provsql_error("Cannot read from pipe (message type R)");
460 circuit->removeTableAncestry(relid);
461 break;
462 }
463
464 case 'a':
465 {
466 /* Look up just the ancestor half of a per-table metadata record. */
467 Oid relid;
468 if(!READM(relid, Oid))
469 provsql_error("Cannot read from pipe (message type a)");
470 ProvenanceTableInfo info{};
471 char found = circuit->getTableInfo(relid, info) ? 1 : 0;
472 if(!WRITEB(&found, char))
473 provsql_error("Cannot write response to pipe (message type a)");
474 if(found) {
475 if(!WRITEB(&info.ancestor_n, uint16_t))
476 provsql_error("Cannot write response to pipe (message type a)");
477 for(uint16_t i=0; i<info.ancestor_n; ++i)
478 if(!WRITEB(&info.ancestors[i], Oid))
479 provsql_error("Cannot write response to pipe (message type a)");
480 }
481 break;
482 }
483
484 case 'j':
485 {
486 /* Joint-circuit load: BFS from a vector of roots so a shared
487 * subgraph reachable from multiple roots collapses to a single
488 * gate_t. Used by getJointCircuit() to load an RV's sub-DAG
489 * together with a conditioning gate that sits above it in the
490 * persisted DAG. */
491 unsigned nb_roots;
492 if(!READM(nb_roots, unsigned))
493 provsql_error("Cannot read from pipe (message type j)");
494
495 std::vector<pg_uuid_t> roots(nb_roots);
496 for(unsigned i=0; i<nb_roots; ++i)
497 if(!READM(roots[i], pg_uuid_t))
498 provsql_error("Cannot read from pipe (message type j)");
499
500 std::stringstream ss;
501 boost::archive::binary_oarchive oa(ss);
502 oa << circuit->createGenericCircuit(roots);
503
504 ss.seekg(0, std::ios::end);
505 unsigned long size = ss.tellg();
506 ss.seekg(0, std::ios::beg);
507
508 if(!WRITEB(&size, unsigned long) || write(provsql_shared_state->pipembw, ss.str().data(), size)==-1)
509 provsql_error("Cannot write to pipe (message type j)");
510 break;
511 }
512
513 default:
514 provsql_error("Wrong message type: %c", c);
515 }
516 }
517
518 int e = errno;
519 provsql_error("Reading from pipe: %s", strerror(e));
520}
521
523{
524 gates.sync();
525 wires.sync();
526 mapping.sync();
527 extra.sync();
528 tableInfo.sync();
529}
530
531/* The tableInfo vector uses a tombstone scheme: removed entries have
532 * their relid set to InvalidOid and remain in place. setTableInfo()
533 * reuses tombstone slots before appending. All readers skip
534 * InvalidOid entries. This avoids reaching into MMappedVector's
535 * append-only public API, and keeps the file format trivial: a
536 * crash-recovered file is internally consistent without any extra
537 * recovery step. In practice, churn on this vector is low (one
538 * entry per add_provenance / repair_key / remove_provenance call).
539 *
540 * Each record carries two logically independent halves: the kind /
541 * block_key fields (TID / BID classification, set by add_provenance /
542 * repair_key / set_table_info) and the ancestor_n / ancestors fields
543 * (base-relation provenance lineage, set by add_provenance for base
544 * tables and by the CTAS hook for derived tables). setTableInfo()
545 * updates the kind half and preserves the ancestor half on update;
546 * setTableAncestry() does the converse. This lets the two halves
547 * evolve independently without the SQL layer having to fetch-and-
548 * round-trip every time. */
549
551{
552 long tombstone = -1;
553 unsigned long n = tableInfo.nbElements();
554 for(unsigned long i=0; i<n; ++i) {
555 if(tableInfo[i].relid == info.relid) {
556 /* Preserve the existing ancestor half on update. */
557 ProvenanceTableInfo merged = info;
558 merged.ancestor_n = tableInfo[i].ancestor_n;
559 memcpy(merged.ancestors, tableInfo[i].ancestors,
560 merged.ancestor_n * sizeof(Oid));
561 tableInfo[i] = merged;
562 return;
563 }
564 if(tombstone < 0 && tableInfo[i].relid == InvalidOid)
565 tombstone = static_cast<long>(i);
566 }
567 /* Fresh record: kind half from caller, ancestor half empty. */
568 ProvenanceTableInfo fresh = info;
569 fresh.ancestor_n = 0;
570 if(tombstone >= 0)
571 tableInfo[tombstone] = fresh;
572 else
573 tableInfo.add(fresh);
574}
575
576void MMappedCircuit::setTableAncestry(Oid relid, uint16_t ancestor_n,
577 const Oid *ancestors)
578{
579 if(relid == InvalidOid)
580 return;
581 if(ancestor_n > PROVSQL_TABLE_INFO_MAX_ANCESTORS)
582 return; /* defensive: caller-side check already rejects this */
583 unsigned long n = tableInfo.nbElements();
584 for(unsigned long i=0; i<n; ++i) {
585 if(tableInfo[i].relid == relid) {
586 ProvenanceTableInfo updated = tableInfo[i];
587 updated.ancestor_n = ancestor_n;
588 memcpy(updated.ancestors, ancestors, ancestor_n * sizeof(Oid));
589 tableInfo[i] = updated;
590 return;
591 }
592 }
593 /* No-op when relid has no kind record: callers should set kind
594 * first (add_provenance / repair_key do this). Silently
595 * dropping the ancestry payload here matches the existing
596 * removeTableInfo / setTableInfo "missing relid is harmless"
597 * pattern and avoids creating an OPAQUE-by-default record. */
598}
599
601{
602 if(relid == InvalidOid)
603 return;
604 unsigned long n = tableInfo.nbElements();
605 for(unsigned long i=0; i<n; ++i) {
606 if(tableInfo[i].relid == relid) {
607 tableInfo[i].relid = InvalidOid;
608 return;
609 }
610 }
611}
612
614{
615 if(relid == InvalidOid)
616 return;
617 unsigned long n = tableInfo.nbElements();
618 for(unsigned long i=0; i<n; ++i) {
619 if(tableInfo[i].relid == relid) {
620 tableInfo[i].ancestor_n = 0;
621 return;
622 }
623 }
624}
625
627{
628 if(relid == InvalidOid)
629 return false;
630 for(unsigned long i=0; i<tableInfo.nbElements(); ++i) {
631 if(tableInfo[i].relid == relid) {
632 out = tableInfo[i];
633 return true;
634 }
635 }
636 return false;
637}
638
639/**
640 * @brief Lexicographic less-than comparison for @c pg_uuid_t.
641 * @param a Left UUID.
642 * @param b Right UUID.
643 * @return @c true if @p a is lexicographically less than @p b.
644 */
645bool operator<(const pg_uuid_t a, const pg_uuid_t b)
646{
647 return memcmp(&a, &b, sizeof(pg_uuid_t))<0;
648}
649
651{
652 return createGenericCircuit(std::vector<pg_uuid_t>{token});
653}
654
656 const std::vector<pg_uuid_t> &roots) const
657{
658 /* Seed the work list with every root. std::set deduplicates so a
659 * UUID listed twice (or reached as a child of one root and the
660 * other's root itself) is processed only once. Shared subgraphs
661 * therefore land on a single gate_t in `result` -- the property
662 * that lets the conditional MC sampler couple the indicator and
663 * value paths through @c Sampler::scalar_cache_ / @c bool_cache_. */
664 std::set<pg_uuid_t> to_process, processed;
665 for(const auto &r : roots)
666 to_process.insert(r);
667
668 GenericCircuit result;
669
670 while(!to_process.empty()) {
671 pg_uuid_t uuid = *to_process.begin();
672 to_process.erase(to_process.begin());
673 processed.insert(uuid);
674 std::string f{uuid2string(uuid)};
675
676 gate_type type = getGateType(uuid);
677 gate_t id = result.setGate(f, type);
678 double prob = getProb(uuid);
679 if(!std::isnan(prob))
680 result.setProb(id, prob);
681
682 std::vector<pg_uuid_t> children = getChildren(uuid);
683 for(unsigned i=0; i<children.size(); ++i) {
684 result.addWire(
685 id,
686 result.getGate(uuid2string(children[i])));
687 if(processed.find(children[i])==processed.end())
688 to_process.insert(children[i]);
689 }
690
691 if(type==gate_mulinput || type==gate_eq || type==gate_agg
692 || type==gate_cmp || type==gate_arith) {
693 auto [info1, info2] = getInfos(uuid);
694 result.setInfos(id, info1, info2);
695 }
696
697 if(type==gate_project || type==gate_value || type==gate_agg
698 || type==gate_rv || type==gate_mulinput) {
699 auto extra = getExtra(uuid);
700 result.setExtra(id, extra);
701 }
702 }
703
704 return result;
705}
gate_t
Strongly-typed gate identifier.
Definition Circuit.h:49
Out-of-line template method implementations for Circuit<gateType>.
Semiring-agnostic in-memory provenance circuit.
static std::map< Oid, MMappedCircuit * > circuits
Per-database mmap-backed provenance circuits, keyed by database OID.
void destroy_provsql_mmap()
Unmap and close the mmap files.
bool operator<(const pg_uuid_t a, const pg_uuid_t b)
Lexicographic less-than comparison for pg_uuid_t.
void provsql_mmap_main_loop()
Main processing loop of the mmap background worker.
void initialize_provsql_mmap()
Open (or create) the mmap files and initialise the circuit store.
static MMappedCircuit * getCircuit(Oid db_oid)
Return (creating lazily if needed) the circuit for db_oid.
Persistent, mmap-backed storage for the full provenance circuit.
#define PROVSQL_TABLE_INFO_MAX_BLOCK_KEY
Cap on the number of block-key columns recorded per relation.
#define PROVSQL_TABLE_INFO_MAX_ANCESTORS
Cap on the number of base ancestors recorded per relation.
void addWire(gate_t f, gate_t t)
Add a directed wire from gate f (parent) to gate t (child).
Definition Circuit.hpp:81
gate_t getGate(const uuid &u)
Return (or create) the gate associated with UUID u.
Definition Circuit.hpp:33
In-memory provenance circuit with semiring-generic evaluation.
void setInfos(gate_t g, unsigned info1, unsigned info2)
Set the integer annotation pair for gate g.
gate_t setGate(gate_type type) override
Allocate a new gate with type type and no UUID.
void setExtra(gate_t g, const std::string &ex)
Attach a string extra to gate g.
void setProb(gate_t g, double p)
Set the probability for gate g.
Persistent mmap-backed representation of the provenance circuit.
void setTableAncestry(Oid relid, uint16_t ancestor_n, const Oid *ancestors)
Insert or update the ancestor set of a per-table metadata record, preserving any existing kind / bloc...
void setExtra(pg_uuid_t token, const std::string &s)
Attach a variable-length string annotation to a gate.
void setTableInfo(const ProvenanceTableInfo &info)
Insert or update the kind / block_key half of a per-table metadata record, preserving any existing an...
void removeTableInfo(Oid relid)
Remove a per-table metadata entry (both halves).
MMappedUUIDHashTable mapping
UUID → gate-index hash table.
void createGate(pg_uuid_t token, gate_type type, const std::vector< pg_uuid_t > &children)
Persist a new gate to the mmap store.
std::string getExtra(pg_uuid_t token) const
Return the variable-length string annotation for gate token.
unsigned long getNbGates() const
Return the total number of gates stored in the circuit.
static constexpr const char * GATES_FILENAME
Backing file for gates.
gate_type getGateType(pg_uuid_t token) const
Return the type of the gate identified by token.
static constexpr const char * TABLE_INFO_FILENAME
Backing file for tableInfo.
void removeTableAncestry(Oid relid)
Clear just the ancestor set of a per-table metadata record, preserving kind / block_key.
void sync()
Flush all backing files to disk with msync().
MMappedVector< ProvenanceTableInfo > tableInfo
Per-relation TID/BID metadata (safe-query optimisation).
static constexpr const char * WIRES_FILENAME
Backing file for wires.
GenericCircuit createGenericCircuit(pg_uuid_t token) const
Build an in-memory GenericCircuit rooted at token.
static std::string makePath(Oid db_oid, const char *filename)
Build the full path for a mmap file under $PGDATA/base/<db_oid>/.
bool setProb(pg_uuid_t token, double prob)
Set the probability associated with a gate.
static constexpr const char * EXTRA_FILENAME
Backing file for extra.
static constexpr const char * MAPPING_FILENAME
Backing file for mapping.
bool getTableInfo(Oid relid, ProvenanceTableInfo &out) const
Look up the full per-table metadata record (both halves).
MMappedVector< char > extra
Variable-length string data.
double getProb(pg_uuid_t token) const
Return the probability stored for the gate identified by token.
std::vector< pg_uuid_t > getChildren(pg_uuid_t token) const
Return the child UUIDs of the gate identified by token.
MMappedVector< GateInformation > gates
Gate metadata array.
MMappedVector< pg_uuid_t > wires
Flattened child UUID array.
void setInfos(pg_uuid_t token, unsigned info1, unsigned info2)
Update the info1 / info2 annotations of a gate.
MMappedCircuit(const std::string &mp, const std::string &gp, const std::string &wp, const std::string &ep, const std::string &tp, bool read_only)
Delegating constructor that accepts pre-built paths.
std::pair< unsigned, unsigned > getInfos(pg_uuid_t token) const
Return the info1 / info2 pair for the gate token.
static constexpr unsigned long NOTHING
Sentinel returned by operator[]() when the UUID is not present.
#define provsql_error(fmt,...)
Report a fatal ProvSQL error and abort the current transaction.
Background worker and IPC primitives for mmap-backed circuit storage.
#define READM(var, type)
Read one value of type from the background-to-main pipe.
#define WRITEB(pvar, type)
Write one value of type to the main-to-background pipe.
provsqlSharedState * provsql_shared_state
Pointer to the ProvSQL shared-memory segment (set in provsql_shmem_startup).
Shared-memory segment and inter-process pipe management.
@ gate_rv
Continuous random-variable leaf (extra encodes distribution).
@ gate_arith
n-ary arithmetic gate over scalar-valued children (info1 holds operator tag)
string uuid2string(pg_uuid_t uuid)
Format a pg_uuid_t as a std::string.
C++ utility functions for UUID manipulation.
Per-gate metadata stored in the gates MMappedVector.
unsigned info2
General-purpose integer annotation 2.
unsigned long children_idx
Start index of this gate's children in wires.
unsigned info1
General-purpose integer annotation 1.
unsigned nb_children
Number of children.
Per-relation metadata for the safe-query optimisation.
Oid relid
pg_class OID of the relation (primary key)
AttrNumber block_key[PROVSQL_TABLE_INFO_MAX_BLOCK_KEY]
Block-key column numbers.
uint16_t block_key_n
Number of valid entries in block_key.
Oid ancestors[PROVSQL_TABLE_INFO_MAX_ANCESTORS]
Sorted, deduplicated base-relation OIDs.
uint8_t kind
One of provsql_table_kind.
uint16_t ancestor_n
Number of valid entries in ancestors (0 = no registry info).
UUID structure.