xref: /openbsd-src/usr.bin/rsync/downloader.c (revision 4b70baf6e17fc8b27fc1f7fa7929335753fa94c3)
1 /*	$Id: downloader.c,v 1.19 2019/04/02 11:05:55 deraadt Exp $ */
2 /*
3  * Copyright (c) 2019 Kristaps Dzonsons <kristaps@bsd.lv>
4  *
5  * Permission to use, copy, modify, and distribute this software for any
6  * purpose with or without fee is hereby granted, provided that the above
7  * copyright notice and this permission notice appear in all copies.
8  *
9  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
10  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
11  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
12  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
13  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
14  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
15  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
16  */
17 #include <sys/mman.h>
18 #include <sys/stat.h>
19 
20 #include <assert.h>
21 #include <errno.h>
22 #include <fcntl.h>
23 #include <inttypes.h>
24 #include <math.h>
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <string.h>
28 #include <time.h>
29 #include <unistd.h>
30 
31 #include <openssl/md4.h>
32 
33 #include "extern.h"
34 
35 /*
36  * A small optimisation: have a 1 MB pre-write buffer.
37  * Disable the pre-write buffer by having this be zero.
38  * (It doesn't affect performance much.)
39  */
40 #define	OBUF_SIZE	(1024 * 1024)
41 
42 enum	downloadst {
43 	DOWNLOAD_READ_NEXT = 0,
44 	DOWNLOAD_READ_LOCAL,
45 	DOWNLOAD_READ_REMOTE
46 };
47 
48 /*
49  * Like struct upload, but used to keep track of what we're downloading.
50  * This also is managed by the receiver process.
51  */
52 struct	download {
53 	enum downloadst	    state; /* state of affairs */
54 	size_t		    idx; /* index of current file */
55 	struct blkset	    blk; /* its blocks */
56 	void		   *map; /* mmap of current file */
57 	size_t		    mapsz; /* length of mapsz */
58 	int		    ofd; /* open origin file */
59 	int		    fd; /* open output file */
60 	char		   *fname; /* output filename */
61 	MD4_CTX		    ctx; /* current hashing context */
62 	off_t		    downloaded; /* total downloaded */
63 	off_t		    total; /* total in file */
64 	const struct flist *fl; /* file list */
65 	size_t		    flsz; /* size of file list */
66 	int		    rootfd; /* destination directory */
67 	int		    fdin; /* read descriptor from sender */
68 	char		   *obuf; /* pre-write buffer */
69 	size_t		    obufsz; /* current size of obuf */
70 	size_t		    obufmax; /* max size we'll wbuffer */
71 };
72 
73 
74 /*
75  * Simply log the filename.
76  */
77 static void
78 log_file(struct sess *sess,
79 	const struct download *dl, const struct flist *f)
80 {
81 	float		 frac, tot = dl->total;
82 	int		 prec = 0;
83 	const char	*unit = "B";
84 
85 	if (sess->opts->server)
86 		return;
87 
88 	frac = (dl->total == 0) ? 100.0 :
89 		100.0 * dl->downloaded / dl->total;
90 
91 	if (dl->total > 1024 * 1024 * 1024) {
92 		tot = dl->total / (1024. * 1024. * 1024.);
93 		prec = 3;
94 		unit = "GB";
95 	} else if (dl->total > 1024 * 1024) {
96 		tot = dl->total / (1024. * 1024.);
97 		prec = 2;
98 		unit = "MB";
99 	} else if (dl->total > 1024) {
100 		tot = dl->total / 1024.;
101 		prec = 1;
102 		unit = "KB";
103 	}
104 
105 	LOG1(sess, "%s (%.*f %s, %.1f%% downloaded)",
106 	    f->path, prec, tot, unit, frac);
107 }
108 
109 /*
110  * Reinitialise a download context w/o overwriting the persistent parts
111  * of the structure (like p->fl or p->flsz) for index "idx".
112  * The MD4 context is pre-seeded.
113  */
114 static void
115 download_reinit(struct sess *sess, struct download *p, size_t idx)
116 {
117 	int32_t seed = htole32(sess->seed);
118 
119 	assert(p->state == DOWNLOAD_READ_NEXT);
120 
121 	p->idx = idx;
122 	memset(&p->blk, 0, sizeof(struct blkset));
123 	p->map = MAP_FAILED;
124 	p->mapsz = 0;
125 	p->ofd = -1;
126 	p->fd = -1;
127 	p->fname = NULL;
128 	MD4_Init(&p->ctx);
129 	p->downloaded = p->total = 0;
130 	/* Don't touch p->fl. */
131 	/* Don't touch p->flsz. */
132 	/* Don't touch p->rootfd. */
133 	/* Don't touch p->fdin. */
134 	MD4_Update(&p->ctx, &seed, sizeof(int32_t));
135 }
136 
137 /*
138  * Free a download context.
139  * If "cleanup" is non-zero, we also try to clean up the temporary file,
140  * assuming that it has been opened in p->fd.
141  */
142 static void
143 download_cleanup(struct download *p, int cleanup)
144 {
145 
146 	if (p->map != MAP_FAILED) {
147 		assert(p->mapsz);
148 		munmap(p->map, p->mapsz);
149 		p->map = MAP_FAILED;
150 		p->mapsz = 0;
151 	}
152 	if (p->ofd != -1) {
153 		close(p->ofd);
154 		p->ofd = -1;
155 	}
156 	if (p->fd != -1) {
157 		close(p->fd);
158 		if (cleanup && p->fname != NULL)
159 			unlinkat(p->rootfd, p->fname, 0);
160 		p->fd = -1;
161 	}
162 	free(p->fname);
163 	p->fname = NULL;
164 	p->state = DOWNLOAD_READ_NEXT;
165 }
166 
167 /*
168  * Initial allocation of the download object using the file list "fl" of
169  * size "flsz", the destination "rootfd", and the sender read "fdin".
170  * Returns NULL on allocation failure.
171  * On success, download_free() must be called with the pointer.
172  */
173 struct download *
174 download_alloc(struct sess *sess, int fdin,
175 	const struct flist *fl, size_t flsz, int rootfd)
176 {
177 	struct download	*p;
178 
179 	if ((p = malloc(sizeof(struct download))) == NULL) {
180 		ERR(sess, "malloc");
181 		return NULL;
182 	}
183 
184 	p->state = DOWNLOAD_READ_NEXT;
185 	p->fl = fl;
186 	p->flsz = flsz;
187 	p->rootfd = rootfd;
188 	p->fdin = fdin;
189 	download_reinit(sess, p, 0);
190 	p->obufsz = 0;
191 	p->obuf = NULL;
192 	p->obufmax = OBUF_SIZE;
193 	if (p->obufmax && (p->obuf = malloc(p->obufmax)) == NULL) {
194 		ERR(sess, "malloc");
195 		free(p);
196 		return NULL;
197 	}
198 	return p;
199 }
200 
201 /*
202  * Perform all cleanups (including removing stray files) and free.
203  * Passing a NULL to this function is ok.
204  */
205 void
206 download_free(struct download *p)
207 {
208 
209 	if (p == NULL)
210 		return;
211 	download_cleanup(p, 1);
212 	free(p->obuf);
213 	free(p);
214 }
215 
216 /*
217  * Optimisation: instead of dumping directly into the output file, keep
218  * a buffer and write as much as we can into the buffer.
219  * That way, we can avoid calling write() too much, and instead call it
220  * with big buffers.
221  * To flush the buffer w/o changing it, pass 0 as "sz".
222  * Returns zero on failure, non-zero on success.
223  */
224 static int
225 buf_copy(struct sess *sess,
226 	const char *buf, size_t sz, struct download *p)
227 {
228 	size_t	 rem, tocopy;
229 	ssize_t	 ssz;
230 
231 	assert(p->obufsz <= p->obufmax);
232 
233 	/*
234 	 * Copy as much as we can.
235 	 * If we've copied everything, exit.
236 	 * If we have no pre-write buffer (obufmax of zero), this never
237 	 * gets called, so we never buffer anything.
238 	 */
239 
240 	if (sz && p->obufsz < p->obufmax) {
241 		assert(p->obuf != NULL);
242 		rem = p->obufmax - p->obufsz;
243 		assert(rem > 0);
244 		tocopy = rem < sz ? rem : sz;
245 		memcpy(p->obuf + p->obufsz, buf, tocopy);
246 		sz -= tocopy;
247 		buf += tocopy;
248 		p->obufsz += tocopy;
249 		assert(p->obufsz <= p->obufmax);
250 		if (sz == 0)
251 			return 1;
252 	}
253 
254 	/* Drain the main buffer. */
255 
256 	if (p->obufsz) {
257 		assert(p->obufmax);
258 		assert(p->obufsz <= p->obufmax);
259 		assert(p->obuf != NULL);
260 		if ((ssz = write(p->fd, p->obuf, p->obufsz)) < 0) {
261 			ERR(sess, "%s: write", p->fname);
262 			return 0;
263 		} else if ((size_t)ssz != p->obufsz) {
264 			ERRX(sess, "%s: short write", p->fname);
265 			return 0;
266 		}
267 		p->obufsz = 0;
268 	}
269 
270 	/*
271 	 * Now drain anything left.
272 	 * If we have no pre-write buffer, this is it.
273 	 */
274 
275 	if (sz) {
276 		if ((ssz = write(p->fd, buf, sz)) < 0) {
277 			ERR(sess, "%s: write", p->fname);
278 			return 0;
279 		} else if ((size_t)ssz != sz) {
280 			ERRX(sess, "%s: short write", p->fname);
281 			return 0;
282 		}
283 	}
284 	return 1;
285 }
286 
287 /*
288  * The downloader waits on a file the sender is going to give us, opens
289  * and mmaps the existing file, opens a temporary file, dumps the file
290  * (or metadata) into the temporary file, then renames.
291  * This happens in several possible phases to avoid blocking.
292  * Returns <0 on failure, 0 on no more data (end of phase), >0 on
293  * success (more data to be read from the sender).
294  */
295 int
296 rsync_downloader(struct download *p, struct sess *sess, int *ofd)
297 {
298 	int		 c;
299 	int32_t		 idx, rawtok;
300 	const struct flist *f;
301 	size_t		 sz, tok;
302 	struct stat	 st;
303 	char		*buf = NULL;
304 	unsigned char	 ourmd[MD4_DIGEST_LENGTH],
305 			 md[MD4_DIGEST_LENGTH];
306 
307 	/*
308 	 * If we don't have a download already in session, then the next
309 	 * one is coming in.
310 	 * Read either the stop (phase) signal from the sender or block
311 	 * metadata, in which case we open our file and wait for data.
312 	 */
313 
314 	if (p->state == DOWNLOAD_READ_NEXT) {
315 		if (!io_read_int(sess, p->fdin, &idx)) {
316 			ERRX1(sess, "io_read_int");
317 			return -1;
318 		} else if (idx >= 0 && (size_t)idx >= p->flsz) {
319 			ERRX(sess, "index out of bounds");
320 			return -1;
321 		} else if (idx < 0) {
322 			LOG3(sess, "downloader: phase complete");
323 			return 0;
324 		}
325 
326 		/* Short-circuit: dry_run mode does nothing. */
327 
328 		if (sess->opts->dry_run)
329 			return 1;
330 
331 		/*
332 		 * Now get our block information.
333 		 * This is all we'll need to reconstruct the file from
334 		 * the map, as block sizes are regular.
335 		 */
336 
337 		download_reinit(sess, p, idx);
338 		if (!blk_send_ack(sess, p->fdin, &p->blk)) {
339 			ERRX1(sess, "blk_send_ack");
340 			goto out;
341 		}
342 
343 		/*
344 		 * Next, we want to open the existing file for using as
345 		 * block input.
346 		 * We do this in a non-blocking way, so if the open
347 		 * succeeds, then we'll go reentrant til the file is
348 		 * readable and we can mmap() it.
349 		 * Set the file descriptor that we want to wait for.
350 		 */
351 
352 		p->state = DOWNLOAD_READ_LOCAL;
353 		f = &p->fl[idx];
354 		p->ofd = openat(p->rootfd, f->path, O_RDONLY | O_NONBLOCK, 0);
355 
356 		if (p->ofd == -1 && errno != ENOENT) {
357 			ERR(sess, "%s: openat", f->path);
358 			goto out;
359 		} else if (p->ofd != -1) {
360 			*ofd = p->ofd;
361 			return 1;
362 		}
363 
364 		/* Fall-through: there's no file. */
365 	}
366 
367 	/*
368 	 * At this point, the server is sending us data and we want to
369 	 * hoover it up as quickly as possible or we'll deadlock.
370 	 * We want to be pulling off of f->fdin as quickly as possible,
371 	 * so perform as much buffering as we can.
372 	 */
373 
374 	f = &p->fl[p->idx];
375 
376 	/*
377 	 * Next in sequence: we have an open download session but
378 	 * haven't created our temporary file.
379 	 * This means that we've already opened (or tried to open) the
380 	 * original file in a nonblocking way, and we can map it.
381 	 */
382 
383 	if (p->state == DOWNLOAD_READ_LOCAL) {
384 		assert(p->fname == NULL);
385 
386 		/*
387 		 * Try to fstat() the file descriptor if valid and make
388 		 * sure that we're still a regular file.
389 		 * Then, if it has non-zero size, mmap() it for hashing.
390 		 */
391 
392 		if (p->ofd != -1 &&
393 		    fstat(p->ofd, &st) == -1) {
394 			ERR(sess, "%s: fstat", f->path);
395 			goto out;
396 		} else if (p->ofd != -1 && !S_ISREG(st.st_mode)) {
397 			WARNX(sess, "%s: not regular", f->path);
398 			goto out;
399 		}
400 
401 		if (p->ofd != -1 && st.st_size > 0) {
402 			p->mapsz = st.st_size;
403 			p->map = mmap(NULL, p->mapsz,
404 				PROT_READ, MAP_SHARED, p->ofd, 0);
405 			if (p->map == MAP_FAILED) {
406 				ERR(sess, "%s: mmap", f->path);
407 				goto out;
408 			}
409 		}
410 
411 		/* Success either way: we don't need this. */
412 
413 		*ofd = -1;
414 
415 		/* Create the temporary file. */
416 
417 		if (mktemplate(sess, &p->fname,
418 		    f->path, sess->opts->recursive) == -1) {
419 			ERRX1(sess, "mktemplate");
420 			goto out;
421 		}
422 
423 		if ((p->fd = mkstempat(p->rootfd, p->fname)) == -1) {
424 			ERR(sess, "mkstempat");
425 			goto out;
426 		}
427 
428 		/*
429 		 * FIXME: we can technically wait until the temporary
430 		 * file is writable, but since it's guaranteed to be
431 		 * empty, I don't think this is a terribly expensive
432 		 * operation as it doesn't involve reading the file into
433 		 * memory beforehand.
434 		 */
435 
436 		LOG3(sess, "%s: temporary: %s", f->path, p->fname);
437 		p->state = DOWNLOAD_READ_REMOTE;
438 		return 1;
439 	}
440 
441 	/*
442 	 * This matches the sequence in blk_flush().
443 	 * If we've gotten here, then we have a possibly-open map file
444 	 * (not for new files) and our temporary file is writable.
445 	 * We read the size/token, then optionally the data.
446 	 * The size >0 for reading data, 0 for no more data, and <0 for
447 	 * a token indicator.
448 	 */
449 
450 again:
451 	assert(p->state == DOWNLOAD_READ_REMOTE);
452 	assert(p->fname != NULL);
453 	assert(p->fd != -1);
454 	assert(p->fdin != -1);
455 
456 	if (!io_read_int(sess, p->fdin, &rawtok)) {
457 		ERRX1(sess, "io_read_int");
458 		goto out;
459 	}
460 
461 	if (rawtok > 0) {
462 		sz = rawtok;
463 		if ((buf = malloc(sz)) == NULL) {
464 			ERR(sess, "realloc");
465 			goto out;
466 		}
467 		if (!io_read_buf(sess, p->fdin, buf, sz)) {
468 			ERRX1(sess, "io_read_int");
469 			goto out;
470 		} else if (!buf_copy(sess, buf, sz, p)) {
471 			ERRX1(sess, "buf_copy");
472 			goto out;
473 		}
474 		p->total += sz;
475 		p->downloaded += sz;
476 		LOG4(sess, "%s: received %zu B block", p->fname, sz);
477 		MD4_Update(&p->ctx, buf, sz);
478 		free(buf);
479 
480 		/* Fast-track more reads as they arrive. */
481 
482 		if ((c = io_read_check(sess, p->fdin)) < 0) {
483 			ERRX1(sess, "io_read_check");
484 			goto out;
485 		} else if (c > 0)
486 			goto again;
487 
488 		return 1;
489 	} else if (rawtok < 0) {
490 		tok = -rawtok - 1;
491 		if (tok >= p->blk.blksz) {
492 			ERRX(sess,
493 			    "%s: token not in block set: %zu (have %zu blocks)",
494 			    p->fname, tok, p->blk.blksz);
495 			goto out;
496 		}
497 		sz = tok == p->blk.blksz - 1 ? p->blk.rem : p->blk.len;
498 		assert(sz);
499 		assert(p->map != MAP_FAILED);
500 		buf = p->map + (tok * p->blk.len);
501 
502 		/*
503 		 * Now we read from our block.
504 		 * We should only be at this point if we have a
505 		 * block to read from, i.e., if we were able to
506 		 * map our origin file and create a block
507 		 * profile from it.
508 		 */
509 
510 		assert(p->map != MAP_FAILED);
511 		if (!buf_copy(sess, buf, sz, p)) {
512 			ERRX1(sess, "buf_copy");
513 			goto out;
514 		}
515 		p->total += sz;
516 		LOG4(sess, "%s: copied %zu B", p->fname, sz);
517 		MD4_Update(&p->ctx, buf, sz);
518 
519 		/* Fast-track more reads as they arrive. */
520 
521 		if ((c = io_read_check(sess, p->fdin)) < 0) {
522 			ERRX1(sess, "io_read_check");
523 			goto out;
524 		} else if (c > 0)
525 			goto again;
526 
527 		return 1;
528 	}
529 
530 	if (!buf_copy(sess, NULL, 0, p)) {
531 		ERRX1(sess, "buf_copy");
532 		goto out;
533 	}
534 
535 	assert(rawtok == 0);
536 	assert(p->obufsz == 0);
537 
538 	/*
539 	 * Make sure our resulting MD4 hashes match.
540 	 * FIXME: if the MD4 hashes don't match, then our file has
541 	 * changed out from under us.
542 	 * This should require us to re-run the sequence in another
543 	 * phase.
544 	 */
545 
546 	MD4_Final(ourmd, &p->ctx);
547 
548 	if (!io_read_buf(sess, p->fdin, md, MD4_DIGEST_LENGTH)) {
549 		ERRX1(sess, "io_read_buf");
550 		goto out;
551 	} else if (memcmp(md, ourmd, MD4_DIGEST_LENGTH)) {
552 		ERRX(sess, "%s: hash does not match", p->fname);
553 		goto out;
554 	}
555 
556 	/* Adjust our file metadata (uid, mode, etc.). */
557 
558 	if (!rsync_set_metadata(sess, 1, p->fd, f, p->fname)) {
559 		ERRX1(sess, "rsync_set_metadata");
560 		goto out;
561 	}
562 
563 	/* Finally, rename the temporary to the real file. */
564 
565 	if (renameat(p->rootfd, p->fname, p->rootfd, f->path) == -1) {
566 		ERR(sess, "%s: renameat: %s", p->fname, f->path);
567 		goto out;
568 	}
569 
570 	log_file(sess, p, f);
571 	download_cleanup(p, 0);
572 	return 1;
573 out:
574 	download_cleanup(p, 1);
575 	return -1;
576 }
577