8#include "storage/ipc.h"
10#include <sys/socket.h>
35constexpr uint32_t CLIENT_RECV_MAX = 256u * 1024 * 1024;
39constexpr uint32_t CLIENT_SEND_MAX = 1u * 1024 * 1024;
42int connect_endpoint(
const std::string &endpoint)
44 if (endpoint.rfind(
"unix:", 0) == 0) {
45 std::string path = endpoint.substr(5);
46 int fd = ::socket(AF_UNIX, SOCK_STREAM, 0);
49 struct sockaddr_un addr;
50 memset(&addr, 0,
sizeof(addr));
51 addr.sun_family = AF_UNIX;
52 if (path.size() >=
sizeof(addr.sun_path)) {
56 strncpy(addr.sun_path, path.c_str(),
sizeof(addr.sun_path) - 1);
57 if (::connect(fd,
reinterpret_cast<sockaddr *
>(&addr),
sizeof(addr)) < 0) {
64 auto colon = endpoint.rfind(
':');
65 if (colon == std::string::npos)
67 std::string host = endpoint.substr(0, colon), port = endpoint.substr(colon + 1);
68 struct addrinfo hints, *res =
nullptr;
69 memset(&hints, 0,
sizeof(hints));
70 hints.ai_family = AF_UNSPEC;
71 hints.ai_socktype = SOCK_STREAM;
72 if (::getaddrinfo(host.c_str(), port.c_str(), &hints, &res) != 0 || !res)
75 for (
auto *ai = res; ai; ai = ai->ai_next) {
76 fd = ::socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol);
79 if (::connect(fd, ai->ai_addr, ai->ai_addrlen) == 0)
88uint16_t get_u16(
const std::string &s,
size_t off)
90 return (uint16_t(
static_cast<unsigned char>(s[off])) << 8)
91 | uint16_t(
static_cast<unsigned char>(s[off + 1]));
97struct ServerError : std::runtime_error {
98 using std::runtime_error::runtime_error;
107std::string g_endpoint;
108uint32_t g_request_id = 0;
109bool g_atexit_registered =
false;
122void kcmcp_atexit(
int code, Datum arg)
127 unsigned char bye[10] = { 0x08, 0, 0, 0, 0, 0, 0, 0, 0, 0 };
128 ssize_t n = ::write(g_fd, bye,
sizeof(bye));
141void wait_readable_or_cancel()
148 int r = ::poll(&pfd, 1, 100);
149 if (r > 0 && (pfd.revents & (POLLIN | POLLHUP | POLLERR)))
151 if (r < 0 && errno != EINTR)
153 if (QueryCancelPending || ProcDiePending) {
155 CHECK_FOR_INTERRUPTS();
164void ensure_connection(
const std::string &endpoint)
166 if (g_fd >= 0 && g_endpoint == endpoint)
170 int fd = connect_endpoint(endpoint);
172 throw std::runtime_error(
"cannot connect to KCMCP endpoint '" + endpoint +
"'");
174 Connection conn(fd, CLIENT_RECV_MAX, CLIENT_SEND_MAX);
175 conn.send(
Type::HELLO, 0,
"{\"kcmcp\":[1,0],\"client\":\"ProvSQL\"}");
178 throw std::runtime_error(
"KCMCP server closed during handshake");
180 throw std::runtime_error(
"KCMCP handshake refused: "
183 throw std::runtime_error(
"KCMCP: expected HELLO from server");
189 g_endpoint = endpoint;
193std::string do_compile(uint8_t input_format,
const std::string &problem)
195 Connection conn(g_fd, CLIENT_RECV_MAX, CLIENT_SEND_MAX);
198 req.push_back(
static_cast<char>(2));
199 req.push_back(
static_cast<char>(input_format));
200 req.push_back(
static_cast<char>(4));
211 wait_readable_or_cancel();
213 throw std::runtime_error(
"KCMCP server closed before RESULT");
217 uint16_t code = m.
payload.size() >= 2 ? get_u16(m.
payload, 0) : 0;
218 std::string msg = m.
payload.size() > 2 ? m.
payload.substr(2) :
"";
219 throw ServerError(
"KCMCP server error " + std::to_string(code)
224 throw std::runtime_error(
"KCMCP: unexpected frame type in reply");
229 throw std::runtime_error(
"KCMCP: truncated RESULT");
230 if (
static_cast<unsigned char>(m.
payload[0]) != 4)
231 throw std::runtime_error(
"KCMCP: server returned a non-ddnnf-nnf result");
232 uint16_t meta_len = get_u16(m.
payload, 2);
233 if (4u + meta_len > m.
payload.size())
234 throw std::runtime_error(
"KCMCP: malformed RESULT meta");
235 return m.
payload.substr(4 + meta_len);
243 const std::string &problem)
246 ::signal(SIGPIPE, SIG_IGN);
247 if (!g_atexit_registered) {
248 on_proc_exit(kcmcp_atexit, (Datum) 0);
249 g_atexit_registered =
true;
257 for (
int attempt = 0; ; ++attempt) {
258 bool reusing = (g_fd >= 0 && g_endpoint == endpoint);
260 ensure_connection(endpoint);
261 return do_compile(input_format, problem);
262 }
catch (
const ServerError &) {
264 }
catch (
const std::exception &) {
266 if (reusing && attempt == 0)
Framed message transport over one connected socket fd.
In-extension KCMCP client: compile a Boolean problem on a warm, socket-attached knowledge compiler in...
Wire codec for KCMCP, the Knowledge Compiler / Model Counter Protocol (see doc/source/dev/kc-server-p...
std::string kcmcp_compile(const std::string &endpoint, uint8_t input_format, const std::string &problem)
Compile problem on a KCMCP server and return its d-DNNF NNF text.
A fully reassembled inbound message (MORE frames concatenated).