1 /* $NetBSD: pgfs_subs.c,v 1.5 2012/04/11 14:28:18 yamt Exp $ */ 2 3 /*- 4 * Copyright (c)2010,2011 YAMAMOTO Takashi, 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 1. Redistributions of source code must retain the above copyright 11 * notice, this list of conditions and the following disclaimer. 12 * 2. Redistributions in binary form must reproduce the above copyright 13 * notice, this list of conditions and the following disclaimer in the 14 * documentation and/or other materials provided with the distribution. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 17 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 19 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 20 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 21 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 22 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 23 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 24 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 25 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 26 * SUCH DAMAGE. 27 */ 28 29 /* 30 * a file system server which stores the data in a PostgreSQL database. 31 */ 32 33 /* 34 * we use large objects to store file contents. there are a few XXXs wrt it. 35 * 36 * - large objects don't obey the normal transaction semantics. 37 * 38 * - we use large object server-side functions directly (instead of via the 39 * libpq large object api) because: 40 * - we want to use asynchronous (in the sense of PQsendFoo) operations 41 * which is not available with the libpq large object api. 42 * - with the libpq large object api, there's no way to know details of 43 * an error because PGresult is freed in the library without saving 44 * PG_DIAG_SQLSTATE etc. 45 */ 46 47 #include <sys/cdefs.h> 48 #ifndef lint 49 __RCSID("$NetBSD: pgfs_subs.c,v 1.5 2012/04/11 14:28:18 yamt Exp $"); 50 #endif /* not lint */ 51 52 #include <assert.h> 53 #include <err.h> 54 #include <errno.h> 55 #include <puffs.h> 56 #include <inttypes.h> 57 #include <stdarg.h> 58 #include <stdbool.h> 59 #include <stdio.h> 60 #include <stdlib.h> 61 #include <time.h> 62 #include <util.h> 63 64 #include <libpq-fe.h> 65 #include <libpq/libpq-fs.h> /* INV_* */ 66 67 #include "pgfs.h" 68 #include "pgfs_db.h" 69 #include "pgfs_debug.h" 70 #include "pgfs_waitq.h" 71 #include "pgfs_subs.h" 72 73 const char * const vtype_table[] = { 74 [VREG] = "regular", 75 [VDIR] = "directory", 76 [VLNK] = "link", 77 }; 78 79 static unsigned int 80 tovtype(const char *type) 81 { 82 unsigned int i; 83 84 for (i = 0; i < __arraycount(vtype_table); i++) { 85 if (vtype_table[i] == NULL) { 86 continue; 87 } 88 if (!strcmp(type, vtype_table[i])) { 89 return i; 90 } 91 } 92 assert(0); 93 return 0; 94 } 95 96 static const char * 97 fromvtype(enum vtype vtype) 98 { 99 100 if (vtype < __arraycount(vtype_table)) { 101 assert(vtype_table[vtype] != NULL); 102 return vtype_table[vtype]; 103 } 104 return NULL; 105 } 106 107 /* 108 * fileid_lock stuff below is to keep ordering of operations for a file. 109 * it is a workaround for the lack of operation barriers in the puffs 110 * protocol. 111 * 112 * currently we do this locking only for SETATTR, GETATTR, and WRITE as 113 * they are known to be reorder-unsafe. they are sensitive to the file 114 * attributes, mainly the file size. note that as the kernel issues async 115 * SETATTR/WRITE requests, vnode lock doesn't prevent GETATTR from seeing 116 * the stale attributes. 117 * 118 * we are relying on waiton/wakeup being a FIFO. 119 */ 120 121 struct fileid_lock_handle { 122 TAILQ_ENTRY(fileid_lock_handle) list; 123 fileid_t fileid; 124 struct puffs_cc *owner; /* diagnostic only */ 125 struct waitq waitq; 126 }; 127 128 TAILQ_HEAD(, fileid_lock_handle) fileid_lock_list = 129 TAILQ_HEAD_INITIALIZER(fileid_lock_list); 130 struct waitq fileid_lock_waitq = TAILQ_HEAD_INITIALIZER(fileid_lock_waitq); 131 132 /* 133 * fileid_lock: serialize requests for the fileid. 134 * 135 * this function should be the first yieldable point in a puffs callback. 136 */ 137 138 struct fileid_lock_handle * 139 fileid_lock(fileid_t fileid, struct puffs_cc *cc) 140 { 141 struct fileid_lock_handle *lock; 142 143 TAILQ_FOREACH(lock, &fileid_lock_list, list) { 144 if (lock->fileid == fileid) { 145 DPRINTF("fileid wait %" PRIu64 " cc %p\n", fileid, cc); 146 assert(lock->owner != cc); 147 waiton(&lock->waitq, cc); /* enter FIFO */ 148 assert(lock->owner == cc); 149 return lock; 150 } 151 } 152 lock = emalloc(sizeof(*lock)); 153 lock->fileid = fileid; 154 lock->owner = cc; 155 DPRINTF("fileid lock %" PRIu64 " cc %p\n", lock->fileid, cc); 156 waitq_init(&lock->waitq); 157 TAILQ_INSERT_HEAD(&fileid_lock_list, lock, list); 158 return lock; 159 } 160 161 void 162 fileid_unlock(struct fileid_lock_handle *lock) 163 { 164 165 DPRINTF("fileid unlock %" PRIu64 "\n", lock->fileid); 166 assert(lock != NULL); 167 assert(lock->owner != NULL); 168 /* 169 * perform direct-handoff to the first waiter. 170 * 171 * a handoff is essential to keep the order of requests. 172 */ 173 lock->owner = wakeup_one(&lock->waitq); 174 if (lock->owner != NULL) { 175 return; 176 } 177 /* 178 * no one is waiting this fileid. 179 */ 180 TAILQ_REMOVE(&fileid_lock_list, lock, list); 181 free(lock); 182 } 183 184 /* 185 * timespec_to_pgtimestamp: create a text representation of timestamp which 186 * can be recognized by the database server. 187 * 188 * it's caller's responsibility to free(3) the result. 189 */ 190 191 int 192 timespec_to_pgtimestamp(const struct timespec *tv, char **resultp) 193 { 194 /* 195 * XXX is there any smarter way? 196 */ 197 char buf1[1024]; 198 char buf2[1024]; 199 struct tm tm_store; 200 struct tm *tm; 201 202 tm = gmtime_r(&tv->tv_sec, &tm_store); 203 if (tm == NULL) { 204 assert(errno != 0); 205 return errno; 206 } 207 strftime(buf1, sizeof(buf1), "%Y%m%dT%H%M%S", tm); 208 snprintf(buf2, sizeof(buf2), "%s.%ju", buf1, 209 (uintmax_t)tv->tv_nsec / 1000); 210 *resultp = estrdup(buf2); 211 return 0; 212 } 213 214 int 215 my_lo_truncate(struct Xconn *xc, int32_t fd, int32_t size) 216 { 217 static struct cmd *c; 218 int32_t ret; 219 int error; 220 221 CREATECMD(c, "SELECT lo_truncate($1, $2)", INT4OID, INT4OID); 222 error = sendcmd(xc, c, fd, size); 223 if (error != 0) { 224 return error; 225 } 226 error = simplefetch(xc, INT4OID, &ret); 227 if (error != 0) { 228 if (error == EEXIST) { 229 /* 230 * probably the insertion of the new-sized page 231 * caused a duplicated key error. retry. 232 */ 233 DPRINTF("map EEXIST to EAGAIN\n"); 234 error = EAGAIN; 235 } 236 return error; 237 } 238 assert(ret == 0); 239 return 0; 240 } 241 242 int 243 my_lo_lseek(struct Xconn *xc, int32_t fd, int32_t offset, int32_t whence, 244 int32_t *retp) 245 { 246 static struct cmd *c; 247 int32_t ret; 248 int error; 249 250 CREATECMD(c, "SELECT lo_lseek($1, $2, $3)", INT4OID, INT4OID, INT4OID); 251 error = sendcmd(xc, c, fd, offset, whence); 252 if (error != 0) { 253 return error; 254 } 255 error = simplefetch(xc, INT4OID, &ret); 256 if (error != 0) { 257 return error; 258 } 259 if (retp != NULL) { 260 *retp = ret; 261 } 262 return 0; 263 } 264 265 int 266 my_lo_read(struct Xconn *xc, int32_t fd, void *buf, size_t size, 267 size_t *resultsizep) 268 { 269 static struct cmd *c; 270 size_t resultsize; 271 int error; 272 273 CREATECMD(c, "SELECT loread($1, $2)", INT4OID, INT4OID); 274 error = sendcmdx(xc, 1, c, fd, (int32_t)size); 275 if (error != 0) { 276 return error; 277 } 278 error = simplefetch(xc, BYTEA, buf, &resultsize); 279 if (error != 0) { 280 return error; 281 } 282 *resultsizep = resultsize; 283 if (size != resultsize) { 284 DPRINTF("shortread? %zu != %zu\n", size, resultsize); 285 } 286 return 0; 287 } 288 289 int 290 my_lo_write(struct Xconn *xc, int32_t fd, const void *buf, size_t size, 291 size_t *resultsizep) 292 { 293 static struct cmd *c; 294 int32_t resultsize; 295 int error; 296 297 CREATECMD(c, "SELECT lowrite($1, $2)", INT4OID, BYTEA); 298 error = sendcmd(xc, c, fd, buf, (int32_t)size); 299 if (error != 0) { 300 return error; 301 } 302 error = simplefetch(xc, INT4OID, &resultsize); 303 if (error != 0) { 304 if (error == EEXIST) { 305 /* 306 * probably the insertion of the new data page 307 * caused a duplicated key error. retry. 308 */ 309 DPRINTF("map EEXIST to EAGAIN\n"); 310 error = EAGAIN; 311 } 312 return error; 313 } 314 *resultsizep = resultsize; 315 if (size != (size_t)resultsize) { 316 DPRINTF("shortwrite? %zu != %zu\n", size, (size_t)resultsize); 317 } 318 return 0; 319 } 320 321 int 322 my_lo_open(struct Xconn *xc, Oid loid, int32_t mode, int32_t *fdp) 323 { 324 static struct cmd *c; 325 int error; 326 327 CREATECMD(c, "SELECT lo_open($1, $2)", OIDOID, INT4OID); 328 error = sendcmd(xc, c, loid, mode); 329 if (error != 0) { 330 return error; 331 } 332 return simplefetch(xc, INT4OID, fdp); 333 } 334 335 int 336 my_lo_close(struct Xconn *xc, int32_t fd) 337 { 338 #if 1 339 /* 340 * do nothing. 341 * 342 * LO handles are automatically closed at the end of transactions. 343 * our transactions are small enough. 344 */ 345 #else 346 static struct cmd *c; 347 int32_t ret; 348 int error; 349 350 CREATECMD(c, "SELECT lo_close($1)", INT4OID); 351 error = sendcmd(xc, c, fd); 352 if (error != 0) { 353 return error; 354 } 355 error = simplefetch(xc, INT4OID, &ret); 356 if (error != 0) { 357 return error; 358 } 359 assert(ret == 0); 360 #endif 361 return 0; 362 } 363 364 static int 365 lo_lookup_by_fileid(struct Xconn *xc, fileid_t fileid, Oid *idp) 366 { 367 static struct cmd *c; 368 static const Oid types[] = { OIDOID, }; 369 struct fetchstatus s; 370 int error; 371 372 CREATECMD(c, "SELECT loid FROM datafork WHERE fileid = $1", INT8OID); 373 error = sendcmd(xc, c, fileid); 374 if (error != 0) { 375 return error; 376 } 377 fetchinit(&s, xc); 378 error = FETCHNEXT(&s, types, idp); 379 fetchdone(&s); 380 DPRINTF("error %d\n", error); 381 return error; 382 } 383 384 int 385 lo_open_by_fileid(struct Xconn *xc, fileid_t fileid, int mode, int *fdp) 386 { 387 Oid loid; 388 int fd; 389 int error; 390 391 error = lo_lookup_by_fileid(xc, fileid, &loid); 392 if (error != 0) { 393 return error; 394 } 395 error = my_lo_open(xc, loid, mode, &fd); 396 if (error != 0) { 397 return error; 398 } 399 *fdp = fd; 400 return 0; 401 } 402 403 static int 404 getsize(struct Xconn *xc, fileid_t fileid, int *resultp) 405 { 406 int32_t size; 407 int fd; 408 int error; 409 410 error = lo_open_by_fileid(xc, fileid, INV_READ, &fd); 411 if (error != 0) { 412 return error; 413 } 414 error = my_lo_lseek(xc, fd, 0, SEEK_END, &size); 415 if (error != 0) { 416 return error; 417 } 418 error = my_lo_close(xc, fd); 419 if (error != 0) { 420 return error; 421 } 422 *resultp = size; 423 return 0; 424 } 425 426 #define GETATTR_TYPE 0x00000001 427 #define GETATTR_NLINK 0x00000002 428 #define GETATTR_SIZE 0x00000004 429 #define GETATTR_MODE 0x00000008 430 #define GETATTR_UID 0x00000010 431 #define GETATTR_GID 0x00000020 432 #define GETATTR_TIME 0x00000040 433 #define GETATTR_ALL \ 434 (GETATTR_TYPE|GETATTR_NLINK|GETATTR_SIZE|GETATTR_MODE| \ 435 GETATTR_UID|GETATTR_GID|GETATTR_TIME) 436 437 int 438 getattr(struct Xconn *xc, fileid_t fileid, struct vattr *va, unsigned int mask) 439 { 440 char *type; 441 long long atime_s; 442 long long atime_us; 443 long long ctime_s; 444 long long ctime_us; 445 long long mtime_s; 446 long long mtime_us; 447 long long btime_s; 448 long long btime_us; 449 uint64_t mode; 450 long long uid; 451 long long gid; 452 long long nlink; 453 long long rev; 454 struct fetchstatus s; 455 int error; 456 457 if (mask == 0) { 458 return 0; 459 } 460 /* 461 * unless explicitly requested, avoid fetching timestamps as they 462 * are a little more expensive than other simple attributes. 463 */ 464 if ((mask & GETATTR_TIME) != 0) { 465 static struct cmd *c; 466 static const Oid types[] = { 467 TEXTOID, 468 INT8OID, 469 INT8OID, 470 INT8OID, 471 INT8OID, 472 INT8OID, 473 INT8OID, 474 INT8OID, 475 INT8OID, 476 INT8OID, 477 INT8OID, 478 INT8OID, 479 INT8OID, 480 INT8OID, 481 }; 482 483 CREATECMD(c, "SELECT type::text, mode, uid, gid, nlink, rev, " 484 "extract(epoch from date_trunc('second', atime))::int8, " 485 "extract(microseconds from atime)::int8, " 486 "extract(epoch from date_trunc('second', ctime))::int8, " 487 "extract(microseconds from ctime)::int8, " 488 "extract(epoch from date_trunc('second', mtime))::int8, " 489 "extract(microseconds from mtime)::int8, " 490 "extract(epoch from date_trunc('second', btime))::int8, " 491 "extract(microseconds from btime)::int8 " 492 "FROM file " 493 "WHERE fileid = $1", INT8OID); 494 error = sendcmd(xc, c, fileid); 495 if (error != 0) { 496 return error; 497 } 498 fetchinit(&s, xc); 499 error = FETCHNEXT(&s, types, &type, &mode, &uid, &gid, &nlink, 500 &rev, 501 &atime_s, &atime_us, 502 &ctime_s, &ctime_us, 503 &mtime_s, &mtime_us, 504 &btime_s, &btime_us); 505 } else { 506 static struct cmd *c; 507 static const Oid types[] = { 508 TEXTOID, 509 INT8OID, 510 INT8OID, 511 INT8OID, 512 INT8OID, 513 INT8OID, 514 }; 515 516 CREATECMD(c, "SELECT type::text, mode, uid, gid, nlink, rev " 517 "FROM file " 518 "WHERE fileid = $1", INT8OID); 519 error = sendcmd(xc, c, fileid); 520 if (error != 0) { 521 return error; 522 } 523 fetchinit(&s, xc); 524 error = FETCHNEXT(&s, types, &type, &mode, &uid, &gid, &nlink, 525 &rev); 526 } 527 fetchdone(&s); 528 if (error != 0) { 529 return error; 530 } 531 memset(va, 0xaa, sizeof(*va)); /* fill with garbage for debug */ 532 va->va_type = tovtype(type); 533 free(type); 534 va->va_mode = mode; 535 va->va_uid = uid; 536 va->va_gid = gid; 537 if (nlink > 0 && va->va_type == VDIR) { 538 nlink++; /* "." */ 539 } 540 va->va_nlink = nlink; 541 va->va_fileid = fileid; 542 va->va_atime.tv_sec = atime_s; 543 va->va_atime.tv_nsec = atime_us * 1000; 544 va->va_ctime.tv_sec = ctime_s; 545 va->va_ctime.tv_nsec = ctime_us * 1000; 546 va->va_mtime.tv_sec = mtime_s; 547 va->va_mtime.tv_nsec = mtime_us * 1000; 548 va->va_birthtime.tv_sec = btime_s; 549 va->va_birthtime.tv_nsec = btime_us * 1000; 550 va->va_blocksize = LOBLKSIZE; 551 va->va_gen = 1; 552 va->va_filerev = rev; 553 if ((mask & GETATTR_SIZE) != 0) { 554 int size; 555 556 size = 0; 557 if (va->va_type == VREG || va->va_type == VLNK) { 558 error = getsize(xc, fileid, &size); 559 if (error != 0) { 560 return error; 561 } 562 } else if (va->va_type == VDIR) { 563 size = 100; /* XXX */ 564 } 565 va->va_size = size; 566 } 567 /* 568 * XXX va_bytes: likely wrong due to toast compression. 569 * there's no cheap way to get the compressed size of LO. 570 */ 571 va->va_bytes = va->va_size; 572 va->va_flags = 0; 573 return 0; 574 } 575 576 int 577 update_mctime(struct Xconn *xc, fileid_t fileid) 578 { 579 static struct cmd *c; 580 581 CREATECMD(c, 582 "UPDATE file " 583 "SET mtime = current_timestamp, ctime = current_timestamp, " 584 "rev = rev + 1 " 585 "WHERE fileid = $1", INT8OID); 586 return simplecmd(xc, c, fileid); 587 } 588 589 int 590 update_atime(struct Xconn *xc, fileid_t fileid) 591 { 592 static struct cmd *c; 593 594 CREATECMD(c, 595 "UPDATE file SET atime = current_timestamp WHERE fileid = $1", 596 INT8OID); 597 return simplecmd(xc, c, fileid); 598 } 599 600 int 601 update_mtime(struct Xconn *xc, fileid_t fileid) 602 { 603 static struct cmd *c; 604 605 CREATECMD(c, 606 "UPDATE file " 607 "SET mtime = current_timestamp, rev = rev + 1 " 608 "WHERE fileid = $1", INT8OID); 609 return simplecmd(xc, c, fileid); 610 } 611 612 int 613 update_ctime(struct Xconn *xc, fileid_t fileid) 614 { 615 static struct cmd *c; 616 617 CREATECMD(c, 618 "UPDATE file SET ctime = current_timestamp WHERE fileid = $1", 619 INT8OID); 620 return simplecmd(xc, c, fileid); 621 } 622 623 int 624 update_nlink(struct Xconn *xc, fileid_t fileid, int delta) 625 { 626 static struct cmd *c; 627 628 CREATECMD(c, 629 "UPDATE file " 630 "SET nlink = nlink + $1 " 631 "WHERE fileid = $2", 632 INT8OID, INT8OID); 633 return simplecmd(xc, c, (int64_t)delta, fileid); 634 } 635 636 int 637 lookupp(struct Xconn *xc, fileid_t fileid, fileid_t *parent) 638 { 639 static struct cmd *c; 640 static const Oid types[] = { INT8OID, }; 641 struct fetchstatus s; 642 int error; 643 644 CREATECMD(c, "SELECT parent_fileid FROM dirent " 645 "WHERE child_fileid = $1 LIMIT 1", INT8OID); 646 error = sendcmd(xc, c, fileid); 647 if (error != 0) { 648 return error; 649 } 650 fetchinit(&s, xc); 651 error = FETCHNEXT(&s, types, parent); 652 fetchdone(&s); 653 if (error != 0) { 654 return error; 655 } 656 return 0; 657 } 658 659 int 660 mkfile(struct Xconn *xc, enum vtype vtype, mode_t mode, uid_t uid, gid_t gid, 661 fileid_t *idp) 662 { 663 static struct cmd *c; 664 const char *type; 665 int error; 666 667 type = fromvtype(vtype); 668 if (type == NULL) { 669 return EOPNOTSUPP; 670 } 671 CREATECMD(c, 672 "INSERT INTO file " 673 "(fileid, type, mode, uid, gid, nlink, rev, " 674 "atime, ctime, mtime, btime) " 675 "VALUES(nextval('fileid_seq'), $1::filetype, $2, $3, $4, 0, 0, " 676 "current_timestamp, " 677 "current_timestamp, " 678 "current_timestamp, " 679 "current_timestamp) " 680 "RETURNING fileid", TEXTOID, INT8OID, INT8OID, INT8OID); 681 error = sendcmd(xc, c, type, (uint64_t)mode, (uint64_t)uid, 682 (uint64_t)gid); 683 if (error != 0) { 684 return error; 685 } 686 return simplefetch(xc, INT8OID, idp); 687 } 688 689 int 690 linkfile(struct Xconn *xc, fileid_t parent, const char *name, fileid_t child) 691 { 692 static struct cmd *c; 693 int error; 694 695 CREATECMD(c, 696 "INSERT INTO dirent " 697 "(parent_fileid, name, child_fileid) " 698 "VALUES($1, $2, $3)", INT8OID, TEXTOID, INT8OID); 699 error = simplecmd(xc, c, parent, name, child); 700 if (error != 0) { 701 return error; 702 } 703 error = update_nlink(xc, child, 1); 704 if (error != 0) { 705 return error; 706 } 707 return update_mtime(xc, parent); 708 } 709 710 int 711 unlinkfile(struct Xconn *xc, fileid_t parent, const char *name, fileid_t child) 712 { 713 static struct cmd *c; 714 int error; 715 716 /* 717 * in addition to the primary key, we check child_fileid as well here 718 * to avoid removing an entry which was appeared after our VOP_LOOKUP. 719 */ 720 CREATECMD(c, 721 "DELETE FROM dirent " 722 "WHERE parent_fileid = $1 AND name = $2 AND child_fileid = $3", 723 INT8OID, TEXTOID, INT8OID); 724 error = simplecmd(xc, c, parent, name, child); 725 if (error != 0) { 726 return error; 727 } 728 error = update_nlink(xc, child, -1); 729 if (error != 0) { 730 return error; 731 } 732 error = update_mtime(xc, parent); 733 if (error != 0) { 734 return error; 735 } 736 return update_ctime(xc, child); 737 } 738 739 int 740 mklinkfile(struct Xconn *xc, fileid_t parent, const char *name, 741 enum vtype vtype, mode_t mode, uid_t uid, gid_t gid, fileid_t *idp) 742 { 743 fileid_t fileid; 744 int error; 745 746 error = mkfile(xc, vtype, mode, uid, gid, &fileid); 747 if (error != 0) { 748 return error; 749 } 750 error = linkfile(xc, parent, name, fileid); 751 if (error != 0) { 752 return error; 753 } 754 if (idp != NULL) { 755 *idp = fileid; 756 } 757 return 0; 758 } 759 760 int 761 mklinkfile_lo(struct Xconn *xc, fileid_t parent_fileid, const char *name, 762 enum vtype vtype, mode_t mode, uid_t uid, gid_t gid, fileid_t *fileidp, 763 int *loidp) 764 { 765 static struct cmd *c; 766 fileid_t new_fileid; 767 int loid; 768 int error; 769 770 error = mklinkfile(xc, parent_fileid, name, vtype, mode, uid, gid, 771 &new_fileid); 772 if (error != 0) { 773 return error; 774 } 775 CREATECMD(c, 776 "INSERT INTO datafork (fileid, loid) " 777 "VALUES($1, lo_creat(-1)) " 778 "RETURNING loid", INT8OID); 779 error = sendcmd(xc, c, new_fileid); 780 if (error != 0) { 781 return error; 782 } 783 error = simplefetch(xc, OIDOID, &loid); 784 if (error != 0) { 785 return error; 786 } 787 if (fileidp != NULL) { 788 *fileidp = new_fileid; 789 } 790 if (loidp != NULL) { 791 *loidp = loid; 792 } 793 return 0; 794 } 795 796 int 797 cleanupfile(struct Xconn *xc, fileid_t fileid) 798 { 799 static struct cmd *c; 800 char *type; 801 unsigned int vtype; 802 int error; 803 804 CREATECMD(c, "DELETE FROM file WHERE fileid = $1 AND nlink = 0 " 805 "RETURNING type::text", INT8OID); 806 error = sendcmd(xc, c, fileid); 807 if (error != 0) { 808 return error; 809 } 810 error = simplefetch(xc, TEXTOID, &type); 811 if (error == ENOENT) { 812 return 0; /* probably nlink > 0 */ 813 } 814 if (error != 0) { 815 return error; 816 } 817 vtype = tovtype(type); 818 free(type); 819 if (vtype == VREG || vtype == VLNK) { 820 static struct cmd *c_datafork; 821 int32_t ret; 822 823 CREATECMD(c_datafork, 824 "WITH loids AS (DELETE FROM datafork WHERE fileid = $1 " 825 "RETURNING loid) SELECT lo_unlink(loid) FROM loids", 826 INT8OID); 827 error = sendcmd(xc, c_datafork, fileid); 828 if (error != 0) { 829 return error; 830 } 831 error = simplefetch(xc, INT4OID, &ret); 832 if (error != 0) { 833 return error; 834 } 835 if (ret != 1) { 836 return EIO; /* lo_unlink failed */ 837 } 838 } 839 return 0; 840 } 841 842 /* 843 * check_path: do locking and check to prevent a rename from creating loop. 844 * 845 * lock the dirents between child_fileid and the root directory. 846 * if gate_fileid is appeared in the path, return EINVAL. 847 * caller should ensure that child_fileid is of VDIR beforehand. 848 * 849 * we uses FOR SHARE row level locks as poor man's predicate locks. 850 * 851 * the following is an example to show why we need to lock the path. 852 * 853 * consider: 854 * "mkdir -p /a/b/c/d/e/f && mkdir -p /1/2/3/4/5/6" 855 * and then 856 * thread 1 is doing "mv /a/b /1/2/3/4/5/6" 857 * thread 2 is doing "mv /1/2 /a/b/c/d/e/f" 858 * 859 * a possible consequence: 860 * thread 1: check_path -> success 861 * thread 2: check_path -> success 862 * thread 1: modify directories -> block on row-level lock 863 * thread 2: modify directories -> block on row-level lock 864 * -> deadlock detected 865 * -> rollback and retry 866 * 867 * another possible consequence: 868 * thread 1: check_path -> success 869 * thread 1: modify directory entries -> success 870 * thread 2: check_path -> block on row-level lock 871 * thread 1: commit 872 * thread 2: acquire the lock and notices the row is updated 873 * -> serialization error 874 * -> rollback and retry 875 * 876 * XXX it might be better to use real serializable transactions, 877 * which will be available for PostgreSQL 9.1 878 */ 879 880 int 881 check_path(struct Xconn *xc, fileid_t gate_fileid, fileid_t child_fileid) 882 { 883 static struct cmd *c; 884 fileid_t parent_fileid; 885 struct fetchstatus s; 886 int error; 887 888 CREATECMD(c, 889 "WITH RECURSIVE r AS " 890 "( " 891 "SELECT parent_fileid, cookie, child_fileid " 892 "FROM dirent " 893 "WHERE child_fileid = $1 " 894 "UNION ALL " 895 "SELECT d.parent_fileid, d.cookie, " 896 "d.child_fileid " 897 "FROM dirent AS d INNER JOIN r " 898 "ON d.child_fileid = r.parent_fileid " 899 ") " 900 "SELECT d.parent_fileid " 901 "FROM dirent d " 902 "JOIN r " 903 "ON d.cookie = r.cookie " 904 "FOR SHARE", INT8OID); 905 error = sendcmd(xc, c, child_fileid); 906 if (error != 0) { 907 return error; 908 } 909 fetchinit(&s, xc); 910 do { 911 static const Oid types[] = { INT8OID, }; 912 913 error = FETCHNEXT(&s, types, &parent_fileid); 914 if (error == ENOENT) { 915 fetchdone(&s); 916 return 0; 917 } 918 if (error != 0) { 919 fetchdone(&s); 920 return error; 921 } 922 } while (gate_fileid != parent_fileid); 923 fetchdone(&s); 924 return EINVAL; 925 } 926 927 int 928 isempty(struct Xconn *xc, fileid_t fileid, bool *emptyp) 929 { 930 int32_t dummy; 931 static struct cmd *c; 932 int error; 933 934 CREATECMD(c, 935 "SELECT 1 FROM dirent " 936 "WHERE parent_fileid = $1 LIMIT 1", INT8OID); 937 error = sendcmd(xc, c, fileid); 938 if (error != 0) { 939 return error; 940 } 941 error = simplefetch(xc, INT4OID, &dummy); 942 assert(error != 0 || dummy == 1); 943 if (error == ENOENT) { 944 *emptyp = true; 945 error = 0; 946 } else { 947 *emptyp = false; 948 } 949 return error; 950 } 951