1 /* $NetBSD: pgfs_db.c,v 1.3 2012/04/11 14:27:43 yamt Exp $ */ 2 3 /*- 4 * Copyright (c)2010,2011 YAMAMOTO Takashi, 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 1. Redistributions of source code must retain the above copyright 11 * notice, this list of conditions and the following disclaimer. 12 * 2. Redistributions in binary form must reproduce the above copyright 13 * notice, this list of conditions and the following disclaimer in the 14 * documentation and/or other materials provided with the distribution. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 17 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 19 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 20 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 21 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 22 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 23 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 24 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 25 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 26 * SUCH DAMAGE. 27 */ 28 29 /* 30 * backend db operations 31 */ 32 33 #include <sys/cdefs.h> 34 #ifndef lint 35 __RCSID("$NetBSD: pgfs_db.c,v 1.3 2012/04/11 14:27:43 yamt Exp $"); 36 #endif /* not lint */ 37 38 #include <assert.h> 39 #include <err.h> 40 #include <errno.h> 41 #include <inttypes.h> 42 #include <puffs.h> 43 #include <stdbool.h> 44 #include <stdarg.h> 45 #include <stdio.h> 46 #include <stdlib.h> 47 #include <util.h> 48 49 #include <libpq-fe.h> 50 51 #include "pgfs_db.h" 52 #include "pgfs_waitq.h" 53 #include "pgfs_debug.h" 54 55 bool pgfs_dosync = false; 56 57 struct Xconn { 58 TAILQ_ENTRY(Xconn) list; 59 PGconn *conn; 60 struct puffs_cc *blocker; 61 struct puffs_cc *owner; 62 bool in_trans; 63 int id; 64 const char *label; 65 }; 66 67 static void 68 dumperror(struct Xconn *xc, const PGresult *res) 69 { 70 static const struct { 71 const char *name; 72 int code; 73 } fields[] = { 74 #define F(x) { .name = #x, .code = x, } 75 F(PG_DIAG_SEVERITY), 76 F(PG_DIAG_SQLSTATE), 77 F(PG_DIAG_MESSAGE_PRIMARY), 78 F(PG_DIAG_MESSAGE_DETAIL), 79 F(PG_DIAG_MESSAGE_HINT), 80 F(PG_DIAG_STATEMENT_POSITION), 81 F(PG_DIAG_INTERNAL_POSITION), 82 F(PG_DIAG_INTERNAL_QUERY), 83 F(PG_DIAG_CONTEXT), 84 F(PG_DIAG_SOURCE_FILE), 85 F(PG_DIAG_SOURCE_LINE), 86 F(PG_DIAG_SOURCE_FUNCTION), 87 #undef F 88 }; 89 unsigned int i; 90 91 if (!pgfs_dodprintf) { 92 return; 93 } 94 assert(PQresultStatus(res) == PGRES_NONFATAL_ERROR || 95 PQresultStatus(res) == PGRES_FATAL_ERROR); 96 for (i = 0; i < __arraycount(fields); i++) { 97 const char *val = PQresultErrorField(res, fields[i].code); 98 99 if (val == NULL) { 100 continue; 101 } 102 fprintf(stderr, "%s: %s\n", fields[i].name, val); 103 } 104 } 105 106 TAILQ_HEAD(, Xconn) xclist = TAILQ_HEAD_INITIALIZER(xclist); 107 struct waitq xcwaitq = TAILQ_HEAD_INITIALIZER(xcwaitq); 108 109 static struct Xconn * 110 getxc(struct puffs_cc *cc) 111 { 112 struct Xconn *xc; 113 114 assert(cc != NULL); 115 retry: 116 TAILQ_FOREACH(xc, &xclist, list) { 117 if (xc->blocker == NULL) { 118 assert(xc->owner == NULL); 119 xc->owner = cc; 120 DPRINTF("xc %p acquire %p\n", xc, cc); 121 return xc; 122 } else { 123 assert(xc->owner == xc->blocker); 124 } 125 } 126 DPRINTF("no free conn %p\n", cc); 127 waiton(&xcwaitq, cc); 128 goto retry; 129 } 130 131 static void 132 relxc(struct Xconn *xc) 133 { 134 135 assert(xc->in_trans); 136 assert(xc->owner != NULL); 137 xc->in_trans = false; 138 xc->owner = NULL; 139 wakeup_one(&xcwaitq); 140 } 141 142 static void 143 pqwait(struct Xconn *xc) 144 { 145 PGconn *conn = xc->conn; 146 struct puffs_cc *cc = xc->owner; 147 148 if (PQflush(conn)) { 149 errx(EXIT_FAILURE, "PQflush: %s", PQerrorMessage(conn)); 150 } 151 if (!PQisBusy(conn)) { 152 return; 153 } 154 assert(xc->blocker == NULL); 155 xc->blocker = cc; 156 DPRINTF("yielding %p\n", cc); 157 /* XXX is it safe to yield before entering mainloop? */ 158 puffs_cc_yield(cc); 159 DPRINTF("yield returned %p\n", cc); 160 assert(xc->owner == cc); 161 assert(xc->blocker == cc); 162 xc->blocker = NULL; 163 } 164 165 static int 166 sqltoerrno(const char *sqlstate) 167 { 168 /* 169 * XXX hack; ERRCODE_INTERNAL_ERROR -> EAGAIN to handle 170 * "tuple concurrently updated" errors for lowrite/lo_truncate. 171 * 172 * XXX should map ERRCODE_OUT_OF_MEMORY to EAGAIN? 173 */ 174 static const struct { 175 char sqlstate[5]; 176 int error; 177 } map[] = { 178 { "00000", 0, }, /* ERRCODE_SUCCESSFUL_COMPLETION */ 179 { "02000", ENOENT, }, /* ERRCODE_NO_DATA */ 180 { "23505", EEXIST, }, /* ERRCODE_UNIQUE_VIOLATION */ 181 { "23514", EINVAL, }, /* ERRCODE_CHECK_VIOLATION */ 182 { "40001", EAGAIN, }, /* ERRCODE_T_R_SERIALIZATION_FAILURE */ 183 { "40P01", EAGAIN, }, /* ERRCODE_T_R_DEADLOCK_DETECTED */ 184 { "42704", ENOENT, }, /* ERRCODE_UNDEFINED_OBJECT */ 185 { "53100", ENOSPC, }, /* ERRCODE_DISK_FULL */ 186 { "53200", ENOMEM, }, /* ERRCODE_OUT_OF_MEMORY */ 187 { "XX000", EAGAIN, }, /* ERRCODE_INTERNAL_ERROR */ 188 }; 189 unsigned int i; 190 191 for (i = 0; i < __arraycount(map); i++) { 192 if (!memcmp(map[i].sqlstate, sqlstate, 5)) { 193 const int error = map[i].error; 194 195 if (error != 0) { 196 DPRINTF("sqlstate %5s mapped to error %d\n", 197 sqlstate, error); 198 } 199 if (error == EINVAL) { 200 /* 201 * sounds like a bug. 202 */ 203 abort(); 204 } 205 return error; 206 } 207 } 208 DPRINTF("unknown sqlstate %5s mapped to EIO\n", sqlstate); 209 return EIO; 210 } 211 212 struct cmd { 213 char name[32]; /* name of prepared statement */ 214 char *cmd; /* query string */ 215 unsigned int nparams; 216 Oid *paramtypes; 217 uint32_t prepared_mask; /* for which connections this is prepared? */ 218 unsigned int flags; /* CMD_ flags */ 219 }; 220 221 #define CMD_NOPREPARE 1 /* don't prepare this command */ 222 223 struct cmd * 224 createcmd(const char *cmd, unsigned int flags, ...) 225 { 226 struct cmd *c; 227 va_list ap; 228 const char *cp; 229 unsigned int i; 230 static unsigned int cmdid; 231 232 c = emalloc(sizeof(*c)); 233 c->cmd = estrdup(cmd); 234 c->nparams = 0; 235 va_start(ap, flags); 236 for (cp = cmd; *cp != 0; cp++) { 237 if (*cp == '$') { /* XXX */ 238 c->nparams++; 239 } 240 } 241 c->paramtypes = emalloc(c->nparams * sizeof(*c->paramtypes)); 242 for (i = 0; i < c->nparams; i++) { 243 Oid type = va_arg(ap, Oid); 244 assert(type == BYTEA || 245 type == INT4OID || type == INT8OID || type == OIDOID || 246 type == TEXTOID || type == TIMESTAMPTZOID); 247 c->paramtypes[i] = type; 248 } 249 va_end(ap); 250 snprintf(c->name, sizeof(c->name), "%u", cmdid++); 251 if ((flags & CMD_NOPREPARE) != 0) { 252 c->prepared_mask = ~0; 253 } else { 254 c->prepared_mask = 0; 255 } 256 c->flags = flags; 257 return c; 258 } 259 260 static void 261 freecmd(struct cmd *c) 262 { 263 264 free(c->paramtypes); 265 free(c->cmd); 266 free(c); 267 } 268 269 static int 270 fetch_noresult(struct Xconn *xc) 271 { 272 PGresult *res; 273 ExecStatusType status; 274 PGconn *conn = xc->conn; 275 int error; 276 277 pqwait(xc); 278 res = PQgetResult(conn); 279 if (res == NULL) { 280 return ENOENT; 281 } 282 status = PQresultStatus(res); 283 if (status == PGRES_COMMAND_OK) { 284 assert(PQnfields(res) == 0); 285 assert(PQntuples(res) == 0); 286 if (!strcmp(PQcmdTuples(res), "0")) { 287 error = ENOENT; 288 } else { 289 error = 0; 290 } 291 } else if (status == PGRES_FATAL_ERROR) { 292 error = sqltoerrno(PQresultErrorField(res, PG_DIAG_SQLSTATE)); 293 assert(error != 0); 294 dumperror(xc, res); 295 } else { 296 errx(1, "%s not command_ok: %d: %s", __func__, 297 (int)status, 298 PQerrorMessage(conn)); 299 } 300 PQclear(res); 301 res = PQgetResult(conn); 302 assert(res == NULL); 303 if (error != 0) { 304 DPRINTF("error %d\n", error); 305 } 306 return error; 307 } 308 309 static int 310 preparecmd(struct Xconn *xc, struct cmd *c) 311 { 312 PGconn *conn = xc->conn; 313 const uint32_t mask = 1 << xc->id; 314 int error; 315 int ret; 316 317 if ((c->prepared_mask & mask) != 0) { 318 return 0; 319 } 320 assert((c->flags & CMD_NOPREPARE) == 0); 321 DPRINTF("PREPARE: '%s'\n", c->cmd); 322 ret = PQsendPrepare(conn, c->name, c->cmd, c->nparams, c->paramtypes); 323 if (!ret) { 324 errx(EXIT_FAILURE, "PQsendPrepare: %s", 325 PQerrorMessage(conn)); 326 } 327 error = fetch_noresult(xc); 328 if (error != 0) { 329 return error; 330 } 331 c->prepared_mask |= mask; 332 return 0; 333 } 334 335 /* 336 * vsendcmd: 337 * 338 * resultmode is just passed to PQsendQueryParams/PQsendQueryPrepared. 339 * 0 for text and 1 for binary. 340 */ 341 342 static int 343 vsendcmd(struct Xconn *xc, int resultmode, struct cmd *c, va_list ap) 344 { 345 PGconn *conn = xc->conn; 346 char **paramvalues; 347 int *paramlengths; 348 int *paramformats; 349 unsigned int i; 350 int error; 351 int ret; 352 353 assert(xc->owner != NULL); 354 assert(xc->blocker == NULL); 355 error = preparecmd(xc, c); 356 if (error != 0) { 357 return error; 358 } 359 paramvalues = emalloc(c->nparams * sizeof(*paramvalues)); 360 paramlengths = NULL; 361 paramformats = NULL; 362 DPRINTF("CMD: '%s'\n", c->cmd); 363 for (i = 0; i < c->nparams; i++) { 364 Oid type = c->paramtypes[i]; 365 char tmpstore[1024]; 366 const char *buf = NULL; 367 intmax_t v = 0; /* XXXgcc */ 368 int sz; 369 bool binary = false; 370 371 switch (type) { 372 case BYTEA: 373 buf = va_arg(ap, const void *); 374 sz = (int)va_arg(ap, size_t); 375 binary = true; 376 break; 377 case INT8OID: 378 case OIDOID: 379 case INT4OID: 380 switch (type) { 381 case INT8OID: 382 v = (intmax_t)va_arg(ap, int64_t); 383 break; 384 case OIDOID: 385 v = (intmax_t)va_arg(ap, Oid); 386 break; 387 case INT4OID: 388 v = (intmax_t)va_arg(ap, int32_t); 389 break; 390 default: 391 errx(EXIT_FAILURE, "unknown integer oid %u", 392 type); 393 } 394 buf = tmpstore; 395 sz = snprintf(tmpstore, sizeof(tmpstore), 396 "%jd", v); 397 assert(sz != -1); 398 assert((size_t)sz < sizeof(tmpstore)); 399 sz += 1; 400 break; 401 case TEXTOID: 402 case TIMESTAMPTZOID: 403 buf = va_arg(ap, char *); 404 sz = strlen(buf) + 1; 405 break; 406 default: 407 errx(EXIT_FAILURE, "%s: unknown param type %u", 408 __func__, type); 409 } 410 if (binary) { 411 if (paramlengths == NULL) { 412 paramlengths = 413 emalloc(c->nparams * sizeof(*paramformats)); 414 } 415 if (paramformats == NULL) { 416 paramformats = ecalloc(1, 417 c->nparams * sizeof(*paramformats)); 418 } 419 paramformats[i] = 1; 420 paramlengths[i] = sz; 421 } 422 paramvalues[i] = emalloc(sz); 423 memcpy(paramvalues[i], buf, sz); 424 if (binary) { 425 DPRINTF("\t[%u]=<BINARY>\n", i); 426 } else { 427 DPRINTF("\t[%u]='%s'\n", i, paramvalues[i]); 428 } 429 } 430 if ((c->flags & CMD_NOPREPARE) != 0) { 431 ret = PQsendQueryParams(conn, c->cmd, c->nparams, c->paramtypes, 432 (const char * const *)paramvalues, paramlengths, 433 paramformats, resultmode); 434 } else { 435 ret = PQsendQueryPrepared(conn, c->name, c->nparams, 436 (const char * const *)paramvalues, paramlengths, 437 paramformats, resultmode); 438 } 439 for (i = 0; i < c->nparams; i++) { 440 free(paramvalues[i]); 441 } 442 free(paramvalues); 443 free(paramlengths); 444 free(paramformats); 445 if (!ret) { 446 errx(EXIT_FAILURE, "PQsendQueryPrepared: %s", 447 PQerrorMessage(conn)); 448 } 449 return 0; 450 } 451 452 int 453 sendcmd(struct Xconn *xc, struct cmd *c, ...) 454 { 455 va_list ap; 456 int error; 457 458 va_start(ap, c); 459 error = vsendcmd(xc, 0, c, ap); 460 va_end(ap); 461 return error; 462 } 463 464 int 465 sendcmdx(struct Xconn *xc, int resultmode, struct cmd *c, ...) 466 { 467 va_list ap; 468 int error; 469 470 va_start(ap, c); 471 error = vsendcmd(xc, resultmode, c, ap); 472 va_end(ap); 473 return error; 474 } 475 476 /* 477 * simplecmd: a convenient routine to execute a command which returns 478 * no rows synchronously. 479 */ 480 481 int 482 simplecmd(struct Xconn *xc, struct cmd *c, ...) 483 { 484 va_list ap; 485 int error; 486 487 va_start(ap, c); 488 error = vsendcmd(xc, 0, c, ap); 489 va_end(ap); 490 if (error != 0) { 491 return error; 492 } 493 return fetch_noresult(xc); 494 } 495 496 void 497 fetchinit(struct fetchstatus *s, struct Xconn *xc) 498 { 499 s->xc = xc; 500 s->res = NULL; 501 s->cur = 0; 502 s->nrows = 0; 503 s->done = false; 504 } 505 506 static intmax_t 507 getint(const char *str) 508 { 509 intmax_t i; 510 char *ep; 511 512 errno = 0; 513 i = strtoimax(str, &ep, 10); 514 assert(errno == 0); 515 assert(str[0] != 0); 516 assert(*ep == 0); 517 return i; 518 } 519 520 static int 521 vfetchnext(struct fetchstatus *s, unsigned int n, const Oid *types, va_list ap) 522 { 523 PGconn *conn = s->xc->conn; 524 unsigned int i; 525 526 assert(conn != NULL); 527 if (s->res == NULL) { 528 ExecStatusType status; 529 int error; 530 531 pqwait(s->xc); 532 s->res = PQgetResult(conn); 533 if (s->res == NULL) { 534 s->done = true; 535 return ENOENT; 536 } 537 status = PQresultStatus(s->res); 538 if (status == PGRES_FATAL_ERROR) { 539 error = sqltoerrno( 540 PQresultErrorField(s->res, PG_DIAG_SQLSTATE)); 541 assert(error != 0); 542 dumperror(s->xc, s->res); 543 return error; 544 } 545 if (status != PGRES_TUPLES_OK) { 546 errx(1, "not tuples_ok: %s", 547 PQerrorMessage(conn)); 548 } 549 assert((unsigned int)PQnfields(s->res) == n); 550 s->nrows = PQntuples(s->res); 551 if (s->nrows == 0) { 552 DPRINTF("no rows\n"); 553 return ENOENT; 554 } 555 assert(s->nrows >= 1); 556 s->cur = 0; 557 } 558 for (i = 0; i < n; i++) { 559 size_t size; 560 561 assert((types[i] != BYTEA) == (PQfformat(s->res, i) == 0)); 562 DPRINTF("[%u] PQftype = %d, types = %d, value = '%s'\n", 563 i, PQftype(s->res, i), types[i], 564 PQgetisnull(s->res, s->cur, i) ? "<NULL>" : 565 PQfformat(s->res, i) == 0 ? PQgetvalue(s->res, s->cur, i) : 566 "<BINARY>"); 567 assert(PQftype(s->res, i) == types[i]); 568 assert(!PQgetisnull(s->res, s->cur, i)); 569 switch(types[i]) { 570 case INT8OID: 571 *va_arg(ap, int64_t *) = 572 getint(PQgetvalue(s->res, s->cur, i)); 573 break; 574 case OIDOID: 575 *va_arg(ap, Oid *) = 576 getint(PQgetvalue(s->res, s->cur, i)); 577 break; 578 case INT4OID: 579 *va_arg(ap, int32_t *) = 580 getint(PQgetvalue(s->res, s->cur, i)); 581 break; 582 case TEXTOID: 583 *va_arg(ap, char **) = 584 estrdup(PQgetvalue(s->res, s->cur, i)); 585 break; 586 case BYTEA: 587 size = PQgetlength(s->res, s->cur, i); 588 memcpy(va_arg(ap, void *), 589 PQgetvalue(s->res, s->cur, i), size); 590 *va_arg(ap, size_t *) = size; 591 break; 592 default: 593 errx(EXIT_FAILURE, "%s unknown type %u", __func__, 594 types[i]); 595 } 596 } 597 s->cur++; 598 if (s->cur == s->nrows) { 599 PQclear(s->res); 600 s->res = NULL; 601 } 602 return 0; 603 } 604 605 int 606 fetchnext(struct fetchstatus *s, unsigned int n, const Oid *types, ...) 607 { 608 va_list ap; 609 int error; 610 611 va_start(ap, types); 612 error = vfetchnext(s, n, types, ap); 613 va_end(ap); 614 return error; 615 } 616 617 void 618 fetchdone(struct fetchstatus *s) 619 { 620 621 if (s->res != NULL) { 622 PQclear(s->res); 623 s->res = NULL; 624 } 625 if (!s->done) { 626 PGresult *res; 627 unsigned int n; 628 629 n = 0; 630 while ((res = PQgetResult(s->xc->conn)) != NULL) { 631 PQclear(res); 632 n++; 633 } 634 if (n > 0) { 635 DPRINTF("%u rows dropped\n", n); 636 } 637 } 638 } 639 640 int 641 simplefetch(struct Xconn *xc, Oid type, ...) 642 { 643 struct fetchstatus s; 644 va_list ap; 645 int error; 646 647 fetchinit(&s, xc); 648 va_start(ap, type); 649 error = vfetchnext(&s, 1, &type, ap); 650 va_end(ap); 651 assert(error != 0 || s.res == NULL); 652 fetchdone(&s); 653 return error; 654 } 655 656 /* 657 * setlabel: set the descriptive label for the connection. 658 * 659 * we use simple pointer comparison for label equality check. 660 */ 661 static void 662 setlabel(struct Xconn *xc, const char *label) 663 { 664 int error; 665 666 /* 667 * put the label into application_name so that it's shown in 668 * pg_stat_activity. we are sure that our labels don't need 669 * PQescapeStringConn. 670 * 671 * example: 672 * SELECT pid,application_name,query FROM pg_stat_activity 673 * WHERE state <> 'idle' 674 */ 675 676 if (label != NULL && label != xc->label) { 677 struct cmd *c; 678 char cmd_str[1024]; 679 680 snprintf(cmd_str, sizeof(cmd_str), 681 "SET application_name TO 'pgfs: %s'", label); 682 c = createcmd(cmd_str, CMD_NOPREPARE); 683 error = simplecmd(xc, c); 684 freecmd(c); 685 assert(error == 0); 686 xc->label = label; 687 } else { 688 #if 0 /* don't bother to clear label */ 689 static struct cmd *c; 690 691 CREATECMD_NOPARAM(c, "SET application_name TO 'pgfs'"); 692 error = simplecmd(xc, c); 693 assert(error == 0); 694 #endif 695 } 696 } 697 698 struct Xconn * 699 begin(struct puffs_usermount *pu, const char *label) 700 { 701 struct Xconn *xc = getxc(puffs_cc_getcc(pu)); 702 static struct cmd *c; 703 int error; 704 705 setlabel(xc, label); 706 CREATECMD_NOPARAM(c, "BEGIN"); 707 assert(!xc->in_trans); 708 error = simplecmd(xc, c); 709 assert(error == 0); 710 assert(PQtransactionStatus(xc->conn) == PQTRANS_INTRANS); 711 xc->in_trans = true; 712 return xc; 713 } 714 715 struct Xconn * 716 begin_readonly(struct puffs_usermount *pu, const char *label) 717 { 718 struct Xconn *xc = getxc(puffs_cc_getcc(pu)); 719 static struct cmd *c; 720 int error; 721 722 setlabel(xc, label); 723 CREATECMD_NOPARAM(c, "BEGIN READ ONLY"); 724 assert(!xc->in_trans); 725 error = simplecmd(xc, c); 726 assert(error == 0); 727 assert(PQtransactionStatus(xc->conn) == PQTRANS_INTRANS); 728 xc->in_trans = true; 729 return xc; 730 } 731 732 void 733 rollback(struct Xconn *xc) 734 { 735 PGTransactionStatusType status; 736 737 /* 738 * check the status as we are not sure the status of our transaction 739 * after a failed commit. 740 */ 741 status = PQtransactionStatus(xc->conn); 742 assert(status != PQTRANS_ACTIVE); 743 assert(status != PQTRANS_UNKNOWN); 744 if (status != PQTRANS_IDLE) { 745 static struct cmd *c; 746 int error; 747 748 assert(status == PQTRANS_INTRANS || status == PQTRANS_INERROR); 749 CREATECMD_NOPARAM(c, "ROLLBACK"); 750 error = simplecmd(xc, c); 751 assert(error == 0); 752 } 753 DPRINTF("xc %p rollback %p\n", xc, xc->owner); 754 setlabel(xc, NULL); 755 relxc(xc); 756 } 757 758 int 759 commit(struct Xconn *xc) 760 { 761 static struct cmd *c; 762 int error; 763 764 CREATECMD_NOPARAM(c, "COMMIT"); 765 error = simplecmd(xc, c); 766 setlabel(xc, NULL); 767 if (error == 0) { 768 DPRINTF("xc %p commit %p\n", xc, xc->owner); 769 relxc(xc); 770 } 771 return error; 772 } 773 774 int 775 commit_sync(struct Xconn *xc) 776 { 777 static struct cmd *c; 778 int error; 779 780 assert(!pgfs_dosync); 781 CREATECMD_NOPARAM(c, "SET LOCAL SYNCHRONOUS_COMMIT TO ON"); 782 error = simplecmd(xc, c); 783 assert(error == 0); 784 return commit(xc); 785 } 786 787 static void 788 pgfs_notice_receiver(void *vp, const PGresult *res) 789 { 790 struct Xconn *xc = vp; 791 792 assert(PQresultStatus(res) == PGRES_NONFATAL_ERROR); 793 fprintf(stderr, "got a notice on %p\n", xc); 794 dumperror(xc, res); 795 } 796 797 static int 798 pgfs_readframe(struct puffs_usermount *pu, struct puffs_framebuf *pufbuf, 799 int fd, int *done) 800 { 801 struct Xconn *xc; 802 PGconn *conn; 803 804 TAILQ_FOREACH(xc, &xclist, list) { 805 if (PQsocket(xc->conn) == fd) { 806 break; 807 } 808 } 809 assert(xc != NULL); 810 conn = xc->conn; 811 PQconsumeInput(conn); 812 if (!PQisBusy(conn)) { 813 if (xc->blocker != NULL) { 814 DPRINTF("schedule %p\n", xc->blocker); 815 puffs_cc_schedule(xc->blocker); 816 } else { 817 DPRINTF("no blockers\n"); 818 } 819 } 820 *done = 0; 821 return 0; 822 } 823 824 int 825 pgfs_connectdb(struct puffs_usermount *pu, const char *dbname, 826 const char *dbuser, bool debug, bool synchronous, unsigned int nconn) 827 { 828 const char *keywords[3+1]; 829 const char *values[3]; 830 unsigned int i; 831 832 if (nconn > 32) { 833 /* 834 * limit from sizeof(cmd->prepared_mask) 835 */ 836 return EINVAL; 837 } 838 if (debug) { 839 pgfs_dodprintf = true; 840 } 841 if (synchronous) { 842 pgfs_dosync = true; 843 } 844 i = 0; 845 if (dbname != NULL) { 846 keywords[i] = "dbname"; 847 values[i] = dbname; 848 i++; 849 } 850 if (dbuser != NULL) { 851 keywords[i] = "user"; 852 values[i] = dbuser; 853 i++; 854 } 855 keywords[i] = "application_name"; 856 values[i] = "pgfs"; 857 i++; 858 keywords[i] = NULL; 859 puffs_framev_init(pu, pgfs_readframe, NULL, NULL, NULL, NULL); 860 for (i = 0; i < nconn; i++) { 861 struct Xconn *xc; 862 struct Xconn *xc2; 863 static int xcid; 864 PGconn *conn; 865 struct cmd *c; 866 int error; 867 868 conn = PQconnectdbParams(keywords, values, 0); 869 if (conn == NULL) { 870 errx(EXIT_FAILURE, 871 "PQconnectdbParams: unknown failure"); 872 } 873 if (PQstatus(conn) != CONNECTION_OK) { 874 /* 875 * XXX sleep and retry on ERRCODE_CANNOT_CONNECT_NOW 876 */ 877 errx(EXIT_FAILURE, "PQconnectdbParams: %s", 878 PQerrorMessage(conn)); 879 } 880 DPRINTF("protocol version %d\n", PQprotocolVersion(conn)); 881 puffs_framev_addfd(pu, PQsocket(conn), PUFFS_FBIO_READ); 882 xc = emalloc(sizeof(*xc)); 883 xc->conn = conn; 884 xc->blocker = NULL; 885 xc->owner = NULL; 886 xc->in_trans = false; 887 xc->id = xcid++; 888 xc->label = NULL; 889 assert(xc->id < 32); 890 PQsetNoticeReceiver(conn, pgfs_notice_receiver, xc); 891 TAILQ_INSERT_HEAD(&xclist, xc, list); 892 xc2 = begin(pu, NULL); 893 assert(xc2 == xc); 894 c = createcmd("SET search_path TO pgfs", CMD_NOPREPARE); 895 error = simplecmd(xc, c); 896 assert(error == 0); 897 freecmd(c); 898 c = createcmd("SET SESSION CHARACTERISTICS AS " 899 "TRANSACTION ISOLATION LEVEL REPEATABLE READ", 900 CMD_NOPREPARE); 901 error = simplecmd(xc, c); 902 assert(error == 0); 903 freecmd(c); 904 c = createcmd("SET SESSION TIME ZONE UTC", CMD_NOPREPARE); 905 error = simplecmd(xc, c); 906 assert(error == 0); 907 freecmd(c); 908 if (!pgfs_dosync) { 909 c = createcmd("SET SESSION SYNCHRONOUS_COMMIT TO OFF", 910 CMD_NOPREPARE); 911 error = simplecmd(xc, c); 912 assert(error == 0); 913 freecmd(c); 914 } 915 if (debug) { 916 struct fetchstatus s; 917 static const Oid types[] = { INT8OID, }; 918 uint64_t pid; 919 920 c = createcmd("SELECT pg_backend_pid()::int8;", 921 CMD_NOPREPARE); 922 error = sendcmd(xc, c); 923 assert(error == 0); 924 fetchinit(&s, xc); 925 error = FETCHNEXT(&s, types, &pid); 926 fetchdone(&s); 927 assert(error == 0); 928 DPRINTF("xc %p backend pid %" PRIu64 "\n", xc, pid); 929 } 930 error = commit(xc); 931 assert(error == 0); 932 assert(xc->owner == NULL); 933 } 934 /* 935 * XXX cleanup unlinked files here? what to do when the filesystem 936 * is shared? 937 */ 938 return 0; 939 } 940 941 struct waitq flushwaitq = TAILQ_HEAD_INITIALIZER(flushwaitq); 942 struct puffs_cc *flusher = NULL; 943 944 int 945 flush_xacts(struct puffs_usermount *pu) 946 { 947 struct puffs_cc *cc = puffs_cc_getcc(pu); 948 struct Xconn *xc; 949 static struct cmd *c; 950 uint64_t dummy; 951 int error; 952 953 /* 954 * flush all previously issued asynchronous transactions. 955 * 956 * XXX 957 * unfortunately it seems that there is no clean way to tell 958 * PostgreSQL flush XLOG. we could perform a CHECKPOINT but it's 959 * too expensive and overkill for our purpose. 960 * besides, PostgreSQL has an optimization to skip XLOG flushing 961 * for transactions which didn't produce WAL records. 962 * (changeset f6a0863e3cb72763490ceca2c558d5ef2dddd5f2) 963 * it means that an empty transaction ("BEGIN; COMMIT;"), which 964 * doesn't produce any WAL records, doesn't flush the XLOG even if 965 * synchronous_commit=on. we issues a dummy setval() to avoid the 966 * optimization. 967 * on the other hand, we try to avoid creating unnecessary WAL activity 968 * by serializing flushing and checking XLOG locations. 969 */ 970 971 assert(!pgfs_dosync); 972 if (flusher != NULL) { /* serialize flushers */ 973 DPRINTF("%p flush in progress %p\n", cc, flusher); 974 waiton(&flushwaitq, cc); 975 assert(flusher == NULL); 976 } 977 DPRINTF("%p start flushing\n", cc); 978 flusher = cc; 979 retry: 980 xc = begin(pu, "flush"); 981 CREATECMD_NOPARAM(c, "SELECT setval('dummyseq', 1) WHERE " 982 "pg_current_xlog_insert_location() <> pg_current_xlog_location()"); 983 error = sendcmd(xc, c); 984 if (error != 0) { 985 goto got_error; 986 } 987 error = simplefetch(xc, INT8OID, &dummy); 988 assert(error != 0 || dummy == 1); 989 if (error == ENOENT) { 990 /* 991 * there seems to be nothing to flush. 992 */ 993 DPRINTF("%p no sync\n", cc); 994 error = 0; 995 } 996 if (error != 0) { 997 goto got_error; 998 } 999 error = commit_sync(xc); 1000 if (error != 0) { 1001 goto got_error; 1002 } 1003 goto done; 1004 got_error: 1005 rollback(xc); 1006 if (error == EAGAIN) { 1007 goto retry; 1008 } 1009 done: 1010 assert(flusher == cc); 1011 flusher = NULL; 1012 wakeup_one(&flushwaitq); 1013 DPRINTF("%p end flushing error=%d\n", cc, error); 1014 return error; 1015 } 1016