1 /* $NetBSD: pgfs_db.c,v 1.3 2012/04/11 14:27:43 yamt Exp $ */
2
3 /*-
4 * Copyright (c)2010,2011 YAMAMOTO Takashi,
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
9 * are met:
10 * 1. Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
17 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
20 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
21 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
22 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
23 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
24 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
25 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
26 * SUCH DAMAGE.
27 */
28
29 /*
30 * backend db operations
31 */
32
33 #include <sys/cdefs.h>
34 #ifndef lint
35 __RCSID("$NetBSD: pgfs_db.c,v 1.3 2012/04/11 14:27:43 yamt Exp $");
36 #endif /* not lint */
37
38 #include <assert.h>
39 #include <err.h>
40 #include <errno.h>
41 #include <inttypes.h>
42 #include <puffs.h>
43 #include <stdbool.h>
44 #include <stdarg.h>
45 #include <stdio.h>
46 #include <stdlib.h>
47 #include <util.h>
48
49 #include <libpq-fe.h>
50
51 #include "pgfs_db.h"
52 #include "pgfs_waitq.h"
53 #include "pgfs_debug.h"
54
55 bool pgfs_dosync = false;
56
57 struct Xconn {
58 TAILQ_ENTRY(Xconn) list;
59 PGconn *conn;
60 struct puffs_cc *blocker;
61 struct puffs_cc *owner;
62 bool in_trans;
63 int id;
64 const char *label;
65 };
66
67 static void
dumperror(struct Xconn * xc,const PGresult * res)68 dumperror(struct Xconn *xc, const PGresult *res)
69 {
70 static const struct {
71 const char *name;
72 int code;
73 } fields[] = {
74 #define F(x) { .name = #x, .code = x, }
75 F(PG_DIAG_SEVERITY),
76 F(PG_DIAG_SQLSTATE),
77 F(PG_DIAG_MESSAGE_PRIMARY),
78 F(PG_DIAG_MESSAGE_DETAIL),
79 F(PG_DIAG_MESSAGE_HINT),
80 F(PG_DIAG_STATEMENT_POSITION),
81 F(PG_DIAG_INTERNAL_POSITION),
82 F(PG_DIAG_INTERNAL_QUERY),
83 F(PG_DIAG_CONTEXT),
84 F(PG_DIAG_SOURCE_FILE),
85 F(PG_DIAG_SOURCE_LINE),
86 F(PG_DIAG_SOURCE_FUNCTION),
87 #undef F
88 };
89 unsigned int i;
90
91 if (!pgfs_dodprintf) {
92 return;
93 }
94 assert(PQresultStatus(res) == PGRES_NONFATAL_ERROR ||
95 PQresultStatus(res) == PGRES_FATAL_ERROR);
96 for (i = 0; i < __arraycount(fields); i++) {
97 const char *val = PQresultErrorField(res, fields[i].code);
98
99 if (val == NULL) {
100 continue;
101 }
102 fprintf(stderr, "%s: %s\n", fields[i].name, val);
103 }
104 }
105
106 TAILQ_HEAD(, Xconn) xclist = TAILQ_HEAD_INITIALIZER(xclist);
107 struct waitq xcwaitq = TAILQ_HEAD_INITIALIZER(xcwaitq);
108
109 static struct Xconn *
getxc(struct puffs_cc * cc)110 getxc(struct puffs_cc *cc)
111 {
112 struct Xconn *xc;
113
114 assert(cc != NULL);
115 retry:
116 TAILQ_FOREACH(xc, &xclist, list) {
117 if (xc->blocker == NULL) {
118 assert(xc->owner == NULL);
119 xc->owner = cc;
120 DPRINTF("xc %p acquire %p\n", xc, cc);
121 return xc;
122 } else {
123 assert(xc->owner == xc->blocker);
124 }
125 }
126 DPRINTF("no free conn %p\n", cc);
127 waiton(&xcwaitq, cc);
128 goto retry;
129 }
130
131 static void
relxc(struct Xconn * xc)132 relxc(struct Xconn *xc)
133 {
134
135 assert(xc->in_trans);
136 assert(xc->owner != NULL);
137 xc->in_trans = false;
138 xc->owner = NULL;
139 wakeup_one(&xcwaitq);
140 }
141
142 static void
pqwait(struct Xconn * xc)143 pqwait(struct Xconn *xc)
144 {
145 PGconn *conn = xc->conn;
146 struct puffs_cc *cc = xc->owner;
147
148 if (PQflush(conn)) {
149 errx(EXIT_FAILURE, "PQflush: %s", PQerrorMessage(conn));
150 }
151 if (!PQisBusy(conn)) {
152 return;
153 }
154 assert(xc->blocker == NULL);
155 xc->blocker = cc;
156 DPRINTF("yielding %p\n", cc);
157 /* XXX is it safe to yield before entering mainloop? */
158 puffs_cc_yield(cc);
159 DPRINTF("yield returned %p\n", cc);
160 assert(xc->owner == cc);
161 assert(xc->blocker == cc);
162 xc->blocker = NULL;
163 }
164
165 static int
sqltoerrno(const char * sqlstate)166 sqltoerrno(const char *sqlstate)
167 {
168 /*
169 * XXX hack; ERRCODE_INTERNAL_ERROR -> EAGAIN to handle
170 * "tuple concurrently updated" errors for lowrite/lo_truncate.
171 *
172 * XXX should map ERRCODE_OUT_OF_MEMORY to EAGAIN?
173 */
174 static const struct {
175 char sqlstate[5];
176 int error;
177 } map[] = {
178 { "00000", 0, }, /* ERRCODE_SUCCESSFUL_COMPLETION */
179 { "02000", ENOENT, }, /* ERRCODE_NO_DATA */
180 { "23505", EEXIST, }, /* ERRCODE_UNIQUE_VIOLATION */
181 { "23514", EINVAL, }, /* ERRCODE_CHECK_VIOLATION */
182 { "40001", EAGAIN, }, /* ERRCODE_T_R_SERIALIZATION_FAILURE */
183 { "40P01", EAGAIN, }, /* ERRCODE_T_R_DEADLOCK_DETECTED */
184 { "42704", ENOENT, }, /* ERRCODE_UNDEFINED_OBJECT */
185 { "53100", ENOSPC, }, /* ERRCODE_DISK_FULL */
186 { "53200", ENOMEM, }, /* ERRCODE_OUT_OF_MEMORY */
187 { "XX000", EAGAIN, }, /* ERRCODE_INTERNAL_ERROR */
188 };
189 unsigned int i;
190
191 for (i = 0; i < __arraycount(map); i++) {
192 if (!memcmp(map[i].sqlstate, sqlstate, 5)) {
193 const int error = map[i].error;
194
195 if (error != 0) {
196 DPRINTF("sqlstate %5s mapped to error %d\n",
197 sqlstate, error);
198 }
199 if (error == EINVAL) {
200 /*
201 * sounds like a bug.
202 */
203 abort();
204 }
205 return error;
206 }
207 }
208 DPRINTF("unknown sqlstate %5s mapped to EIO\n", sqlstate);
209 return EIO;
210 }
211
212 struct cmd {
213 char name[32]; /* name of prepared statement */
214 char *cmd; /* query string */
215 unsigned int nparams;
216 Oid *paramtypes;
217 uint32_t prepared_mask; /* for which connections this is prepared? */
218 unsigned int flags; /* CMD_ flags */
219 };
220
221 #define CMD_NOPREPARE 1 /* don't prepare this command */
222
223 struct cmd *
createcmd(const char * cmd,unsigned int flags,...)224 createcmd(const char *cmd, unsigned int flags, ...)
225 {
226 struct cmd *c;
227 va_list ap;
228 const char *cp;
229 unsigned int i;
230 static unsigned int cmdid;
231
232 c = emalloc(sizeof(*c));
233 c->cmd = estrdup(cmd);
234 c->nparams = 0;
235 va_start(ap, flags);
236 for (cp = cmd; *cp != 0; cp++) {
237 if (*cp == '$') { /* XXX */
238 c->nparams++;
239 }
240 }
241 c->paramtypes = emalloc(c->nparams * sizeof(*c->paramtypes));
242 for (i = 0; i < c->nparams; i++) {
243 Oid type = va_arg(ap, Oid);
244 assert(type == BYTEA ||
245 type == INT4OID || type == INT8OID || type == OIDOID ||
246 type == TEXTOID || type == TIMESTAMPTZOID);
247 c->paramtypes[i] = type;
248 }
249 va_end(ap);
250 snprintf(c->name, sizeof(c->name), "%u", cmdid++);
251 if ((flags & CMD_NOPREPARE) != 0) {
252 c->prepared_mask = ~0;
253 } else {
254 c->prepared_mask = 0;
255 }
256 c->flags = flags;
257 return c;
258 }
259
260 static void
freecmd(struct cmd * c)261 freecmd(struct cmd *c)
262 {
263
264 free(c->paramtypes);
265 free(c->cmd);
266 free(c);
267 }
268
269 static int
fetch_noresult(struct Xconn * xc)270 fetch_noresult(struct Xconn *xc)
271 {
272 PGresult *res;
273 ExecStatusType status;
274 PGconn *conn = xc->conn;
275 int error;
276
277 pqwait(xc);
278 res = PQgetResult(conn);
279 if (res == NULL) {
280 return ENOENT;
281 }
282 status = PQresultStatus(res);
283 if (status == PGRES_COMMAND_OK) {
284 assert(PQnfields(res) == 0);
285 assert(PQntuples(res) == 0);
286 if (!strcmp(PQcmdTuples(res), "0")) {
287 error = ENOENT;
288 } else {
289 error = 0;
290 }
291 } else if (status == PGRES_FATAL_ERROR) {
292 error = sqltoerrno(PQresultErrorField(res, PG_DIAG_SQLSTATE));
293 assert(error != 0);
294 dumperror(xc, res);
295 } else {
296 errx(1, "%s not command_ok: %d: %s", __func__,
297 (int)status,
298 PQerrorMessage(conn));
299 }
300 PQclear(res);
301 res = PQgetResult(conn);
302 assert(res == NULL);
303 if (error != 0) {
304 DPRINTF("error %d\n", error);
305 }
306 return error;
307 }
308
309 static int
preparecmd(struct Xconn * xc,struct cmd * c)310 preparecmd(struct Xconn *xc, struct cmd *c)
311 {
312 PGconn *conn = xc->conn;
313 const uint32_t mask = 1 << xc->id;
314 int error;
315 int ret;
316
317 if ((c->prepared_mask & mask) != 0) {
318 return 0;
319 }
320 assert((c->flags & CMD_NOPREPARE) == 0);
321 DPRINTF("PREPARE: '%s'\n", c->cmd);
322 ret = PQsendPrepare(conn, c->name, c->cmd, c->nparams, c->paramtypes);
323 if (!ret) {
324 errx(EXIT_FAILURE, "PQsendPrepare: %s",
325 PQerrorMessage(conn));
326 }
327 error = fetch_noresult(xc);
328 if (error != 0) {
329 return error;
330 }
331 c->prepared_mask |= mask;
332 return 0;
333 }
334
335 /*
336 * vsendcmd:
337 *
338 * resultmode is just passed to PQsendQueryParams/PQsendQueryPrepared.
339 * 0 for text and 1 for binary.
340 */
341
342 static int
vsendcmd(struct Xconn * xc,int resultmode,struct cmd * c,va_list ap)343 vsendcmd(struct Xconn *xc, int resultmode, struct cmd *c, va_list ap)
344 {
345 PGconn *conn = xc->conn;
346 char **paramvalues;
347 int *paramlengths;
348 int *paramformats;
349 unsigned int i;
350 int error;
351 int ret;
352
353 assert(xc->owner != NULL);
354 assert(xc->blocker == NULL);
355 error = preparecmd(xc, c);
356 if (error != 0) {
357 return error;
358 }
359 paramvalues = emalloc(c->nparams * sizeof(*paramvalues));
360 paramlengths = NULL;
361 paramformats = NULL;
362 DPRINTF("CMD: '%s'\n", c->cmd);
363 for (i = 0; i < c->nparams; i++) {
364 Oid type = c->paramtypes[i];
365 char tmpstore[1024];
366 const char *buf = NULL;
367 intmax_t v = 0; /* XXXgcc */
368 int sz;
369 bool binary = false;
370
371 switch (type) {
372 case BYTEA:
373 buf = va_arg(ap, const void *);
374 sz = (int)va_arg(ap, size_t);
375 binary = true;
376 break;
377 case INT8OID:
378 case OIDOID:
379 case INT4OID:
380 switch (type) {
381 case INT8OID:
382 v = (intmax_t)va_arg(ap, int64_t);
383 break;
384 case OIDOID:
385 v = (intmax_t)va_arg(ap, Oid);
386 break;
387 case INT4OID:
388 v = (intmax_t)va_arg(ap, int32_t);
389 break;
390 default:
391 errx(EXIT_FAILURE, "unknown integer oid %u",
392 type);
393 }
394 buf = tmpstore;
395 sz = snprintf(tmpstore, sizeof(tmpstore),
396 "%jd", v);
397 assert(sz != -1);
398 assert((size_t)sz < sizeof(tmpstore));
399 sz += 1;
400 break;
401 case TEXTOID:
402 case TIMESTAMPTZOID:
403 buf = va_arg(ap, char *);
404 sz = strlen(buf) + 1;
405 break;
406 default:
407 errx(EXIT_FAILURE, "%s: unknown param type %u",
408 __func__, type);
409 }
410 if (binary) {
411 if (paramlengths == NULL) {
412 paramlengths =
413 emalloc(c->nparams * sizeof(*paramformats));
414 }
415 if (paramformats == NULL) {
416 paramformats = ecalloc(1,
417 c->nparams * sizeof(*paramformats));
418 }
419 paramformats[i] = 1;
420 paramlengths[i] = sz;
421 }
422 paramvalues[i] = emalloc(sz);
423 memcpy(paramvalues[i], buf, sz);
424 if (binary) {
425 DPRINTF("\t[%u]=<BINARY>\n", i);
426 } else {
427 DPRINTF("\t[%u]='%s'\n", i, paramvalues[i]);
428 }
429 }
430 if ((c->flags & CMD_NOPREPARE) != 0) {
431 ret = PQsendQueryParams(conn, c->cmd, c->nparams, c->paramtypes,
432 (const char * const *)paramvalues, paramlengths,
433 paramformats, resultmode);
434 } else {
435 ret = PQsendQueryPrepared(conn, c->name, c->nparams,
436 (const char * const *)paramvalues, paramlengths,
437 paramformats, resultmode);
438 }
439 for (i = 0; i < c->nparams; i++) {
440 free(paramvalues[i]);
441 }
442 free(paramvalues);
443 free(paramlengths);
444 free(paramformats);
445 if (!ret) {
446 errx(EXIT_FAILURE, "PQsendQueryPrepared: %s",
447 PQerrorMessage(conn));
448 }
449 return 0;
450 }
451
452 int
sendcmd(struct Xconn * xc,struct cmd * c,...)453 sendcmd(struct Xconn *xc, struct cmd *c, ...)
454 {
455 va_list ap;
456 int error;
457
458 va_start(ap, c);
459 error = vsendcmd(xc, 0, c, ap);
460 va_end(ap);
461 return error;
462 }
463
464 int
sendcmdx(struct Xconn * xc,int resultmode,struct cmd * c,...)465 sendcmdx(struct Xconn *xc, int resultmode, struct cmd *c, ...)
466 {
467 va_list ap;
468 int error;
469
470 va_start(ap, c);
471 error = vsendcmd(xc, resultmode, c, ap);
472 va_end(ap);
473 return error;
474 }
475
476 /*
477 * simplecmd: a convenient routine to execute a command which returns
478 * no rows synchronously.
479 */
480
481 int
simplecmd(struct Xconn * xc,struct cmd * c,...)482 simplecmd(struct Xconn *xc, struct cmd *c, ...)
483 {
484 va_list ap;
485 int error;
486
487 va_start(ap, c);
488 error = vsendcmd(xc, 0, c, ap);
489 va_end(ap);
490 if (error != 0) {
491 return error;
492 }
493 return fetch_noresult(xc);
494 }
495
496 void
fetchinit(struct fetchstatus * s,struct Xconn * xc)497 fetchinit(struct fetchstatus *s, struct Xconn *xc)
498 {
499 s->xc = xc;
500 s->res = NULL;
501 s->cur = 0;
502 s->nrows = 0;
503 s->done = false;
504 }
505
506 static intmax_t
getint(const char * str)507 getint(const char *str)
508 {
509 intmax_t i;
510 char *ep;
511
512 errno = 0;
513 i = strtoimax(str, &ep, 10);
514 assert(errno == 0);
515 assert(str[0] != 0);
516 assert(*ep == 0);
517 return i;
518 }
519
520 static int
vfetchnext(struct fetchstatus * s,unsigned int n,const Oid * types,va_list ap)521 vfetchnext(struct fetchstatus *s, unsigned int n, const Oid *types, va_list ap)
522 {
523 PGconn *conn = s->xc->conn;
524 unsigned int i;
525
526 assert(conn != NULL);
527 if (s->res == NULL) {
528 ExecStatusType status;
529 int error;
530
531 pqwait(s->xc);
532 s->res = PQgetResult(conn);
533 if (s->res == NULL) {
534 s->done = true;
535 return ENOENT;
536 }
537 status = PQresultStatus(s->res);
538 if (status == PGRES_FATAL_ERROR) {
539 error = sqltoerrno(
540 PQresultErrorField(s->res, PG_DIAG_SQLSTATE));
541 assert(error != 0);
542 dumperror(s->xc, s->res);
543 return error;
544 }
545 if (status != PGRES_TUPLES_OK) {
546 errx(1, "not tuples_ok: %s",
547 PQerrorMessage(conn));
548 }
549 assert((unsigned int)PQnfields(s->res) == n);
550 s->nrows = PQntuples(s->res);
551 if (s->nrows == 0) {
552 DPRINTF("no rows\n");
553 return ENOENT;
554 }
555 assert(s->nrows >= 1);
556 s->cur = 0;
557 }
558 for (i = 0; i < n; i++) {
559 size_t size;
560
561 assert((types[i] != BYTEA) == (PQfformat(s->res, i) == 0));
562 DPRINTF("[%u] PQftype = %d, types = %d, value = '%s'\n",
563 i, PQftype(s->res, i), types[i],
564 PQgetisnull(s->res, s->cur, i) ? "<NULL>" :
565 PQfformat(s->res, i) == 0 ? PQgetvalue(s->res, s->cur, i) :
566 "<BINARY>");
567 assert(PQftype(s->res, i) == types[i]);
568 assert(!PQgetisnull(s->res, s->cur, i));
569 switch(types[i]) {
570 case INT8OID:
571 *va_arg(ap, int64_t *) =
572 getint(PQgetvalue(s->res, s->cur, i));
573 break;
574 case OIDOID:
575 *va_arg(ap, Oid *) =
576 getint(PQgetvalue(s->res, s->cur, i));
577 break;
578 case INT4OID:
579 *va_arg(ap, int32_t *) =
580 getint(PQgetvalue(s->res, s->cur, i));
581 break;
582 case TEXTOID:
583 *va_arg(ap, char **) =
584 estrdup(PQgetvalue(s->res, s->cur, i));
585 break;
586 case BYTEA:
587 size = PQgetlength(s->res, s->cur, i);
588 memcpy(va_arg(ap, void *),
589 PQgetvalue(s->res, s->cur, i), size);
590 *va_arg(ap, size_t *) = size;
591 break;
592 default:
593 errx(EXIT_FAILURE, "%s unknown type %u", __func__,
594 types[i]);
595 }
596 }
597 s->cur++;
598 if (s->cur == s->nrows) {
599 PQclear(s->res);
600 s->res = NULL;
601 }
602 return 0;
603 }
604
605 int
fetchnext(struct fetchstatus * s,unsigned int n,const Oid * types,...)606 fetchnext(struct fetchstatus *s, unsigned int n, const Oid *types, ...)
607 {
608 va_list ap;
609 int error;
610
611 va_start(ap, types);
612 error = vfetchnext(s, n, types, ap);
613 va_end(ap);
614 return error;
615 }
616
617 void
fetchdone(struct fetchstatus * s)618 fetchdone(struct fetchstatus *s)
619 {
620
621 if (s->res != NULL) {
622 PQclear(s->res);
623 s->res = NULL;
624 }
625 if (!s->done) {
626 PGresult *res;
627 unsigned int n;
628
629 n = 0;
630 while ((res = PQgetResult(s->xc->conn)) != NULL) {
631 PQclear(res);
632 n++;
633 }
634 if (n > 0) {
635 DPRINTF("%u rows dropped\n", n);
636 }
637 }
638 }
639
640 int
simplefetch(struct Xconn * xc,Oid type,...)641 simplefetch(struct Xconn *xc, Oid type, ...)
642 {
643 struct fetchstatus s;
644 va_list ap;
645 int error;
646
647 fetchinit(&s, xc);
648 va_start(ap, type);
649 error = vfetchnext(&s, 1, &type, ap);
650 va_end(ap);
651 assert(error != 0 || s.res == NULL);
652 fetchdone(&s);
653 return error;
654 }
655
656 /*
657 * setlabel: set the descriptive label for the connection.
658 *
659 * we use simple pointer comparison for label equality check.
660 */
661 static void
setlabel(struct Xconn * xc,const char * label)662 setlabel(struct Xconn *xc, const char *label)
663 {
664 int error;
665
666 /*
667 * put the label into application_name so that it's shown in
668 * pg_stat_activity. we are sure that our labels don't need
669 * PQescapeStringConn.
670 *
671 * example:
672 * SELECT pid,application_name,query FROM pg_stat_activity
673 * WHERE state <> 'idle'
674 */
675
676 if (label != NULL && label != xc->label) {
677 struct cmd *c;
678 char cmd_str[1024];
679
680 snprintf(cmd_str, sizeof(cmd_str),
681 "SET application_name TO 'pgfs: %s'", label);
682 c = createcmd(cmd_str, CMD_NOPREPARE);
683 error = simplecmd(xc, c);
684 freecmd(c);
685 assert(error == 0);
686 xc->label = label;
687 } else {
688 #if 0 /* don't bother to clear label */
689 static struct cmd *c;
690
691 CREATECMD_NOPARAM(c, "SET application_name TO 'pgfs'");
692 error = simplecmd(xc, c);
693 assert(error == 0);
694 #endif
695 }
696 }
697
698 struct Xconn *
begin(struct puffs_usermount * pu,const char * label)699 begin(struct puffs_usermount *pu, const char *label)
700 {
701 struct Xconn *xc = getxc(puffs_cc_getcc(pu));
702 static struct cmd *c;
703 int error;
704
705 setlabel(xc, label);
706 CREATECMD_NOPARAM(c, "BEGIN");
707 assert(!xc->in_trans);
708 error = simplecmd(xc, c);
709 assert(error == 0);
710 assert(PQtransactionStatus(xc->conn) == PQTRANS_INTRANS);
711 xc->in_trans = true;
712 return xc;
713 }
714
715 struct Xconn *
begin_readonly(struct puffs_usermount * pu,const char * label)716 begin_readonly(struct puffs_usermount *pu, const char *label)
717 {
718 struct Xconn *xc = getxc(puffs_cc_getcc(pu));
719 static struct cmd *c;
720 int error;
721
722 setlabel(xc, label);
723 CREATECMD_NOPARAM(c, "BEGIN READ ONLY");
724 assert(!xc->in_trans);
725 error = simplecmd(xc, c);
726 assert(error == 0);
727 assert(PQtransactionStatus(xc->conn) == PQTRANS_INTRANS);
728 xc->in_trans = true;
729 return xc;
730 }
731
732 void
rollback(struct Xconn * xc)733 rollback(struct Xconn *xc)
734 {
735 PGTransactionStatusType status;
736
737 /*
738 * check the status as we are not sure the status of our transaction
739 * after a failed commit.
740 */
741 status = PQtransactionStatus(xc->conn);
742 assert(status != PQTRANS_ACTIVE);
743 assert(status != PQTRANS_UNKNOWN);
744 if (status != PQTRANS_IDLE) {
745 static struct cmd *c;
746 int error;
747
748 assert(status == PQTRANS_INTRANS || status == PQTRANS_INERROR);
749 CREATECMD_NOPARAM(c, "ROLLBACK");
750 error = simplecmd(xc, c);
751 assert(error == 0);
752 }
753 DPRINTF("xc %p rollback %p\n", xc, xc->owner);
754 setlabel(xc, NULL);
755 relxc(xc);
756 }
757
758 int
commit(struct Xconn * xc)759 commit(struct Xconn *xc)
760 {
761 static struct cmd *c;
762 int error;
763
764 CREATECMD_NOPARAM(c, "COMMIT");
765 error = simplecmd(xc, c);
766 setlabel(xc, NULL);
767 if (error == 0) {
768 DPRINTF("xc %p commit %p\n", xc, xc->owner);
769 relxc(xc);
770 }
771 return error;
772 }
773
774 int
commit_sync(struct Xconn * xc)775 commit_sync(struct Xconn *xc)
776 {
777 static struct cmd *c;
778 int error;
779
780 assert(!pgfs_dosync);
781 CREATECMD_NOPARAM(c, "SET LOCAL SYNCHRONOUS_COMMIT TO ON");
782 error = simplecmd(xc, c);
783 assert(error == 0);
784 return commit(xc);
785 }
786
787 static void
pgfs_notice_receiver(void * vp,const PGresult * res)788 pgfs_notice_receiver(void *vp, const PGresult *res)
789 {
790 struct Xconn *xc = vp;
791
792 assert(PQresultStatus(res) == PGRES_NONFATAL_ERROR);
793 fprintf(stderr, "got a notice on %p\n", xc);
794 dumperror(xc, res);
795 }
796
797 static int
pgfs_readframe(struct puffs_usermount * pu,struct puffs_framebuf * pufbuf,int fd,int * done)798 pgfs_readframe(struct puffs_usermount *pu, struct puffs_framebuf *pufbuf,
799 int fd, int *done)
800 {
801 struct Xconn *xc;
802 PGconn *conn;
803
804 TAILQ_FOREACH(xc, &xclist, list) {
805 if (PQsocket(xc->conn) == fd) {
806 break;
807 }
808 }
809 assert(xc != NULL);
810 conn = xc->conn;
811 PQconsumeInput(conn);
812 if (!PQisBusy(conn)) {
813 if (xc->blocker != NULL) {
814 DPRINTF("schedule %p\n", xc->blocker);
815 puffs_cc_schedule(xc->blocker);
816 } else {
817 DPRINTF("no blockers\n");
818 }
819 }
820 *done = 0;
821 return 0;
822 }
823
824 int
pgfs_connectdb(struct puffs_usermount * pu,const char * dbname,const char * dbuser,bool debug,bool synchronous,unsigned int nconn)825 pgfs_connectdb(struct puffs_usermount *pu, const char *dbname,
826 const char *dbuser, bool debug, bool synchronous, unsigned int nconn)
827 {
828 const char *keywords[3+1];
829 const char *values[3];
830 unsigned int i;
831
832 if (nconn > 32) {
833 /*
834 * limit from sizeof(cmd->prepared_mask)
835 */
836 return EINVAL;
837 }
838 if (debug) {
839 pgfs_dodprintf = true;
840 }
841 if (synchronous) {
842 pgfs_dosync = true;
843 }
844 i = 0;
845 if (dbname != NULL) {
846 keywords[i] = "dbname";
847 values[i] = dbname;
848 i++;
849 }
850 if (dbuser != NULL) {
851 keywords[i] = "user";
852 values[i] = dbuser;
853 i++;
854 }
855 keywords[i] = "application_name";
856 values[i] = "pgfs";
857 i++;
858 keywords[i] = NULL;
859 puffs_framev_init(pu, pgfs_readframe, NULL, NULL, NULL, NULL);
860 for (i = 0; i < nconn; i++) {
861 struct Xconn *xc;
862 struct Xconn *xc2;
863 static int xcid;
864 PGconn *conn;
865 struct cmd *c;
866 int error;
867
868 conn = PQconnectdbParams(keywords, values, 0);
869 if (conn == NULL) {
870 errx(EXIT_FAILURE,
871 "PQconnectdbParams: unknown failure");
872 }
873 if (PQstatus(conn) != CONNECTION_OK) {
874 /*
875 * XXX sleep and retry on ERRCODE_CANNOT_CONNECT_NOW
876 */
877 errx(EXIT_FAILURE, "PQconnectdbParams: %s",
878 PQerrorMessage(conn));
879 }
880 DPRINTF("protocol version %d\n", PQprotocolVersion(conn));
881 puffs_framev_addfd(pu, PQsocket(conn), PUFFS_FBIO_READ);
882 xc = emalloc(sizeof(*xc));
883 xc->conn = conn;
884 xc->blocker = NULL;
885 xc->owner = NULL;
886 xc->in_trans = false;
887 xc->id = xcid++;
888 xc->label = NULL;
889 assert(xc->id < 32);
890 PQsetNoticeReceiver(conn, pgfs_notice_receiver, xc);
891 TAILQ_INSERT_HEAD(&xclist, xc, list);
892 xc2 = begin(pu, NULL);
893 assert(xc2 == xc);
894 c = createcmd("SET search_path TO pgfs", CMD_NOPREPARE);
895 error = simplecmd(xc, c);
896 assert(error == 0);
897 freecmd(c);
898 c = createcmd("SET SESSION CHARACTERISTICS AS "
899 "TRANSACTION ISOLATION LEVEL REPEATABLE READ",
900 CMD_NOPREPARE);
901 error = simplecmd(xc, c);
902 assert(error == 0);
903 freecmd(c);
904 c = createcmd("SET SESSION TIME ZONE UTC", CMD_NOPREPARE);
905 error = simplecmd(xc, c);
906 assert(error == 0);
907 freecmd(c);
908 if (!pgfs_dosync) {
909 c = createcmd("SET SESSION SYNCHRONOUS_COMMIT TO OFF",
910 CMD_NOPREPARE);
911 error = simplecmd(xc, c);
912 assert(error == 0);
913 freecmd(c);
914 }
915 if (debug) {
916 struct fetchstatus s;
917 static const Oid types[] = { INT8OID, };
918 uint64_t pid;
919
920 c = createcmd("SELECT pg_backend_pid()::int8;",
921 CMD_NOPREPARE);
922 error = sendcmd(xc, c);
923 assert(error == 0);
924 fetchinit(&s, xc);
925 error = FETCHNEXT(&s, types, &pid);
926 fetchdone(&s);
927 assert(error == 0);
928 DPRINTF("xc %p backend pid %" PRIu64 "\n", xc, pid);
929 }
930 error = commit(xc);
931 assert(error == 0);
932 assert(xc->owner == NULL);
933 }
934 /*
935 * XXX cleanup unlinked files here? what to do when the filesystem
936 * is shared?
937 */
938 return 0;
939 }
940
941 struct waitq flushwaitq = TAILQ_HEAD_INITIALIZER(flushwaitq);
942 struct puffs_cc *flusher = NULL;
943
944 int
flush_xacts(struct puffs_usermount * pu)945 flush_xacts(struct puffs_usermount *pu)
946 {
947 struct puffs_cc *cc = puffs_cc_getcc(pu);
948 struct Xconn *xc;
949 static struct cmd *c;
950 uint64_t dummy;
951 int error;
952
953 /*
954 * flush all previously issued asynchronous transactions.
955 *
956 * XXX
957 * unfortunately it seems that there is no clean way to tell
958 * PostgreSQL flush XLOG. we could perform a CHECKPOINT but it's
959 * too expensive and overkill for our purpose.
960 * besides, PostgreSQL has an optimization to skip XLOG flushing
961 * for transactions which didn't produce WAL records.
962 * (changeset f6a0863e3cb72763490ceca2c558d5ef2dddd5f2)
963 * it means that an empty transaction ("BEGIN; COMMIT;"), which
964 * doesn't produce any WAL records, doesn't flush the XLOG even if
965 * synchronous_commit=on. we issues a dummy setval() to avoid the
966 * optimization.
967 * on the other hand, we try to avoid creating unnecessary WAL activity
968 * by serializing flushing and checking XLOG locations.
969 */
970
971 assert(!pgfs_dosync);
972 if (flusher != NULL) { /* serialize flushers */
973 DPRINTF("%p flush in progress %p\n", cc, flusher);
974 waiton(&flushwaitq, cc);
975 assert(flusher == NULL);
976 }
977 DPRINTF("%p start flushing\n", cc);
978 flusher = cc;
979 retry:
980 xc = begin(pu, "flush");
981 CREATECMD_NOPARAM(c, "SELECT setval('dummyseq', 1) WHERE "
982 "pg_current_xlog_insert_location() <> pg_current_xlog_location()");
983 error = sendcmd(xc, c);
984 if (error != 0) {
985 goto got_error;
986 }
987 error = simplefetch(xc, INT8OID, &dummy);
988 assert(error != 0 || dummy == 1);
989 if (error == ENOENT) {
990 /*
991 * there seems to be nothing to flush.
992 */
993 DPRINTF("%p no sync\n", cc);
994 error = 0;
995 }
996 if (error != 0) {
997 goto got_error;
998 }
999 error = commit_sync(xc);
1000 if (error != 0) {
1001 goto got_error;
1002 }
1003 goto done;
1004 got_error:
1005 rollback(xc);
1006 if (error == EAGAIN) {
1007 goto retry;
1008 }
1009 done:
1010 assert(flusher == cc);
1011 flusher = NULL;
1012 wakeup_one(&flushwaitq);
1013 DPRINTF("%p end flushing error=%d\n", cc, error);
1014 return error;
1015 }
1016