1 /* $Id: downloader.c,v 1.19 2019/04/02 11:05:55 deraadt Exp $ */ 2 /* 3 * Copyright (c) 2019 Kristaps Dzonsons <kristaps@bsd.lv> 4 * 5 * Permission to use, copy, modify, and distribute this software for any 6 * purpose with or without fee is hereby granted, provided that the above 7 * copyright notice and this permission notice appear in all copies. 8 * 9 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 10 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 11 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 12 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 13 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 14 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 15 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 16 */ 17 #include <sys/mman.h> 18 #include <sys/stat.h> 19 20 #include <assert.h> 21 #include <errno.h> 22 #include <fcntl.h> 23 #include <inttypes.h> 24 #include <math.h> 25 #include <stdio.h> 26 #include <stdlib.h> 27 #include <string.h> 28 #include <time.h> 29 #include <unistd.h> 30 31 #include <openssl/md4.h> 32 33 #include "extern.h" 34 35 /* 36 * A small optimisation: have a 1 MB pre-write buffer. 37 * Disable the pre-write buffer by having this be zero. 38 * (It doesn't affect performance much.) 39 */ 40 #define OBUF_SIZE (1024 * 1024) 41 42 enum downloadst { 43 DOWNLOAD_READ_NEXT = 0, 44 DOWNLOAD_READ_LOCAL, 45 DOWNLOAD_READ_REMOTE 46 }; 47 48 /* 49 * Like struct upload, but used to keep track of what we're downloading. 50 * This also is managed by the receiver process. 51 */ 52 struct download { 53 enum downloadst state; /* state of affairs */ 54 size_t idx; /* index of current file */ 55 struct blkset blk; /* its blocks */ 56 void *map; /* mmap of current file */ 57 size_t mapsz; /* length of mapsz */ 58 int ofd; /* open origin file */ 59 int fd; /* open output file */ 60 char *fname; /* output filename */ 61 MD4_CTX ctx; /* current hashing context */ 62 off_t downloaded; /* total downloaded */ 63 off_t total; /* total in file */ 64 const struct flist *fl; /* file list */ 65 size_t flsz; /* size of file list */ 66 int rootfd; /* destination directory */ 67 int fdin; /* read descriptor from sender */ 68 char *obuf; /* pre-write buffer */ 69 size_t obufsz; /* current size of obuf */ 70 size_t obufmax; /* max size we'll wbuffer */ 71 }; 72 73 74 /* 75 * Simply log the filename. 76 */ 77 static void 78 log_file(struct sess *sess, 79 const struct download *dl, const struct flist *f) 80 { 81 float frac, tot = dl->total; 82 int prec = 0; 83 const char *unit = "B"; 84 85 if (sess->opts->server) 86 return; 87 88 frac = (dl->total == 0) ? 100.0 : 89 100.0 * dl->downloaded / dl->total; 90 91 if (dl->total > 1024 * 1024 * 1024) { 92 tot = dl->total / (1024. * 1024. * 1024.); 93 prec = 3; 94 unit = "GB"; 95 } else if (dl->total > 1024 * 1024) { 96 tot = dl->total / (1024. * 1024.); 97 prec = 2; 98 unit = "MB"; 99 } else if (dl->total > 1024) { 100 tot = dl->total / 1024.; 101 prec = 1; 102 unit = "KB"; 103 } 104 105 LOG1(sess, "%s (%.*f %s, %.1f%% downloaded)", 106 f->path, prec, tot, unit, frac); 107 } 108 109 /* 110 * Reinitialise a download context w/o overwriting the persistent parts 111 * of the structure (like p->fl or p->flsz) for index "idx". 112 * The MD4 context is pre-seeded. 113 */ 114 static void 115 download_reinit(struct sess *sess, struct download *p, size_t idx) 116 { 117 int32_t seed = htole32(sess->seed); 118 119 assert(p->state == DOWNLOAD_READ_NEXT); 120 121 p->idx = idx; 122 memset(&p->blk, 0, sizeof(struct blkset)); 123 p->map = MAP_FAILED; 124 p->mapsz = 0; 125 p->ofd = -1; 126 p->fd = -1; 127 p->fname = NULL; 128 MD4_Init(&p->ctx); 129 p->downloaded = p->total = 0; 130 /* Don't touch p->fl. */ 131 /* Don't touch p->flsz. */ 132 /* Don't touch p->rootfd. */ 133 /* Don't touch p->fdin. */ 134 MD4_Update(&p->ctx, &seed, sizeof(int32_t)); 135 } 136 137 /* 138 * Free a download context. 139 * If "cleanup" is non-zero, we also try to clean up the temporary file, 140 * assuming that it has been opened in p->fd. 141 */ 142 static void 143 download_cleanup(struct download *p, int cleanup) 144 { 145 146 if (p->map != MAP_FAILED) { 147 assert(p->mapsz); 148 munmap(p->map, p->mapsz); 149 p->map = MAP_FAILED; 150 p->mapsz = 0; 151 } 152 if (p->ofd != -1) { 153 close(p->ofd); 154 p->ofd = -1; 155 } 156 if (p->fd != -1) { 157 close(p->fd); 158 if (cleanup && p->fname != NULL) 159 unlinkat(p->rootfd, p->fname, 0); 160 p->fd = -1; 161 } 162 free(p->fname); 163 p->fname = NULL; 164 p->state = DOWNLOAD_READ_NEXT; 165 } 166 167 /* 168 * Initial allocation of the download object using the file list "fl" of 169 * size "flsz", the destination "rootfd", and the sender read "fdin". 170 * Returns NULL on allocation failure. 171 * On success, download_free() must be called with the pointer. 172 */ 173 struct download * 174 download_alloc(struct sess *sess, int fdin, 175 const struct flist *fl, size_t flsz, int rootfd) 176 { 177 struct download *p; 178 179 if ((p = malloc(sizeof(struct download))) == NULL) { 180 ERR(sess, "malloc"); 181 return NULL; 182 } 183 184 p->state = DOWNLOAD_READ_NEXT; 185 p->fl = fl; 186 p->flsz = flsz; 187 p->rootfd = rootfd; 188 p->fdin = fdin; 189 download_reinit(sess, p, 0); 190 p->obufsz = 0; 191 p->obuf = NULL; 192 p->obufmax = OBUF_SIZE; 193 if (p->obufmax && (p->obuf = malloc(p->obufmax)) == NULL) { 194 ERR(sess, "malloc"); 195 free(p); 196 return NULL; 197 } 198 return p; 199 } 200 201 /* 202 * Perform all cleanups (including removing stray files) and free. 203 * Passing a NULL to this function is ok. 204 */ 205 void 206 download_free(struct download *p) 207 { 208 209 if (p == NULL) 210 return; 211 download_cleanup(p, 1); 212 free(p->obuf); 213 free(p); 214 } 215 216 /* 217 * Optimisation: instead of dumping directly into the output file, keep 218 * a buffer and write as much as we can into the buffer. 219 * That way, we can avoid calling write() too much, and instead call it 220 * with big buffers. 221 * To flush the buffer w/o changing it, pass 0 as "sz". 222 * Returns zero on failure, non-zero on success. 223 */ 224 static int 225 buf_copy(struct sess *sess, 226 const char *buf, size_t sz, struct download *p) 227 { 228 size_t rem, tocopy; 229 ssize_t ssz; 230 231 assert(p->obufsz <= p->obufmax); 232 233 /* 234 * Copy as much as we can. 235 * If we've copied everything, exit. 236 * If we have no pre-write buffer (obufmax of zero), this never 237 * gets called, so we never buffer anything. 238 */ 239 240 if (sz && p->obufsz < p->obufmax) { 241 assert(p->obuf != NULL); 242 rem = p->obufmax - p->obufsz; 243 assert(rem > 0); 244 tocopy = rem < sz ? rem : sz; 245 memcpy(p->obuf + p->obufsz, buf, tocopy); 246 sz -= tocopy; 247 buf += tocopy; 248 p->obufsz += tocopy; 249 assert(p->obufsz <= p->obufmax); 250 if (sz == 0) 251 return 1; 252 } 253 254 /* Drain the main buffer. */ 255 256 if (p->obufsz) { 257 assert(p->obufmax); 258 assert(p->obufsz <= p->obufmax); 259 assert(p->obuf != NULL); 260 if ((ssz = write(p->fd, p->obuf, p->obufsz)) < 0) { 261 ERR(sess, "%s: write", p->fname); 262 return 0; 263 } else if ((size_t)ssz != p->obufsz) { 264 ERRX(sess, "%s: short write", p->fname); 265 return 0; 266 } 267 p->obufsz = 0; 268 } 269 270 /* 271 * Now drain anything left. 272 * If we have no pre-write buffer, this is it. 273 */ 274 275 if (sz) { 276 if ((ssz = write(p->fd, buf, sz)) < 0) { 277 ERR(sess, "%s: write", p->fname); 278 return 0; 279 } else if ((size_t)ssz != sz) { 280 ERRX(sess, "%s: short write", p->fname); 281 return 0; 282 } 283 } 284 return 1; 285 } 286 287 /* 288 * The downloader waits on a file the sender is going to give us, opens 289 * and mmaps the existing file, opens a temporary file, dumps the file 290 * (or metadata) into the temporary file, then renames. 291 * This happens in several possible phases to avoid blocking. 292 * Returns <0 on failure, 0 on no more data (end of phase), >0 on 293 * success (more data to be read from the sender). 294 */ 295 int 296 rsync_downloader(struct download *p, struct sess *sess, int *ofd) 297 { 298 int c; 299 int32_t idx, rawtok; 300 const struct flist *f; 301 size_t sz, tok; 302 struct stat st; 303 char *buf = NULL; 304 unsigned char ourmd[MD4_DIGEST_LENGTH], 305 md[MD4_DIGEST_LENGTH]; 306 307 /* 308 * If we don't have a download already in session, then the next 309 * one is coming in. 310 * Read either the stop (phase) signal from the sender or block 311 * metadata, in which case we open our file and wait for data. 312 */ 313 314 if (p->state == DOWNLOAD_READ_NEXT) { 315 if (!io_read_int(sess, p->fdin, &idx)) { 316 ERRX1(sess, "io_read_int"); 317 return -1; 318 } else if (idx >= 0 && (size_t)idx >= p->flsz) { 319 ERRX(sess, "index out of bounds"); 320 return -1; 321 } else if (idx < 0) { 322 LOG3(sess, "downloader: phase complete"); 323 return 0; 324 } 325 326 /* Short-circuit: dry_run mode does nothing. */ 327 328 if (sess->opts->dry_run) 329 return 1; 330 331 /* 332 * Now get our block information. 333 * This is all we'll need to reconstruct the file from 334 * the map, as block sizes are regular. 335 */ 336 337 download_reinit(sess, p, idx); 338 if (!blk_send_ack(sess, p->fdin, &p->blk)) { 339 ERRX1(sess, "blk_send_ack"); 340 goto out; 341 } 342 343 /* 344 * Next, we want to open the existing file for using as 345 * block input. 346 * We do this in a non-blocking way, so if the open 347 * succeeds, then we'll go reentrant til the file is 348 * readable and we can mmap() it. 349 * Set the file descriptor that we want to wait for. 350 */ 351 352 p->state = DOWNLOAD_READ_LOCAL; 353 f = &p->fl[idx]; 354 p->ofd = openat(p->rootfd, f->path, O_RDONLY | O_NONBLOCK, 0); 355 356 if (p->ofd == -1 && errno != ENOENT) { 357 ERR(sess, "%s: openat", f->path); 358 goto out; 359 } else if (p->ofd != -1) { 360 *ofd = p->ofd; 361 return 1; 362 } 363 364 /* Fall-through: there's no file. */ 365 } 366 367 /* 368 * At this point, the server is sending us data and we want to 369 * hoover it up as quickly as possible or we'll deadlock. 370 * We want to be pulling off of f->fdin as quickly as possible, 371 * so perform as much buffering as we can. 372 */ 373 374 f = &p->fl[p->idx]; 375 376 /* 377 * Next in sequence: we have an open download session but 378 * haven't created our temporary file. 379 * This means that we've already opened (or tried to open) the 380 * original file in a nonblocking way, and we can map it. 381 */ 382 383 if (p->state == DOWNLOAD_READ_LOCAL) { 384 assert(p->fname == NULL); 385 386 /* 387 * Try to fstat() the file descriptor if valid and make 388 * sure that we're still a regular file. 389 * Then, if it has non-zero size, mmap() it for hashing. 390 */ 391 392 if (p->ofd != -1 && 393 fstat(p->ofd, &st) == -1) { 394 ERR(sess, "%s: fstat", f->path); 395 goto out; 396 } else if (p->ofd != -1 && !S_ISREG(st.st_mode)) { 397 WARNX(sess, "%s: not regular", f->path); 398 goto out; 399 } 400 401 if (p->ofd != -1 && st.st_size > 0) { 402 p->mapsz = st.st_size; 403 p->map = mmap(NULL, p->mapsz, 404 PROT_READ, MAP_SHARED, p->ofd, 0); 405 if (p->map == MAP_FAILED) { 406 ERR(sess, "%s: mmap", f->path); 407 goto out; 408 } 409 } 410 411 /* Success either way: we don't need this. */ 412 413 *ofd = -1; 414 415 /* Create the temporary file. */ 416 417 if (mktemplate(sess, &p->fname, 418 f->path, sess->opts->recursive) == -1) { 419 ERRX1(sess, "mktemplate"); 420 goto out; 421 } 422 423 if ((p->fd = mkstempat(p->rootfd, p->fname)) == -1) { 424 ERR(sess, "mkstempat"); 425 goto out; 426 } 427 428 /* 429 * FIXME: we can technically wait until the temporary 430 * file is writable, but since it's guaranteed to be 431 * empty, I don't think this is a terribly expensive 432 * operation as it doesn't involve reading the file into 433 * memory beforehand. 434 */ 435 436 LOG3(sess, "%s: temporary: %s", f->path, p->fname); 437 p->state = DOWNLOAD_READ_REMOTE; 438 return 1; 439 } 440 441 /* 442 * This matches the sequence in blk_flush(). 443 * If we've gotten here, then we have a possibly-open map file 444 * (not for new files) and our temporary file is writable. 445 * We read the size/token, then optionally the data. 446 * The size >0 for reading data, 0 for no more data, and <0 for 447 * a token indicator. 448 */ 449 450 again: 451 assert(p->state == DOWNLOAD_READ_REMOTE); 452 assert(p->fname != NULL); 453 assert(p->fd != -1); 454 assert(p->fdin != -1); 455 456 if (!io_read_int(sess, p->fdin, &rawtok)) { 457 ERRX1(sess, "io_read_int"); 458 goto out; 459 } 460 461 if (rawtok > 0) { 462 sz = rawtok; 463 if ((buf = malloc(sz)) == NULL) { 464 ERR(sess, "realloc"); 465 goto out; 466 } 467 if (!io_read_buf(sess, p->fdin, buf, sz)) { 468 ERRX1(sess, "io_read_int"); 469 goto out; 470 } else if (!buf_copy(sess, buf, sz, p)) { 471 ERRX1(sess, "buf_copy"); 472 goto out; 473 } 474 p->total += sz; 475 p->downloaded += sz; 476 LOG4(sess, "%s: received %zu B block", p->fname, sz); 477 MD4_Update(&p->ctx, buf, sz); 478 free(buf); 479 480 /* Fast-track more reads as they arrive. */ 481 482 if ((c = io_read_check(sess, p->fdin)) < 0) { 483 ERRX1(sess, "io_read_check"); 484 goto out; 485 } else if (c > 0) 486 goto again; 487 488 return 1; 489 } else if (rawtok < 0) { 490 tok = -rawtok - 1; 491 if (tok >= p->blk.blksz) { 492 ERRX(sess, 493 "%s: token not in block set: %zu (have %zu blocks)", 494 p->fname, tok, p->blk.blksz); 495 goto out; 496 } 497 sz = tok == p->blk.blksz - 1 ? p->blk.rem : p->blk.len; 498 assert(sz); 499 assert(p->map != MAP_FAILED); 500 buf = p->map + (tok * p->blk.len); 501 502 /* 503 * Now we read from our block. 504 * We should only be at this point if we have a 505 * block to read from, i.e., if we were able to 506 * map our origin file and create a block 507 * profile from it. 508 */ 509 510 assert(p->map != MAP_FAILED); 511 if (!buf_copy(sess, buf, sz, p)) { 512 ERRX1(sess, "buf_copy"); 513 goto out; 514 } 515 p->total += sz; 516 LOG4(sess, "%s: copied %zu B", p->fname, sz); 517 MD4_Update(&p->ctx, buf, sz); 518 519 /* Fast-track more reads as they arrive. */ 520 521 if ((c = io_read_check(sess, p->fdin)) < 0) { 522 ERRX1(sess, "io_read_check"); 523 goto out; 524 } else if (c > 0) 525 goto again; 526 527 return 1; 528 } 529 530 if (!buf_copy(sess, NULL, 0, p)) { 531 ERRX1(sess, "buf_copy"); 532 goto out; 533 } 534 535 assert(rawtok == 0); 536 assert(p->obufsz == 0); 537 538 /* 539 * Make sure our resulting MD4 hashes match. 540 * FIXME: if the MD4 hashes don't match, then our file has 541 * changed out from under us. 542 * This should require us to re-run the sequence in another 543 * phase. 544 */ 545 546 MD4_Final(ourmd, &p->ctx); 547 548 if (!io_read_buf(sess, p->fdin, md, MD4_DIGEST_LENGTH)) { 549 ERRX1(sess, "io_read_buf"); 550 goto out; 551 } else if (memcmp(md, ourmd, MD4_DIGEST_LENGTH)) { 552 ERRX(sess, "%s: hash does not match", p->fname); 553 goto out; 554 } 555 556 /* Adjust our file metadata (uid, mode, etc.). */ 557 558 if (!rsync_set_metadata(sess, 1, p->fd, f, p->fname)) { 559 ERRX1(sess, "rsync_set_metadata"); 560 goto out; 561 } 562 563 /* Finally, rename the temporary to the real file. */ 564 565 if (renameat(p->rootfd, p->fname, p->rootfd, f->path) == -1) { 566 ERR(sess, "%s: renameat: %s", p->fname, f->path); 567 goto out; 568 } 569 570 log_file(sess, p, f); 571 download_cleanup(p, 0); 572 return 1; 573 out: 574 download_cleanup(p, 1); 575 return -1; 576 } 577