xref: /netbsd-src/share/examples/puffs/pgfs/pgfs_db.c (revision 9ddb6ab554e70fb9bbd90c3d96b812bc57755a14)
1 /*	$NetBSD: pgfs_db.c,v 1.1 2011/10/12 01:05:00 yamt Exp $	*/
2 
3 /*-
4  * Copyright (c)2010,2011 YAMAMOTO Takashi,
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  * 1. Redistributions of source code must retain the above copyright
11  *    notice, this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright
13  *    notice, this list of conditions and the following disclaimer in the
14  *    documentation and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
17  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
20  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
21  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
22  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
23  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
24  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
25  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
26  * SUCH DAMAGE.
27  */
28 
29 /*
30  * backend db operations
31  */
32 
33 #include <sys/cdefs.h>
34 #ifndef lint
35 __RCSID("$NetBSD: pgfs_db.c,v 1.1 2011/10/12 01:05:00 yamt Exp $");
36 #endif /* not lint */
37 
38 #include <assert.h>
39 #include <err.h>
40 #include <errno.h>
41 #include <inttypes.h>
42 #include <puffs.h>
43 #include <stdbool.h>
44 #include <stdarg.h>
45 #include <stdio.h>
46 #include <stdlib.h>
47 #include <util.h>
48 
49 #include <libpq-fe.h>
50 
51 #include "pgfs_db.h"
52 #include "pgfs_waitq.h"
53 #include "pgfs_debug.h"
54 
55 bool pgfs_dosync = false;
56 
57 struct Xconn {
58 	TAILQ_ENTRY(Xconn) list;
59 	PGconn *conn;
60 	struct puffs_cc *blocker;
61 	struct puffs_cc *owner;
62 	bool in_trans;
63 	int id;
64 };
65 
66 static void
67 dumperror(struct Xconn *xc, const PGresult *res)
68 {
69 	static const struct {
70 		const char *name;
71 		int code;
72 	} fields[] = {
73 #define F(x)	{ .name = #x, .code = x, }
74 		F(PG_DIAG_SEVERITY),
75 		F(PG_DIAG_SQLSTATE),
76 		F(PG_DIAG_MESSAGE_PRIMARY),
77 		F(PG_DIAG_MESSAGE_DETAIL),
78 		F(PG_DIAG_MESSAGE_HINT),
79 		F(PG_DIAG_STATEMENT_POSITION),
80 		F(PG_DIAG_INTERNAL_POSITION),
81 		F(PG_DIAG_INTERNAL_QUERY),
82 		F(PG_DIAG_CONTEXT),
83 		F(PG_DIAG_SOURCE_FILE),
84 		F(PG_DIAG_SOURCE_LINE),
85 		F(PG_DIAG_SOURCE_FUNCTION),
86 #undef F
87 	};
88 	unsigned int i;
89 
90 	if (!pgfs_dodprintf) {
91 		return;
92 	}
93 	assert(PQresultStatus(res) == PGRES_NONFATAL_ERROR ||
94 	    PQresultStatus(res) == PGRES_FATAL_ERROR);
95 	for (i = 0; i < __arraycount(fields); i++) {
96 		const char *val = PQresultErrorField(res, fields[i].code);
97 
98 		if (val == NULL) {
99 			continue;
100 		}
101 		fprintf(stderr, "%s: %s\n", fields[i].name, val);
102 	}
103 }
104 
105 TAILQ_HEAD(, Xconn) xclist = TAILQ_HEAD_INITIALIZER(xclist);
106 struct waitq xcwaitq = TAILQ_HEAD_INITIALIZER(xcwaitq);
107 
108 static struct Xconn *
109 getxc(struct puffs_cc *cc)
110 {
111 	struct Xconn *xc;
112 
113 	assert(cc != NULL);
114 retry:
115 	TAILQ_FOREACH(xc, &xclist, list) {
116 		if (xc->blocker == NULL) {
117 			assert(xc->owner == NULL);
118 			xc->owner = cc;
119 			DPRINTF("xc %p acquire %p\n", xc, cc);
120 			return xc;
121 		} else {
122 			assert(xc->owner == xc->blocker);
123 		}
124 	}
125 	DPRINTF("no free conn %p\n", cc);
126 	waiton(&xcwaitq, cc);
127 	goto retry;
128 }
129 
130 static void
131 relxc(struct Xconn *xc)
132 {
133 
134 	assert(xc->in_trans);
135 	assert(xc->owner != NULL);
136 	xc->in_trans = false;
137 	xc->owner = NULL;
138 	wakeup_one(&xcwaitq);
139 }
140 
141 static void
142 pqwait(struct Xconn *xc)
143 {
144 	PGconn *conn = xc->conn;
145 	struct puffs_cc *cc = xc->owner;
146 
147 	if (PQflush(conn)) {
148 		errx(EXIT_FAILURE, "PQflush: %s", PQerrorMessage(conn));
149 	}
150 	if (!PQisBusy(conn)) {
151 		return;
152 	}
153 	assert(xc->blocker == NULL);
154 	xc->blocker = cc;
155 	DPRINTF("yielding %p\n", cc);
156 	/* XXX is it safe to yield before entering mainloop? */
157 	puffs_cc_yield(cc);
158 	DPRINTF("yield returned %p\n", cc);
159 	assert(xc->owner == cc);
160 	assert(xc->blocker == cc);
161 	xc->blocker = NULL;
162 }
163 
164 static int
165 sqltoerrno(const char *sqlstate)
166 {
167 	/*
168 	 * XXX hack; ERRCODE_INTERNAL_ERROR -> EAGAIN to handle
169 	 * "tuple concurrently updated" errors for lowrite/lo_truncate.
170 	 *
171 	 * XXX should map ERRCODE_OUT_OF_MEMORY to EAGAIN?
172 	 */
173 	static const struct {
174 		char sqlstate[5];
175 		int error;
176 	} map[] = {
177 		{ "00000", 0, },	/* ERRCODE_SUCCESSFUL_COMPLETION */
178 		{ "02000", ENOENT, },	/* ERRCODE_NO_DATA */
179 		{ "23505", EEXIST, },	/* ERRCODE_UNIQUE_VIOLATION */
180 		{ "23514", EINVAL, },	/* ERRCODE_CHECK_VIOLATION */
181 		{ "40001", EAGAIN, },	/* ERRCODE_T_R_SERIALIZATION_FAILURE */
182 		{ "40P01", EAGAIN, },	/* ERRCODE_T_R_DEADLOCK_DETECTED */
183 		{ "42704", ENOENT, },	/* ERRCODE_UNDEFINED_OBJECT */
184 		{ "53100", ENOSPC, },	/* ERRCODE_DISK_FULL */
185 		{ "53200", ENOMEM, },	/* ERRCODE_OUT_OF_MEMORY */
186 		{ "XX000", EAGAIN, },	/* ERRCODE_INTERNAL_ERROR */
187 	};
188 	unsigned int i;
189 
190 	for (i = 0; i < __arraycount(map); i++) {
191 		if (!memcmp(map[i].sqlstate, sqlstate, 5)) {
192 			const int error = map[i].error;
193 
194 			if (error != 0) {
195 				DPRINTF("sqlstate %5s mapped to error %d\n",
196 				    sqlstate, error);
197 			}
198 			if (error == EINVAL) {
199 				/*
200 				 * sounds like a bug.
201 				 */
202 				abort();
203 			}
204 			return error;
205 		}
206 	}
207 	DPRINTF("unknown sqlstate %5s mapped to EIO\n", sqlstate);
208 	return EIO;
209 }
210 
211 struct cmd {
212 	char name[32];		/* name of prepared statement */
213 	char *cmd;		/* query string */
214 	unsigned int nparams;
215 	Oid *paramtypes;
216 	uint32_t prepared_mask;	/* for which connections this is prepared? */
217 	unsigned int flags;	/* CMD_ flags */
218 };
219 
220 #define	CMD_NOPREPARE	1	/* don't prepare this command */
221 
222 struct cmd *
223 createcmd(const char *cmd, unsigned int flags, ...)
224 {
225 	struct cmd *c;
226 	va_list ap;
227 	const char *cp;
228 	unsigned int i;
229 	static unsigned int cmdid;
230 
231 	c = emalloc(sizeof(*c));
232 	c->cmd = estrdup(cmd);
233 	c->nparams = 0;
234 	va_start(ap, flags);
235 	for (cp = cmd; *cp != 0; cp++) {
236 		if (*cp == '$') { /* XXX */
237 			c->nparams++;
238 		}
239 	}
240 	c->paramtypes = emalloc(c->nparams * sizeof(*c->paramtypes));
241 	for (i = 0; i < c->nparams; i++) {
242 		Oid type = va_arg(ap, Oid);
243 		assert(type == BYTEA ||
244 		    type == INT4OID || type == INT8OID || type == OIDOID ||
245 		    type == TEXTOID || type == TIMESTAMPTZOID);
246 		c->paramtypes[i] = type;
247 	}
248 	va_end(ap);
249 	snprintf(c->name, sizeof(c->name), "%u", cmdid++);
250 	if ((flags & CMD_NOPREPARE) != 0) {
251 		c->prepared_mask = ~0;
252 	} else {
253 		c->prepared_mask = 0;
254 	}
255 	c->flags = flags;
256 	return c;
257 }
258 
259 static void
260 freecmd(struct cmd *c)
261 {
262 
263 	free(c->paramtypes);
264 	free(c->cmd);
265 	free(c);
266 }
267 
268 static int
269 fetch_noresult(struct Xconn *xc)
270 {
271 	PGresult *res;
272 	ExecStatusType status;
273 	PGconn *conn = xc->conn;
274 	int error;
275 
276 	pqwait(xc);
277 	res = PQgetResult(conn);
278 	if (res == NULL) {
279 		return ENOENT;
280 	}
281 	status = PQresultStatus(res);
282 	if (status == PGRES_COMMAND_OK) {
283 		assert(PQnfields(res) == 0);
284 		assert(PQntuples(res) == 0);
285 		if (!strcmp(PQcmdTuples(res), "0")) {
286 			error = ENOENT;
287 		} else {
288 			error = 0;
289 		}
290 	} else if (status == PGRES_FATAL_ERROR) {
291 		error = sqltoerrno(PQresultErrorField(res, PG_DIAG_SQLSTATE));
292 		assert(error != 0);
293 		dumperror(xc, res);
294 	} else {
295 		errx(1, "%s not command_ok: %d: %s", __func__,
296 		    (int)status,
297 		    PQerrorMessage(conn));
298 	}
299 	PQclear(res);
300 	res = PQgetResult(conn);
301 	assert(res == NULL);
302 	if (error != 0) {
303 		DPRINTF("error %d\n", error);
304 	}
305 	return error;
306 }
307 
308 static int
309 preparecmd(struct Xconn *xc, struct cmd *c)
310 {
311 	PGconn *conn = xc->conn;
312 	const uint32_t mask = 1 << xc->id;
313 	int error;
314 	int ret;
315 
316 	if ((c->prepared_mask & mask) != 0) {
317 		return 0;
318 	}
319 	assert((c->flags & CMD_NOPREPARE) == 0);
320 	DPRINTF("PREPARE: '%s'\n", c->cmd);
321 	ret = PQsendPrepare(conn, c->name, c->cmd, c->nparams, c->paramtypes);
322 	if (!ret) {
323 		errx(EXIT_FAILURE, "PQsendPrepare: %s",
324 		    PQerrorMessage(conn));
325 	}
326 	error = fetch_noresult(xc);
327 	if (error != 0) {
328 		return error;
329 	}
330 	c->prepared_mask |= mask;
331 	return 0;
332 }
333 
334 /*
335  * vsendcmd:
336  *
337  * resultmode is just passed to PQsendQueryParams/PQsendQueryPrepared.
338  * 0 for text and 1 for binary.
339  */
340 
341 static int
342 vsendcmd(struct Xconn *xc, int resultmode, struct cmd *c, va_list ap)
343 {
344 	PGconn *conn = xc->conn;
345 	char **paramvalues;
346 	int *paramlengths;
347 	int *paramformats;
348 	unsigned int i;
349 	int error;
350 	int ret;
351 
352 	assert(xc->owner != NULL);
353 	assert(xc->blocker == NULL);
354 	error = preparecmd(xc, c);
355 	if (error != 0) {
356 		return error;
357 	}
358 	paramvalues = emalloc(c->nparams * sizeof(*paramvalues));
359 	paramlengths = NULL;
360 	paramformats = NULL;
361 	DPRINTF("CMD: '%s'\n", c->cmd);
362 	for (i = 0; i < c->nparams; i++) {
363 		Oid type = c->paramtypes[i];
364 		char tmpstore[1024];
365 		const char *buf = NULL;
366 		intmax_t v = 0; /* XXXgcc */
367 		int sz;
368 		bool binary = false;
369 
370 		switch (type) {
371 		case BYTEA:
372 			buf = va_arg(ap, const void *);
373 			sz = (int)va_arg(ap, size_t);
374 			binary = true;
375 			break;
376 		case INT8OID:
377 		case OIDOID:
378 		case INT4OID:
379 			switch (type) {
380 			case INT8OID:
381 				v = (intmax_t)va_arg(ap, int64_t);
382 				break;
383 			case OIDOID:
384 				v = (intmax_t)va_arg(ap, Oid);
385 				break;
386 			case INT4OID:
387 				v = (intmax_t)va_arg(ap, int32_t);
388 				break;
389 			default:
390 				errx(EXIT_FAILURE, "unknown integer oid %u",
391 				    type);
392 			}
393 			buf = tmpstore;
394 			sz = snprintf(tmpstore, sizeof(tmpstore),
395 			    "%jd", v);
396 			assert(sz != -1);
397 			assert((size_t)sz < sizeof(tmpstore));
398 			sz += 1;
399 			break;
400 		case TEXTOID:
401 		case TIMESTAMPTZOID:
402 			buf = va_arg(ap, char *);
403 			sz = strlen(buf) + 1;
404 			break;
405 		default:
406 			errx(EXIT_FAILURE, "%s: unknown param type %u",
407 			    __func__, type);
408 		}
409 		if (binary) {
410 			if (paramlengths == NULL) {
411 				paramlengths =
412 				    emalloc(c->nparams * sizeof(*paramformats));
413 			}
414 			if (paramformats == NULL) {
415 				paramformats = ecalloc(1,
416 				    c->nparams * sizeof(*paramformats));
417 			}
418 			paramformats[i] = 1;
419 			paramlengths[i] = sz;
420 		}
421 		paramvalues[i] = emalloc(sz);
422 		memcpy(paramvalues[i], buf, sz);
423 		if (binary) {
424 			DPRINTF("\t[%u]=<BINARY>\n", i);
425 		} else {
426 			DPRINTF("\t[%u]='%s'\n", i, paramvalues[i]);
427 		}
428 	}
429 	if ((c->flags & CMD_NOPREPARE) != 0) {
430 		ret = PQsendQueryParams(conn, c->cmd, c->nparams, c->paramtypes,
431 		    (const char * const *)paramvalues, paramlengths,
432 		    paramformats, resultmode);
433 	} else {
434 		ret = PQsendQueryPrepared(conn, c->name, c->nparams,
435 		    (const char * const *)paramvalues, paramlengths,
436 		    paramformats, resultmode);
437 	}
438 	for (i = 0; i < c->nparams; i++) {
439 		free(paramvalues[i]);
440 	}
441 	free(paramvalues);
442 	free(paramlengths);
443 	free(paramformats);
444 	if (!ret) {
445 		errx(EXIT_FAILURE, "PQsendQueryPrepared: %s",
446 		    PQerrorMessage(conn));
447 	}
448 	return 0;
449 }
450 
451 int
452 sendcmd(struct Xconn *xc, struct cmd *c, ...)
453 {
454 	va_list ap;
455 	int error;
456 
457 	va_start(ap, c);
458 	error = vsendcmd(xc, 0, c, ap);
459 	va_end(ap);
460 	return error;
461 }
462 
463 int
464 sendcmdx(struct Xconn *xc, int resultmode, struct cmd *c, ...)
465 {
466 	va_list ap;
467 	int error;
468 
469 	va_start(ap, c);
470 	error = vsendcmd(xc, resultmode, c, ap);
471 	va_end(ap);
472 	return error;
473 }
474 
475 /*
476  * simplecmd: a convenient routine to execute a command which returns
477  * no rows synchronously.
478  */
479 
480 int
481 simplecmd(struct Xconn *xc, struct cmd *c, ...)
482 {
483 	va_list ap;
484 	int error;
485 
486 	va_start(ap, c);
487 	error = vsendcmd(xc, 0, c, ap);
488 	va_end(ap);
489 	if (error != 0) {
490 		return error;
491 	}
492 	return fetch_noresult(xc);
493 }
494 
495 void
496 fetchinit(struct fetchstatus *s, struct Xconn *xc)
497 {
498 	s->xc = xc;
499 	s->res = NULL;
500 	s->cur = 0;
501 	s->nrows = 0;
502 	s->done = false;
503 }
504 
505 static intmax_t
506 getint(const char *str)
507 {
508 	intmax_t i;
509 	char *ep;
510 
511 	errno = 0;
512 	i = strtoimax(str, &ep, 10);
513 	assert(errno == 0);
514 	assert(str[0] != 0);
515 	assert(*ep == 0);
516 	return i;
517 }
518 
519 static int
520 vfetchnext(struct fetchstatus *s, unsigned int n, const Oid *types, va_list ap)
521 {
522 	PGconn *conn = s->xc->conn;
523 	unsigned int i;
524 
525 	assert(conn != NULL);
526 	if (s->res == NULL) {
527 		ExecStatusType status;
528 		int error;
529 
530 		pqwait(s->xc);
531 		s->res = PQgetResult(conn);
532 		if (s->res == NULL) {
533 			s->done = true;
534 			return ENOENT;
535 		}
536 		status = PQresultStatus(s->res);
537 		if (status == PGRES_FATAL_ERROR) {
538 			error = sqltoerrno(
539 			    PQresultErrorField(s->res, PG_DIAG_SQLSTATE));
540 			assert(error != 0);
541 			dumperror(s->xc, s->res);
542 			return error;
543 		}
544 		if (status != PGRES_TUPLES_OK) {
545 			errx(1, "not tuples_ok: %s",
546 			    PQerrorMessage(conn));
547 		}
548 		assert((unsigned int)PQnfields(s->res) == n);
549 		s->nrows = PQntuples(s->res);
550 		if (s->nrows == 0) {
551 			DPRINTF("no rows\n");
552 			return ENOENT;
553 		}
554 		assert(s->nrows >= 1);
555 		s->cur = 0;
556 	}
557 	for (i = 0; i < n; i++) {
558 		size_t size;
559 
560 		assert((types[i] != BYTEA) == (PQfformat(s->res, i) == 0));
561 		DPRINTF("[%u] PQftype = %d, types = %d, value = '%s'\n",
562 		    i, PQftype(s->res, i), types[i],
563 		    PQgetisnull(s->res, s->cur, i) ? "<NULL>" :
564 		    PQfformat(s->res, i) == 0 ? PQgetvalue(s->res, s->cur, i) :
565 		    "<BINARY>");
566 		assert(PQftype(s->res, i) == types[i]);
567 		assert(!PQgetisnull(s->res, s->cur, i));
568 		switch(types[i]) {
569 		case INT8OID:
570 			*va_arg(ap, int64_t *) =
571 			    getint(PQgetvalue(s->res, s->cur, i));
572 			break;
573 		case OIDOID:
574 			*va_arg(ap, Oid *) =
575 			    getint(PQgetvalue(s->res, s->cur, i));
576 			break;
577 		case INT4OID:
578 			*va_arg(ap, int32_t *) =
579 			    getint(PQgetvalue(s->res, s->cur, i));
580 			break;
581 		case TEXTOID:
582 			*va_arg(ap, char **) =
583 			    estrdup(PQgetvalue(s->res, s->cur, i));
584 			break;
585 		case BYTEA:
586 			size = PQgetlength(s->res, s->cur, i);
587 			memcpy(va_arg(ap, void *),
588 			    PQgetvalue(s->res, s->cur, i), size);
589 			*va_arg(ap, size_t *) = size;
590 			break;
591 		default:
592 			errx(EXIT_FAILURE, "%s unknown type %u", __func__,
593 			    types[i]);
594 		}
595 	}
596 	s->cur++;
597 	if (s->cur == s->nrows) {
598 		PQclear(s->res);
599 		s->res = NULL;
600 	}
601 	return 0;
602 }
603 
604 int
605 fetchnext(struct fetchstatus *s, unsigned int n, const Oid *types, ...)
606 {
607 	va_list ap;
608 	int error;
609 
610 	va_start(ap, types);
611 	error = vfetchnext(s, n, types, ap);
612 	va_end(ap);
613 	return error;
614 }
615 
616 void
617 fetchdone(struct fetchstatus *s)
618 {
619 
620 	if (s->res != NULL) {
621 		PQclear(s->res);
622 		s->res = NULL;
623 	}
624 	if (!s->done) {
625 		PGresult *res;
626 		unsigned int n;
627 
628 		n = 0;
629 		while ((res = PQgetResult(s->xc->conn)) != NULL) {
630 			PQclear(res);
631 			n++;
632 		}
633 		if (n > 0) {
634 			DPRINTF("%u rows dropped\n", n);
635 		}
636 	}
637 }
638 
639 int
640 simplefetch(struct Xconn *xc, Oid type, ...)
641 {
642 	struct fetchstatus s;
643 	va_list ap;
644 	int error;
645 
646 	fetchinit(&s, xc);
647 	va_start(ap, type);
648 	error = vfetchnext(&s, 1, &type, ap);
649 	va_end(ap);
650 	assert(error != 0 || s.res == NULL);
651 	fetchdone(&s);
652 	return error;
653 }
654 
655 struct Xconn *
656 begin(struct puffs_usermount *pu)
657 {
658 	struct Xconn *xc = getxc(puffs_cc_getcc(pu));
659 	static struct cmd *c;
660 	int error;
661 
662 	CREATECMD_NOPARAM(c, "BEGIN");
663 	assert(!xc->in_trans);
664 	error = simplecmd(xc, c);
665 	assert(error == 0);
666 	assert(PQtransactionStatus(xc->conn) == PQTRANS_INTRANS);
667 	xc->in_trans = true;
668 	return xc;
669 }
670 
671 struct Xconn *
672 begin_readonly(struct puffs_usermount *pu)
673 {
674 	struct Xconn *xc = getxc(puffs_cc_getcc(pu));
675 	static struct cmd *c;
676 	int error;
677 
678 	CREATECMD_NOPARAM(c, "BEGIN READ ONLY");
679 	assert(!xc->in_trans);
680 	error = simplecmd(xc, c);
681 	assert(error == 0);
682 	assert(PQtransactionStatus(xc->conn) == PQTRANS_INTRANS);
683 	xc->in_trans = true;
684 	return xc;
685 }
686 
687 void
688 rollback(struct Xconn *xc)
689 {
690 	PGTransactionStatusType status;
691 
692 	/*
693 	 * check the status as we are not sure the status of our transaction
694 	 * after a failed commit.
695 	 */
696 	status = PQtransactionStatus(xc->conn);
697 	assert(status != PQTRANS_ACTIVE);
698 	assert(status != PQTRANS_UNKNOWN);
699 	if (status != PQTRANS_IDLE) {
700 		static struct cmd *c;
701 		int error;
702 
703 		assert(status == PQTRANS_INTRANS || status == PQTRANS_INERROR);
704 		CREATECMD_NOPARAM(c, "ROLLBACK");
705 		error = simplecmd(xc, c);
706 		assert(error == 0);
707 	}
708 	DPRINTF("xc %p rollback %p\n", xc, xc->owner);
709 	relxc(xc);
710 }
711 
712 int
713 commit(struct Xconn *xc)
714 {
715 	static struct cmd *c;
716 	int error;
717 
718 	CREATECMD_NOPARAM(c, "COMMIT");
719 	error = simplecmd(xc, c);
720 	if (error == 0) {
721 		DPRINTF("xc %p commit %p\n", xc, xc->owner);
722 		relxc(xc);
723 	}
724 	return error;
725 }
726 
727 int
728 commit_sync(struct Xconn *xc)
729 {
730 	static struct cmd *c;
731 	int error;
732 
733 	assert(!pgfs_dosync);
734 	CREATECMD_NOPARAM(c, "SET LOCAL SYNCHRONOUS_COMMIT TO ON");
735 	error = simplecmd(xc, c);
736 	assert(error == 0);
737 	return commit(xc);
738 }
739 
740 static void
741 pgfs_notice_receiver(void *vp, const PGresult *res)
742 {
743 	struct Xconn *xc = vp;
744 
745 	assert(PQresultStatus(res) == PGRES_NONFATAL_ERROR);
746 	fprintf(stderr, "got a notice on %p\n", xc);
747 	dumperror(xc, res);
748 }
749 
750 static int
751 pgfs_readframe(struct puffs_usermount *pu, struct puffs_framebuf *pufbuf,
752     int fd, int *done)
753 {
754 	struct Xconn *xc;
755 	PGconn *conn;
756 
757 	TAILQ_FOREACH(xc, &xclist, list) {
758 		if (PQsocket(xc->conn) == fd) {
759 			break;
760 		}
761 	}
762 	assert(xc != NULL);
763 	conn = xc->conn;
764 	PQconsumeInput(conn);
765 	if (!PQisBusy(conn)) {
766 		if (xc->blocker != NULL) {
767 			DPRINTF("schedule %p\n", xc->blocker);
768 			puffs_cc_schedule(xc->blocker);
769 		} else {
770 			DPRINTF("no blockers\n");
771 		}
772 	}
773 	*done = 0;
774 	return 0;
775 }
776 
777 int
778 pgfs_connectdb(struct puffs_usermount *pu, const char *dbname,
779     const char *dbuser, bool debug, bool synchronous, unsigned int nconn)
780 {
781 	const char *keywords[3+1];
782 	const char *values[3];
783 	unsigned int i;
784 
785 	if (nconn > 32) {
786 		/*
787 		 * limit from sizeof(cmd->prepared_mask)
788 		 */
789 		return EINVAL;
790 	}
791 	if (debug) {
792 		pgfs_dodprintf = true;
793 	}
794 	if (synchronous) {
795 		pgfs_dosync = true;
796 	}
797 	i = 0;
798 	if (dbname != NULL) {
799 		keywords[i] = "dbname";
800 		values[i] = dbname;
801 		i++;
802 	}
803 	if (dbuser != NULL) {
804 		keywords[i] = "user";
805 		values[i] = dbuser;
806 		i++;
807 	}
808 	keywords[i] = "application_name";
809 	values[i] = "pgfs";
810 	i++;
811 	keywords[i] = NULL;
812 	puffs_framev_init(pu, pgfs_readframe, NULL, NULL, NULL, NULL);
813 	for (i = 0; i < nconn; i++) {
814 		struct Xconn *xc;
815 		struct Xconn *xc2;
816 		static int xcid;
817 		PGconn *conn;
818 		struct cmd *c;
819 		int error;
820 
821 		conn = PQconnectdbParams(keywords, values, 0);
822 		if (conn == NULL) {
823 			errx(EXIT_FAILURE,
824 			    "PQconnectdbParams: unknown failure");
825 		}
826 		if (PQstatus(conn) != CONNECTION_OK) {
827 			/*
828 			 * XXX sleep and retry on ERRCODE_CANNOT_CONNECT_NOW
829 			 */
830 			errx(EXIT_FAILURE, "PQconnectdbParams: %s",
831 			    PQerrorMessage(conn));
832 		}
833 		DPRINTF("protocol version %d\n", PQprotocolVersion(conn));
834 		puffs_framev_addfd(pu, PQsocket(conn), PUFFS_FBIO_READ);
835 		xc = emalloc(sizeof(*xc));
836 		xc->conn = conn;
837 		xc->blocker = NULL;
838 		xc->owner = NULL;
839 		xc->in_trans = false;
840 		xc->id = xcid++;
841 		assert(xc->id < 32);
842 		PQsetNoticeReceiver(conn, pgfs_notice_receiver, xc);
843 		TAILQ_INSERT_HEAD(&xclist, xc, list);
844 		xc2 = begin(pu);
845 		assert(xc2 == xc);
846 		c = createcmd("SET search_path TO pgfs", CMD_NOPREPARE);
847 		error = simplecmd(xc, c);
848 		assert(error == 0);
849 		freecmd(c);
850 		c = createcmd("SET SESSION CHARACTERISTICS AS "
851 		    "TRANSACTION ISOLATION LEVEL REPEATABLE READ",
852 		    CMD_NOPREPARE);
853 		error = simplecmd(xc, c);
854 		assert(error == 0);
855 		freecmd(c);
856 		c = createcmd("SET SESSION TIME ZONE UTC", CMD_NOPREPARE);
857 		error = simplecmd(xc, c);
858 		assert(error == 0);
859 		freecmd(c);
860 		if (!pgfs_dosync) {
861 			c = createcmd("SET SESSION SYNCHRONOUS_COMMIT TO OFF",
862 			    CMD_NOPREPARE);
863 			error = simplecmd(xc, c);
864 			assert(error == 0);
865 			freecmd(c);
866 		}
867 		if (debug) {
868 			struct fetchstatus s;
869 			static const Oid types[] = { INT8OID, };
870 			uint64_t pid;
871 
872 			c = createcmd("SELECT pg_backend_pid()::int8;",
873 			    CMD_NOPREPARE);
874 			error = sendcmd(xc, c);
875 			assert(error == 0);
876 			fetchinit(&s, xc);
877 			error = FETCHNEXT(&s, types, &pid);
878 			fetchdone(&s);
879 			assert(error == 0);
880 			DPRINTF("xc %p backend pid %" PRIu64 "\n", xc, pid);
881 		}
882 		error = commit(xc);
883 		assert(error == 0);
884 		assert(xc->owner == NULL);
885 	}
886 	/*
887 	 * XXX cleanup unlinked files here?  what to do when the filesystem
888 	 * is shared?
889 	 */
890 	return 0;
891 }
892 
893 struct waitq flushwaitq = TAILQ_HEAD_INITIALIZER(flushwaitq);
894 struct puffs_cc *flusher = NULL;
895 
896 int
897 flush_xacts(struct puffs_usermount *pu)
898 {
899 	struct puffs_cc *cc = puffs_cc_getcc(pu);
900 	struct Xconn *xc;
901 	static struct cmd *c;
902 	uint64_t dummy;
903 	int error;
904 
905 	/*
906 	 * flush all previously issued asynchronous transactions.
907 	 *
908 	 * XXX
909 	 * unfortunately it seems that there is no clean way to tell
910 	 * PostgreSQL flush XLOG.  we could perform a CHECKPOINT but it's
911 	 * too expensive and overkill for our purpose.
912 	 * besides, PostgreSQL has an optimization to skip XLOG flushing
913 	 * for transactions which didn't produce WAL records.
914 	 * (changeset f6a0863e3cb72763490ceca2c558d5ef2dddd5f2)
915 	 * it means that an empty transaction ("BEGIN; COMMIT;"), which
916 	 * doesn't produce any WAL records, doesn't flush the XLOG even if
917 	 * synchronous_commit=on.  we issues a dummy setval() to avoid the
918 	 * optimization.
919 	 * on the other hand, we try to avoid creating unnecessary WAL activity
920 	 * by serializing flushing and checking XLOG locations.
921 	 */
922 
923 	assert(!pgfs_dosync);
924 	if (flusher != NULL) { /* serialize flushers */
925 		DPRINTF("%p flush in progress %p\n", cc, flusher);
926 		waiton(&flushwaitq, cc);
927 		assert(flusher == NULL);
928 	}
929 	DPRINTF("%p start flushing\n", cc);
930 	flusher = cc;
931 retry:
932 	xc = begin(pu);
933 	CREATECMD_NOPARAM(c, "SELECT setval('dummyseq', 1) WHERE "
934 	    "pg_current_xlog_insert_location() <> pg_current_xlog_location()");
935 	error = sendcmd(xc, c);
936 	if (error != 0) {
937 		goto got_error;
938 	}
939 	error = simplefetch(xc, INT8OID, &dummy);
940 	assert(error != 0 || dummy == 1);
941 	if (error == ENOENT) {
942 		/*
943 		 * there seems to be nothing to flush.
944 		 */
945 		DPRINTF("%p no sync\n", cc);
946 		error = 0;
947 	}
948 	if (error != 0) {
949 		goto got_error;
950 	}
951 	error = commit_sync(xc);
952 	if (error != 0) {
953 		goto got_error;
954 	}
955 	goto done;
956 got_error:
957 	rollback(xc);
958 	if (error == EAGAIN) {
959 		goto retry;
960 	}
961 done:
962 	assert(flusher == cc);
963 	flusher = NULL;
964 	wakeup_one(&flushwaitq);
965 	DPRINTF("%p end flushing error=%d\n", cc, error);
966 	return error;
967 }
968