ProvSQL C/C++ API
Adding support for provenance and uncertainty management to PostgreSQL databases
Loading...
Searching...
No Matches
kcmcp_server.cpp
Go to the documentation of this file.
1/**
2 * @file kcmcp_server.cpp
3 * @brief Implementation of the tdkc KCMCP reference server (see
4 * kcmcp_server.h and doc/source/dev/kc-server-protocol.rst).
5 */
6#include "kcmcp_server.h"
7#include "kcmcp_protocol.h"
8#include "dimacs_cnf.h"
9#include "tdkc_interrupt.h"
10
11#include "BooleanCircuit.h"
12#include "dDNNF.h"
14#include "TreeDecomposition.h"
15#include "Circuit.hpp"
16
17#include <boost/property_tree/ptree.hpp>
18#include <boost/property_tree/json_parser.hpp>
19
20#include <algorithm>
21#include <chrono>
22#include <cstdio>
23#include <iomanip>
24#include <sstream>
25#include <stdexcept>
26#include <string>
27
28extern "C" {
29#include <sys/socket.h>
30#include <sys/un.h>
31#include <netdb.h>
32#include <poll.h>
33#include <unistd.h>
34#include <signal.h>
35#include <errno.h>
36#include <string.h>
37}
38
39using namespace kcmcp;
40using steady = std::chrono::steady_clock;
41
42namespace {
43
44// Thrown from the poll hook to abort a running build.
45struct Cancelled {};
46struct TimedOut {};
47
48constexpr uint32_t SERVER_MAX_PAYLOAD = 256u * 1024 * 1024; // advertised
49constexpr uint32_t CLIENT_FLOOR = 1u * 1024 * 1024; // client's 1 MiB
50constexpr long KCMCP_MAJOR = 1; // protocol major this server speaks
51
52// Context of the request currently being served, consulted by
53// provsql_tdkc_poll() (which the build loops call via CHECK_FOR_INTERRUPTS).
54struct ActiveJob {
55 Connection *conn = nullptr;
56 uint32_t request_id = 0;
57 bool cancel = false;
58 steady::time_point start;
59 steady::time_point last_progress;
60 steady::time_point deadline;
61 bool has_deadline = false;
62 std::chrono::milliseconds progress_interval{2000};
63};
64ActiveJob *g_job = nullptr;
65
66// The major version from a client HELLO's "kcmcp":[major,minor]; -1 if the
67// field is absent or unparseable (treated leniently as compatible).
68long client_major(const std::string &hello_json)
69{
70 try {
71 boost::property_tree::ptree pt;
72 std::istringstream is(hello_json);
73 boost::property_tree::read_json(is, pt);
74 auto kc = pt.get_child_optional("kcmcp");
75 if (!kc)
76 return -1;
77 for (const auto &elem : *kc) // JSON array: elements have empty keys
78 return elem.second.get_value<long>(); // first element is the major
79 return -1;
80 } catch (...) {
81 return -1;
82 }
83}
84
85std::string server_hello()
86{
87 std::ostringstream o;
88 o << "{\"kcmcp\":1,\"engine\":\"tdkc\",\"max_payload\":" << SERVER_MAX_PAYLOAD
89 << ",\"operations\":[\"compile\",\"wmc\"]"
90 << ",\"input_formats\":[\"dimacs-cnf\"]"
91 << ",\"output_formats\":{\"compile\":[\"ddnnf-nnf\"],\"wmc\":[\"decimal\"]}"
92 << ",\"features\":[\"cancel\",\"progress\"]}";
93 return o.str();
94}
95
96// Read an integer option from the REQUEST options JSON, returning @p def when
97// the blob is empty, malformed, or lacks the key (ignore-unknown semantics).
98long option_long(const std::string &options, const char *key, long def)
99{
100 if (options.empty())
101 return def;
102 try {
103 boost::property_tree::ptree pt;
104 std::istringstream is(options);
105 boost::property_tree::read_json(is, pt);
106 return pt.get<long>(key, def);
107 } catch (...) {
108 return def; // a malformed options block is treated as {}
109 }
110}
111
112void send_error(Connection &conn, uint32_t rid, ErrorCode code,
113 const std::string &msg)
114{
115 conn.send(Type::ERROR, rid, build_error(code, msg));
116}
117
118// Map an input gate's UUID ("v<n>") back to its CNF variable index.
119int var_of_input_uuid(const std::string &u)
120{
121 if (u.size() > 1 && u[0] == 'v') {
122 try { return std::stoi(u.substr(1)); } catch (...) {}
123 }
124 return -1; // unknown -> toNNF falls back to the gate id
125}
126
127void handle_request(Connection &conn, const Message &msg)
128{
129 Request req;
130 if (!parse_request(msg.payload, req)) {
131 send_error(conn, msg.request_id, ErrorCode::PARSE, "malformed REQUEST payload");
132 return;
133 }
135 send_error(conn, msg.request_id, ErrorCode::UNSUPPORTED_OPERATION,
136 std::string("unsupported operation '") + operation_name(req.operation)
137 + "'; this engine offers compile and wmc");
138 return;
139 }
141 send_error(conn, msg.request_id, ErrorCode::UNSUPPORTED_FORMAT,
142 std::string("unsupported input '") + input_format_name(req.input_format)
143 + "'; this engine reads dimacs-cnf");
144 return;
145 }
146 const bool weighted = (req.operation == Operation::WMC);
148 if (req.output_format != want) {
149 send_error(conn, msg.request_id, ErrorCode::UNSUPPORTED_FORMAT,
150 std::string("operation '") + operation_name(req.operation)
151 + "' produces '" + output_format_name(want) + "' here");
152 return;
153 }
154
156 gate_t root;
157 try {
158 root = parse_dimacs_cnf(req.problem, c, weighted);
159 } catch (const std::exception &e) {
160 send_error(conn, msg.request_id, ErrorCode::PARSE, e.what());
161 return;
162 }
163
164 ActiveJob job;
165 job.conn = &conn;
166 job.request_id = msg.request_id;
167 job.start = job.last_progress = steady::now();
168 long timeout_ms = option_long(req.options, "timeout_ms", 0);
169 if (timeout_ms > 0) {
170 job.deadline = job.start + std::chrono::milliseconds(timeout_ms);
171 job.has_deadline = true;
172 }
173 // PROGRESS cadence, in milliseconds (default 2000); 0 = emit on every poll.
174 job.progress_interval =
175 std::chrono::milliseconds(std::max(0L, option_long(req.options,
176 "progress_every_ms", 2000)));
177 g_job = &job;
178
179 try {
180 TreeDecomposition td(c);
181 auto dnnf = dDNNFTreeDecompositionBuilder{c, root, td}.build();
182 g_job = nullptr;
183
184 std::ostringstream meta;
185 if (req.operation == Operation::COMPILE) {
186 std::string nnf = dnnf.toNNF(var_of_input_uuid);
187 meta << "{\"treewidth\":" << td.getTreewidth()
188 << ",\"nodes\":" << dnnf.getNbGates() << ",\"exact\":true}";
189 conn.send(Type::RESULT, msg.request_id,
190 build_result(OutputFormat::DDNNF_NNF, meta.str(), nnf));
191 } else {
192 double p = dnnf.probabilityEvaluation();
193 meta << "{\"treewidth\":" << td.getTreewidth() << ",\"exact\":true}";
194 std::ostringstream val;
195 val << std::setprecision(15) << p;
196 conn.send(Type::RESULT, msg.request_id,
197 build_result(OutputFormat::DECIMAL, meta.str(), val.str()));
198 }
199 } catch (const Cancelled &) {
200 g_job = nullptr;
201 send_error(conn, msg.request_id, ErrorCode::CANCELLED, "cancelled at client request");
202 } catch (const TimedOut &) {
203 g_job = nullptr;
204 send_error(conn, msg.request_id, ErrorCode::TIMEOUT,
205 "time budget exceeded (timeout_ms)");
206 } catch (const TreeDecompositionException &) {
207 g_job = nullptr;
208 send_error(conn, msg.request_id, ErrorCode::INTERNAL,
209 "treewidth exceeds the supported bound");
210 } catch (const std::exception &e) {
211 g_job = nullptr;
212 send_error(conn, msg.request_id, ErrorCode::INTERNAL, e.what());
213 }
214}
215
216void run_session(int fd)
217{
218 Connection conn(fd, SERVER_MAX_PAYLOAD, CLIENT_FLOOR);
219
220 // Handshake: read the client HELLO, reply with ours.
221 Message m;
222 try {
223 if (!conn.recv(m))
224 return;
225 } catch (...) {
226 return;
227 }
228 if (m.type != Type::HELLO) {
229 try {
231 "expected HELLO as the first frame");
232 } catch (...) {}
233 return;
234 }
235 // A major bump is a breaking change: if the client requires a newer major
236 // than we implement there is no shared version, so we do not silently
237 // downgrade -- we reject and let the client fall back.
238 long major = client_major(m.payload);
239 if (major > KCMCP_MAJOR) {
240 try {
241 send_error(conn, 0, ErrorCode::UNSUPPORTED_VERSION,
242 "this server speaks KCMCP major " + std::to_string(KCMCP_MAJOR)
243 + "; client requires major " + std::to_string(major));
244 } catch (...) {}
245 return;
246 }
247 try {
248 conn.send(Type::HELLO, 0, server_hello());
249 } catch (...) {
250 return;
251 }
252
253 // Request/response loop on the same connection.
254 for (;;) {
255 try {
256 if (!conn.recv(m))
257 return; // clean close
258 } catch (const ProtocolError &e) {
259 // Non-fatal (e.g. an undecodable but length-bounded COMPRESSED frame):
260 // the stream stayed synchronised, so answer -- echoing the offending
261 // frame's request_id -- and keep serving. Fatal: the stream is
262 // desynchronised, so close after the ERROR.
263 try { send_error(conn, e.fatal ? 0 : m.request_id, e.code, e.what()); }
264 catch (...) { return; }
265 if (e.fatal)
266 return;
267 continue;
268 } catch (...) {
269 return;
270 }
271 switch (m.type) {
272 case Type::REQUEST:
273 handle_request(conn, m);
274 break;
275 case Type::PING:
276 conn.send(Type::PONG, m.request_id);
277 break;
278 case Type::CANCEL:
279 break; // no job is running between requests
280 case Type::BYE:
281 return;
282 default:
284 "unexpected frame type");
285 }
286 }
287}
288
289int listen_unix(const std::string &path)
290{
291 int fd = ::socket(AF_UNIX, SOCK_STREAM, 0);
292 if (fd < 0) { perror("socket"); return -1; }
293 struct sockaddr_un addr;
294 memset(&addr, 0, sizeof(addr));
295 addr.sun_family = AF_UNIX;
296 if (path.size() >= sizeof(addr.sun_path)) {
297 fprintf(stderr, "tdkc: socket path too long\n");
298 return -1;
299 }
300 strncpy(addr.sun_path, path.c_str(), sizeof(addr.sun_path) - 1);
301 ::unlink(path.c_str()); // clear a stale socket file
302 if (::bind(fd, reinterpret_cast<sockaddr *>(&addr), sizeof(addr)) < 0) {
303 perror("bind"); return -1;
304 }
305 return fd;
306}
307
308int listen_tcp(const std::string &host, const std::string &port)
309{
310 struct addrinfo hints, *res = nullptr;
311 memset(&hints, 0, sizeof(hints));
312 hints.ai_family = AF_UNSPEC;
313 hints.ai_socktype = SOCK_STREAM;
314 if (::getaddrinfo(host.c_str(), port.c_str(), &hints, &res) != 0 || !res) {
315 fprintf(stderr, "tdkc: cannot resolve %s:%s\n", host.c_str(), port.c_str());
316 return -1;
317 }
318 int fd = ::socket(res->ai_family, res->ai_socktype, res->ai_protocol);
319 if (fd < 0) { perror("socket"); freeaddrinfo(res); return -1; }
320 int one = 1;
321 ::setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(one));
322 if (::bind(fd, res->ai_addr, res->ai_addrlen) < 0) {
323 perror("bind"); freeaddrinfo(res); ::close(fd); return -1;
324 }
325 freeaddrinfo(res);
326 // Report the actual port (so an ephemeral "host:0" is discoverable).
327 struct sockaddr_storage ss;
328 socklen_t sl = sizeof(ss);
329 if (::getsockname(fd, reinterpret_cast<sockaddr *>(&ss), &sl) == 0) {
330 char hbuf[NI_MAXHOST], pbuf[NI_MAXSERV];
331 if (::getnameinfo(reinterpret_cast<sockaddr *>(&ss), sl, hbuf, sizeof(hbuf),
332 pbuf, sizeof(pbuf), NI_NUMERICHOST | NI_NUMERICSERV) == 0)
333 fprintf(stderr, "tdkc: bound %s:%s\n", hbuf, pbuf);
334 }
335 return fd;
336}
337
338} // namespace
339
340// Build-loop interrupt hook (declared in tdkc_interrupt.h). Services the
341// active connection mid-job and aborts the build on cancel/timeout.
343{
344 ActiveJob *job = g_job;
345 if (!job)
346 return;
347
348 struct pollfd pfd;
349 pfd.fd = job->conn->fd();
350 pfd.events = POLLIN;
351 pfd.revents = 0;
352 if (::poll(&pfd, 1, 0) > 0 && (pfd.revents & POLLIN)) {
353 Message m;
354 try {
355 if (job->conn->recv(m)) {
356 if (m.type == Type::CANCEL && m.request_id == job->request_id)
357 job->cancel = true;
358 else if (m.type == Type::PING)
359 job->conn->send(Type::PONG, m.request_id);
360 // other frames mid-job are ignored (one in-flight request)
361 } else {
362 job->cancel = true; // peer closed: abandon the job
363 }
364 } catch (...) {
365 job->cancel = true;
366 }
367 }
368
369 auto now = steady::now();
370 if (job->cancel)
371 throw Cancelled{};
372 if (job->has_deadline && now >= job->deadline)
373 throw TimedOut{};
374 if (now - job->last_progress >= job->progress_interval) {
375 long ms = std::chrono::duration_cast<std::chrono::milliseconds>(
376 now - job->start).count();
377 try {
378 job->conn->send(Type::PROGRESS, job->request_id,
379 "{\"phase\":\"compile\",\"elapsed_ms\":" + std::to_string(ms) + "}");
380 } catch (...) {
381 job->cancel = true;
382 throw Cancelled{};
383 }
384 job->last_progress = now;
385 }
386}
387
388int kcmcp_serve(const std::string &endpoint)
389{
390 ::signal(SIGPIPE, SIG_IGN); // a peer vanishing mid-send must not kill us
391
392 int lfd;
393 if (endpoint.rfind("unix:", 0) == 0) {
394 lfd = listen_unix(endpoint.substr(5));
395 } else {
396 auto colon = endpoint.rfind(':');
397 if (colon == std::string::npos) {
398 fprintf(stderr, "tdkc: endpoint must be unix:/path or host:port\n");
399 return 1;
400 }
401 lfd = listen_tcp(endpoint.substr(0, colon), endpoint.substr(colon + 1));
402 }
403 if (lfd < 0)
404 return 1;
405 if (::listen(lfd, 16) < 0) {
406 perror("listen"); ::close(lfd); return 1;
407 }
408 fprintf(stderr, "tdkc: KCMCP server listening on %s\n", endpoint.c_str());
409 fflush(stderr);
410
411 for (;;) {
412 int cfd = ::accept(lfd, nullptr, nullptr);
413 if (cfd < 0) {
414 if (errno == EINTR) continue;
415 perror("accept");
416 break;
417 }
418 run_session(cfd);
419 ::close(cfd);
420 g_job = nullptr; // defensive: never leak a stale job across sessions
421 }
422 ::close(lfd);
423 return 0;
424}
Boolean provenance circuit with support for knowledge compilation.
gate_t
Strongly-typed gate identifier.
Definition Circuit.h:49
Out-of-line template method implementations for Circuit<gateType>.
Tree decomposition of a Boolean circuit for knowledge compilation.
Boolean circuit for provenance formula evaluation.
Exception thrown when a tree decomposition cannot be constructed.
Tree decomposition of a Boolean circuit's primal graph.
unsigned getTreewidth() const
Return the treewidth of this decomposition.
Builds a d-DNNF from a Boolean circuit using a tree decomposition.
dDNNF && build() &&
Execute the compilation and return the resulting d-DNNF.
Framed message transport over one connected socket fd.
bool recv(Message &out)
Read one logical message (concatenating MORE frames).
void send(Type type, uint32_t request_id, const std::string &payload)
Send a message, splitting payload across MORE-flagged frames no larger than the peer's limit.
Constructs a d-DNNF from a Boolean circuit and its tree decomposition.
Decomposable Deterministic Negation Normal Form circuit.
gate_t parse_dimacs_cnf(const std::string &text, BooleanCircuit &c, bool weighted)
Parse text (a DIMACS CNF) into c and return the root gate.
Parse a DIMACS CNF into a ProvSQL BooleanCircuit.
Wire codec for KCMCP, the Knowledge Compiler / Model Counter Protocol (see doc/source/dev/kc-server-p...
int kcmcp_serve(const std::string &endpoint)
Serve KCMCP on endpoint until terminated.
std::chrono::steady_clock steady
void provsql_tdkc_poll()
The tdkc KCMCP reference server.
std::string build_result(OutputFormat fmt, const std::string &meta_json, const std::string &result)
Build a RESULT payload (result_format byte + meta JSON + result bytes).
const char * input_format_name(InputFormat fmt)
const char * output_format_name(OutputFormat fmt)
ErrorCode
ERROR codes.
@ UNSUPPORTED_VERSION
client requires a KCMCP major this server lacks
@ UNSUPPORTED_OPERATION
unsupported operation or unknown frame type
const char * operation_name(Operation op)
OutputFormat
Output-format registry (REQUEST byte 2 / RESULT byte 0; one shared space).
std::string build_error(ErrorCode code, const std::string &message)
Build an ERROR payload (u16 code + UTF-8 message).
bool parse_request(const std::string &payload, Request &out)
Decode a REQUEST payload; returns false if structurally malformed.
A fully reassembled inbound message (MORE frames concatenated).
uint32_t request_id
std::string payload
Thrown by Connection on a protocol violation that warrants an ERROR frame (e.g.
Decoded REQUEST payload.
InputFormat input_format
std::string problem
the formula bytes
Operation operation
std::string options
UTF-8 JSON (may be empty == {}).
OutputFormat output_format
Build-loop interrupt hook for the standalone tdkc binary.