ProvSQL C/C++ API
Adding support for provenance and uncertainty management to PostgreSQL databases
Loading...
Searching...
No Matches
MMappedCircuit.cpp
Go to the documentation of this file.
1/**
2 * @file MMappedCircuit.cpp
3 * @brief Persistent mmap-backed circuit: implementation and background-worker entry points.
4 *
5 * Implements the @c MMappedCircuit methods declared in @c MMappedCircuit.h,
6 * the @c createGenericCircuit() free function, and the background-worker
7 * entry points declared in @c provsql_mmap.h:
8 *
9 * - @c initialize_provsql_mmap(): called by the background worker at
10 * startup; opens all four mmap files and creates the singleton
11 * @c MMappedCircuit instance.
12 * - @c destroy_provsql_mmap(): called on shutdown; syncs and deletes the
13 * singleton.
14 * - @c provsql_mmap_main_loop(): the worker's main loop; receives gate-
15 * creation messages from backends over the IPC pipe and writes them
16 * to the mmap store.
17 *
18 * The @c createGenericCircuit() function performs a BFS from a root UUID,
19 * reading gates from the mmap store and building an in-memory @c GenericCircuit.
20 */
21#include <cerrno>
22#include <cmath>
23#include <sstream>
24
25#include "MMappedCircuit.h"
26#include "GenericCircuit.h"
27#include "Circuit.hpp"
28#include "provsql_utils_cpp.h"
29
30extern "C" {
31#include "provsql_mmap.h"
32#include "provsql_shmem.h"
33}
34
35/** @brief Singleton pointer to the process-wide mmap-backed provenance circuit. */
36static MMappedCircuit *circuit = NULL;
37
42
44{
45 delete circuit;
46}
47
49 pg_uuid_t token, gate_type type, const std::vector<pg_uuid_t> &children)
50{
51 auto [idx, created] = mapping.add(token);
52 if(!created) // Was already existing, no need to do anything
53 return;
54
55 gates.add({type, static_cast<unsigned>(children.size()), wires.nbElements()});
56 for(const auto &c: children)
57 {
58 wires.add(c);
59 }
60}
61
63{
64 auto idx = mapping[token];
66 return gate_invalid;
67 else
68 return gates[idx].type;
69}
70
71std::vector<pg_uuid_t> MMappedCircuit::getChildren(pg_uuid_t token) const
72{
73 std::vector<pg_uuid_t> result;
74 auto idx = mapping[token];
76 const GateInformation &gi = gates[idx];
77 for(unsigned long k=gi.children_idx; k<gi.children_idx+gi.nb_children; ++k)
78 result.push_back(wires[k]);
79 }
80 return result;
81}
82
83bool MMappedCircuit::setProb(pg_uuid_t token, double prob)
84{
85 auto idx = mapping[token];
87 (gates[idx].type == gate_input || gates[idx].type==gate_update || gates[idx].type == gate_mulinput)) {
88 gates[idx].prob=prob;
89 return true;
90 } else
91 return false;
92}
93
95{
96 auto idx = mapping[token];
98 (gates[idx].type != gate_input && gates[idx].type != gate_update && gates[idx].type != gate_mulinput))
99 return NAN;
100 else
101 return gates[idx].prob;
102}
103
104void MMappedCircuit::setInfos(pg_uuid_t token, unsigned info1, unsigned info2)
105{
106 auto idx = mapping[token];
108 gates[idx].info1=info1;
109 gates[idx].info2=info2;
110 }
111}
112
113void MMappedCircuit::setExtra(pg_uuid_t token, const std::string &s)
114{
115 auto idx = mapping[token];
117 gates[idx].extra_idx=extra.nbElements();
118 for(auto c: s)
119 extra.add(c);
120 gates[idx].extra_len=s.size();
121 }
122}
123
124std::pair<unsigned, unsigned> MMappedCircuit::getInfos(pg_uuid_t token) const
125{
126 auto idx = mapping[token];
128 return std::make_pair(0, 0);
129 } else {
130 const GateInformation &gi = gates[idx];
131 return std::make_pair(gi.info1, gi.info2);
132 }
133}
134
135std::string MMappedCircuit::getExtra(pg_uuid_t token) const
136{
137 std::string result;
138
139 auto idx = mapping[token];
141 for(unsigned long start=gates[idx].extra_idx, k=start, end=start+gates[idx].extra_len; k<end; ++k)
142 result+=extra[k];
143 }
144
145 return result;
146}
147
149{
150 char c;
151
152 while(READM(c, char)) {
153 switch(c) {
154 case 'C':
155 {
156 pg_uuid_t token;
157 gate_type type;
158 unsigned nb_children;
159
160 if(!READM(token, pg_uuid_t) || !READM(type, gate_type) || !READM(nb_children, unsigned))
161 provsql_error("Cannot read from pipe (message type C)"); ;
162
163 std::vector<pg_uuid_t> children(nb_children);
164 for(unsigned i=0; i<nb_children; ++i)
165 if(!READM(children[i], pg_uuid_t))
166 provsql_error("Cannot read from pipe (message type C)");
167
168 circuit->createGate(token, type, children);
169 break;
170 }
171
172 case 'P':
173 {
174 pg_uuid_t token;
175 double prob;
176
177 if(!READM(token, pg_uuid_t) || !READM(prob, double))
178 provsql_error("Cannot read from pipe (message type P)");
179
180 bool ok = circuit->setProb(token, prob);
181 char return_value = ok?static_cast<char>(1):0;
182
183 if(!WRITEB(&return_value, char))
184 provsql_error("Cannot write response to pipe (message type P)");
185 break;
186 }
187
188 case 'I':
189 {
190 pg_uuid_t token;
191 unsigned info1, info2;
192
193 if(!READM(token, pg_uuid_t) || !READM(info1, unsigned) || !READM(info2, unsigned))
194 provsql_error("Cannot read from pipe (message type I)");
195
196 circuit->setInfos(token, info1, info2);
197 break;
198 }
199
200 case 'E':
201 {
202 pg_uuid_t token;
203 unsigned len;
204
205 if(!READM(token, pg_uuid_t) || !READM(len, unsigned))
206 provsql_error("Cannot read from pipe (message type E)");
207
208 if(len>0) {
209 char *data = new char[len];
210 if(read(provsql_shared_state->pipebmr, data, len)<len)
211 provsql_error("Cannot read from pipe (message type E)");
212
213 circuit->setExtra(token, std::string(data, len));
214 }
215
216 break;
217 }
218
219 case 't':
220 {
221 pg_uuid_t token;
222
223 if(!READM(token, pg_uuid_t))
224 provsql_error("Cannot read from pipe (message type t)");
225
226 gate_type type = circuit->getGateType(token);
227
228 if(!WRITEB(&type, gate_type))
229 provsql_error("Cannot write response to pipe (message type t)");
230 break;
231 }
232
233 case 'n':
234 {
235 unsigned long nb = circuit->getNbGates();
236
237 if(!WRITEB(&nb, unsigned long))
238 provsql_error("Cannot write response to pipe (message type n)");
239 break;
240 }
241
242 case 'c':
243 {
244 pg_uuid_t token;
245
246 if(!READM(token, pg_uuid_t))
247 provsql_error("Cannot read from pipe (message type c)");
248
249 auto children = circuit->getChildren(token);
250 unsigned nb_children = children.size();
251 if(!WRITEB(&nb_children, unsigned))
252 provsql_error("Cannot write response to pipe (message type c)");
253
254 if(write(provsql_shared_state->pipembw, &children[0], nb_children*sizeof(pg_uuid_t))==-1)
255 provsql_error("Cannot write response to pipe (message type c)");
256 break;
257 }
258
259 case 'p':
260 {
261 pg_uuid_t token;
262
263 if(!READM(token, pg_uuid_t))
264 provsql_error("Cannot read from pipe (message type p)");
265
266 double prob = circuit->getProb(token);
267
268 if(!WRITEB(&prob, double))
269 provsql_error("Cannot write response to pipe (message type p)");
270 break;
271 }
272
273 case 'i':
274 {
275 pg_uuid_t token;
276
277 if(!READM(token, pg_uuid_t))
278 provsql_error("Cannot read from pipe (message type i)");
279
280 auto infos = circuit->getInfos(token);
281
282 if(!WRITEB(&infos.first, unsigned) || !WRITEB(&infos.second, unsigned))
283 provsql_error("Cannot write response to pipe (message type i)");
284 break;
285 }
286
287 case 'e':
288 {
289 pg_uuid_t token;
290
291 if(!READM(token, pg_uuid_t))
292 provsql_error("Cannot read from pipe (message type e)");
293
294 auto str = circuit->getExtra(token);
295 unsigned len = str.size();
296
297 if(!WRITEB(&len, unsigned) || write(provsql_shared_state->pipembw, str.data(), len)==-1)
298 provsql_error("Cannot write response to pipe (message type e)");
299 break;
300 }
301
302 case 'g':
303 {
304 pg_uuid_t token;
305
306 if(!READM(token, pg_uuid_t))
307 provsql_error("Cannot read from pipe (message type g)");
308
309 std::stringstream ss;
310 boost::archive::binary_oarchive oa(ss);
311 oa << createGenericCircuit(token);
312
313 ss.seekg(0, std::ios::end);
314 unsigned long size = ss.tellg();
315 ss.seekg(0, std::ios::beg);
316
317 if(!WRITEB(&size, unsigned long) || write(provsql_shared_state->pipembw, ss.str().data(), size)==-1)
318 provsql_error("Cannot write to pipe (message type g)");
319 break;
320 }
321
322 default:
323 provsql_error("Wrong message type: %c", c);
324 }
325 }
326
327 int e = errno;
328 provsql_error("Reading from pipe: %s", strerror(e));
329}
330
332{
333 gates.sync();
334 wires.sync();
335 mapping.sync();
336}
337
338/**
339 * @brief Lexicographic less-than comparison for @c pg_uuid_t.
340 * @param a Left UUID.
341 * @param b Right UUID.
342 * @return @c true if @p a is lexicographically less than @p b.
343 */
344bool operator<(const pg_uuid_t a, const pg_uuid_t b)
345{
346 return memcmp(&a, &b, sizeof(pg_uuid_t))<0;
347}
348
350{
351 std::set<pg_uuid_t> to_process, processed;
352 to_process.insert(token);
353
354 GenericCircuit result;
355
356 while(!to_process.empty()) {
357 pg_uuid_t uuid = *to_process.begin();
358 to_process.erase(to_process.begin());
359 processed.insert(uuid);
360 std::string f{uuid2string(uuid)};
361
362 gate_type type = circuit->getGateType(uuid);
363 gate_t id = result.setGate(f, type);
364 double prob = circuit->getProb(uuid);
365 if(!std::isnan(prob))
366 result.setProb(id, prob);
367
368 std::vector<pg_uuid_t> children = circuit->getChildren(uuid);
369 for(unsigned i=0; i<children.size(); ++i) {
370 result.addWire(
371 id,
372 result.getGate(uuid2string(children[i])));
373 if(processed.find(children[i])==processed.end())
374 to_process.insert(children[i]);
375 }
376
377 if(type==gate_mulinput || type==gate_eq || type==gate_agg || type==gate_cmp) {
378 auto [info1, info2] = circuit->getInfos(uuid);
379 result.setInfos(id, info1, info2);
380 }
381
382 if(type==gate_project || type==gate_value || type==gate_agg) {
383 auto extra = circuit->getExtra(uuid);
384 result.setExtra(id, extra);
385 }
386 }
387
388 return result;
389}
gate_t
Strongly-typed gate identifier.
Definition Circuit.h:48
Out-of-line template method implementations for Circuit<gateType>.
Semiring-agnostic in-memory provenance circuit.
GenericCircuit createGenericCircuit(pg_uuid_t token)
Build an in-memory GenericCircuit rooted at token.
void destroy_provsql_mmap()
Unmap and close the mmap files.
bool operator<(const pg_uuid_t a, const pg_uuid_t b)
Lexicographic less-than comparison for pg_uuid_t.
void provsql_mmap_main_loop()
Main processing loop of the mmap background worker.
static MMappedCircuit * circuit
Singleton pointer to the process-wide mmap-backed provenance circuit.
void initialize_provsql_mmap()
Open (or create) the mmap files and initialise the circuit store.
Persistent, mmap-backed storage for the full provenance circuit.
void addWire(gate_t f, gate_t t)
Add a directed wire from gate f (parent) to gate t (child).
Definition Circuit.hpp:81
gate_t getGate(const uuid &u)
Return (or create) the gate associated with UUID u.
Definition Circuit.hpp:33
In-memory provenance circuit with semiring-generic evaluation.
void setInfos(gate_t g, unsigned info1, unsigned info2)
Set the integer annotation pair for gate g.
gate_t setGate(gate_type type) override
Allocate a new gate with type type and no UUID.
void setExtra(gate_t g, const std::string &ex)
Attach a string extra to gate g.
void setProb(gate_t g, double p)
Set the probability for gate g.
Persistent mmap-backed representation of the provenance circuit.
void setExtra(pg_uuid_t token, const std::string &s)
Attach a variable-length string annotation to a gate.
MMappedUUIDHashTable mapping
UUID → gate-index hash table.
void createGate(pg_uuid_t token, gate_type type, const std::vector< pg_uuid_t > &children)
Persist a new gate to the mmap store.
std::string getExtra(pg_uuid_t token) const
Return the variable-length string annotation for gate token.
unsigned long getNbGates() const
Return the total number of gates stored in the circuit.
gate_type getGateType(pg_uuid_t token) const
Return the type of the gate identified by token.
void sync()
Flush all backing files to disk with msync().
bool setProb(pg_uuid_t token, double prob)
Set the probability associated with a gate.
MMappedVector< char > extra
Variable-length string data.
double getProb(pg_uuid_t token) const
Return the probability stored for the gate identified by token.
std::vector< pg_uuid_t > getChildren(pg_uuid_t token) const
Return the child UUIDs of the gate identified by token.
MMappedVector< GateInformation > gates
Gate metadata array.
MMappedVector< pg_uuid_t > wires
Flattened child UUID array.
void setInfos(pg_uuid_t token, unsigned info1, unsigned info2)
Update the info1 / info2 annotations of a gate.
std::pair< unsigned, unsigned > getInfos(pg_uuid_t token) const
Return the info1 / info2 pair for the gate token.
std::pair< unsigned long, bool > add(pg_uuid_t u)
Insert UUID u, assigning it the next available integer.
void sync()
Flush dirty pages to the backing file with msync().
static constexpr unsigned long NOTHING
Sentinel returned by operator[]() when the UUID is not present.
unsigned long nbElements() const
Return the number of elements currently stored.
void add(const T &value)
Append an element to the end of the vector.
void sync()
Flush dirty pages to the backing file with msync().
#define provsql_error(fmt,...)
Report a fatal ProvSQL error and abort the current transaction.
Background worker and IPC primitives for mmap-backed circuit storage.
#define READM(var, type)
Read one value of type from the background-to-main pipe.
#define WRITEB(pvar, type)
Write one value of type to the main-to-background pipe.
provsqlSharedState * provsql_shared_state
Pointer to the ProvSQL shared-memory segment (set in provsql_shmem_startup).
Shared-memory segment and inter-process pipe management.
gate_type
Possible gate type in the provenance circuit.
@ gate_update
Update operation.
@ gate_eq
Equijoin gate (for where provenance)
@ gate_value
Scalar value (for aggregate provenance)
@ gate_mulinput
Multivalued input (for Boolean provenance)
@ gate_agg
Aggregation operator (for aggregate provenance)
@ gate_project
Project gate (for where provenance)
@ gate_invalid
Invalid gate type.
@ gate_cmp
Currently unused, meant for comparison of aggregate values.
@ gate_input
Input (variable) gate of the circuit.
string uuid2string(pg_uuid_t uuid)
Format a pg_uuid_t as a std::string.
C++ utility functions for UUID manipulation.
Per-gate metadata stored in the gates MMappedVector.
unsigned info2
General-purpose integer annotation 2.
unsigned long children_idx
Start index of this gate's children in wires.
unsigned info1
General-purpose integer annotation 1.
unsigned nb_children
Number of children.
UUID structure.
long pipebmr
Background-to-main pipe: read end (worker reads)
long pipembw
Main-to-background pipe: write end (backend writes)