xref: /openbsd-src/usr.bin/rsync/uploader.c (revision 097a140d792de8b2bbe59ad827d39eabf9b4280a)
1 /*	$Id: uploader.c,v 1.24 2021/03/22 11:20:04 claudio Exp $ */
2 /*
3  * Copyright (c) 2019 Kristaps Dzonsons <kristaps@bsd.lv>
4  * Copyright (c) 2019 Florian Obser <florian@openbsd.org>
5  *
6  * Permission to use, copy, modify, and distribute this software for any
7  * purpose with or without fee is hereby granted, provided that the above
8  * copyright notice and this permission notice appear in all copies.
9  *
10  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17  */
18 #include <sys/mman.h>
19 #include <sys/stat.h>
20 
21 #include <assert.h>
22 #include <errno.h>
23 #include <fcntl.h>
24 #include <inttypes.h>
25 #include <math.h>
26 #include <stdio.h>
27 #include <stdlib.h>
28 #include <string.h>
29 #include <time.h>
30 #include <unistd.h>
31 
32 #include "extern.h"
33 
34 enum	uploadst {
35 	UPLOAD_FIND_NEXT = 0, /* find next to upload to sender */
36 	UPLOAD_WRITE_LOCAL, /* wait to write to sender */
37 	UPLOAD_READ_LOCAL, /* wait to read from local file */
38 	UPLOAD_FINISHED /* nothing more to do in phase */
39 };
40 
41 /*
42  * Used to keep track of data flowing from the receiver to the sender.
43  * This is managed by the receiver process.
44  */
45 struct	upload {
46 	enum uploadst	    state;
47 	char		   *buf; /* if not NULL, pending upload */
48 	size_t		    bufsz; /* size of buf */
49 	size_t		    bufmax; /* maximum size of buf */
50 	size_t		    bufpos; /* position in buf */
51 	size_t		    idx; /* current transfer index */
52 	mode_t		    oumask; /* umask for creating files */
53 	char		   *root; /* destination directory path */
54 	int		    rootfd; /* destination directory */
55 	size_t		    csumlen; /* checksum length */
56 	int		    fdout; /* write descriptor to sender */
57 	const struct flist *fl; /* file list */
58 	size_t		    flsz; /* size of file list */
59 	int		   *newdir; /* non-zero if mkdir'd */
60 };
61 
62 /*
63  * Log a directory by emitting the file and a trailing slash, just to
64  * show the operator that we're a directory.
65  */
66 static void
67 log_dir(struct sess *sess, const struct flist *f)
68 {
69 	size_t	 sz;
70 
71 	if (sess->opts->server)
72 		return;
73 	sz = strlen(f->path);
74 	assert(sz > 0);
75 	LOG1("%s%s", f->path, (f->path[sz - 1] == '/') ? "" : "/");
76 }
77 
78 /*
79  * Log a link by emitting the file and the target, just to show the
80  * operator that we're a link.
81  */
82 static void
83 log_link(struct sess *sess, const struct flist *f)
84 {
85 
86 	if (!sess->opts->server)
87 		LOG1("%s -> %s", f->path, f->link);
88 }
89 
90 /*
91  * Simply log the filename.
92  */
93 static void
94 log_file(struct sess *sess, const struct flist *f)
95 {
96 
97 	if (!sess->opts->server)
98 		LOG1("%s", f->path);
99 }
100 
101 /*
102  * Prepare the overall block set's metadata.
103  * We always have at least one block.
104  * The block size is an important part of the algorithm.
105  * I use the same heuristic as the reference rsync, but implemented in a
106  * bit more of a straightforward way.
107  * In general, the individual block length is the rounded square root of
108  * the total file size.
109  * The minimum block length is 700.
110  */
111 static void
112 init_blkset(struct blkset *p, off_t sz)
113 {
114 	double	 v;
115 
116 	if (sz >= (BLOCK_SIZE_MIN * BLOCK_SIZE_MIN)) {
117 		/* Simple rounded-up integer square root. */
118 
119 		v = sqrt(sz);
120 		p->len = ceil(v);
121 
122 		/*
123 		 * Always be a multiple of eight.
124 		 * There's no reason to do this, but rsync does.
125 		 */
126 
127 		if ((p->len % 8) > 0)
128 			p->len += 8 - (p->len % 8);
129 	} else
130 		p->len = BLOCK_SIZE_MIN;
131 
132 	p->size = sz;
133 	if ((p->blksz = sz / p->len) == 0)
134 		p->rem = sz;
135 	else
136 		p->rem = sz % p->len;
137 
138 	/* If we have a remainder, then we need an extra block. */
139 
140 	if (p->rem)
141 		p->blksz++;
142 }
143 
144 /*
145  * For each block, prepare the block's metadata.
146  * We use the mapped "map" file to set our checksums.
147  */
148 static void
149 init_blk(struct blk *p, const struct blkset *set, off_t offs,
150 	size_t idx, const void *map, const struct sess *sess)
151 {
152 
153 	p->idx = idx;
154 	/* Block length inherits for all but the last. */
155 	p->len = idx < set->blksz - 1 ? set->len : set->rem;
156 	p->offs = offs;
157 
158 	p->chksum_short = hash_fast(map, p->len);
159 	hash_slow(map, p->len, p->chksum_long, sess);
160 }
161 
162 /*
163  * Handle a symbolic link.
164  * If we encounter directories existing in the symbolic link's place,
165  * then try to unlink the directory.
166  * Otherwise, simply overwrite with the symbolic link by renaming.
167  * Return <0 on failure 0 on success.
168  */
169 static int
170 pre_link(struct upload *p, struct sess *sess)
171 {
172 	struct stat		 st;
173 	const struct flist	*f;
174 	int			 rc, newlink = 0, updatelink = 0;
175 	char			*b, *temp = NULL;
176 
177 	f = &p->fl[p->idx];
178 	assert(S_ISLNK(f->st.mode));
179 
180 	if (!sess->opts->preserve_links) {
181 		WARNX("%s: ignoring symlink", f->path);
182 		return 0;
183 	} else if (sess->opts->dry_run) {
184 		log_link(sess, f);
185 		return 0;
186 	}
187 
188 	/*
189 	 * See if the symlink already exists.
190 	 * If it's a directory, then try to unlink the directory prior
191 	 * to overwriting with a symbolic link.
192 	 * If it's a non-directory, we just overwrite it.
193 	 */
194 
195 	assert(p->rootfd != -1);
196 	rc = fstatat(p->rootfd, f->path, &st, AT_SYMLINK_NOFOLLOW);
197 	if (rc != -1 && !S_ISLNK(st.st_mode)) {
198 		if (S_ISDIR(st.st_mode) &&
199 		    unlinkat(p->rootfd, f->path, AT_REMOVEDIR) == -1) {
200 			ERR("%s: unlinkat", f->path);
201 			return -1;
202 		}
203 		rc = -1;
204 	} else if (rc == -1 && errno != ENOENT) {
205 		ERR("%s: fstatat", f->path);
206 		return -1;
207 	}
208 
209 	/*
210 	 * If the symbolic link already exists, then make sure that it
211 	 * points to the correct place.
212 	 */
213 
214 	if (rc != -1) {
215 		b = symlinkat_read(p->rootfd, f->path);
216 		if (b == NULL) {
217 			ERRX1("symlinkat_read");
218 			return -1;
219 		}
220 		if (strcmp(f->link, b)) {
221 			free(b);
222 			b = NULL;
223 			LOG3("%s: updating symlink: %s", f->path, f->link);
224 			updatelink = 1;
225 		}
226 		free(b);
227 		b = NULL;
228 	}
229 
230 	/*
231 	 * Create the temporary file as a symbolic link, then rename the
232 	 * temporary file as the real one, overwriting anything there.
233 	 */
234 
235 	if (rc == -1 || updatelink) {
236 		LOG3("%s: creating symlink: %s", f->path, f->link);
237 		if (mktemplate(&temp, f->path, sess->opts->recursive) == -1) {
238 			ERRX1("mktemplate");
239 			return -1;
240 		}
241 		if (mkstemplinkat(f->link, p->rootfd, temp) == NULL) {
242 			ERR("mkstemplinkat");
243 			free(temp);
244 			return -1;
245 		}
246 		newlink = 1;
247 	}
248 
249 	rsync_set_metadata_at(sess, newlink,
250 		p->rootfd, f, newlink ? temp : f->path);
251 
252 	if (newlink) {
253 		if (renameat(p->rootfd, temp, p->rootfd, f->path) == -1) {
254 			ERR("%s: renameat %s", temp, f->path);
255 			(void)unlinkat(p->rootfd, temp, 0);
256 			free(temp);
257 			return -1;
258 		}
259 		free(temp);
260 	}
261 
262 	log_link(sess, f);
263 	return 0;
264 }
265 
266 /*
267  * See pre_link(), but for devices.
268  * FIXME: this is very similar to the other pre_xxx() functions.
269  * Return <0 on failure 0 on success.
270  */
271 static int
272 pre_dev(struct upload *p, struct sess *sess)
273 {
274 	struct stat		 st;
275 	const struct flist	*f;
276 	int			 rc, newdev = 0, updatedev = 0;
277 	char			*temp = NULL;
278 
279 	f = &p->fl[p->idx];
280 	assert(S_ISBLK(f->st.mode) || S_ISCHR(f->st.mode));
281 
282 	if (!sess->opts->devices || getuid() != 0) {
283 		WARNX("skipping non-regular file %s", f->path);
284 		return 0;
285 	} else if (sess->opts->dry_run) {
286 		log_file(sess, f);
287 		return 0;
288 	}
289 
290 	/*
291 	 * See if the dev already exists.
292 	 * If a non-device exists in its place, we'll replace that.
293 	 * If it replaces a directory, remove the directory first.
294 	 */
295 
296 	assert(p->rootfd != -1);
297 	rc = fstatat(p->rootfd, f->path, &st, AT_SYMLINK_NOFOLLOW);
298 
299 	if (rc != -1 && !(S_ISBLK(st.st_mode) || S_ISCHR(st.st_mode))) {
300 		if (S_ISDIR(st.st_mode) &&
301 		    unlinkat(p->rootfd, f->path, AT_REMOVEDIR) == -1) {
302 			ERR("%s: unlinkat", f->path);
303 			return -1;
304 		}
305 		rc = -1;
306 	} else if (rc == -1 && errno != ENOENT) {
307 		ERR("%s: fstatat", f->path);
308 		return -1;
309 	}
310 
311 	/* Make sure existing device is of the correct type. */
312 
313 	if (rc != -1) {
314 		if ((f->st.mode & (S_IFCHR|S_IFBLK)) !=
315 		    (st.st_mode & (S_IFCHR|S_IFBLK)) ||
316 		    f->st.rdev != st.st_rdev) {
317 			LOG3("%s: updating device", f->path);
318 			updatedev = 1;
319 		}
320 	}
321 
322 	if (rc == -1 || updatedev) {
323 		newdev = 1;
324 		if (mktemplate(&temp, f->path, sess->opts->recursive) == -1) {
325 			ERRX1("mktemplate");
326 			return -1;
327 		}
328 		if (mkstempnodat(p->rootfd, temp,
329 		    f->st.mode & (S_IFCHR|S_IFBLK), f->st.rdev) == NULL) {
330 			ERR("mkstempnodat");
331 			free(temp);
332 			return -1;
333 		}
334 	}
335 
336 	rsync_set_metadata_at(sess, newdev,
337 	    p->rootfd, f, newdev ? temp : f->path);
338 
339 	if (newdev) {
340 		if (renameat(p->rootfd, temp, p->rootfd, f->path) == -1) {
341 			ERR("%s: renameat %s", temp, f->path);
342 			(void)unlinkat(p->rootfd, temp, 0);
343 			free(temp);
344 			return -1;
345 		}
346 		free(temp);
347 	}
348 
349 	log_file(sess, f);
350 	return 0;
351 }
352 
353 /*
354  * See pre_link(), but for FIFOs.
355  * FIXME: this is very similar to the other pre_xxx() functions.
356  * Return <0 on failure 0 on success.
357  */
358 static int
359 pre_fifo(struct upload *p, struct sess *sess)
360 {
361 	struct stat		 st;
362 	const struct flist	*f;
363 	int			 rc, newfifo = 0;
364 	char			*temp = NULL;
365 
366 	f = &p->fl[p->idx];
367 	assert(S_ISFIFO(f->st.mode));
368 
369 	if (!sess->opts->specials) {
370 		WARNX("skipping non-regular file %s", f->path);
371 		return 0;
372 	} else if (sess->opts->dry_run) {
373 		log_file(sess, f);
374 		return 0;
375 	}
376 
377 	/*
378 	 * See if the fifo already exists.
379 	 * If it exists as a non-FIFO, unlink it (if a directory) then
380 	 * mark it from replacement.
381 	 */
382 
383 	assert(p->rootfd != -1);
384 	rc = fstatat(p->rootfd, f->path, &st, AT_SYMLINK_NOFOLLOW);
385 
386 	if (rc != -1 && !S_ISFIFO(st.st_mode)) {
387 		if (S_ISDIR(st.st_mode) &&
388 		    unlinkat(p->rootfd, f->path, AT_REMOVEDIR) == -1) {
389 			ERR("%s: unlinkat", f->path);
390 			return -1;
391 		}
392 		rc = -1;
393 	} else if (rc == -1 && errno != ENOENT) {
394 		ERR("%s: fstatat", f->path);
395 		return -1;
396 	}
397 
398 	if (rc == -1) {
399 		newfifo = 1;
400 		if (mktemplate(&temp, f->path, sess->opts->recursive) == -1) {
401 			ERRX1("mktemplate");
402 			return -1;
403 		}
404 		if (mkstempfifoat(p->rootfd, temp) == NULL) {
405 			ERR("mkstempfifoat");
406 			free(temp);
407 			return -1;
408 		}
409 	}
410 
411 	rsync_set_metadata_at(sess, newfifo,
412 		p->rootfd, f, newfifo ? temp : f->path);
413 
414 	if (newfifo) {
415 		if (renameat(p->rootfd, temp, p->rootfd, f->path) == -1) {
416 			ERR("%s: renameat %s", temp, f->path);
417 			(void)unlinkat(p->rootfd, temp, 0);
418 			free(temp);
419 			return -1;
420 		}
421 		free(temp);
422 	}
423 
424 	log_file(sess, f);
425 	return 0;
426 }
427 
428 /*
429  * See pre_link(), but for socket files.
430  * FIXME: this is very similar to the other pre_xxx() functions.
431  * Return <0 on failure 0 on success.
432  */
433 static int
434 pre_sock(struct upload *p, struct sess *sess)
435 {
436 	struct stat		 st;
437 	const struct flist	*f;
438 	int			 rc, newsock = 0;
439 	char			*temp = NULL;
440 
441 	f = &p->fl[p->idx];
442 	assert(S_ISSOCK(f->st.mode));
443 
444 	if (!sess->opts->specials) {
445 		WARNX("skipping non-regular file %s", f->path);
446 		return 0;
447 	} else if (sess->opts->dry_run) {
448 		log_file(sess, f);
449 		return 0;
450 	}
451 
452 	/*
453 	 * See if the fifo already exists.
454 	 * If it exists as a non-FIFO, unlink it (if a directory) then
455 	 * mark it from replacement.
456 	 */
457 
458 	assert(p->rootfd != -1);
459 	rc = fstatat(p->rootfd, f->path, &st, AT_SYMLINK_NOFOLLOW);
460 
461 	if (rc != -1 && !S_ISSOCK(st.st_mode)) {
462 		if (S_ISDIR(st.st_mode) &&
463 		    unlinkat(p->rootfd, f->path, AT_REMOVEDIR) == -1) {
464 			ERR("%s: unlinkat", f->path);
465 			return -1;
466 		}
467 		rc = -1;
468 	} else if (rc == -1 && errno != ENOENT) {
469 		ERR("%s: fstatat", f->path);
470 		return -1;
471 	}
472 
473 	if (rc == -1) {
474 		newsock = 1;
475 		if (mktemplate(&temp, f->path, sess->opts->recursive) == -1) {
476 			ERRX1("mktemplate");
477 			return -1;
478 		}
479 		if (mkstempsock(p->root, temp) == NULL) {
480 			ERR("mkstempsock");
481 			free(temp);
482 			return -1;
483 		}
484 	}
485 
486 	rsync_set_metadata_at(sess, newsock,
487 		p->rootfd, f, newsock ? temp : f->path);
488 
489 	if (newsock) {
490 		if (renameat(p->rootfd, temp, p->rootfd, f->path) == -1) {
491 			ERR("%s: renameat %s", temp, f->path);
492 			(void)unlinkat(p->rootfd, temp, 0);
493 			free(temp);
494 			return -1;
495 		}
496 		free(temp);
497 	}
498 
499 	log_file(sess, f);
500 	return 0;
501 }
502 
503 /*
504  * If not found, create the destination directory in prefix order.
505  * Create directories using the existing umask.
506  * Return <0 on failure 0 on success.
507  */
508 static int
509 pre_dir(const struct upload *p, struct sess *sess)
510 {
511 	struct stat	 st;
512 	int		 rc;
513 	const struct flist *f;
514 
515 	f = &p->fl[p->idx];
516 	assert(S_ISDIR(f->st.mode));
517 
518 	if (!sess->opts->recursive) {
519 		WARNX("%s: ignoring directory", f->path);
520 		return 0;
521 	} else if (sess->opts->dry_run) {
522 		log_dir(sess, f);
523 		return 0;
524 	}
525 
526 	assert(p->rootfd != -1);
527 	rc = fstatat(p->rootfd, f->path, &st, AT_SYMLINK_NOFOLLOW);
528 
529 	if (rc == -1 && errno != ENOENT) {
530 		ERR("%s: fstatat", f->path);
531 		return -1;
532 	} else if (rc != -1 && !S_ISDIR(st.st_mode)) {
533 		ERRX("%s: not a directory", f->path);
534 		return -1;
535 	} else if (rc != -1) {
536 		/*
537 		 * FIXME: we should fchmod the permissions here as well,
538 		 * as we may locally have shut down writing into the
539 		 * directory and that doesn't work.
540 		 */
541 		LOG3("%s: updating directory", f->path);
542 		return 0;
543 	}
544 
545 	/*
546 	 * We want to make the directory with default permissions (using
547 	 * our old umask, which we've since unset), then adjust
548 	 * permissions (assuming preserve_perms or new) afterward in
549 	 * case it's u-w or something.
550 	 */
551 
552 	LOG3("%s: creating directory", f->path);
553 	if (mkdirat(p->rootfd, f->path, 0777 & ~p->oumask) == -1) {
554 		ERR("%s: mkdirat", f->path);
555 		return -1;
556 	}
557 
558 	p->newdir[p->idx] = 1;
559 	log_dir(sess, f);
560 	return 0;
561 }
562 
563 /*
564  * Process the directory time and mode for "idx" in the file list.
565  * Returns zero on failure, non-zero on success.
566  */
567 static int
568 post_dir(struct sess *sess, const struct upload *u, size_t idx)
569 {
570 	struct timespec	 tv[2];
571 	int		 rc;
572 	struct stat	 st;
573 	const struct flist *f;
574 
575 	f = &u->fl[idx];
576 	assert(S_ISDIR(f->st.mode));
577 
578 	/* We already warned about the directory in pre_process_dir(). */
579 
580 	if (!sess->opts->recursive)
581 		return 1;
582 	else if (sess->opts->dry_run)
583 		return 1;
584 
585 	if (fstatat(u->rootfd, f->path, &st, AT_SYMLINK_NOFOLLOW) == -1) {
586 		ERR("%s: fstatat", f->path);
587 		return 0;
588 	} else if (!S_ISDIR(st.st_mode)) {
589 		WARNX("%s: not a directory", f->path);
590 		return 0;
591 	}
592 
593 	/*
594 	 * Update the modification time if we're a new directory *or* if
595 	 * we're preserving times and the time has changed.
596 	 * FIXME: run rsync_set_metadata()?
597 	 */
598 
599 	if (u->newdir[idx] ||
600 	    (sess->opts->preserve_times &&
601 	     st.st_mtime != f->st.mtime)) {
602 		tv[0].tv_sec = time(NULL);
603 		tv[0].tv_nsec = 0;
604 		tv[1].tv_sec = f->st.mtime;
605 		tv[1].tv_nsec = 0;
606 		rc = utimensat(u->rootfd, f->path, tv, 0);
607 		if (rc == -1) {
608 			ERR("%s: utimensat", f->path);
609 			return 0;
610 		}
611 		LOG4("%s: updated date", f->path);
612 	}
613 
614 	/*
615 	 * Update the mode if we're a new directory *or* if we're
616 	 * preserving modes and it has changed.
617 	 */
618 
619 	if (u->newdir[idx] ||
620 	    (sess->opts->preserve_perms && st.st_mode != f->st.mode)) {
621 		rc = fchmodat(u->rootfd, f->path, f->st.mode, 0);
622 		if (rc == -1) {
623 			ERR("%s: fchmodat", f->path);
624 			return 0;
625 		}
626 		LOG4("%s: updated mode", f->path);
627 	}
628 
629 	return 1;
630 }
631 
632 /*
633  * Try to open the file at the current index.
634  * If the file does not exist, returns with success.
635  * Return <0 on failure, 0 on success w/nothing to be done, >0 on
636  * success and the file needs attention.
637  */
638 static int
639 pre_file(const struct upload *p, int *filefd, struct sess *sess)
640 {
641 	const struct flist *f;
642 
643 	f = &p->fl[p->idx];
644 	assert(S_ISREG(f->st.mode));
645 
646 	if (sess->opts->dry_run) {
647 		log_file(sess, f);
648 		if (!io_write_int(sess, p->fdout, p->idx)) {
649 			ERRX1("io_write_int");
650 			return -1;
651 		}
652 		return 0;
653 	}
654 
655 	/*
656 	 * For non dry-run cases, we'll write the acknowledgement later
657 	 * in the rsync_uploader() function because we need to wait for
658 	 * the open() call to complete.
659 	 * If the call to openat() fails with ENOENT, there's a
660 	 * fast-path between here and the write function, so we won't do
661 	 * any blocking between now and then.
662 	 */
663 
664 	*filefd = openat(p->rootfd, f->path,
665 		O_RDONLY | O_NOFOLLOW | O_NONBLOCK, 0);
666 	if (*filefd != -1 || errno == ENOENT)
667 		return 1;
668 	ERR("%s: openat", f->path);
669 	return -1;
670 }
671 
672 /*
673  * Allocate an uploader object in the correct state to start.
674  * Returns NULL on failure or the pointer otherwise.
675  * On success, upload_free() must be called with the allocated pointer.
676  */
677 struct upload *
678 upload_alloc(const char *root, int rootfd, int fdout,
679 	size_t clen, const struct flist *fl, size_t flsz, mode_t msk)
680 {
681 	struct upload	*p;
682 
683 	if ((p = calloc(1, sizeof(struct upload))) == NULL) {
684 		ERR("calloc");
685 		return NULL;
686 	}
687 
688 	p->state = UPLOAD_FIND_NEXT;
689 	p->oumask = msk;
690 	p->root = strdup(root);
691 	if (p->root == NULL) {
692 		ERR("strdup");
693 		free(p);
694 		return NULL;
695 	}
696 	p->rootfd = rootfd;
697 	p->csumlen = clen;
698 	p->fdout = fdout;
699 	p->fl = fl;
700 	p->flsz = flsz;
701 	p->newdir = calloc(flsz, sizeof(int));
702 	if (p->newdir == NULL) {
703 		ERR("calloc");
704 		free(p->root);
705 		free(p);
706 		return NULL;
707 	}
708 	return p;
709 }
710 
711 /*
712  * Perform all cleanups and free.
713  * Passing a NULL to this function is ok.
714  */
715 void
716 upload_free(struct upload *p)
717 {
718 
719 	if (p == NULL)
720 		return;
721 	free(p->root);
722 	free(p->newdir);
723 	free(p->buf);
724 	free(p);
725 }
726 
727 /*
728  * Iterates through all available files and conditionally gets the file
729  * ready for processing to check whether it's up to date.
730  * If not up to date or empty, sends file information to the sender.
731  * If returns 0, we've processed all files there are to process.
732  * If returns >0, we're waiting for POLLIN or POLLOUT data.
733  * Otherwise returns <0, which is an error.
734  */
735 int
736 rsync_uploader(struct upload *u, int *fileinfd,
737 	struct sess *sess, int *fileoutfd)
738 {
739 	struct blkset	    blk;
740 	struct stat	    st;
741 	void		   *mbuf, *bufp;
742 	ssize_t		    msz;
743 	size_t		    i, pos, sz;
744 	off_t		    offs;
745 	int		    c;
746 	const struct flist *f;
747 
748 	/* This should never get called. */
749 
750 	assert(u->state != UPLOAD_FINISHED);
751 
752 	/*
753 	 * If we have an upload in progress, then keep writing until the
754 	 * buffer has been fully written.
755 	 * We must only have the output file descriptor working and also
756 	 * have a valid buffer to write.
757 	 */
758 
759 	if (u->state == UPLOAD_WRITE_LOCAL) {
760 		assert(u->buf != NULL);
761 		assert(*fileoutfd != -1);
762 		assert(*fileinfd == -1);
763 
764 		/*
765 		 * Unfortunately, we need to chunk these: if we're
766 		 * the server side of things, then we're multiplexing
767 		 * output and need to wrap this in chunks.
768 		 * This is a major deficiency of rsync.
769 		 * FIXME: add a "fast-path" mode that simply dumps out
770 		 * the buffer non-blocking if we're not mplexing.
771 		 */
772 
773 		if (u->bufpos < u->bufsz) {
774 			sz = MAX_CHUNK < (u->bufsz - u->bufpos) ?
775 				MAX_CHUNK : (u->bufsz - u->bufpos);
776 			c = io_write_buf(sess, u->fdout,
777 				u->buf + u->bufpos, sz);
778 			if (c == 0) {
779 				ERRX1("io_write_nonblocking");
780 				return -1;
781 			}
782 			u->bufpos += sz;
783 			if (u->bufpos < u->bufsz)
784 				return 1;
785 		}
786 
787 		/*
788 		 * Let the UPLOAD_FIND_NEXT state handle things if we
789 		 * finish, as we'll need to write a POLLOUT message and
790 		 * not have a writable descriptor yet.
791 		 */
792 
793 		u->state = UPLOAD_FIND_NEXT;
794 		u->idx++;
795 		return 1;
796 	}
797 
798 	/*
799 	 * If we invoke the uploader without a file currently open, then
800 	 * we iterate through til the next available regular file and
801 	 * start the opening process.
802 	 * This means we must have the output file descriptor working.
803 	 */
804 
805 	if (u->state == UPLOAD_FIND_NEXT) {
806 		assert(*fileinfd == -1);
807 		assert(*fileoutfd != -1);
808 
809 		for ( ; u->idx < u->flsz; u->idx++) {
810 			if (S_ISDIR(u->fl[u->idx].st.mode))
811 				c = pre_dir(u, sess);
812 			else if (S_ISLNK(u->fl[u->idx].st.mode))
813 				c = pre_link(u, sess);
814 			else if (S_ISREG(u->fl[u->idx].st.mode))
815 				c = pre_file(u, fileinfd, sess);
816 			else if (S_ISBLK(u->fl[u->idx].st.mode) ||
817 			    S_ISCHR(u->fl[u->idx].st.mode))
818 				c = pre_dev(u, sess);
819 			else if (S_ISFIFO(u->fl[u->idx].st.mode))
820 				c = pre_fifo(u, sess);
821 			else if (S_ISSOCK(u->fl[u->idx].st.mode))
822 				c = pre_sock(u, sess);
823 			else
824 				c = 0;
825 
826 			if (c < 0)
827 				return -1;
828 			else if (c > 0)
829 				break;
830 		}
831 
832 		/*
833 		 * Whether we've finished writing files or not, we
834 		 * disable polling on the output channel.
835 		 */
836 
837 		*fileoutfd = -1;
838 		if (u->idx == u->flsz) {
839 			assert(*fileinfd == -1);
840 			if (!io_write_int(sess, u->fdout, -1)) {
841 				ERRX1("io_write_int");
842 				return -1;
843 			}
844 			u->state = UPLOAD_FINISHED;
845 			LOG4("uploader: finished");
846 			return 0;
847 		}
848 
849 		/* Go back to the event loop, if necessary. */
850 
851 		u->state = (*fileinfd == -1) ?
852 			UPLOAD_WRITE_LOCAL : UPLOAD_READ_LOCAL;
853 		if (u->state == UPLOAD_READ_LOCAL)
854 			return 1;
855 	}
856 
857 	/*
858 	 * If an input file is open, stat it and see if it's already up
859 	 * to date, in which case close it and go to the next one.
860 	 * Either way, we don't have a write channel open.
861 	 */
862 
863 	if (u->state == UPLOAD_READ_LOCAL) {
864 		assert(*fileinfd != -1);
865 		assert(*fileoutfd == -1);
866 		f = &u->fl[u->idx];
867 
868 		if (fstat(*fileinfd, &st) == -1) {
869 			ERR("%s: fstat", f->path);
870 			close(*fileinfd);
871 			*fileinfd = -1;
872 			return -1;
873 		} else if (!S_ISREG(st.st_mode)) {
874 			ERRX("%s: not regular", f->path);
875 			close(*fileinfd);
876 			*fileinfd = -1;
877 			return -1;
878 		}
879 
880 		if (st.st_size == f->st.size &&
881 		    st.st_mtime == f->st.mtime) {
882 			LOG3("%s: skipping: up to date", f->path);
883 			if (!rsync_set_metadata
884 			    (sess, 0, *fileinfd, f, f->path)) {
885 				ERRX1("rsync_set_metadata");
886 				close(*fileinfd);
887 				*fileinfd = -1;
888 				return -1;
889 			}
890 			close(*fileinfd);
891 			*fileinfd = -1;
892 			*fileoutfd = u->fdout;
893 			u->state = UPLOAD_FIND_NEXT;
894 			u->idx++;
895 			return 1;
896 		}
897 
898 		/* Fallthrough... */
899 
900 		u->state = UPLOAD_WRITE_LOCAL;
901 	}
902 
903 	/* Initialies our blocks. */
904 
905 	assert(u->state == UPLOAD_WRITE_LOCAL);
906 	memset(&blk, 0, sizeof(struct blkset));
907 	blk.csum = u->csumlen;
908 
909 	if (*fileinfd != -1 && st.st_size > 0) {
910 		init_blkset(&blk, st.st_size);
911 		assert(blk.blksz);
912 
913 		blk.blks = calloc(blk.blksz, sizeof(struct blk));
914 		if (blk.blks == NULL) {
915 			ERR("calloc");
916 			close(*fileinfd);
917 			*fileinfd = -1;
918 			return -1;
919 		}
920 
921 		if ((mbuf = calloc(1, blk.len)) == NULL) {
922 			ERR("calloc");
923 			close(*fileinfd);
924 			*fileinfd = -1;
925 			return -1;
926 		}
927 
928 		offs = 0;
929 		i = 0;
930 		do {
931 			msz = pread(*fileinfd, mbuf, blk.len, offs);
932 			if (msz < 0) {
933 				ERR("pread");
934 				close(*fileinfd);
935 				*fileinfd = -1;
936 				return -1;
937 			}
938 			if ((size_t)msz != blk.len && (size_t)msz != blk.rem) {
939 				/* short read, try again */
940 				continue;
941 			}
942 			init_blk(&blk.blks[i], &blk, offs, i, mbuf, sess);
943 			offs += blk.len;
944 			LOG3(
945 			    "i=%ld, offs=%lld, msz=%ld, blk.len=%lu, blk.rem=%lu",
946 			    i, offs, msz, blk.len, blk.rem);
947 			i++;
948 		} while (i < blk.blksz);
949 
950 		close(*fileinfd);
951 		*fileinfd = -1;
952 		LOG3("%s: mapped %jd B with %zu blocks",
953 		    u->fl[u->idx].path, (intmax_t)blk.size,
954 		    blk.blksz);
955 	} else {
956 		if (*fileinfd != -1) {
957 			close(*fileinfd);
958 			*fileinfd = -1;
959 		}
960 		blk.len = MAX_CHUNK; /* Doesn't matter. */
961 		LOG3("%s: not mapped", u->fl[u->idx].path);
962 	}
963 
964 	assert(*fileinfd == -1);
965 
966 	/* Make sure the block metadata buffer is big enough. */
967 
968 	u->bufsz =
969 	     sizeof(int32_t) + /* identifier */
970 	     sizeof(int32_t) + /* block count */
971 	     sizeof(int32_t) + /* block length */
972 	     sizeof(int32_t) + /* checksum length */
973 	     sizeof(int32_t) + /* block remainder */
974 	     blk.blksz *
975 	     (sizeof(int32_t) + /* short checksum */
976 	      blk.csum); /* long checksum */
977 
978 	if (u->bufsz > u->bufmax) {
979 		if ((bufp = realloc(u->buf, u->bufsz)) == NULL) {
980 			ERR("realloc");
981 			return -1;
982 		}
983 		u->buf = bufp;
984 		u->bufmax = u->bufsz;
985 	}
986 
987 	u->bufpos = pos = 0;
988 	io_buffer_int(u->buf, &pos, u->bufsz, u->idx);
989 	io_buffer_int(u->buf, &pos, u->bufsz, blk.blksz);
990 	io_buffer_int(u->buf, &pos, u->bufsz, blk.len);
991 	io_buffer_int(u->buf, &pos, u->bufsz, blk.csum);
992 	io_buffer_int(u->buf, &pos, u->bufsz, blk.rem);
993 	for (i = 0; i < blk.blksz; i++) {
994 		io_buffer_int(u->buf, &pos, u->bufsz,
995 			blk.blks[i].chksum_short);
996 		io_buffer_buf(u->buf, &pos, u->bufsz,
997 			blk.blks[i].chksum_long, blk.csum);
998 	}
999 	assert(pos == u->bufsz);
1000 
1001 	/* Reenable the output poller and clean up. */
1002 
1003 	*fileoutfd = u->fdout;
1004 	free(blk.blks);
1005 	return 1;
1006 }
1007 
1008 /*
1009  * Fix up the directory permissions and times post-order.
1010  * We can't fix up directory permissions in place because the server may
1011  * want us to have overly-tight permissions---say, those that don't
1012  * allow writing into the directory.
1013  * We also need to do our directory times post-order because making
1014  * files within the directory will change modification times.
1015  * Returns zero on failure, non-zero on success.
1016  */
1017 int
1018 rsync_uploader_tail(struct upload *u, struct sess *sess)
1019 {
1020 	size_t	 i;
1021 
1022 
1023 	if (!sess->opts->preserve_times &&
1024 	    !sess->opts->preserve_perms)
1025 		return 1;
1026 
1027 	LOG2("fixing up directory times and permissions");
1028 
1029 	for (i = 0; i < u->flsz; i++)
1030 		if (S_ISDIR(u->fl[i].st.mode))
1031 			if (!post_dir(sess, u, i))
1032 				return 0;
1033 
1034 	return 1;
1035 }
1036