xref: /netbsd-src/share/examples/puffs/pgfs/pgfs_subs.c (revision b757af438b42b93f8c6571f026d8b8ef3eaf5fc9)
1 /*	$NetBSD: pgfs_subs.c,v 1.3 2011/10/13 14:40:06 yamt Exp $	*/
2 
3 /*-
4  * Copyright (c)2010,2011 YAMAMOTO Takashi,
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  * 1. Redistributions of source code must retain the above copyright
11  *    notice, this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright
13  *    notice, this list of conditions and the following disclaimer in the
14  *    documentation and/or other materials provided with the distribution.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
17  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
20  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
21  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
22  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
23  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
24  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
25  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
26  * SUCH DAMAGE.
27  */
28 
29 /*
30  * a file system server which stores the data in a PostgreSQL database.
31  */
32 
33 /*
34  * we use large objects to store file contents.  there are a few XXXs wrt it.
35  *
36  * - large objects don't obey the normal transaction semantics.
37  *
38  * - we use large object server-side functions directly (instead of via the
39  *   libpq large object api) because:
40  *	- we want to use asynchronous (in the sense of PQsendFoo) operations
41  *	  which is not available with the libpq large object api.
42  *	- with the libpq large object api, there's no way to know details of
43  *	  an error because PGresult is freed in the library without saving
44  *	  PG_DIAG_SQLSTATE etc.
45  */
46 
47 #include <sys/cdefs.h>
48 #ifndef lint
49 __RCSID("$NetBSD: pgfs_subs.c,v 1.3 2011/10/13 14:40:06 yamt Exp $");
50 #endif /* not lint */
51 
52 #include <assert.h>
53 #include <err.h>
54 #include <errno.h>
55 #include <puffs.h>
56 #include <inttypes.h>
57 #include <stdarg.h>
58 #include <stdbool.h>
59 #include <stdio.h>
60 #include <stdlib.h>
61 #include <time.h>
62 #include <util.h>
63 
64 #include <libpq-fe.h>
65 #include <libpq/libpq-fs.h>	/* INV_* */
66 
67 #include "pgfs.h"
68 #include "pgfs_db.h"
69 #include "pgfs_debug.h"
70 #include "pgfs_waitq.h"
71 #include "pgfs_subs.h"
72 
73 const char * const vtype_table[] = {
74 	[VREG] = "regular",
75 	[VDIR] = "directory",
76 	[VLNK] = "link",
77 };
78 
79 static unsigned int
80 tovtype(const char *type)
81 {
82 	unsigned int i;
83 
84 	for (i = 0; i < __arraycount(vtype_table); i++) {
85 		if (vtype_table[i] == NULL) {
86 			continue;
87 		}
88 		if (!strcmp(type, vtype_table[i])) {
89 			return i;
90 		}
91 	}
92 	assert(0);
93 	return 0;
94 }
95 
96 static const char *
97 fromvtype(enum vtype vtype)
98 {
99 
100 	if (vtype < __arraycount(vtype_table)) {
101 		assert(vtype_table[vtype] != NULL);
102 		return vtype_table[vtype];
103 	}
104 	return NULL;
105 }
106 
107 /*
108  * fileid_lock stuff below is to keep ordering of operations for a file.
109  * it is a workaround for the lack of operation barriers in the puffs
110  * protocol.
111  *
112  * currently we do this locking only for SETATTR, GETATTR, and WRITE as
113  * they are known to be reorder-unsafe.  they are sensitive to the file
114  * attributes, mainly the file size.  note that as the kernel issues async
115  * SETATTR/WRITE requests, vnode lock doesn't prevent GETATTR from seeing
116  * the stale attributes.
117  *
118  * we are relying on waiton/wakeup being a FIFO.
119  */
120 
121 struct fileid_lock_handle {
122 	TAILQ_ENTRY(fileid_lock_handle) list;
123 	fileid_t fileid;
124 	struct puffs_cc *owner;	/* diagnostic only */
125 	struct waitq waitq;
126 };
127 
128 TAILQ_HEAD(, fileid_lock_handle) fileid_lock_list =
129     TAILQ_HEAD_INITIALIZER(fileid_lock_list);
130 struct waitq fileid_lock_waitq = TAILQ_HEAD_INITIALIZER(fileid_lock_waitq);
131 
132 /*
133  * fileid_lock: serialize requests for the fileid.
134  *
135  * this function should be the first yieldable point in a puffs callback.
136  */
137 
138 struct fileid_lock_handle *
139 fileid_lock(fileid_t fileid, struct puffs_cc *cc)
140 {
141 	struct fileid_lock_handle *lock;
142 
143 	TAILQ_FOREACH(lock, &fileid_lock_list, list) {
144 		if (lock->fileid == fileid) {
145 			DPRINTF("fileid wait %" PRIu64 " cc %p\n", fileid, cc);
146 			assert(lock->owner != cc);
147 			waiton(&lock->waitq, cc);	/* enter FIFO */
148 			assert(lock->owner == cc);
149 			return lock;
150 		}
151 	}
152 	lock = emalloc(sizeof(*lock));
153 	lock->fileid = fileid;
154 	lock->owner = cc;
155 	DPRINTF("fileid lock %" PRIu64 " cc %p\n", lock->fileid, cc);
156 	waitq_init(&lock->waitq);
157 	TAILQ_INSERT_HEAD(&fileid_lock_list, lock, list);
158 	return lock;
159 }
160 
161 void
162 fileid_unlock(struct fileid_lock_handle *lock)
163 {
164 
165 	DPRINTF("fileid unlock %" PRIu64 "\n", lock->fileid);
166 	assert(lock != NULL);
167 	assert(lock->owner != NULL);
168 	/*
169 	 * perform direct-handoff to the first waiter.
170 	 *
171 	 * a handoff is essential to keep the order of requests.
172 	 */
173 	lock->owner = wakeup_one(&lock->waitq);
174 	if (lock->owner != NULL) {
175 		return;
176 	}
177 	/*
178 	 * no one is waiting this fileid.
179 	 */
180 	TAILQ_REMOVE(&fileid_lock_list, lock, list);
181 	free(lock);
182 }
183 
184 /*
185  * timespec_to_pgtimestamp: create a text representation of timestamp which
186  * can be recognized by the database server.
187  *
188  * it's caller's responsibility to free(3) the result.
189  */
190 
191 int
192 timespec_to_pgtimestamp(const struct timespec *tv, char **resultp)
193 {
194 	/*
195 	 * XXX is there any smarter way?
196 	 */
197 	char buf1[1024];
198 	char buf2[1024];
199 	struct tm tm_store;
200 	struct tm *tm;
201 
202 	tm = gmtime_r(&tv->tv_sec, &tm_store);
203 	if (tm == NULL) {
204 		assert(errno != 0);
205 		return errno;
206 	}
207 	strftime(buf1, sizeof(buf1), "%Y%m%dT%H%M%S", tm);
208 	snprintf(buf2, sizeof(buf2), "%s.%ju", buf1,
209 	    (uintmax_t)tv->tv_nsec / 1000);
210 	*resultp = estrdup(buf2);
211 	return 0;
212 }
213 
214 int
215 my_lo_truncate(struct Xconn *xc, int32_t fd, int32_t size)
216 {
217 	static struct cmd *c;
218 	int32_t ret;
219 	int error;
220 
221 	CREATECMD(c, "SELECT lo_truncate($1, $2)", INT4OID, INT4OID);
222 	error = sendcmd(xc, c, fd, size);
223 	if (error != 0) {
224 		return error;
225 	}
226 	error = simplefetch(xc, INT4OID, &ret);
227 	if (error != 0) {
228 		if (error == EEXIST) {
229 			/*
230 			 * probably the insertion of the new-sized page
231 			 * caused a duplicated key error.  retry.
232 			 */
233 			DPRINTF("map EEXIST to EAGAIN\n");
234 			error = EAGAIN;
235 		}
236 		return error;
237 	}
238 	assert(ret == 0);
239 	return 0;
240 }
241 
242 int
243 my_lo_lseek(struct Xconn *xc, int32_t fd, int32_t offset, int32_t whence,
244     int32_t *retp)
245 {
246 	static struct cmd *c;
247 	int32_t ret;
248 	int error;
249 
250 	CREATECMD(c, "SELECT lo_lseek($1, $2, $3)", INT4OID, INT4OID, INT4OID);
251 	error = sendcmd(xc, c, fd, offset, whence);
252 	if (error != 0) {
253 		return error;
254 	}
255 	error = simplefetch(xc, INT4OID, &ret);
256 	if (error != 0) {
257 		return error;
258 	}
259 	if (retp != NULL) {
260 		*retp = ret;
261 	}
262 	return 0;
263 }
264 
265 int
266 my_lo_read(struct Xconn *xc, int32_t fd, void *buf, size_t size,
267     size_t *resultsizep)
268 {
269 	static struct cmd *c;
270 	size_t resultsize;
271 	int error;
272 
273 	CREATECMD(c, "SELECT loread($1, $2)", INT4OID, INT4OID);
274 	error = sendcmdx(xc, 1, c, fd, (int32_t)size);
275 	if (error != 0) {
276 		return error;
277 	}
278 	error = simplefetch(xc, BYTEA, buf, &resultsize);
279 	if (error != 0) {
280 		return error;
281 	}
282 	*resultsizep = resultsize;
283 	if (size != resultsize) {
284 		DPRINTF("shortread? %zu != %zu\n", size, resultsize);
285 	}
286 	return 0;
287 }
288 
289 int
290 my_lo_write(struct Xconn *xc, int32_t fd, const void *buf, size_t size,
291     size_t *resultsizep)
292 {
293 	static struct cmd *c;
294 	int32_t resultsize;
295 	int error;
296 
297 	CREATECMD(c, "SELECT lowrite($1, $2)", INT4OID, BYTEA);
298 	error = sendcmd(xc, c, fd, buf, (int32_t)size);
299 	if (error != 0) {
300 		return error;
301 	}
302 	error = simplefetch(xc, INT4OID, &resultsize);
303 	if (error != 0) {
304 		if (error == EEXIST) {
305 			/*
306 			 * probably the insertion of the new data page
307 			 * caused a duplicated key error.  retry.
308 			 */
309 			DPRINTF("map EEXIST to EAGAIN\n");
310 			error = EAGAIN;
311 		}
312 		return error;
313 	}
314 	*resultsizep = resultsize;
315 	if (size != (size_t)resultsize) {
316 		DPRINTF("shortwrite? %zu != %zu\n", size, (size_t)resultsize);
317 	}
318 	return 0;
319 }
320 
321 int
322 my_lo_open(struct Xconn *xc, Oid loid, int32_t mode, int32_t *fdp)
323 {
324 	static struct cmd *c;
325 	int error;
326 
327 	CREATECMD(c, "SELECT lo_open($1, $2)", OIDOID, INT4OID);
328 	error = sendcmd(xc, c, loid, mode);
329 	if (error != 0) {
330 		return error;
331 	}
332 	return simplefetch(xc, INT4OID, fdp);
333 }
334 
335 int
336 my_lo_close(struct Xconn *xc, int32_t fd)
337 {
338 	static struct cmd *c;
339 	int32_t ret;
340 	int error;
341 
342 	CREATECMD(c, "SELECT lo_close($1)", INT4OID);
343 	error = sendcmd(xc, c, fd);
344 	if (error != 0) {
345 		return error;
346 	}
347 	error = simplefetch(xc, INT4OID, &ret);
348 	if (error != 0) {
349 		return error;
350 	}
351 	assert(ret == 0);
352 	return 0;
353 }
354 
355 static int
356 lo_lookup_by_fileid(struct Xconn *xc, fileid_t fileid, Oid *idp)
357 {
358 	static struct cmd *c;
359 	static const Oid types[] = { OIDOID, };
360 	struct fetchstatus s;
361 	int error;
362 
363 	CREATECMD(c, "SELECT loid FROM datafork WHERE fileid = $1", INT8OID);
364 	error = sendcmd(xc, c, fileid);
365 	if (error != 0) {
366 		return error;
367 	}
368 	fetchinit(&s, xc);
369 	error = FETCHNEXT(&s, types, idp);
370 	fetchdone(&s);
371 	DPRINTF("error %d\n", error);
372 	return error;
373 }
374 
375 int
376 lo_open_by_fileid(struct Xconn *xc, fileid_t fileid, int mode, int *fdp)
377 {
378 	Oid loid;
379 	int fd;
380 	int error;
381 
382 	error = lo_lookup_by_fileid(xc, fileid, &loid);
383 	if (error != 0) {
384 		return error;
385 	}
386 	error = my_lo_open(xc, loid, mode, &fd);
387 	if (error != 0) {
388 		return error;
389 	}
390 	*fdp = fd;
391 	return 0;
392 }
393 
394 static int
395 getsize(struct Xconn *xc, fileid_t fileid, int *resultp)
396 {
397 	int32_t size;
398 	int fd;
399 	int error;
400 
401 	error = lo_open_by_fileid(xc, fileid, INV_READ, &fd);
402 	if (error != 0) {
403 		return error;
404 	}
405 	error = my_lo_lseek(xc, fd, 0, SEEK_END, &size);
406 	if (error != 0) {
407 		return error;
408 	}
409 	error = my_lo_close(xc, fd);
410 	if (error != 0) {
411 		return error;
412 	}
413 	*resultp = size;
414 	return 0;
415 }
416 
417 #define	GETATTR_TYPE	0x00000001
418 #define	GETATTR_NLINK	0x00000002
419 #define	GETATTR_SIZE	0x00000004
420 #define	GETATTR_MODE	0x00000008
421 #define	GETATTR_UID	0x00000010
422 #define	GETATTR_GID	0x00000020
423 #define	GETATTR_TIME	0x00000040
424 #define	GETATTR_ALL	\
425 	(GETATTR_TYPE|GETATTR_NLINK|GETATTR_SIZE|GETATTR_MODE| \
426 	GETATTR_UID|GETATTR_GID|GETATTR_TIME)
427 
428 int
429 getattr(struct Xconn *xc, fileid_t fileid, struct vattr *va, unsigned int mask)
430 {
431 	char *type;
432 	long long atime_s;
433 	long long atime_us;
434 	long long ctime_s;
435 	long long ctime_us;
436 	long long mtime_s;
437 	long long mtime_us;
438 	long long btime_s;
439 	long long btime_us;
440 	uint64_t mode;
441 	long long uid;
442 	long long gid;
443 	long long nlink;
444 	long long rev;
445 	struct fetchstatus s;
446 	int error;
447 
448 	if (mask == 0) {
449 		return 0;
450 	}
451 	/*
452 	 * unless explicitly requested, avoid fetching timestamps as they
453 	 * are a little more expensive than other simple attributes.
454 	 */
455 	if ((mask & GETATTR_TIME) != 0) {
456 		static struct cmd *c;
457 		static const Oid types[] = {
458 			TEXTOID,
459 			INT8OID,
460 			INT8OID,
461 			INT8OID,
462 			INT8OID,
463 			INT8OID,
464 			INT8OID,
465 			INT8OID,
466 			INT8OID,
467 			INT8OID,
468 			INT8OID,
469 			INT8OID,
470 			INT8OID,
471 			INT8OID,
472 		};
473 
474 		CREATECMD(c, "SELECT type::text, mode, uid, gid, nlink, rev, "
475 		    "extract(epoch from date_trunc('second', atime))::int8, "
476 		    "extract(microseconds from atime)::int8, "
477 		    "extract(epoch from date_trunc('second', ctime))::int8, "
478 		    "extract(microseconds from ctime)::int8, "
479 		    "extract(epoch from date_trunc('second', mtime))::int8, "
480 		    "extract(microseconds from mtime)::int8, "
481 		    "extract(epoch from date_trunc('second', btime))::int8, "
482 		    "extract(microseconds from btime)::int8 "
483 		    "FROM file "
484 		    "WHERE fileid = $1", INT8OID);
485 		error = sendcmd(xc, c, fileid);
486 		if (error != 0) {
487 			return error;
488 		}
489 		fetchinit(&s, xc);
490 		error = FETCHNEXT(&s, types, &type, &mode, &uid, &gid, &nlink,
491 		    &rev,
492 		    &atime_s, &atime_us,
493 		    &ctime_s, &ctime_us,
494 		    &mtime_s, &mtime_us,
495 		    &btime_s, &btime_us);
496 	} else {
497 		static struct cmd *c;
498 		static const Oid types[] = {
499 			TEXTOID,
500 			INT8OID,
501 			INT8OID,
502 			INT8OID,
503 			INT8OID,
504 			INT8OID,
505 		};
506 
507 		CREATECMD(c, "SELECT type::text, mode, uid, gid, nlink, rev "
508 		    "FROM file "
509 		    "WHERE fileid = $1", INT8OID);
510 		error = sendcmd(xc, c, fileid);
511 		if (error != 0) {
512 			return error;
513 		}
514 		fetchinit(&s, xc);
515 		error = FETCHNEXT(&s, types, &type, &mode, &uid, &gid, &nlink,
516 		    &rev);
517 	}
518 	fetchdone(&s);
519 	if (error != 0) {
520 		return error;
521 	}
522 	memset(va, 0xaa, sizeof(*va)); /* fill with garbage for debug */
523 	va->va_type = tovtype(type);
524 	free(type);
525 	va->va_mode = mode;
526 	va->va_uid = uid;
527 	va->va_gid = gid;
528 	if (nlink > 0 && va->va_type == VDIR) {
529 		nlink++; /* "." */
530 	}
531 	va->va_nlink = nlink;
532 	va->va_fileid = fileid;
533 	va->va_atime.tv_sec = atime_s;
534 	va->va_atime.tv_nsec = atime_us * 1000;
535 	va->va_ctime.tv_sec = ctime_s;
536 	va->va_ctime.tv_nsec = ctime_us * 1000;
537 	va->va_mtime.tv_sec = mtime_s;
538 	va->va_mtime.tv_nsec = mtime_us * 1000;
539 	va->va_birthtime.tv_sec = btime_s;
540 	va->va_birthtime.tv_nsec = btime_us * 1000;
541 	va->va_blocksize = LOBLKSIZE;
542 	va->va_gen = 1;
543 	va->va_filerev = rev;
544 	if ((mask & GETATTR_SIZE) != 0) {
545 		int size;
546 
547 		size = 0;
548 		if (va->va_type == VREG || va->va_type == VLNK) {
549 			error = getsize(xc, fileid, &size);
550 			if (error != 0) {
551 				return error;
552 			}
553 		} else if (va->va_type == VDIR) {
554 			size = 100; /* XXX */
555 		}
556 		va->va_size = size;
557 	}
558 	/*
559 	 * XXX va_bytes: likely wrong due to toast compression.
560 	 * there's no cheap way to get the compressed size of LO.
561 	 */
562 	va->va_bytes = va->va_size;
563 	va->va_flags = 0;
564 	return 0;
565 }
566 
567 int
568 update_mctime(struct Xconn *xc, fileid_t fileid)
569 {
570 	static struct cmd *c;
571 
572 	CREATECMD(c,
573 	    "UPDATE file "
574 	    "SET mtime = current_timestamp, ctime = current_timestamp, "
575 		"rev = rev + 1 "
576 	    "WHERE fileid = $1", INT8OID);
577 	return simplecmd(xc, c, fileid);
578 }
579 
580 int
581 update_atime(struct Xconn *xc, fileid_t fileid)
582 {
583 	static struct cmd *c;
584 
585 	CREATECMD(c,
586 	    "UPDATE file SET atime = current_timestamp WHERE fileid = $1",
587 	    INT8OID);
588 	return simplecmd(xc, c, fileid);
589 }
590 
591 int
592 update_mtime(struct Xconn *xc, fileid_t fileid)
593 {
594 	static struct cmd *c;
595 
596 	CREATECMD(c,
597 	    "UPDATE file "
598 	    "SET mtime = current_timestamp, rev = rev + 1 "
599 	    "WHERE fileid = $1", INT8OID);
600 	return simplecmd(xc, c, fileid);
601 }
602 
603 int
604 update_ctime(struct Xconn *xc, fileid_t fileid)
605 {
606 	static struct cmd *c;
607 
608 	CREATECMD(c,
609 	    "UPDATE file SET ctime = current_timestamp WHERE fileid = $1",
610 	    INT8OID);
611 	return simplecmd(xc, c, fileid);
612 }
613 
614 int
615 update_nlink(struct Xconn *xc, fileid_t fileid, int delta)
616 {
617 	static struct cmd *c;
618 
619 	CREATECMD(c,
620 	    "UPDATE file "
621 	    "SET nlink = nlink + $1 "
622 	    "WHERE fileid = $2",
623 	    INT8OID, INT8OID);
624 	return simplecmd(xc, c, (int64_t)delta, fileid);
625 }
626 
627 int
628 lookupp(struct Xconn *xc, fileid_t fileid, fileid_t *parent)
629 {
630 	static struct cmd *c;
631 	static const Oid types[] = { INT8OID, };
632 	struct fetchstatus s;
633 	int error;
634 
635 	CREATECMD(c, "SELECT parent_fileid FROM dirent "
636 		"WHERE child_fileid = $1 LIMIT 1", INT8OID);
637 	error = sendcmd(xc, c, fileid);
638 	if (error != 0) {
639 		return error;
640 	}
641 	fetchinit(&s, xc);
642 	error = FETCHNEXT(&s, types, parent);
643 	fetchdone(&s);
644 	if (error != 0) {
645 		return error;
646 	}
647 	return 0;
648 }
649 
650 int
651 mkfile(struct Xconn *xc, enum vtype vtype, mode_t mode, uid_t uid, gid_t gid,
652     fileid_t *idp)
653 {
654 	static struct cmd *c;
655 	const char *type;
656 	int error;
657 
658 	type = fromvtype(vtype);
659 	if (type == NULL) {
660 		return EOPNOTSUPP;
661 	}
662 	CREATECMD(c,
663 		"INSERT INTO file "
664 		"(fileid, type, mode, uid, gid, nlink, rev, "
665 		"atime, ctime, mtime, btime) "
666 		"VALUES(nextval('fileid_seq'), $1::filetype, $2, $3, $4, 0, 0, "
667 		"current_timestamp, "
668 		"current_timestamp, "
669 		"current_timestamp, "
670 		"current_timestamp) "
671 		"RETURNING fileid", TEXTOID, INT8OID, INT8OID, INT8OID);
672 	error = sendcmd(xc, c, type, (uint64_t)mode, (uint64_t)uid,
673 	    (uint64_t)gid);
674 	if (error != 0) {
675 		return error;
676 	}
677 	return simplefetch(xc, INT8OID, idp);
678 }
679 
680 int
681 linkfile(struct Xconn *xc, fileid_t parent, const char *name, fileid_t child)
682 {
683 	static struct cmd *c;
684 	int error;
685 
686 	CREATECMD(c,
687 		"INSERT INTO dirent "
688 		"(parent_fileid, name, child_fileid) "
689 		"VALUES($1, $2, $3)", INT8OID, TEXTOID, INT8OID);
690 	error = simplecmd(xc, c, parent, name, child);
691 	if (error != 0) {
692 		return error;
693 	}
694 	error = update_nlink(xc, child, 1);
695 	if (error != 0) {
696 		return error;
697 	}
698 	return update_mtime(xc, parent);
699 }
700 
701 int
702 unlinkfile(struct Xconn *xc, fileid_t parent, const char *name, fileid_t child)
703 {
704 	static struct cmd *c;
705 	int error;
706 
707 	/*
708 	 * in addition to the primary key, we check child_fileid as well here
709 	 * to avoid removing an entry which was appeared after our VOP_LOOKUP.
710 	 */
711 	CREATECMD(c,
712 		"DELETE FROM dirent "
713 		"WHERE parent_fileid = $1 AND name = $2 AND child_fileid = $3",
714 		INT8OID, TEXTOID, INT8OID);
715 	error = simplecmd(xc, c, parent, name, child);
716 	if (error != 0) {
717 		return error;
718 	}
719 	error = update_nlink(xc, child, -1);
720 	if (error != 0) {
721 		return error;
722 	}
723 	error = update_mtime(xc, parent);
724 	if (error != 0) {
725 		return error;
726 	}
727 	return update_ctime(xc, child);
728 }
729 
730 int
731 mklinkfile(struct Xconn *xc, fileid_t parent, const char *name,
732     enum vtype vtype, mode_t mode, uid_t uid, gid_t gid, fileid_t *idp)
733 {
734 	fileid_t fileid;
735 	int error;
736 
737 	error = mkfile(xc, vtype, mode, uid, gid, &fileid);
738 	if (error != 0) {
739 		return error;
740 	}
741 	error = linkfile(xc, parent, name, fileid);
742 	if (error != 0) {
743 		return error;
744 	}
745 	if (idp != NULL) {
746 		*idp = fileid;
747 	}
748 	return 0;
749 }
750 
751 int
752 mklinkfile_lo(struct Xconn *xc, fileid_t parent_fileid, const char *name,
753     enum vtype vtype, mode_t mode, uid_t uid, gid_t gid, fileid_t *fileidp,
754     int *loidp)
755 {
756 	static struct cmd *c;
757 	fileid_t new_fileid;
758 	int loid;
759 	int error;
760 
761 	error = mklinkfile(xc, parent_fileid, name, vtype, mode, uid, gid,
762 	    &new_fileid);
763 	if (error != 0) {
764 		return error;
765 	}
766 	CREATECMD(c,
767 		"INSERT INTO datafork (fileid, loid) "
768 		"VALUES($1, lo_creat(-1)) "
769 		"RETURNING loid", INT8OID);
770 	error = sendcmd(xc, c, new_fileid);
771 	if (error != 0) {
772 		return error;
773 	}
774 	error = simplefetch(xc, OIDOID, &loid);
775 	if (error != 0) {
776 		return error;
777 	}
778 	if (fileidp != NULL) {
779 		*fileidp = new_fileid;
780 	}
781 	if (loidp != NULL) {
782 		*loidp = loid;
783 	}
784 	return 0;
785 }
786 
787 int
788 cleanupfile(struct Xconn *xc, fileid_t fileid, struct vattr *va)
789 {
790 	static struct cmd *c;
791 
792 	/*
793 	 * XXX what to do when the filesystem is shared?
794 	 */
795 
796 	if (va->va_type == VREG || va->va_type == VLNK) {
797 		static struct cmd *c_datafork;
798 		int32_t ret;
799 		int error;
800 
801 		CREATECMD(c_datafork,
802 			"WITH loids AS (DELETE FROM datafork WHERE fileid = $1 "
803 			"RETURNING loid) SELECT lo_unlink(loid) FROM loids",
804 			INT8OID);
805 		error = sendcmd(xc, c_datafork, fileid);
806 		if (error != 0) {
807 			return error;
808 		}
809 		error = simplefetch(xc, INT4OID, &ret);
810 		if (error != 0) {
811 			return error;
812 		}
813 		if (ret != 1) {
814 			return EIO; /* lo_unlink failed */
815 		}
816 	}
817 	CREATECMD(c, "DELETE FROM file WHERE fileid = $1", INT8OID);
818 	return simplecmd(xc, c, fileid);
819 }
820 
821 /*
822  * check_path: do locking and check to prevent a rename from creating loop.
823  *
824  * lock the dirents between child_fileid and the root directory.
825  * if gate_fileid is appeared in the path, return EINVAL.
826  * caller should ensure that child_fileid is of VDIR beforehand.
827  *
828  * we uses FOR SHARE row level locks as poor man's predicate locks.
829  *
830  * the following is an example to show why we need to lock the path.
831  *
832  * consider:
833  * "mkdir -p /a/b/c/d/e/f && mkdir -p /1/2/3/4/5/6"
834  * and then
835  * thread 1 is doing "mv /a/b /1/2/3/4/5/6"
836  * thread 2 is doing "mv /1/2 /a/b/c/d/e/f"
837  *
838  * a possible consequence:
839  *	thread 1: check_path -> success
840  *	thread 2: check_path -> success
841  *	thread 1: modify directories -> block on row-level lock
842  *	thread 2: modify directories -> block on row-level lock
843  *			-> deadlock detected
844  *			-> rollback and retry
845  *
846  * another possible consequence:
847  *	thread 1: check_path -> success
848  *	thread 1: modify directory entries -> success
849  *	thread 2: check_path -> block on row-level lock
850  *	thread 1: commit
851  *	thread 2: acquire the lock and notices the row is updated
852  *			-> serialization error
853  *			-> rollback and retry
854  *
855  * XXX it might be better to use real serializable transactions,
856  * which will be available for PostgreSQL 9.1
857  */
858 
859 int
860 check_path(struct Xconn *xc, fileid_t gate_fileid, fileid_t child_fileid)
861 {
862 	static struct cmd *c;
863 	fileid_t parent_fileid;
864 	struct fetchstatus s;
865 	int error;
866 
867 	CREATECMD(c,
868 		"WITH RECURSIVE r AS "
869 		"( "
870 				"SELECT parent_fileid, cookie, child_fileid "
871 				"FROM dirent "
872 				"WHERE child_fileid = $1 "
873 			"UNION ALL "
874 				"SELECT d.parent_fileid, d.cookie, "
875 				"d.child_fileid "
876 				"FROM dirent AS d INNER JOIN r "
877 				"ON d.child_fileid = r.parent_fileid "
878 		") "
879 		"SELECT d.parent_fileid "
880 		"FROM dirent d "
881 		"JOIN r "
882 		"ON d.cookie = r.cookie "
883 		"FOR SHARE", INT8OID);
884 	error = sendcmd(xc, c, child_fileid);
885 	if (error != 0) {
886 		return error;
887 	}
888 	fetchinit(&s, xc);
889 	do {
890 		static const Oid types[] = { INT8OID, };
891 
892 		error = FETCHNEXT(&s, types, &parent_fileid);
893 		if (error == ENOENT) {
894 			fetchdone(&s);
895 			return 0;
896 		}
897 		if (error != 0) {
898 			fetchdone(&s);
899 			return error;
900 		}
901 	} while (gate_fileid != parent_fileid);
902 	fetchdone(&s);
903 	return EINVAL;
904 }
905 
906 int
907 isempty(struct Xconn *xc, fileid_t fileid, bool *emptyp)
908 {
909 	int32_t dummy;
910 	static struct cmd *c;
911 	int error;
912 
913 	CREATECMD(c,
914 		"SELECT 1 FROM dirent "
915 		"WHERE parent_fileid = $1 LIMIT 1", INT8OID);
916 	error = sendcmd(xc, c, fileid);
917 	if (error != 0) {
918 		return error;
919 	}
920 	error = simplefetch(xc, INT4OID, &dummy);
921 	assert(error != 0 || dummy == 1);
922 	if (error == ENOENT) {
923 		*emptyp = true;
924 		error = 0;
925 	} else {
926 		*emptyp = false;
927 	}
928 	return error;
929 }
930