xref: /netbsd-src/share/examples/puffs/pgfs/pgfs_db.c (revision d72c60c59581dcbd01cb4b67bd7bb95356a24678)
1 /*	$NetBSD: pgfs_db.c,v 1.3 2012/04/11 14:27:43 yamt Exp $	*/
2 
3 /*-
4  * Copyright (c)2010,2011 YAMAMOTO Takashi,
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  * 1. Redistributions of source code must retain the above copyright
11  *    notice, this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright
13  *    notice, this list of conditions and the following disclaimer in the
14  *    documentation and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
17  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
20  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
21  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
22  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
23  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
24  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
25  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
26  * SUCH DAMAGE.
27  */
28 
29 /*
30  * backend db operations
31  */
32 
33 #include <sys/cdefs.h>
34 #ifndef lint
35 __RCSID("$NetBSD: pgfs_db.c,v 1.3 2012/04/11 14:27:43 yamt Exp $");
36 #endif /* not lint */
37 
38 #include <assert.h>
39 #include <err.h>
40 #include <errno.h>
41 #include <inttypes.h>
42 #include <puffs.h>
43 #include <stdbool.h>
44 #include <stdarg.h>
45 #include <stdio.h>
46 #include <stdlib.h>
47 #include <util.h>
48 
49 #include <libpq-fe.h>
50 
51 #include "pgfs_db.h"
52 #include "pgfs_waitq.h"
53 #include "pgfs_debug.h"
54 
55 bool pgfs_dosync = false;
56 
57 struct Xconn {
58 	TAILQ_ENTRY(Xconn) list;
59 	PGconn *conn;
60 	struct puffs_cc *blocker;
61 	struct puffs_cc *owner;
62 	bool in_trans;
63 	int id;
64 	const char *label;
65 };
66 
67 static void
dumperror(struct Xconn * xc,const PGresult * res)68 dumperror(struct Xconn *xc, const PGresult *res)
69 {
70 	static const struct {
71 		const char *name;
72 		int code;
73 	} fields[] = {
74 #define F(x)	{ .name = #x, .code = x, }
75 		F(PG_DIAG_SEVERITY),
76 		F(PG_DIAG_SQLSTATE),
77 		F(PG_DIAG_MESSAGE_PRIMARY),
78 		F(PG_DIAG_MESSAGE_DETAIL),
79 		F(PG_DIAG_MESSAGE_HINT),
80 		F(PG_DIAG_STATEMENT_POSITION),
81 		F(PG_DIAG_INTERNAL_POSITION),
82 		F(PG_DIAG_INTERNAL_QUERY),
83 		F(PG_DIAG_CONTEXT),
84 		F(PG_DIAG_SOURCE_FILE),
85 		F(PG_DIAG_SOURCE_LINE),
86 		F(PG_DIAG_SOURCE_FUNCTION),
87 #undef F
88 	};
89 	unsigned int i;
90 
91 	if (!pgfs_dodprintf) {
92 		return;
93 	}
94 	assert(PQresultStatus(res) == PGRES_NONFATAL_ERROR ||
95 	    PQresultStatus(res) == PGRES_FATAL_ERROR);
96 	for (i = 0; i < __arraycount(fields); i++) {
97 		const char *val = PQresultErrorField(res, fields[i].code);
98 
99 		if (val == NULL) {
100 			continue;
101 		}
102 		fprintf(stderr, "%s: %s\n", fields[i].name, val);
103 	}
104 }
105 
106 TAILQ_HEAD(, Xconn) xclist = TAILQ_HEAD_INITIALIZER(xclist);
107 struct waitq xcwaitq = TAILQ_HEAD_INITIALIZER(xcwaitq);
108 
109 static struct Xconn *
getxc(struct puffs_cc * cc)110 getxc(struct puffs_cc *cc)
111 {
112 	struct Xconn *xc;
113 
114 	assert(cc != NULL);
115 retry:
116 	TAILQ_FOREACH(xc, &xclist, list) {
117 		if (xc->blocker == NULL) {
118 			assert(xc->owner == NULL);
119 			xc->owner = cc;
120 			DPRINTF("xc %p acquire %p\n", xc, cc);
121 			return xc;
122 		} else {
123 			assert(xc->owner == xc->blocker);
124 		}
125 	}
126 	DPRINTF("no free conn %p\n", cc);
127 	waiton(&xcwaitq, cc);
128 	goto retry;
129 }
130 
131 static void
relxc(struct Xconn * xc)132 relxc(struct Xconn *xc)
133 {
134 
135 	assert(xc->in_trans);
136 	assert(xc->owner != NULL);
137 	xc->in_trans = false;
138 	xc->owner = NULL;
139 	wakeup_one(&xcwaitq);
140 }
141 
142 static void
pqwait(struct Xconn * xc)143 pqwait(struct Xconn *xc)
144 {
145 	PGconn *conn = xc->conn;
146 	struct puffs_cc *cc = xc->owner;
147 
148 	if (PQflush(conn)) {
149 		errx(EXIT_FAILURE, "PQflush: %s", PQerrorMessage(conn));
150 	}
151 	if (!PQisBusy(conn)) {
152 		return;
153 	}
154 	assert(xc->blocker == NULL);
155 	xc->blocker = cc;
156 	DPRINTF("yielding %p\n", cc);
157 	/* XXX is it safe to yield before entering mainloop? */
158 	puffs_cc_yield(cc);
159 	DPRINTF("yield returned %p\n", cc);
160 	assert(xc->owner == cc);
161 	assert(xc->blocker == cc);
162 	xc->blocker = NULL;
163 }
164 
165 static int
sqltoerrno(const char * sqlstate)166 sqltoerrno(const char *sqlstate)
167 {
168 	/*
169 	 * XXX hack; ERRCODE_INTERNAL_ERROR -> EAGAIN to handle
170 	 * "tuple concurrently updated" errors for lowrite/lo_truncate.
171 	 *
172 	 * XXX should map ERRCODE_OUT_OF_MEMORY to EAGAIN?
173 	 */
174 	static const struct {
175 		char sqlstate[5];
176 		int error;
177 	} map[] = {
178 		{ "00000", 0, },	/* ERRCODE_SUCCESSFUL_COMPLETION */
179 		{ "02000", ENOENT, },	/* ERRCODE_NO_DATA */
180 		{ "23505", EEXIST, },	/* ERRCODE_UNIQUE_VIOLATION */
181 		{ "23514", EINVAL, },	/* ERRCODE_CHECK_VIOLATION */
182 		{ "40001", EAGAIN, },	/* ERRCODE_T_R_SERIALIZATION_FAILURE */
183 		{ "40P01", EAGAIN, },	/* ERRCODE_T_R_DEADLOCK_DETECTED */
184 		{ "42704", ENOENT, },	/* ERRCODE_UNDEFINED_OBJECT */
185 		{ "53100", ENOSPC, },	/* ERRCODE_DISK_FULL */
186 		{ "53200", ENOMEM, },	/* ERRCODE_OUT_OF_MEMORY */
187 		{ "XX000", EAGAIN, },	/* ERRCODE_INTERNAL_ERROR */
188 	};
189 	unsigned int i;
190 
191 	for (i = 0; i < __arraycount(map); i++) {
192 		if (!memcmp(map[i].sqlstate, sqlstate, 5)) {
193 			const int error = map[i].error;
194 
195 			if (error != 0) {
196 				DPRINTF("sqlstate %5s mapped to error %d\n",
197 				    sqlstate, error);
198 			}
199 			if (error == EINVAL) {
200 				/*
201 				 * sounds like a bug.
202 				 */
203 				abort();
204 			}
205 			return error;
206 		}
207 	}
208 	DPRINTF("unknown sqlstate %5s mapped to EIO\n", sqlstate);
209 	return EIO;
210 }
211 
212 struct cmd {
213 	char name[32];		/* name of prepared statement */
214 	char *cmd;		/* query string */
215 	unsigned int nparams;
216 	Oid *paramtypes;
217 	uint32_t prepared_mask;	/* for which connections this is prepared? */
218 	unsigned int flags;	/* CMD_ flags */
219 };
220 
221 #define	CMD_NOPREPARE	1	/* don't prepare this command */
222 
223 struct cmd *
createcmd(const char * cmd,unsigned int flags,...)224 createcmd(const char *cmd, unsigned int flags, ...)
225 {
226 	struct cmd *c;
227 	va_list ap;
228 	const char *cp;
229 	unsigned int i;
230 	static unsigned int cmdid;
231 
232 	c = emalloc(sizeof(*c));
233 	c->cmd = estrdup(cmd);
234 	c->nparams = 0;
235 	va_start(ap, flags);
236 	for (cp = cmd; *cp != 0; cp++) {
237 		if (*cp == '$') { /* XXX */
238 			c->nparams++;
239 		}
240 	}
241 	c->paramtypes = emalloc(c->nparams * sizeof(*c->paramtypes));
242 	for (i = 0; i < c->nparams; i++) {
243 		Oid type = va_arg(ap, Oid);
244 		assert(type == BYTEA ||
245 		    type == INT4OID || type == INT8OID || type == OIDOID ||
246 		    type == TEXTOID || type == TIMESTAMPTZOID);
247 		c->paramtypes[i] = type;
248 	}
249 	va_end(ap);
250 	snprintf(c->name, sizeof(c->name), "%u", cmdid++);
251 	if ((flags & CMD_NOPREPARE) != 0) {
252 		c->prepared_mask = ~0;
253 	} else {
254 		c->prepared_mask = 0;
255 	}
256 	c->flags = flags;
257 	return c;
258 }
259 
260 static void
freecmd(struct cmd * c)261 freecmd(struct cmd *c)
262 {
263 
264 	free(c->paramtypes);
265 	free(c->cmd);
266 	free(c);
267 }
268 
269 static int
fetch_noresult(struct Xconn * xc)270 fetch_noresult(struct Xconn *xc)
271 {
272 	PGresult *res;
273 	ExecStatusType status;
274 	PGconn *conn = xc->conn;
275 	int error;
276 
277 	pqwait(xc);
278 	res = PQgetResult(conn);
279 	if (res == NULL) {
280 		return ENOENT;
281 	}
282 	status = PQresultStatus(res);
283 	if (status == PGRES_COMMAND_OK) {
284 		assert(PQnfields(res) == 0);
285 		assert(PQntuples(res) == 0);
286 		if (!strcmp(PQcmdTuples(res), "0")) {
287 			error = ENOENT;
288 		} else {
289 			error = 0;
290 		}
291 	} else if (status == PGRES_FATAL_ERROR) {
292 		error = sqltoerrno(PQresultErrorField(res, PG_DIAG_SQLSTATE));
293 		assert(error != 0);
294 		dumperror(xc, res);
295 	} else {
296 		errx(1, "%s not command_ok: %d: %s", __func__,
297 		    (int)status,
298 		    PQerrorMessage(conn));
299 	}
300 	PQclear(res);
301 	res = PQgetResult(conn);
302 	assert(res == NULL);
303 	if (error != 0) {
304 		DPRINTF("error %d\n", error);
305 	}
306 	return error;
307 }
308 
309 static int
preparecmd(struct Xconn * xc,struct cmd * c)310 preparecmd(struct Xconn *xc, struct cmd *c)
311 {
312 	PGconn *conn = xc->conn;
313 	const uint32_t mask = 1 << xc->id;
314 	int error;
315 	int ret;
316 
317 	if ((c->prepared_mask & mask) != 0) {
318 		return 0;
319 	}
320 	assert((c->flags & CMD_NOPREPARE) == 0);
321 	DPRINTF("PREPARE: '%s'\n", c->cmd);
322 	ret = PQsendPrepare(conn, c->name, c->cmd, c->nparams, c->paramtypes);
323 	if (!ret) {
324 		errx(EXIT_FAILURE, "PQsendPrepare: %s",
325 		    PQerrorMessage(conn));
326 	}
327 	error = fetch_noresult(xc);
328 	if (error != 0) {
329 		return error;
330 	}
331 	c->prepared_mask |= mask;
332 	return 0;
333 }
334 
335 /*
336  * vsendcmd:
337  *
338  * resultmode is just passed to PQsendQueryParams/PQsendQueryPrepared.
339  * 0 for text and 1 for binary.
340  */
341 
342 static int
vsendcmd(struct Xconn * xc,int resultmode,struct cmd * c,va_list ap)343 vsendcmd(struct Xconn *xc, int resultmode, struct cmd *c, va_list ap)
344 {
345 	PGconn *conn = xc->conn;
346 	char **paramvalues;
347 	int *paramlengths;
348 	int *paramformats;
349 	unsigned int i;
350 	int error;
351 	int ret;
352 
353 	assert(xc->owner != NULL);
354 	assert(xc->blocker == NULL);
355 	error = preparecmd(xc, c);
356 	if (error != 0) {
357 		return error;
358 	}
359 	paramvalues = emalloc(c->nparams * sizeof(*paramvalues));
360 	paramlengths = NULL;
361 	paramformats = NULL;
362 	DPRINTF("CMD: '%s'\n", c->cmd);
363 	for (i = 0; i < c->nparams; i++) {
364 		Oid type = c->paramtypes[i];
365 		char tmpstore[1024];
366 		const char *buf = NULL;
367 		intmax_t v = 0; /* XXXgcc */
368 		int sz;
369 		bool binary = false;
370 
371 		switch (type) {
372 		case BYTEA:
373 			buf = va_arg(ap, const void *);
374 			sz = (int)va_arg(ap, size_t);
375 			binary = true;
376 			break;
377 		case INT8OID:
378 		case OIDOID:
379 		case INT4OID:
380 			switch (type) {
381 			case INT8OID:
382 				v = (intmax_t)va_arg(ap, int64_t);
383 				break;
384 			case OIDOID:
385 				v = (intmax_t)va_arg(ap, Oid);
386 				break;
387 			case INT4OID:
388 				v = (intmax_t)va_arg(ap, int32_t);
389 				break;
390 			default:
391 				errx(EXIT_FAILURE, "unknown integer oid %u",
392 				    type);
393 			}
394 			buf = tmpstore;
395 			sz = snprintf(tmpstore, sizeof(tmpstore),
396 			    "%jd", v);
397 			assert(sz != -1);
398 			assert((size_t)sz < sizeof(tmpstore));
399 			sz += 1;
400 			break;
401 		case TEXTOID:
402 		case TIMESTAMPTZOID:
403 			buf = va_arg(ap, char *);
404 			sz = strlen(buf) + 1;
405 			break;
406 		default:
407 			errx(EXIT_FAILURE, "%s: unknown param type %u",
408 			    __func__, type);
409 		}
410 		if (binary) {
411 			if (paramlengths == NULL) {
412 				paramlengths =
413 				    emalloc(c->nparams * sizeof(*paramformats));
414 			}
415 			if (paramformats == NULL) {
416 				paramformats = ecalloc(1,
417 				    c->nparams * sizeof(*paramformats));
418 			}
419 			paramformats[i] = 1;
420 			paramlengths[i] = sz;
421 		}
422 		paramvalues[i] = emalloc(sz);
423 		memcpy(paramvalues[i], buf, sz);
424 		if (binary) {
425 			DPRINTF("\t[%u]=<BINARY>\n", i);
426 		} else {
427 			DPRINTF("\t[%u]='%s'\n", i, paramvalues[i]);
428 		}
429 	}
430 	if ((c->flags & CMD_NOPREPARE) != 0) {
431 		ret = PQsendQueryParams(conn, c->cmd, c->nparams, c->paramtypes,
432 		    (const char * const *)paramvalues, paramlengths,
433 		    paramformats, resultmode);
434 	} else {
435 		ret = PQsendQueryPrepared(conn, c->name, c->nparams,
436 		    (const char * const *)paramvalues, paramlengths,
437 		    paramformats, resultmode);
438 	}
439 	for (i = 0; i < c->nparams; i++) {
440 		free(paramvalues[i]);
441 	}
442 	free(paramvalues);
443 	free(paramlengths);
444 	free(paramformats);
445 	if (!ret) {
446 		errx(EXIT_FAILURE, "PQsendQueryPrepared: %s",
447 		    PQerrorMessage(conn));
448 	}
449 	return 0;
450 }
451 
452 int
sendcmd(struct Xconn * xc,struct cmd * c,...)453 sendcmd(struct Xconn *xc, struct cmd *c, ...)
454 {
455 	va_list ap;
456 	int error;
457 
458 	va_start(ap, c);
459 	error = vsendcmd(xc, 0, c, ap);
460 	va_end(ap);
461 	return error;
462 }
463 
464 int
sendcmdx(struct Xconn * xc,int resultmode,struct cmd * c,...)465 sendcmdx(struct Xconn *xc, int resultmode, struct cmd *c, ...)
466 {
467 	va_list ap;
468 	int error;
469 
470 	va_start(ap, c);
471 	error = vsendcmd(xc, resultmode, c, ap);
472 	va_end(ap);
473 	return error;
474 }
475 
476 /*
477  * simplecmd: a convenient routine to execute a command which returns
478  * no rows synchronously.
479  */
480 
481 int
simplecmd(struct Xconn * xc,struct cmd * c,...)482 simplecmd(struct Xconn *xc, struct cmd *c, ...)
483 {
484 	va_list ap;
485 	int error;
486 
487 	va_start(ap, c);
488 	error = vsendcmd(xc, 0, c, ap);
489 	va_end(ap);
490 	if (error != 0) {
491 		return error;
492 	}
493 	return fetch_noresult(xc);
494 }
495 
496 void
fetchinit(struct fetchstatus * s,struct Xconn * xc)497 fetchinit(struct fetchstatus *s, struct Xconn *xc)
498 {
499 	s->xc = xc;
500 	s->res = NULL;
501 	s->cur = 0;
502 	s->nrows = 0;
503 	s->done = false;
504 }
505 
506 static intmax_t
getint(const char * str)507 getint(const char *str)
508 {
509 	intmax_t i;
510 	char *ep;
511 
512 	errno = 0;
513 	i = strtoimax(str, &ep, 10);
514 	assert(errno == 0);
515 	assert(str[0] != 0);
516 	assert(*ep == 0);
517 	return i;
518 }
519 
520 static int
vfetchnext(struct fetchstatus * s,unsigned int n,const Oid * types,va_list ap)521 vfetchnext(struct fetchstatus *s, unsigned int n, const Oid *types, va_list ap)
522 {
523 	PGconn *conn = s->xc->conn;
524 	unsigned int i;
525 
526 	assert(conn != NULL);
527 	if (s->res == NULL) {
528 		ExecStatusType status;
529 		int error;
530 
531 		pqwait(s->xc);
532 		s->res = PQgetResult(conn);
533 		if (s->res == NULL) {
534 			s->done = true;
535 			return ENOENT;
536 		}
537 		status = PQresultStatus(s->res);
538 		if (status == PGRES_FATAL_ERROR) {
539 			error = sqltoerrno(
540 			    PQresultErrorField(s->res, PG_DIAG_SQLSTATE));
541 			assert(error != 0);
542 			dumperror(s->xc, s->res);
543 			return error;
544 		}
545 		if (status != PGRES_TUPLES_OK) {
546 			errx(1, "not tuples_ok: %s",
547 			    PQerrorMessage(conn));
548 		}
549 		assert((unsigned int)PQnfields(s->res) == n);
550 		s->nrows = PQntuples(s->res);
551 		if (s->nrows == 0) {
552 			DPRINTF("no rows\n");
553 			return ENOENT;
554 		}
555 		assert(s->nrows >= 1);
556 		s->cur = 0;
557 	}
558 	for (i = 0; i < n; i++) {
559 		size_t size;
560 
561 		assert((types[i] != BYTEA) == (PQfformat(s->res, i) == 0));
562 		DPRINTF("[%u] PQftype = %d, types = %d, value = '%s'\n",
563 		    i, PQftype(s->res, i), types[i],
564 		    PQgetisnull(s->res, s->cur, i) ? "<NULL>" :
565 		    PQfformat(s->res, i) == 0 ? PQgetvalue(s->res, s->cur, i) :
566 		    "<BINARY>");
567 		assert(PQftype(s->res, i) == types[i]);
568 		assert(!PQgetisnull(s->res, s->cur, i));
569 		switch(types[i]) {
570 		case INT8OID:
571 			*va_arg(ap, int64_t *) =
572 			    getint(PQgetvalue(s->res, s->cur, i));
573 			break;
574 		case OIDOID:
575 			*va_arg(ap, Oid *) =
576 			    getint(PQgetvalue(s->res, s->cur, i));
577 			break;
578 		case INT4OID:
579 			*va_arg(ap, int32_t *) =
580 			    getint(PQgetvalue(s->res, s->cur, i));
581 			break;
582 		case TEXTOID:
583 			*va_arg(ap, char **) =
584 			    estrdup(PQgetvalue(s->res, s->cur, i));
585 			break;
586 		case BYTEA:
587 			size = PQgetlength(s->res, s->cur, i);
588 			memcpy(va_arg(ap, void *),
589 			    PQgetvalue(s->res, s->cur, i), size);
590 			*va_arg(ap, size_t *) = size;
591 			break;
592 		default:
593 			errx(EXIT_FAILURE, "%s unknown type %u", __func__,
594 			    types[i]);
595 		}
596 	}
597 	s->cur++;
598 	if (s->cur == s->nrows) {
599 		PQclear(s->res);
600 		s->res = NULL;
601 	}
602 	return 0;
603 }
604 
605 int
fetchnext(struct fetchstatus * s,unsigned int n,const Oid * types,...)606 fetchnext(struct fetchstatus *s, unsigned int n, const Oid *types, ...)
607 {
608 	va_list ap;
609 	int error;
610 
611 	va_start(ap, types);
612 	error = vfetchnext(s, n, types, ap);
613 	va_end(ap);
614 	return error;
615 }
616 
617 void
fetchdone(struct fetchstatus * s)618 fetchdone(struct fetchstatus *s)
619 {
620 
621 	if (s->res != NULL) {
622 		PQclear(s->res);
623 		s->res = NULL;
624 	}
625 	if (!s->done) {
626 		PGresult *res;
627 		unsigned int n;
628 
629 		n = 0;
630 		while ((res = PQgetResult(s->xc->conn)) != NULL) {
631 			PQclear(res);
632 			n++;
633 		}
634 		if (n > 0) {
635 			DPRINTF("%u rows dropped\n", n);
636 		}
637 	}
638 }
639 
640 int
simplefetch(struct Xconn * xc,Oid type,...)641 simplefetch(struct Xconn *xc, Oid type, ...)
642 {
643 	struct fetchstatus s;
644 	va_list ap;
645 	int error;
646 
647 	fetchinit(&s, xc);
648 	va_start(ap, type);
649 	error = vfetchnext(&s, 1, &type, ap);
650 	va_end(ap);
651 	assert(error != 0 || s.res == NULL);
652 	fetchdone(&s);
653 	return error;
654 }
655 
656 /*
657  * setlabel: set the descriptive label for the connection.
658  *
659  * we use simple pointer comparison for label equality check.
660  */
661 static void
setlabel(struct Xconn * xc,const char * label)662 setlabel(struct Xconn *xc, const char *label)
663 {
664 	int error;
665 
666 	/*
667 	 * put the label into application_name so that it's shown in
668 	 * pg_stat_activity.  we are sure that our labels don't need
669 	 * PQescapeStringConn.
670 	 *
671 	 * example:
672 	 *	SELECT pid,application_name,query FROM pg_stat_activity
673 	 *	WHERE state <> 'idle'
674 	 */
675 
676 	if (label != NULL && label != xc->label) {
677 		struct cmd *c;
678 		char cmd_str[1024];
679 
680 		snprintf(cmd_str, sizeof(cmd_str),
681 		    "SET application_name TO 'pgfs: %s'", label);
682 		c = createcmd(cmd_str, CMD_NOPREPARE);
683 		error = simplecmd(xc, c);
684 		freecmd(c);
685 		assert(error == 0);
686 		xc->label = label;
687 	} else {
688 #if 0 /* don't bother to clear label */
689 		static struct cmd *c;
690 
691 		CREATECMD_NOPARAM(c, "SET application_name TO 'pgfs'");
692 		error = simplecmd(xc, c);
693 		assert(error == 0);
694 #endif
695 	}
696 }
697 
698 struct Xconn *
begin(struct puffs_usermount * pu,const char * label)699 begin(struct puffs_usermount *pu, const char *label)
700 {
701 	struct Xconn *xc = getxc(puffs_cc_getcc(pu));
702 	static struct cmd *c;
703 	int error;
704 
705 	setlabel(xc, label);
706 	CREATECMD_NOPARAM(c, "BEGIN");
707 	assert(!xc->in_trans);
708 	error = simplecmd(xc, c);
709 	assert(error == 0);
710 	assert(PQtransactionStatus(xc->conn) == PQTRANS_INTRANS);
711 	xc->in_trans = true;
712 	return xc;
713 }
714 
715 struct Xconn *
begin_readonly(struct puffs_usermount * pu,const char * label)716 begin_readonly(struct puffs_usermount *pu, const char *label)
717 {
718 	struct Xconn *xc = getxc(puffs_cc_getcc(pu));
719 	static struct cmd *c;
720 	int error;
721 
722 	setlabel(xc, label);
723 	CREATECMD_NOPARAM(c, "BEGIN READ ONLY");
724 	assert(!xc->in_trans);
725 	error = simplecmd(xc, c);
726 	assert(error == 0);
727 	assert(PQtransactionStatus(xc->conn) == PQTRANS_INTRANS);
728 	xc->in_trans = true;
729 	return xc;
730 }
731 
732 void
rollback(struct Xconn * xc)733 rollback(struct Xconn *xc)
734 {
735 	PGTransactionStatusType status;
736 
737 	/*
738 	 * check the status as we are not sure the status of our transaction
739 	 * after a failed commit.
740 	 */
741 	status = PQtransactionStatus(xc->conn);
742 	assert(status != PQTRANS_ACTIVE);
743 	assert(status != PQTRANS_UNKNOWN);
744 	if (status != PQTRANS_IDLE) {
745 		static struct cmd *c;
746 		int error;
747 
748 		assert(status == PQTRANS_INTRANS || status == PQTRANS_INERROR);
749 		CREATECMD_NOPARAM(c, "ROLLBACK");
750 		error = simplecmd(xc, c);
751 		assert(error == 0);
752 	}
753 	DPRINTF("xc %p rollback %p\n", xc, xc->owner);
754 	setlabel(xc, NULL);
755 	relxc(xc);
756 }
757 
758 int
commit(struct Xconn * xc)759 commit(struct Xconn *xc)
760 {
761 	static struct cmd *c;
762 	int error;
763 
764 	CREATECMD_NOPARAM(c, "COMMIT");
765 	error = simplecmd(xc, c);
766 	setlabel(xc, NULL);
767 	if (error == 0) {
768 		DPRINTF("xc %p commit %p\n", xc, xc->owner);
769 		relxc(xc);
770 	}
771 	return error;
772 }
773 
774 int
commit_sync(struct Xconn * xc)775 commit_sync(struct Xconn *xc)
776 {
777 	static struct cmd *c;
778 	int error;
779 
780 	assert(!pgfs_dosync);
781 	CREATECMD_NOPARAM(c, "SET LOCAL SYNCHRONOUS_COMMIT TO ON");
782 	error = simplecmd(xc, c);
783 	assert(error == 0);
784 	return commit(xc);
785 }
786 
787 static void
pgfs_notice_receiver(void * vp,const PGresult * res)788 pgfs_notice_receiver(void *vp, const PGresult *res)
789 {
790 	struct Xconn *xc = vp;
791 
792 	assert(PQresultStatus(res) == PGRES_NONFATAL_ERROR);
793 	fprintf(stderr, "got a notice on %p\n", xc);
794 	dumperror(xc, res);
795 }
796 
797 static int
pgfs_readframe(struct puffs_usermount * pu,struct puffs_framebuf * pufbuf,int fd,int * done)798 pgfs_readframe(struct puffs_usermount *pu, struct puffs_framebuf *pufbuf,
799     int fd, int *done)
800 {
801 	struct Xconn *xc;
802 	PGconn *conn;
803 
804 	TAILQ_FOREACH(xc, &xclist, list) {
805 		if (PQsocket(xc->conn) == fd) {
806 			break;
807 		}
808 	}
809 	assert(xc != NULL);
810 	conn = xc->conn;
811 	PQconsumeInput(conn);
812 	if (!PQisBusy(conn)) {
813 		if (xc->blocker != NULL) {
814 			DPRINTF("schedule %p\n", xc->blocker);
815 			puffs_cc_schedule(xc->blocker);
816 		} else {
817 			DPRINTF("no blockers\n");
818 		}
819 	}
820 	*done = 0;
821 	return 0;
822 }
823 
824 int
pgfs_connectdb(struct puffs_usermount * pu,const char * dbname,const char * dbuser,bool debug,bool synchronous,unsigned int nconn)825 pgfs_connectdb(struct puffs_usermount *pu, const char *dbname,
826     const char *dbuser, bool debug, bool synchronous, unsigned int nconn)
827 {
828 	const char *keywords[3+1];
829 	const char *values[3];
830 	unsigned int i;
831 
832 	if (nconn > 32) {
833 		/*
834 		 * limit from sizeof(cmd->prepared_mask)
835 		 */
836 		return EINVAL;
837 	}
838 	if (debug) {
839 		pgfs_dodprintf = true;
840 	}
841 	if (synchronous) {
842 		pgfs_dosync = true;
843 	}
844 	i = 0;
845 	if (dbname != NULL) {
846 		keywords[i] = "dbname";
847 		values[i] = dbname;
848 		i++;
849 	}
850 	if (dbuser != NULL) {
851 		keywords[i] = "user";
852 		values[i] = dbuser;
853 		i++;
854 	}
855 	keywords[i] = "application_name";
856 	values[i] = "pgfs";
857 	i++;
858 	keywords[i] = NULL;
859 	puffs_framev_init(pu, pgfs_readframe, NULL, NULL, NULL, NULL);
860 	for (i = 0; i < nconn; i++) {
861 		struct Xconn *xc;
862 		struct Xconn *xc2;
863 		static int xcid;
864 		PGconn *conn;
865 		struct cmd *c;
866 		int error;
867 
868 		conn = PQconnectdbParams(keywords, values, 0);
869 		if (conn == NULL) {
870 			errx(EXIT_FAILURE,
871 			    "PQconnectdbParams: unknown failure");
872 		}
873 		if (PQstatus(conn) != CONNECTION_OK) {
874 			/*
875 			 * XXX sleep and retry on ERRCODE_CANNOT_CONNECT_NOW
876 			 */
877 			errx(EXIT_FAILURE, "PQconnectdbParams: %s",
878 			    PQerrorMessage(conn));
879 		}
880 		DPRINTF("protocol version %d\n", PQprotocolVersion(conn));
881 		puffs_framev_addfd(pu, PQsocket(conn), PUFFS_FBIO_READ);
882 		xc = emalloc(sizeof(*xc));
883 		xc->conn = conn;
884 		xc->blocker = NULL;
885 		xc->owner = NULL;
886 		xc->in_trans = false;
887 		xc->id = xcid++;
888 		xc->label = NULL;
889 		assert(xc->id < 32);
890 		PQsetNoticeReceiver(conn, pgfs_notice_receiver, xc);
891 		TAILQ_INSERT_HEAD(&xclist, xc, list);
892 		xc2 = begin(pu, NULL);
893 		assert(xc2 == xc);
894 		c = createcmd("SET search_path TO pgfs", CMD_NOPREPARE);
895 		error = simplecmd(xc, c);
896 		assert(error == 0);
897 		freecmd(c);
898 		c = createcmd("SET SESSION CHARACTERISTICS AS "
899 		    "TRANSACTION ISOLATION LEVEL REPEATABLE READ",
900 		    CMD_NOPREPARE);
901 		error = simplecmd(xc, c);
902 		assert(error == 0);
903 		freecmd(c);
904 		c = createcmd("SET SESSION TIME ZONE UTC", CMD_NOPREPARE);
905 		error = simplecmd(xc, c);
906 		assert(error == 0);
907 		freecmd(c);
908 		if (!pgfs_dosync) {
909 			c = createcmd("SET SESSION SYNCHRONOUS_COMMIT TO OFF",
910 			    CMD_NOPREPARE);
911 			error = simplecmd(xc, c);
912 			assert(error == 0);
913 			freecmd(c);
914 		}
915 		if (debug) {
916 			struct fetchstatus s;
917 			static const Oid types[] = { INT8OID, };
918 			uint64_t pid;
919 
920 			c = createcmd("SELECT pg_backend_pid()::int8;",
921 			    CMD_NOPREPARE);
922 			error = sendcmd(xc, c);
923 			assert(error == 0);
924 			fetchinit(&s, xc);
925 			error = FETCHNEXT(&s, types, &pid);
926 			fetchdone(&s);
927 			assert(error == 0);
928 			DPRINTF("xc %p backend pid %" PRIu64 "\n", xc, pid);
929 		}
930 		error = commit(xc);
931 		assert(error == 0);
932 		assert(xc->owner == NULL);
933 	}
934 	/*
935 	 * XXX cleanup unlinked files here?  what to do when the filesystem
936 	 * is shared?
937 	 */
938 	return 0;
939 }
940 
941 struct waitq flushwaitq = TAILQ_HEAD_INITIALIZER(flushwaitq);
942 struct puffs_cc *flusher = NULL;
943 
944 int
flush_xacts(struct puffs_usermount * pu)945 flush_xacts(struct puffs_usermount *pu)
946 {
947 	struct puffs_cc *cc = puffs_cc_getcc(pu);
948 	struct Xconn *xc;
949 	static struct cmd *c;
950 	uint64_t dummy;
951 	int error;
952 
953 	/*
954 	 * flush all previously issued asynchronous transactions.
955 	 *
956 	 * XXX
957 	 * unfortunately it seems that there is no clean way to tell
958 	 * PostgreSQL flush XLOG.  we could perform a CHECKPOINT but it's
959 	 * too expensive and overkill for our purpose.
960 	 * besides, PostgreSQL has an optimization to skip XLOG flushing
961 	 * for transactions which didn't produce WAL records.
962 	 * (changeset f6a0863e3cb72763490ceca2c558d5ef2dddd5f2)
963 	 * it means that an empty transaction ("BEGIN; COMMIT;"), which
964 	 * doesn't produce any WAL records, doesn't flush the XLOG even if
965 	 * synchronous_commit=on.  we issues a dummy setval() to avoid the
966 	 * optimization.
967 	 * on the other hand, we try to avoid creating unnecessary WAL activity
968 	 * by serializing flushing and checking XLOG locations.
969 	 */
970 
971 	assert(!pgfs_dosync);
972 	if (flusher != NULL) { /* serialize flushers */
973 		DPRINTF("%p flush in progress %p\n", cc, flusher);
974 		waiton(&flushwaitq, cc);
975 		assert(flusher == NULL);
976 	}
977 	DPRINTF("%p start flushing\n", cc);
978 	flusher = cc;
979 retry:
980 	xc = begin(pu, "flush");
981 	CREATECMD_NOPARAM(c, "SELECT setval('dummyseq', 1) WHERE "
982 	    "pg_current_xlog_insert_location() <> pg_current_xlog_location()");
983 	error = sendcmd(xc, c);
984 	if (error != 0) {
985 		goto got_error;
986 	}
987 	error = simplefetch(xc, INT8OID, &dummy);
988 	assert(error != 0 || dummy == 1);
989 	if (error == ENOENT) {
990 		/*
991 		 * there seems to be nothing to flush.
992 		 */
993 		DPRINTF("%p no sync\n", cc);
994 		error = 0;
995 	}
996 	if (error != 0) {
997 		goto got_error;
998 	}
999 	error = commit_sync(xc);
1000 	if (error != 0) {
1001 		goto got_error;
1002 	}
1003 	goto done;
1004 got_error:
1005 	rollback(xc);
1006 	if (error == EAGAIN) {
1007 		goto retry;
1008 	}
1009 done:
1010 	assert(flusher == cc);
1011 	flusher = NULL;
1012 	wakeup_one(&flushwaitq);
1013 	DPRINTF("%p end flushing error=%d\n", cc, error);
1014 	return error;
1015 }
1016