1 /* $Id: downloader.c,v 1.8 2019/02/13 05:41:35 tb Exp $ */ 2 /* 3 * Copyright (c) 2019 Kristaps Dzonsons <kristaps@bsd.lv> 4 * 5 * Permission to use, copy, modify, and distribute this software for any 6 * purpose with or without fee is hereby granted, provided that the above 7 * copyright notice and this permission notice appear in all copies. 8 * 9 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 10 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 11 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 12 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 13 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 14 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 15 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 16 */ 17 #include <sys/mman.h> 18 #include <sys/stat.h> 19 20 #include <assert.h> 21 #include <errno.h> 22 #include <fcntl.h> 23 #include <inttypes.h> 24 #include <math.h> 25 #include <poll.h> 26 #include <stdio.h> 27 #include <stdlib.h> 28 #include <string.h> 29 #include <time.h> 30 #include <unistd.h> 31 32 #include <openssl/md4.h> 33 34 #include "extern.h" 35 36 /* 37 * A small optimisation: have a 1 MB pre-write buffer. 38 * Disable the pre-write buffer by having this be zero. 39 * (It doesn't affect performance much.) 40 */ 41 #define OBUF_SIZE (1024 * 1024) 42 43 enum downloadst { 44 DOWNLOAD_READ_NEXT = 0, 45 DOWNLOAD_READ_LOCAL, 46 DOWNLOAD_READ_REMOTE 47 }; 48 49 /* 50 * Like struct upload, but used to keep track of what we're downloading. 51 * This also is managed by the receiver process. 52 */ 53 struct download { 54 enum downloadst state; /* state of affairs */ 55 size_t idx; /* index of current file */ 56 struct blkset blk; /* its blocks */ 57 void *map; /* mmap of current file */ 58 size_t mapsz; /* length of mapsz */ 59 int ofd; /* open origin file */ 60 int fd; /* open output file */ 61 char *fname; /* output filename */ 62 MD4_CTX ctx; /* current hashing context */ 63 off_t downloaded; /* total downloaded */ 64 off_t total; /* total in file */ 65 const struct flist *fl; /* file list */ 66 size_t flsz; /* size of file list */ 67 int rootfd; /* destination directory */ 68 int fdin; /* read descriptor from sender */ 69 char *obuf; /* pre-write buffer */ 70 size_t obufsz; /* current size of obuf */ 71 size_t obufmax; /* max size we'll wbuffer */ 72 }; 73 74 75 /* 76 * Simply log the filename. 77 */ 78 static void 79 log_file(struct sess *sess, 80 const struct download *dl, const struct flist *f) 81 { 82 float frac, tot = dl->total; 83 int prec = 0; 84 const char *unit = "B"; 85 86 if (sess->opts->server) 87 return; 88 89 frac = 0 == dl->total ? 100.0 : 90 100.0 * dl->downloaded / dl->total; 91 92 if (dl->total > 1024 * 1024 * 1024) { 93 tot = dl->total / (1024. * 1024. * 1024.); 94 prec = 3; 95 unit = "GB"; 96 } else if (dl->total > 1024 * 1024) { 97 tot = dl->total / (1024. * 1024.); 98 prec = 2; 99 unit = "MB"; 100 } else if (dl->total > 1024) { 101 tot = dl->total / 1024.; 102 prec = 1; 103 unit = "KB"; 104 } 105 106 LOG1(sess, "%s (%.*f %s, %.1f%% downloaded)", 107 f->path, prec, tot, unit, frac); 108 } 109 110 /* 111 * Reinitialise a download context w/o overwriting the persistent parts 112 * of the structure (like p->fl or p->flsz) for index "idx". 113 * The MD4 context is pre-seeded. 114 */ 115 static void 116 download_reinit(struct sess *sess, struct download *p, size_t idx) 117 { 118 int32_t seed = htole32(sess->seed); 119 120 assert(p->state == DOWNLOAD_READ_NEXT); 121 122 p->idx = idx; 123 memset(&p->blk, 0, sizeof(struct blkset)); 124 p->map = MAP_FAILED; 125 p->mapsz = 0; 126 p->ofd = -1; 127 p->fd = -1; 128 p->fname = NULL; 129 MD4_Init(&p->ctx); 130 p->downloaded = p->total = 0; 131 /* Don't touch p->fl. */ 132 /* Don't touch p->flsz. */ 133 /* Don't touch p->rootfd. */ 134 /* Don't touch p->fdin. */ 135 MD4_Update(&p->ctx, &seed, sizeof(int32_t)); 136 } 137 138 /* 139 * Free a download context. 140 * If "cleanup" is non-zero, we also try to clean up the temporary file, 141 * assuming that it has been opened in p->fd. 142 */ 143 static void 144 download_cleanup(struct download *p, int cleanup) 145 { 146 147 if (p->map != MAP_FAILED) { 148 assert(p->mapsz); 149 munmap(p->map, p->mapsz); 150 p->map = MAP_FAILED; 151 p->mapsz = 0; 152 } 153 if (p->ofd != -1) { 154 close(p->ofd); 155 p->ofd = -1; 156 } 157 if (p->fd != -1) { 158 close(p->fd); 159 if (cleanup && p->fname != NULL) 160 unlinkat(p->rootfd, p->fname, 0); 161 p->fd = -1; 162 } 163 free(p->fname); 164 p->fname = NULL; 165 p->state = DOWNLOAD_READ_NEXT; 166 } 167 168 /* 169 * Initial allocation of the download object using the file list "fl" of 170 * size "flsz", the destination "rootfd", and the sender read "fdin". 171 * Returns NULL on allocation failure. 172 * On success, download_free() must be called with the pointer. 173 */ 174 struct download * 175 download_alloc(struct sess *sess, int fdin, 176 const struct flist *fl, size_t flsz, int rootfd) 177 { 178 struct download *p; 179 180 if ((p = malloc(sizeof(struct download))) == NULL) { 181 ERR(sess, "malloc"); 182 return NULL; 183 } 184 185 p->state = DOWNLOAD_READ_NEXT; 186 p->fl = fl; 187 p->flsz = flsz; 188 p->rootfd = rootfd; 189 p->fdin = fdin; 190 download_reinit(sess, p, 0); 191 p->obufsz = 0; 192 p->obuf = NULL; 193 p->obufmax = OBUF_SIZE; 194 if (p->obufmax && (p->obuf = malloc(p->obufmax)) == NULL) { 195 ERR(sess, "malloc"); 196 free(p); 197 return NULL; 198 } 199 return p; 200 } 201 202 /* 203 * Perform all cleanups (including removing stray files) and free. 204 * Passing a NULL to this function is ok. 205 */ 206 void 207 download_free(struct download *p) 208 { 209 210 if (p == NULL) 211 return; 212 download_cleanup(p, 1); 213 free(p->obuf); 214 free(p); 215 } 216 217 /* 218 * Optimisation: instead of dumping directly into the output file, keep 219 * a buffer and write as much as we can into the buffer. 220 * That way, we can avoid calling write() too much, and instead call it 221 * with big buffers. 222 * To flush the buffer w/o changing it, pass 0 as "sz". 223 * Returns zero on failure, non-zero on success. 224 */ 225 static int 226 buf_copy(struct sess *sess, 227 const char *buf, size_t sz, struct download *p) 228 { 229 size_t rem, tocopy; 230 ssize_t ssz; 231 232 assert(p->obufsz <= p->obufmax); 233 234 /* 235 * Copy as much as we can. 236 * If we've copied everything, exit. 237 * If we have no pre-write buffer (obufmax of zero), this never 238 * gets called, so we never buffer anything. 239 */ 240 241 if (sz && p->obufsz < p->obufmax) { 242 assert(p->obuf != NULL); 243 rem = p->obufmax - p->obufsz; 244 assert(rem > 0); 245 tocopy = rem < sz ? rem : sz; 246 memcpy(p->obuf + p->obufsz, buf, tocopy); 247 sz -= tocopy; 248 buf += tocopy; 249 p->obufsz += tocopy; 250 assert(p->obufsz <= p->obufmax); 251 if (sz == 0) 252 return 1; 253 } 254 255 /* Drain the main buffer. */ 256 257 if (p->obufsz) { 258 assert(p->obufmax); 259 assert(p->obufsz <= p->obufmax); 260 assert(p->obuf != NULL); 261 if ((ssz = write(p->fd, p->obuf, p->obufsz)) < 0) { 262 ERR(sess, "%s: write", p->fname); 263 return 0; 264 } else if ((size_t)ssz != p->obufsz) { 265 ERRX(sess, "%s: short write", p->fname); 266 return 0; 267 } 268 p->obufsz = 0; 269 } 270 271 /* 272 * Now drain anything left. 273 * If we have no pre-write buffer, this is it. 274 */ 275 276 if (sz) { 277 if ((ssz = write(p->fd, buf, sz)) < 0) { 278 ERR(sess, "%s: write", p->fname); 279 return 0; 280 } else if ((size_t)ssz != sz) { 281 ERRX(sess, "%s: short write", p->fname); 282 return 0; 283 } 284 } 285 return 1; 286 } 287 288 /* 289 * The downloader waits on a file the sender is going to give us, opens 290 * and mmaps the existing file, opens a temporary file, dumps the file 291 * (or metadata) into the temporary file, then renames. 292 * This happens in several possible phases to avoid blocking. 293 * Returns <0 on failure, 0 on no more data (end of phase), >0 on 294 * success (more data to be read from the sender). 295 */ 296 int 297 rsync_downloader(struct download *p, struct sess *sess, int *ofd) 298 { 299 int32_t idx, rawtok; 300 uint32_t hash; 301 const struct flist *f; 302 size_t sz, dirlen, tok; 303 const char *cp; 304 mode_t perm; 305 struct stat st; 306 char *buf = NULL; 307 unsigned char ourmd[MD4_DIGEST_LENGTH], 308 md[MD4_DIGEST_LENGTH]; 309 struct timespec tv[2]; 310 311 /* 312 * If we don't have a download already in session, then the next 313 * one is coming in. 314 * Read either the stop (phase) signal from the sender or block 315 * metadata, in which case we open our file and wait for data. 316 */ 317 318 if (p->state == DOWNLOAD_READ_NEXT) { 319 if (!io_read_int(sess, p->fdin, &idx)) { 320 ERRX1(sess, "io_read_int"); 321 return -1; 322 } else if (idx >= 0 && (size_t)idx >= p->flsz) { 323 ERRX(sess, "index out of bounds"); 324 return -1; 325 } else if (idx < 0) { 326 LOG3(sess, "downloader: phase complete"); 327 return 0; 328 } 329 330 /* Short-circuit: dry_run mode does nothing. */ 331 332 if (sess->opts->dry_run) 333 return 1; 334 335 /* 336 * Now get our block information. 337 * This is all we'll need to reconstruct the file from 338 * the map, as block sizes are regular. 339 */ 340 341 download_reinit(sess, p, idx); 342 if (!blk_send_ack(sess, p->fdin, &p->blk)) { 343 ERRX1(sess, "blk_send_ack"); 344 goto out; 345 } 346 347 /* 348 * Next, we want to open the existing file for using as 349 * block input. 350 * We do this in a non-blocking way, so if the open 351 * succeeds, then we'll go reentrant til the file is 352 * readable and we can mmap() it. 353 * Set the file descriptor that we want to wait for. 354 */ 355 356 p->state = DOWNLOAD_READ_LOCAL; 357 f = &p->fl[idx]; 358 p->ofd = openat(p->rootfd, f->path, O_RDONLY | O_NONBLOCK, 0); 359 360 if (p->ofd == -1 && errno != ENOENT) { 361 ERR(sess, "%s: openat", f->path); 362 goto out; 363 } else if (p->ofd != -1) { 364 *ofd = p->ofd; 365 return 1; 366 } 367 368 /* Fall-through: there's no file. */ 369 } 370 371 /* 372 * At this point, the server is sending us data and we want to 373 * hoover it up as quickly as possible or we'll deadlock. 374 * We want to be pulling off of f->fdin as quickly as possible, 375 * so perform as much buffering as we can. 376 */ 377 378 f = &p->fl[p->idx]; 379 380 /* 381 * Next in sequence: we have an open download session but 382 * haven't created our temporary file. 383 * This means that we've already opened (or tried to open) the 384 * original file in a nonblocking way, and we can map it. 385 */ 386 387 if (p->state == DOWNLOAD_READ_LOCAL) { 388 assert(p->fname == NULL); 389 390 /* 391 * Try to fstat() the file descriptor if valid and make 392 * sure that we're still a regular file. 393 * Then, if it has non-zero size, mmap() it for hashing. 394 */ 395 396 if (p->ofd != -1 && 397 fstat(p->ofd, &st) == -1) { 398 ERR(sess, "%s: fstat", f->path); 399 goto out; 400 } else if (p->ofd != -1 && !S_ISREG(st.st_mode)) { 401 WARNX(sess, "%s: not regular", f->path); 402 goto out; 403 } 404 405 if (p->ofd != -1 && st.st_size > 0) { 406 p->mapsz = st.st_size; 407 p->map = mmap(NULL, p->mapsz, 408 PROT_READ, MAP_SHARED, p->ofd, 0); 409 if (p->map == MAP_FAILED) { 410 ERR(sess, "%s: mmap", f->path); 411 goto out; 412 } 413 } 414 415 /* Success either way: we don't need this. */ 416 417 *ofd = -1; 418 419 /* 420 * Create the temporary file. 421 * Use a simple scheme of path/.FILE.RANDOM, where we 422 * fill in RANDOM with an arc4random number. 423 * The tricky part is getting into the directory if 424 * we're in recursive mode. 425 */ 426 427 hash = arc4random(); 428 if (sess->opts->recursive && 429 NULL != (cp = strrchr(f->path, '/'))) { 430 dirlen = cp - f->path; 431 if (asprintf(&p->fname, "%.*s/.%s.%" PRIu32, 432 (int)dirlen, f->path, 433 f->path + dirlen + 1, hash) < 0) 434 p->fname = NULL; 435 } else { 436 if (asprintf(&p->fname, ".%s.%" PRIu32, 437 f->path, hash) < 0) 438 p->fname = NULL; 439 } 440 if (p->fname == NULL) { 441 ERR(sess, "asprintf"); 442 goto out; 443 } 444 445 /* 446 * Inherit permissions from the source file if we're new 447 * or specifically told with -p. 448 */ 449 450 if (!sess->opts->preserve_perms) 451 perm = -1 == p->ofd ? f->st.mode : st.st_mode; 452 else 453 perm = f->st.mode; 454 455 p->fd = openat(p->rootfd, p->fname, 456 O_APPEND|O_WRONLY|O_CREAT|O_EXCL, perm); 457 458 if (p->fd == -1) { 459 ERR(sess, "%s: openat", p->fname); 460 goto out; 461 } 462 463 /* 464 * FIXME: we can technically wait until the temporary 465 * file is writable, but since it's guaranteed to be 466 * empty, I don't think this is a terribly expensive 467 * operation as it doesn't involve reading the file into 468 * memory beforehand. 469 */ 470 471 LOG3(sess, "%s: temporary: %s", f->path, p->fname); 472 p->state = DOWNLOAD_READ_REMOTE; 473 return 1; 474 } 475 476 /* 477 * This matches the sequence in blk_flush(). 478 * If we've gotten here, then we have a possibly-open map file 479 * (not for new files) and our temporary file is writable. 480 * We read the size/token, then optionally the data. 481 * The size >0 for reading data, 0 for no more data, and <0 for 482 * a token indicator. 483 */ 484 485 assert(p->state == DOWNLOAD_READ_REMOTE); 486 assert(p->fname != NULL); 487 assert(p->fd != -1); 488 assert(p->fdin != -1); 489 490 if (!io_read_int(sess, p->fdin, &rawtok)) { 491 ERRX1(sess, "io_read_int"); 492 goto out; 493 } 494 495 if (rawtok > 0) { 496 sz = rawtok; 497 if ((buf = malloc(sz)) == NULL) { 498 ERR(sess, "realloc"); 499 goto out; 500 } 501 if (!io_read_buf(sess, p->fdin, buf, sz)) { 502 ERRX1(sess, "io_read_int"); 503 goto out; 504 } else if (!buf_copy(sess, buf, sz, p)) { 505 ERRX1(sess, "buf_copy"); 506 goto out; 507 } 508 p->total += sz; 509 p->downloaded += sz; 510 LOG4(sess, "%s: received %zu B block", p->fname, sz); 511 MD4_Update(&p->ctx, buf, sz); 512 free(buf); 513 return 1; 514 } else if (rawtok < 0) { 515 tok = -rawtok - 1; 516 if (tok >= p->blk.blksz) { 517 ERRX(sess, "%s: token not in block " 518 "set: %zu (have %zu blocks)", 519 p->fname, tok, p->blk.blksz); 520 goto out; 521 } 522 sz = tok == p->blk.blksz - 1 ? p->blk.rem : p->blk.len; 523 assert(sz); 524 assert(p->map != MAP_FAILED); 525 buf = p->map + (tok * p->blk.len); 526 527 /* 528 * Now we read from our block. 529 * We should only be at this point if we have a 530 * block to read from, i.e., if we were able to 531 * map our origin file and create a block 532 * profile from it. 533 */ 534 535 assert(p->map != MAP_FAILED); 536 if (!buf_copy(sess, buf, sz, p)) { 537 ERRX1(sess, "buf_copy"); 538 goto out; 539 } 540 p->total += sz; 541 LOG4(sess, "%s: copied %zu B", p->fname, sz); 542 MD4_Update(&p->ctx, buf, sz); 543 return 1; 544 } 545 546 if (!buf_copy(sess, NULL, 0, p)) { 547 ERRX1(sess, "buf_copy"); 548 goto out; 549 } 550 551 assert(rawtok == 0); 552 assert(p->obufsz == 0); 553 554 /* 555 * Make sure our resulting MD4 hashes match. 556 * FIXME: if the MD4 hashes don't match, then our file has 557 * changed out from under us. 558 * This should require us to re-run the sequence in another 559 * phase. 560 */ 561 562 MD4_Final(ourmd, &p->ctx); 563 564 if (!io_read_buf(sess, p->fdin, md, MD4_DIGEST_LENGTH)) { 565 ERRX1(sess, "io_read_buf"); 566 goto out; 567 } else if (memcmp(md, ourmd, MD4_DIGEST_LENGTH)) { 568 ERRX(sess, "%s: hash does not match", p->fname); 569 goto out; 570 } 571 572 /* 573 * Conditionally adjust group id. 574 * FIXME: remember the original file's group id and don't 575 * reassign it if it's the same. 576 * If we have an EPERM, report it but continue on: this just 577 * means that we're mapping into an unknown (or disallowed) 578 * group identifier. 579 */ 580 581 if (sess->opts->preserve_gids) { 582 if (fchown(p->fd, -1, f->st.gid) == -1) { 583 if (errno != EPERM) { 584 ERR(sess, "%s: fchown", p->fname); 585 goto out; 586 } 587 WARNX(sess, "%s: gid unknown or not available " 588 "to user: %u", f->path, f->st.gid); 589 } else 590 LOG4(sess, "%s: updated gid", f->path); 591 } 592 593 /* Conditionally adjust file modification time. */ 594 595 if (sess->opts->preserve_times) { 596 tv[0].tv_sec = time(NULL); 597 tv[0].tv_nsec = 0; 598 tv[1].tv_sec = f->st.mtime; 599 tv[1].tv_nsec = 0; 600 if (futimens(p->fd, tv) == -1) { 601 ERR(sess, "%s: futimens", p->fname); 602 goto out; 603 } 604 LOG4(sess, "%s: updated date", f->path); 605 } 606 607 /* Finally, rename the temporary to the real file. */ 608 609 if (renameat(p->rootfd, p->fname, p->rootfd, f->path) == -1) { 610 ERR(sess, "%s: renameat: %s", p->fname, f->path); 611 goto out; 612 } 613 614 log_file(sess, p, f); 615 download_cleanup(p, 0); 616 return 1; 617 out: 618 download_cleanup(p, 1); 619 return -1; 620 } 621