1 /* $NetBSD: pgfs_subs.c,v 1.5 2012/04/11 14:28:18 yamt Exp $ */
2
3 /*-
4 * Copyright (c)2010,2011 YAMAMOTO Takashi,
5 * All rights reserved.
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
9 * are met:
10 * 1. Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 * 2. Redistributions in binary form must reproduce the above copyright
13 * notice, this list of conditions and the following disclaimer in the
14 * documentation and/or other materials provided with the distribution.
15 *
16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
17 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
20 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
21 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
22 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
23 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
24 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
25 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
26 * SUCH DAMAGE.
27 */
28
29 /*
30 * a file system server which stores the data in a PostgreSQL database.
31 */
32
33 /*
34 * we use large objects to store file contents. there are a few XXXs wrt it.
35 *
36 * - large objects don't obey the normal transaction semantics.
37 *
38 * - we use large object server-side functions directly (instead of via the
39 * libpq large object api) because:
40 * - we want to use asynchronous (in the sense of PQsendFoo) operations
41 * which is not available with the libpq large object api.
42 * - with the libpq large object api, there's no way to know details of
43 * an error because PGresult is freed in the library without saving
44 * PG_DIAG_SQLSTATE etc.
45 */
46
47 #include <sys/cdefs.h>
48 #ifndef lint
49 __RCSID("$NetBSD: pgfs_subs.c,v 1.5 2012/04/11 14:28:18 yamt Exp $");
50 #endif /* not lint */
51
52 #include <assert.h>
53 #include <err.h>
54 #include <errno.h>
55 #include <puffs.h>
56 #include <inttypes.h>
57 #include <stdarg.h>
58 #include <stdbool.h>
59 #include <stdio.h>
60 #include <stdlib.h>
61 #include <time.h>
62 #include <util.h>
63
64 #include <libpq-fe.h>
65 #include <libpq/libpq-fs.h> /* INV_* */
66
67 #include "pgfs.h"
68 #include "pgfs_db.h"
69 #include "pgfs_debug.h"
70 #include "pgfs_waitq.h"
71 #include "pgfs_subs.h"
72
73 const char * const vtype_table[] = {
74 [VREG] = "regular",
75 [VDIR] = "directory",
76 [VLNK] = "link",
77 };
78
79 static unsigned int
tovtype(const char * type)80 tovtype(const char *type)
81 {
82 unsigned int i;
83
84 for (i = 0; i < __arraycount(vtype_table); i++) {
85 if (vtype_table[i] == NULL) {
86 continue;
87 }
88 if (!strcmp(type, vtype_table[i])) {
89 return i;
90 }
91 }
92 assert(0);
93 return 0;
94 }
95
96 static const char *
fromvtype(enum vtype vtype)97 fromvtype(enum vtype vtype)
98 {
99
100 if (vtype < __arraycount(vtype_table)) {
101 assert(vtype_table[vtype] != NULL);
102 return vtype_table[vtype];
103 }
104 return NULL;
105 }
106
107 /*
108 * fileid_lock stuff below is to keep ordering of operations for a file.
109 * it is a workaround for the lack of operation barriers in the puffs
110 * protocol.
111 *
112 * currently we do this locking only for SETATTR, GETATTR, and WRITE as
113 * they are known to be reorder-unsafe. they are sensitive to the file
114 * attributes, mainly the file size. note that as the kernel issues async
115 * SETATTR/WRITE requests, vnode lock doesn't prevent GETATTR from seeing
116 * the stale attributes.
117 *
118 * we are relying on waiton/wakeup being a FIFO.
119 */
120
121 struct fileid_lock_handle {
122 TAILQ_ENTRY(fileid_lock_handle) list;
123 fileid_t fileid;
124 struct puffs_cc *owner; /* diagnostic only */
125 struct waitq waitq;
126 };
127
128 TAILQ_HEAD(, fileid_lock_handle) fileid_lock_list =
129 TAILQ_HEAD_INITIALIZER(fileid_lock_list);
130 struct waitq fileid_lock_waitq = TAILQ_HEAD_INITIALIZER(fileid_lock_waitq);
131
132 /*
133 * fileid_lock: serialize requests for the fileid.
134 *
135 * this function should be the first yieldable point in a puffs callback.
136 */
137
138 struct fileid_lock_handle *
fileid_lock(fileid_t fileid,struct puffs_cc * cc)139 fileid_lock(fileid_t fileid, struct puffs_cc *cc)
140 {
141 struct fileid_lock_handle *lock;
142
143 TAILQ_FOREACH(lock, &fileid_lock_list, list) {
144 if (lock->fileid == fileid) {
145 DPRINTF("fileid wait %" PRIu64 " cc %p\n", fileid, cc);
146 assert(lock->owner != cc);
147 waiton(&lock->waitq, cc); /* enter FIFO */
148 assert(lock->owner == cc);
149 return lock;
150 }
151 }
152 lock = emalloc(sizeof(*lock));
153 lock->fileid = fileid;
154 lock->owner = cc;
155 DPRINTF("fileid lock %" PRIu64 " cc %p\n", lock->fileid, cc);
156 waitq_init(&lock->waitq);
157 TAILQ_INSERT_HEAD(&fileid_lock_list, lock, list);
158 return lock;
159 }
160
161 void
fileid_unlock(struct fileid_lock_handle * lock)162 fileid_unlock(struct fileid_lock_handle *lock)
163 {
164
165 DPRINTF("fileid unlock %" PRIu64 "\n", lock->fileid);
166 assert(lock != NULL);
167 assert(lock->owner != NULL);
168 /*
169 * perform direct-handoff to the first waiter.
170 *
171 * a handoff is essential to keep the order of requests.
172 */
173 lock->owner = wakeup_one(&lock->waitq);
174 if (lock->owner != NULL) {
175 return;
176 }
177 /*
178 * no one is waiting this fileid.
179 */
180 TAILQ_REMOVE(&fileid_lock_list, lock, list);
181 free(lock);
182 }
183
184 /*
185 * timespec_to_pgtimestamp: create a text representation of timestamp which
186 * can be recognized by the database server.
187 *
188 * it's caller's responsibility to free(3) the result.
189 */
190
191 int
timespec_to_pgtimestamp(const struct timespec * tv,char ** resultp)192 timespec_to_pgtimestamp(const struct timespec *tv, char **resultp)
193 {
194 /*
195 * XXX is there any smarter way?
196 */
197 char buf1[1024];
198 char buf2[1024];
199 struct tm tm_store;
200 struct tm *tm;
201
202 tm = gmtime_r(&tv->tv_sec, &tm_store);
203 if (tm == NULL) {
204 assert(errno != 0);
205 return errno;
206 }
207 strftime(buf1, sizeof(buf1), "%Y%m%dT%H%M%S", tm);
208 snprintf(buf2, sizeof(buf2), "%s.%ju", buf1,
209 (uintmax_t)tv->tv_nsec / 1000);
210 *resultp = estrdup(buf2);
211 return 0;
212 }
213
214 int
my_lo_truncate(struct Xconn * xc,int32_t fd,int32_t size)215 my_lo_truncate(struct Xconn *xc, int32_t fd, int32_t size)
216 {
217 static struct cmd *c;
218 int32_t ret;
219 int error;
220
221 CREATECMD(c, "SELECT lo_truncate($1, $2)", INT4OID, INT4OID);
222 error = sendcmd(xc, c, fd, size);
223 if (error != 0) {
224 return error;
225 }
226 error = simplefetch(xc, INT4OID, &ret);
227 if (error != 0) {
228 if (error == EEXIST) {
229 /*
230 * probably the insertion of the new-sized page
231 * caused a duplicated key error. retry.
232 */
233 DPRINTF("map EEXIST to EAGAIN\n");
234 error = EAGAIN;
235 }
236 return error;
237 }
238 assert(ret == 0);
239 return 0;
240 }
241
242 int
my_lo_lseek(struct Xconn * xc,int32_t fd,int32_t offset,int32_t whence,int32_t * retp)243 my_lo_lseek(struct Xconn *xc, int32_t fd, int32_t offset, int32_t whence,
244 int32_t *retp)
245 {
246 static struct cmd *c;
247 int32_t ret;
248 int error;
249
250 CREATECMD(c, "SELECT lo_lseek($1, $2, $3)", INT4OID, INT4OID, INT4OID);
251 error = sendcmd(xc, c, fd, offset, whence);
252 if (error != 0) {
253 return error;
254 }
255 error = simplefetch(xc, INT4OID, &ret);
256 if (error != 0) {
257 return error;
258 }
259 if (retp != NULL) {
260 *retp = ret;
261 }
262 return 0;
263 }
264
265 int
my_lo_read(struct Xconn * xc,int32_t fd,void * buf,size_t size,size_t * resultsizep)266 my_lo_read(struct Xconn *xc, int32_t fd, void *buf, size_t size,
267 size_t *resultsizep)
268 {
269 static struct cmd *c;
270 size_t resultsize;
271 int error;
272
273 CREATECMD(c, "SELECT loread($1, $2)", INT4OID, INT4OID);
274 error = sendcmdx(xc, 1, c, fd, (int32_t)size);
275 if (error != 0) {
276 return error;
277 }
278 error = simplefetch(xc, BYTEA, buf, &resultsize);
279 if (error != 0) {
280 return error;
281 }
282 *resultsizep = resultsize;
283 if (size != resultsize) {
284 DPRINTF("shortread? %zu != %zu\n", size, resultsize);
285 }
286 return 0;
287 }
288
289 int
my_lo_write(struct Xconn * xc,int32_t fd,const void * buf,size_t size,size_t * resultsizep)290 my_lo_write(struct Xconn *xc, int32_t fd, const void *buf, size_t size,
291 size_t *resultsizep)
292 {
293 static struct cmd *c;
294 int32_t resultsize;
295 int error;
296
297 CREATECMD(c, "SELECT lowrite($1, $2)", INT4OID, BYTEA);
298 error = sendcmd(xc, c, fd, buf, (int32_t)size);
299 if (error != 0) {
300 return error;
301 }
302 error = simplefetch(xc, INT4OID, &resultsize);
303 if (error != 0) {
304 if (error == EEXIST) {
305 /*
306 * probably the insertion of the new data page
307 * caused a duplicated key error. retry.
308 */
309 DPRINTF("map EEXIST to EAGAIN\n");
310 error = EAGAIN;
311 }
312 return error;
313 }
314 *resultsizep = resultsize;
315 if (size != (size_t)resultsize) {
316 DPRINTF("shortwrite? %zu != %zu\n", size, (size_t)resultsize);
317 }
318 return 0;
319 }
320
321 int
my_lo_open(struct Xconn * xc,Oid loid,int32_t mode,int32_t * fdp)322 my_lo_open(struct Xconn *xc, Oid loid, int32_t mode, int32_t *fdp)
323 {
324 static struct cmd *c;
325 int error;
326
327 CREATECMD(c, "SELECT lo_open($1, $2)", OIDOID, INT4OID);
328 error = sendcmd(xc, c, loid, mode);
329 if (error != 0) {
330 return error;
331 }
332 return simplefetch(xc, INT4OID, fdp);
333 }
334
335 int
my_lo_close(struct Xconn * xc,int32_t fd)336 my_lo_close(struct Xconn *xc, int32_t fd)
337 {
338 #if 1
339 /*
340 * do nothing.
341 *
342 * LO handles are automatically closed at the end of transactions.
343 * our transactions are small enough.
344 */
345 #else
346 static struct cmd *c;
347 int32_t ret;
348 int error;
349
350 CREATECMD(c, "SELECT lo_close($1)", INT4OID);
351 error = sendcmd(xc, c, fd);
352 if (error != 0) {
353 return error;
354 }
355 error = simplefetch(xc, INT4OID, &ret);
356 if (error != 0) {
357 return error;
358 }
359 assert(ret == 0);
360 #endif
361 return 0;
362 }
363
364 static int
lo_lookup_by_fileid(struct Xconn * xc,fileid_t fileid,Oid * idp)365 lo_lookup_by_fileid(struct Xconn *xc, fileid_t fileid, Oid *idp)
366 {
367 static struct cmd *c;
368 static const Oid types[] = { OIDOID, };
369 struct fetchstatus s;
370 int error;
371
372 CREATECMD(c, "SELECT loid FROM datafork WHERE fileid = $1", INT8OID);
373 error = sendcmd(xc, c, fileid);
374 if (error != 0) {
375 return error;
376 }
377 fetchinit(&s, xc);
378 error = FETCHNEXT(&s, types, idp);
379 fetchdone(&s);
380 DPRINTF("error %d\n", error);
381 return error;
382 }
383
384 int
lo_open_by_fileid(struct Xconn * xc,fileid_t fileid,int mode,int * fdp)385 lo_open_by_fileid(struct Xconn *xc, fileid_t fileid, int mode, int *fdp)
386 {
387 Oid loid;
388 int fd;
389 int error;
390
391 error = lo_lookup_by_fileid(xc, fileid, &loid);
392 if (error != 0) {
393 return error;
394 }
395 error = my_lo_open(xc, loid, mode, &fd);
396 if (error != 0) {
397 return error;
398 }
399 *fdp = fd;
400 return 0;
401 }
402
403 static int
getsize(struct Xconn * xc,fileid_t fileid,int * resultp)404 getsize(struct Xconn *xc, fileid_t fileid, int *resultp)
405 {
406 int32_t size;
407 int fd;
408 int error;
409
410 error = lo_open_by_fileid(xc, fileid, INV_READ, &fd);
411 if (error != 0) {
412 return error;
413 }
414 error = my_lo_lseek(xc, fd, 0, SEEK_END, &size);
415 if (error != 0) {
416 return error;
417 }
418 error = my_lo_close(xc, fd);
419 if (error != 0) {
420 return error;
421 }
422 *resultp = size;
423 return 0;
424 }
425
426 #define GETATTR_TYPE 0x00000001
427 #define GETATTR_NLINK 0x00000002
428 #define GETATTR_SIZE 0x00000004
429 #define GETATTR_MODE 0x00000008
430 #define GETATTR_UID 0x00000010
431 #define GETATTR_GID 0x00000020
432 #define GETATTR_TIME 0x00000040
433 #define GETATTR_ALL \
434 (GETATTR_TYPE|GETATTR_NLINK|GETATTR_SIZE|GETATTR_MODE| \
435 GETATTR_UID|GETATTR_GID|GETATTR_TIME)
436
437 int
getattr(struct Xconn * xc,fileid_t fileid,struct vattr * va,unsigned int mask)438 getattr(struct Xconn *xc, fileid_t fileid, struct vattr *va, unsigned int mask)
439 {
440 char *type;
441 long long atime_s;
442 long long atime_us;
443 long long ctime_s;
444 long long ctime_us;
445 long long mtime_s;
446 long long mtime_us;
447 long long btime_s;
448 long long btime_us;
449 uint64_t mode;
450 long long uid;
451 long long gid;
452 long long nlink;
453 long long rev;
454 struct fetchstatus s;
455 int error;
456
457 if (mask == 0) {
458 return 0;
459 }
460 /*
461 * unless explicitly requested, avoid fetching timestamps as they
462 * are a little more expensive than other simple attributes.
463 */
464 if ((mask & GETATTR_TIME) != 0) {
465 static struct cmd *c;
466 static const Oid types[] = {
467 TEXTOID,
468 INT8OID,
469 INT8OID,
470 INT8OID,
471 INT8OID,
472 INT8OID,
473 INT8OID,
474 INT8OID,
475 INT8OID,
476 INT8OID,
477 INT8OID,
478 INT8OID,
479 INT8OID,
480 INT8OID,
481 };
482
483 CREATECMD(c, "SELECT type::text, mode, uid, gid, nlink, rev, "
484 "extract(epoch from date_trunc('second', atime))::int8, "
485 "extract(microseconds from atime)::int8, "
486 "extract(epoch from date_trunc('second', ctime))::int8, "
487 "extract(microseconds from ctime)::int8, "
488 "extract(epoch from date_trunc('second', mtime))::int8, "
489 "extract(microseconds from mtime)::int8, "
490 "extract(epoch from date_trunc('second', btime))::int8, "
491 "extract(microseconds from btime)::int8 "
492 "FROM file "
493 "WHERE fileid = $1", INT8OID);
494 error = sendcmd(xc, c, fileid);
495 if (error != 0) {
496 return error;
497 }
498 fetchinit(&s, xc);
499 error = FETCHNEXT(&s, types, &type, &mode, &uid, &gid, &nlink,
500 &rev,
501 &atime_s, &atime_us,
502 &ctime_s, &ctime_us,
503 &mtime_s, &mtime_us,
504 &btime_s, &btime_us);
505 } else {
506 static struct cmd *c;
507 static const Oid types[] = {
508 TEXTOID,
509 INT8OID,
510 INT8OID,
511 INT8OID,
512 INT8OID,
513 INT8OID,
514 };
515
516 CREATECMD(c, "SELECT type::text, mode, uid, gid, nlink, rev "
517 "FROM file "
518 "WHERE fileid = $1", INT8OID);
519 error = sendcmd(xc, c, fileid);
520 if (error != 0) {
521 return error;
522 }
523 fetchinit(&s, xc);
524 error = FETCHNEXT(&s, types, &type, &mode, &uid, &gid, &nlink,
525 &rev);
526 }
527 fetchdone(&s);
528 if (error != 0) {
529 return error;
530 }
531 memset(va, 0xaa, sizeof(*va)); /* fill with garbage for debug */
532 va->va_type = tovtype(type);
533 free(type);
534 va->va_mode = mode;
535 va->va_uid = uid;
536 va->va_gid = gid;
537 if (nlink > 0 && va->va_type == VDIR) {
538 nlink++; /* "." */
539 }
540 va->va_nlink = nlink;
541 va->va_fileid = fileid;
542 va->va_atime.tv_sec = atime_s;
543 va->va_atime.tv_nsec = atime_us * 1000;
544 va->va_ctime.tv_sec = ctime_s;
545 va->va_ctime.tv_nsec = ctime_us * 1000;
546 va->va_mtime.tv_sec = mtime_s;
547 va->va_mtime.tv_nsec = mtime_us * 1000;
548 va->va_birthtime.tv_sec = btime_s;
549 va->va_birthtime.tv_nsec = btime_us * 1000;
550 va->va_blocksize = LOBLKSIZE;
551 va->va_gen = 1;
552 va->va_filerev = rev;
553 if ((mask & GETATTR_SIZE) != 0) {
554 int size;
555
556 size = 0;
557 if (va->va_type == VREG || va->va_type == VLNK) {
558 error = getsize(xc, fileid, &size);
559 if (error != 0) {
560 return error;
561 }
562 } else if (va->va_type == VDIR) {
563 size = 100; /* XXX */
564 }
565 va->va_size = size;
566 }
567 /*
568 * XXX va_bytes: likely wrong due to toast compression.
569 * there's no cheap way to get the compressed size of LO.
570 */
571 va->va_bytes = va->va_size;
572 va->va_flags = 0;
573 return 0;
574 }
575
576 int
update_mctime(struct Xconn * xc,fileid_t fileid)577 update_mctime(struct Xconn *xc, fileid_t fileid)
578 {
579 static struct cmd *c;
580
581 CREATECMD(c,
582 "UPDATE file "
583 "SET mtime = current_timestamp, ctime = current_timestamp, "
584 "rev = rev + 1 "
585 "WHERE fileid = $1", INT8OID);
586 return simplecmd(xc, c, fileid);
587 }
588
589 int
update_atime(struct Xconn * xc,fileid_t fileid)590 update_atime(struct Xconn *xc, fileid_t fileid)
591 {
592 static struct cmd *c;
593
594 CREATECMD(c,
595 "UPDATE file SET atime = current_timestamp WHERE fileid = $1",
596 INT8OID);
597 return simplecmd(xc, c, fileid);
598 }
599
600 int
update_mtime(struct Xconn * xc,fileid_t fileid)601 update_mtime(struct Xconn *xc, fileid_t fileid)
602 {
603 static struct cmd *c;
604
605 CREATECMD(c,
606 "UPDATE file "
607 "SET mtime = current_timestamp, rev = rev + 1 "
608 "WHERE fileid = $1", INT8OID);
609 return simplecmd(xc, c, fileid);
610 }
611
612 int
update_ctime(struct Xconn * xc,fileid_t fileid)613 update_ctime(struct Xconn *xc, fileid_t fileid)
614 {
615 static struct cmd *c;
616
617 CREATECMD(c,
618 "UPDATE file SET ctime = current_timestamp WHERE fileid = $1",
619 INT8OID);
620 return simplecmd(xc, c, fileid);
621 }
622
623 int
update_nlink(struct Xconn * xc,fileid_t fileid,int delta)624 update_nlink(struct Xconn *xc, fileid_t fileid, int delta)
625 {
626 static struct cmd *c;
627
628 CREATECMD(c,
629 "UPDATE file "
630 "SET nlink = nlink + $1 "
631 "WHERE fileid = $2",
632 INT8OID, INT8OID);
633 return simplecmd(xc, c, (int64_t)delta, fileid);
634 }
635
636 int
lookupp(struct Xconn * xc,fileid_t fileid,fileid_t * parent)637 lookupp(struct Xconn *xc, fileid_t fileid, fileid_t *parent)
638 {
639 static struct cmd *c;
640 static const Oid types[] = { INT8OID, };
641 struct fetchstatus s;
642 int error;
643
644 CREATECMD(c, "SELECT parent_fileid FROM dirent "
645 "WHERE child_fileid = $1 LIMIT 1", INT8OID);
646 error = sendcmd(xc, c, fileid);
647 if (error != 0) {
648 return error;
649 }
650 fetchinit(&s, xc);
651 error = FETCHNEXT(&s, types, parent);
652 fetchdone(&s);
653 if (error != 0) {
654 return error;
655 }
656 return 0;
657 }
658
659 int
mkfile(struct Xconn * xc,enum vtype vtype,mode_t mode,uid_t uid,gid_t gid,fileid_t * idp)660 mkfile(struct Xconn *xc, enum vtype vtype, mode_t mode, uid_t uid, gid_t gid,
661 fileid_t *idp)
662 {
663 static struct cmd *c;
664 const char *type;
665 int error;
666
667 type = fromvtype(vtype);
668 if (type == NULL) {
669 return EOPNOTSUPP;
670 }
671 CREATECMD(c,
672 "INSERT INTO file "
673 "(fileid, type, mode, uid, gid, nlink, rev, "
674 "atime, ctime, mtime, btime) "
675 "VALUES(nextval('fileid_seq'), $1::filetype, $2, $3, $4, 0, 0, "
676 "current_timestamp, "
677 "current_timestamp, "
678 "current_timestamp, "
679 "current_timestamp) "
680 "RETURNING fileid", TEXTOID, INT8OID, INT8OID, INT8OID);
681 error = sendcmd(xc, c, type, (uint64_t)mode, (uint64_t)uid,
682 (uint64_t)gid);
683 if (error != 0) {
684 return error;
685 }
686 return simplefetch(xc, INT8OID, idp);
687 }
688
689 int
linkfile(struct Xconn * xc,fileid_t parent,const char * name,fileid_t child)690 linkfile(struct Xconn *xc, fileid_t parent, const char *name, fileid_t child)
691 {
692 static struct cmd *c;
693 int error;
694
695 CREATECMD(c,
696 "INSERT INTO dirent "
697 "(parent_fileid, name, child_fileid) "
698 "VALUES($1, $2, $3)", INT8OID, TEXTOID, INT8OID);
699 error = simplecmd(xc, c, parent, name, child);
700 if (error != 0) {
701 return error;
702 }
703 error = update_nlink(xc, child, 1);
704 if (error != 0) {
705 return error;
706 }
707 return update_mtime(xc, parent);
708 }
709
710 int
unlinkfile(struct Xconn * xc,fileid_t parent,const char * name,fileid_t child)711 unlinkfile(struct Xconn *xc, fileid_t parent, const char *name, fileid_t child)
712 {
713 static struct cmd *c;
714 int error;
715
716 /*
717 * in addition to the primary key, we check child_fileid as well here
718 * to avoid removing an entry which was appeared after our VOP_LOOKUP.
719 */
720 CREATECMD(c,
721 "DELETE FROM dirent "
722 "WHERE parent_fileid = $1 AND name = $2 AND child_fileid = $3",
723 INT8OID, TEXTOID, INT8OID);
724 error = simplecmd(xc, c, parent, name, child);
725 if (error != 0) {
726 return error;
727 }
728 error = update_nlink(xc, child, -1);
729 if (error != 0) {
730 return error;
731 }
732 error = update_mtime(xc, parent);
733 if (error != 0) {
734 return error;
735 }
736 return update_ctime(xc, child);
737 }
738
739 int
mklinkfile(struct Xconn * xc,fileid_t parent,const char * name,enum vtype vtype,mode_t mode,uid_t uid,gid_t gid,fileid_t * idp)740 mklinkfile(struct Xconn *xc, fileid_t parent, const char *name,
741 enum vtype vtype, mode_t mode, uid_t uid, gid_t gid, fileid_t *idp)
742 {
743 fileid_t fileid;
744 int error;
745
746 error = mkfile(xc, vtype, mode, uid, gid, &fileid);
747 if (error != 0) {
748 return error;
749 }
750 error = linkfile(xc, parent, name, fileid);
751 if (error != 0) {
752 return error;
753 }
754 if (idp != NULL) {
755 *idp = fileid;
756 }
757 return 0;
758 }
759
760 int
mklinkfile_lo(struct Xconn * xc,fileid_t parent_fileid,const char * name,enum vtype vtype,mode_t mode,uid_t uid,gid_t gid,fileid_t * fileidp,int * loidp)761 mklinkfile_lo(struct Xconn *xc, fileid_t parent_fileid, const char *name,
762 enum vtype vtype, mode_t mode, uid_t uid, gid_t gid, fileid_t *fileidp,
763 int *loidp)
764 {
765 static struct cmd *c;
766 fileid_t new_fileid;
767 int loid;
768 int error;
769
770 error = mklinkfile(xc, parent_fileid, name, vtype, mode, uid, gid,
771 &new_fileid);
772 if (error != 0) {
773 return error;
774 }
775 CREATECMD(c,
776 "INSERT INTO datafork (fileid, loid) "
777 "VALUES($1, lo_creat(-1)) "
778 "RETURNING loid", INT8OID);
779 error = sendcmd(xc, c, new_fileid);
780 if (error != 0) {
781 return error;
782 }
783 error = simplefetch(xc, OIDOID, &loid);
784 if (error != 0) {
785 return error;
786 }
787 if (fileidp != NULL) {
788 *fileidp = new_fileid;
789 }
790 if (loidp != NULL) {
791 *loidp = loid;
792 }
793 return 0;
794 }
795
796 int
cleanupfile(struct Xconn * xc,fileid_t fileid)797 cleanupfile(struct Xconn *xc, fileid_t fileid)
798 {
799 static struct cmd *c;
800 char *type;
801 unsigned int vtype;
802 int error;
803
804 CREATECMD(c, "DELETE FROM file WHERE fileid = $1 AND nlink = 0 "
805 "RETURNING type::text", INT8OID);
806 error = sendcmd(xc, c, fileid);
807 if (error != 0) {
808 return error;
809 }
810 error = simplefetch(xc, TEXTOID, &type);
811 if (error == ENOENT) {
812 return 0; /* probably nlink > 0 */
813 }
814 if (error != 0) {
815 return error;
816 }
817 vtype = tovtype(type);
818 free(type);
819 if (vtype == VREG || vtype == VLNK) {
820 static struct cmd *c_datafork;
821 int32_t ret;
822
823 CREATECMD(c_datafork,
824 "WITH loids AS (DELETE FROM datafork WHERE fileid = $1 "
825 "RETURNING loid) SELECT lo_unlink(loid) FROM loids",
826 INT8OID);
827 error = sendcmd(xc, c_datafork, fileid);
828 if (error != 0) {
829 return error;
830 }
831 error = simplefetch(xc, INT4OID, &ret);
832 if (error != 0) {
833 return error;
834 }
835 if (ret != 1) {
836 return EIO; /* lo_unlink failed */
837 }
838 }
839 return 0;
840 }
841
842 /*
843 * check_path: do locking and check to prevent a rename from creating loop.
844 *
845 * lock the dirents between child_fileid and the root directory.
846 * if gate_fileid is appeared in the path, return EINVAL.
847 * caller should ensure that child_fileid is of VDIR beforehand.
848 *
849 * we uses FOR SHARE row level locks as poor man's predicate locks.
850 *
851 * the following is an example to show why we need to lock the path.
852 *
853 * consider:
854 * "mkdir -p /a/b/c/d/e/f && mkdir -p /1/2/3/4/5/6"
855 * and then
856 * thread 1 is doing "mv /a/b /1/2/3/4/5/6"
857 * thread 2 is doing "mv /1/2 /a/b/c/d/e/f"
858 *
859 * a possible consequence:
860 * thread 1: check_path -> success
861 * thread 2: check_path -> success
862 * thread 1: modify directories -> block on row-level lock
863 * thread 2: modify directories -> block on row-level lock
864 * -> deadlock detected
865 * -> rollback and retry
866 *
867 * another possible consequence:
868 * thread 1: check_path -> success
869 * thread 1: modify directory entries -> success
870 * thread 2: check_path -> block on row-level lock
871 * thread 1: commit
872 * thread 2: acquire the lock and notices the row is updated
873 * -> serialization error
874 * -> rollback and retry
875 *
876 * XXX it might be better to use real serializable transactions,
877 * which will be available for PostgreSQL 9.1
878 */
879
880 int
check_path(struct Xconn * xc,fileid_t gate_fileid,fileid_t child_fileid)881 check_path(struct Xconn *xc, fileid_t gate_fileid, fileid_t child_fileid)
882 {
883 static struct cmd *c;
884 fileid_t parent_fileid;
885 struct fetchstatus s;
886 int error;
887
888 CREATECMD(c,
889 "WITH RECURSIVE r AS "
890 "( "
891 "SELECT parent_fileid, cookie, child_fileid "
892 "FROM dirent "
893 "WHERE child_fileid = $1 "
894 "UNION ALL "
895 "SELECT d.parent_fileid, d.cookie, "
896 "d.child_fileid "
897 "FROM dirent AS d INNER JOIN r "
898 "ON d.child_fileid = r.parent_fileid "
899 ") "
900 "SELECT d.parent_fileid "
901 "FROM dirent d "
902 "JOIN r "
903 "ON d.cookie = r.cookie "
904 "FOR SHARE", INT8OID);
905 error = sendcmd(xc, c, child_fileid);
906 if (error != 0) {
907 return error;
908 }
909 fetchinit(&s, xc);
910 do {
911 static const Oid types[] = { INT8OID, };
912
913 error = FETCHNEXT(&s, types, &parent_fileid);
914 if (error == ENOENT) {
915 fetchdone(&s);
916 return 0;
917 }
918 if (error != 0) {
919 fetchdone(&s);
920 return error;
921 }
922 } while (gate_fileid != parent_fileid);
923 fetchdone(&s);
924 return EINVAL;
925 }
926
927 int
isempty(struct Xconn * xc,fileid_t fileid,bool * emptyp)928 isempty(struct Xconn *xc, fileid_t fileid, bool *emptyp)
929 {
930 int32_t dummy;
931 static struct cmd *c;
932 int error;
933
934 CREATECMD(c,
935 "SELECT 1 FROM dirent "
936 "WHERE parent_fileid = $1 LIMIT 1", INT8OID);
937 error = sendcmd(xc, c, fileid);
938 if (error != 0) {
939 return error;
940 }
941 error = simplefetch(xc, INT4OID, &dummy);
942 assert(error != 0 || dummy == 1);
943 if (error == ENOENT) {
944 *emptyp = true;
945 error = 0;
946 } else {
947 *emptyp = false;
948 }
949 return error;
950 }
951