1 /* $OpenBSD: rrdp.c,v 1.9 2021/04/21 09:36:06 claudio Exp $ */ 2 /* 3 * Copyright (c) 2020 Nils Fisher <nils_fisher@hotmail.com> 4 * Copyright (c) 2021 Claudio Jeker <claudio@openbsd.org> 5 * 6 * Permission to use, copy, modify, and distribute this software for any 7 * purpose with or without fee is hereby granted, provided that the above 8 * copyright notice and this permission notice appear in all copies. 9 * 10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 14 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 15 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 16 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 17 */ 18 #include <sys/queue.h> 19 #include <sys/stat.h> 20 21 #include <assert.h> 22 #include <ctype.h> 23 #include <err.h> 24 #include <errno.h> 25 #include <fcntl.h> 26 #include <limits.h> 27 #include <poll.h> 28 #include <string.h> 29 #include <unistd.h> 30 #include <imsg.h> 31 32 #include <expat.h> 33 #include <openssl/sha.h> 34 35 #include "extern.h" 36 #include "rrdp.h" 37 38 #define MAX_SESSIONS 12 39 #define READ_BUF_SIZE (32 * 1024) 40 41 static struct msgbuf msgq; 42 43 #define RRDP_STATE_REQ 0x01 44 #define RRDP_STATE_WAIT 0x02 45 #define RRDP_STATE_PARSE 0x04 46 #define RRDP_STATE_PARSE_ERROR 0x08 47 #define RRDP_STATE_PARSE_DONE 0x10 48 #define RRDP_STATE_HTTP_DONE 0x20 49 #define RRDP_STATE_DONE (RRDP_STATE_PARSE_DONE | RRDP_STATE_HTTP_DONE) 50 51 struct rrdp { 52 TAILQ_ENTRY(rrdp) entry; 53 size_t id; 54 char *notifyuri; 55 char *local; 56 char *last_mod; 57 58 struct pollfd *pfd; 59 int infd; 60 int state; 61 unsigned int file_pending; 62 unsigned int file_failed; 63 enum http_result res; 64 enum rrdp_task task; 65 66 char hash[SHA256_DIGEST_LENGTH]; 67 SHA256_CTX ctx; 68 69 struct rrdp_session repository; 70 struct rrdp_session current; 71 XML_Parser parser; 72 struct notification_xml *nxml; 73 struct snapshot_xml *sxml; 74 struct delta_xml *dxml; 75 }; 76 77 TAILQ_HEAD(,rrdp) states = TAILQ_HEAD_INITIALIZER(states); 78 79 struct publish_xml { 80 char *uri; 81 char *data; 82 char hash[SHA256_DIGEST_LENGTH]; 83 int data_length; 84 enum publish_type type; 85 }; 86 87 char * 88 xstrdup(const char *s) 89 { 90 char *r; 91 if ((r = strdup(s)) == NULL) 92 err(1, "strdup"); 93 return r; 94 } 95 96 /* 97 * Hex decode hexstring into the supplied buffer. 98 * Return 0 on success else -1, if buffer too small or bad encoding. 99 */ 100 int 101 hex_decode(const char *hexstr, char *buf, size_t len) 102 { 103 unsigned char ch, r; 104 size_t pos = 0; 105 int i; 106 107 while (*hexstr) { 108 r = 0; 109 for (i = 0; i < 2; i++) { 110 ch = hexstr[i]; 111 if (isdigit(ch)) 112 ch -= '0'; 113 else if (islower(ch)) 114 ch -= ('a' - 10); 115 else if (isupper(ch)) 116 ch -= ('A' - 10); 117 else 118 return -1; 119 if (ch > 0xf) 120 return -1; 121 r = r << 4 | ch; 122 } 123 if (pos < len) 124 buf[pos++] = r; 125 else 126 return -1; 127 128 hexstr += 2; 129 } 130 return 0; 131 } 132 133 /* 134 * Report back that a RRDP request finished. 135 * ok should only be set to 1 if the cache is now up-to-date. 136 */ 137 static void 138 rrdp_done(size_t id, int ok) 139 { 140 enum rrdp_msg type = RRDP_END; 141 struct ibuf *b; 142 143 if ((b = ibuf_open(sizeof(type) + sizeof(id) + sizeof(ok))) == NULL) 144 err(1, NULL); 145 io_simple_buffer(b, &type, sizeof(type)); 146 io_simple_buffer(b, &id, sizeof(id)); 147 io_simple_buffer(b, &ok, sizeof(ok)); 148 ibuf_close(&msgq, b); 149 } 150 151 /* 152 * Request an URI to be fetched via HTTPS. 153 * The main process will respond with a RRDP_HTTP_INI which includes 154 * the file descriptor to read from. RRDP_HTTP_FIN is sent at the 155 * end of the request with the HTTP status code and last modified timestamp. 156 * If the request should not set the If-Modified-Since: header then last_mod 157 * should be set to NULL, else it should point to a proper date string. 158 */ 159 static void 160 rrdp_http_req(size_t id, const char *uri, const char *last_mod) 161 { 162 enum rrdp_msg type = RRDP_HTTP_REQ; 163 struct ibuf *b; 164 165 if ((b = ibuf_dynamic(256, UINT_MAX)) == NULL) 166 err(1, NULL); 167 io_simple_buffer(b, &type, sizeof(type)); 168 io_simple_buffer(b, &id, sizeof(id)); 169 io_str_buffer(b, uri); 170 io_str_buffer(b, last_mod); 171 ibuf_close(&msgq, b); 172 } 173 174 /* 175 * Send the session state to the main process so it gets stored. 176 */ 177 static void 178 rrdp_state_send(struct rrdp *s) 179 { 180 enum rrdp_msg type = RRDP_SESSION; 181 struct ibuf *b; 182 183 if ((b = ibuf_dynamic(256, UINT_MAX)) == NULL) 184 err(1, NULL); 185 io_simple_buffer(b, &type, sizeof(type)); 186 io_simple_buffer(b, &s->id, sizeof(s->id)); 187 io_str_buffer(b, s->current.session_id); 188 io_simple_buffer(b, &s->current.serial, sizeof(s->current.serial)); 189 io_str_buffer(b, s->current.last_mod); 190 ibuf_close(&msgq, b); 191 } 192 193 static struct rrdp * 194 rrdp_new(size_t id, char *local, char *notify, char *session_id, 195 long long serial, char *last_mod) 196 { 197 struct rrdp *s; 198 199 if ((s = calloc(1, sizeof(*s))) == NULL) 200 err(1, NULL); 201 202 s->infd = -1; 203 s->id = id; 204 s->local = local; 205 s->notifyuri = notify; 206 s->repository.session_id = session_id; 207 s->repository.serial = serial; 208 s->repository.last_mod = last_mod; 209 210 s->state = RRDP_STATE_REQ; 211 if ((s->parser = XML_ParserCreate("US-ASCII")) == NULL) 212 err(1, "XML_ParserCreate"); 213 214 s->nxml = new_notification_xml(s->parser, &s->repository, &s->current); 215 216 TAILQ_INSERT_TAIL(&states, s, entry); 217 218 return s; 219 } 220 221 static void 222 rrdp_free(struct rrdp *s) 223 { 224 if (s == NULL) 225 return; 226 227 TAILQ_REMOVE(&states, s, entry); 228 229 free_notification_xml(s->nxml); 230 free_snapshot_xml(s->sxml); 231 free_delta_xml(s->dxml); 232 233 if (s->parser) 234 XML_ParserFree(s->parser); 235 if (s->infd != -1) 236 close(s->infd); 237 free(s->notifyuri); 238 free(s->local); 239 free(s->last_mod); 240 free(s->repository.last_mod); 241 free(s->repository.session_id); 242 free(s->current.last_mod); 243 free(s->current.session_id); 244 245 free(s); 246 } 247 248 static struct rrdp * 249 rrdp_get(size_t id) 250 { 251 struct rrdp *s; 252 253 TAILQ_FOREACH(s, &states, entry) 254 if (s->id == id) 255 break; 256 return s; 257 } 258 259 static void 260 rrdp_failed(struct rrdp *s) 261 { 262 size_t id = s->id; 263 264 /* reset file state before retrying */ 265 s->file_failed = 0; 266 267 /* XXX MUST do some cleanup in the repo here */ 268 if (s->task == DELTA) { 269 /* fallback to a snapshot as per RFC8182 */ 270 free_delta_xml(s->dxml); 271 s->dxml = NULL; 272 s->sxml = new_snapshot_xml(s->parser, &s->current, s); 273 s->task = SNAPSHOT; 274 s->state = RRDP_STATE_REQ; 275 logx("%s: delta sync failed, fallback to snapshot", s->local); 276 } else { 277 /* 278 * TODO: update state to track recurring failures 279 * and fall back to rsync after a while. 280 * This should probably happen in the main process. 281 */ 282 rrdp_free(s); 283 rrdp_done(id, 0); 284 } 285 } 286 287 static void 288 rrdp_finished(struct rrdp *s) 289 { 290 size_t id = s->id; 291 292 /* check if all parts of the process have finished */ 293 if ((s->state & RRDP_STATE_DONE) != RRDP_STATE_DONE) 294 return; 295 296 /* still some files pending */ 297 if (s->file_pending > 0) 298 return; 299 300 if (s->state & RRDP_STATE_PARSE_ERROR) { 301 rrdp_failed(s); 302 return; 303 } 304 305 if (s->res == HTTP_OK) { 306 XML_Parser p = s->parser; 307 308 /* 309 * Finalize parsing on success to be sure that 310 * all of the XML is correct. Needs to be done here 311 * since the call would most probably fail for non 312 * successful data fetches. 313 */ 314 if (XML_Parse(p, NULL, 0, 1) != XML_STATUS_OK) { 315 warnx("%s: XML error at line %llu: %s", s->local, 316 (unsigned long long)XML_GetCurrentLineNumber(p), 317 XML_ErrorString(XML_GetErrorCode(p))); 318 rrdp_failed(s); 319 return; 320 } 321 322 /* If a file caused an error fail the update */ 323 if (s->file_failed > 0) { 324 rrdp_failed(s); 325 return; 326 } 327 328 switch (s->task) { 329 case NOTIFICATION: 330 s->task = notification_done(s->nxml, s->last_mod); 331 s->last_mod = NULL; 332 switch (s->task) { 333 case NOTIFICATION: 334 logx("%s: repository not modified", s->local); 335 rrdp_state_send(s); 336 rrdp_free(s); 337 rrdp_done(id, 1); 338 break; 339 case SNAPSHOT: 340 logx("%s: downloading snapshot", s->local); 341 s->sxml = new_snapshot_xml(p, &s->current, s); 342 s->state = RRDP_STATE_REQ; 343 break; 344 case DELTA: 345 logx("%s: downloading %lld deltas", s->local, 346 s->repository.serial - s->current.serial); 347 s->dxml = new_delta_xml(p, &s->current, s); 348 s->state = RRDP_STATE_REQ; 349 break; 350 } 351 break; 352 case SNAPSHOT: 353 rrdp_state_send(s); 354 rrdp_free(s); 355 rrdp_done(id, 1); 356 break; 357 case DELTA: 358 if (notification_delta_done(s->nxml)) { 359 /* finished */ 360 rrdp_state_send(s); 361 rrdp_free(s); 362 rrdp_done(id, 1); 363 } else { 364 /* reset delta parser for next delta */ 365 free_delta_xml(s->dxml); 366 s->dxml = new_delta_xml(p, &s->current, s); 367 s->state = RRDP_STATE_REQ; 368 } 369 break; 370 } 371 } else if (s->res == HTTP_NOT_MOD && s->task == NOTIFICATION) { 372 logx("%s: notification file not modified", s->local); 373 /* no need to update state file */ 374 rrdp_free(s); 375 rrdp_done(id, 1); 376 } else { 377 rrdp_failed(s); 378 } 379 } 380 381 static void 382 rrdp_input_handler(int fd) 383 { 384 char *local, *notify, *session_id, *last_mod; 385 struct rrdp *s; 386 enum rrdp_msg type; 387 enum http_result res; 388 long long serial; 389 size_t id; 390 int infd, ok; 391 392 infd = io_recvfd(fd, &type, sizeof(type)); 393 io_simple_read(fd, &id, sizeof(id)); 394 395 switch (type) { 396 case RRDP_START: 397 io_str_read(fd, &local); 398 io_str_read(fd, ¬ify); 399 io_str_read(fd, &session_id); 400 io_simple_read(fd, &serial, sizeof(serial)); 401 io_str_read(fd, &last_mod); 402 if (infd != -1) 403 errx(1, "received unexpected fd %d", infd); 404 405 s = rrdp_new(id, local, notify, session_id, serial, last_mod); 406 break; 407 case RRDP_HTTP_INI: 408 if (infd == -1) 409 errx(1, "expected fd not received"); 410 s = rrdp_get(id); 411 if (s == NULL) 412 errx(1, "rrdp session %zu does not exist", id); 413 if (s->state != RRDP_STATE_WAIT) 414 errx(1, "%s: bad internal state", s->local); 415 416 s->infd = infd; 417 s->state = RRDP_STATE_PARSE; 418 break; 419 case RRDP_HTTP_FIN: 420 io_simple_read(fd, &res, sizeof(res)); 421 io_str_read(fd, &last_mod); 422 if (infd != -1) 423 errx(1, "received unexpected fd"); 424 425 s = rrdp_get(id); 426 if (s == NULL) 427 errx(1, "rrdp session %zu does not exist", id); 428 if (!(s->state & RRDP_STATE_PARSE)) 429 errx(1, "%s: bad internal state", s->local); 430 431 s->res = res; 432 s->last_mod = last_mod; 433 s->state |= RRDP_STATE_HTTP_DONE; 434 rrdp_finished(s); 435 break; 436 case RRDP_FILE: 437 s = rrdp_get(id); 438 if (s == NULL) 439 errx(1, "rrdp session %zu does not exist", id); 440 if (infd != -1) 441 errx(1, "received unexpected fd %d", infd); 442 io_simple_read(fd, &ok, sizeof(ok)); 443 if (ok == 0) 444 s->file_failed++; 445 s->file_pending--; 446 if (s->file_pending == 0) 447 rrdp_finished(s); 448 break; 449 default: 450 errx(1, "unexpected message %d", type); 451 } 452 } 453 454 static void 455 rrdp_data_handler(struct rrdp *s) 456 { 457 char buf[READ_BUF_SIZE]; 458 XML_Parser p = s->parser; 459 ssize_t len; 460 461 len = read(s->infd, buf, sizeof(buf)); 462 if (len == -1) { 463 s->state |= RRDP_STATE_PARSE_ERROR; 464 warn("%s: read failure", s->local); 465 return; 466 } 467 if ((s->state & RRDP_STATE_PARSE) == 0) 468 errx(1, "%s: bad parser state", s->local); 469 if (len == 0) { 470 /* parser stage finished */ 471 close(s->infd); 472 s->infd = -1; 473 474 if (s->task != NOTIFICATION) { 475 char h[SHA256_DIGEST_LENGTH]; 476 477 SHA256_Final(h, &s->ctx); 478 if (memcmp(s->hash, h, sizeof(s->hash)) != 0) { 479 s->state |= RRDP_STATE_PARSE_ERROR; 480 warnx("%s: bad message digest", s->local); 481 } 482 } 483 484 s->state |= RRDP_STATE_PARSE_DONE; 485 rrdp_finished(s); 486 return; 487 } 488 489 /* parse and maybe hash the bytes just read */ 490 if (s->task != NOTIFICATION) 491 SHA256_Update(&s->ctx, buf, len); 492 if ((s->state & RRDP_STATE_PARSE_ERROR) == 0 && 493 XML_Parse(p, buf, len, 0) != XML_STATUS_OK) { 494 warnx("%s: parse error at line %llu: %s", s->local, 495 (unsigned long long)XML_GetCurrentLineNumber(p), 496 XML_ErrorString(XML_GetErrorCode(p))); 497 s->state |= RRDP_STATE_PARSE_ERROR; 498 } 499 } 500 501 void 502 proc_rrdp(int fd) 503 { 504 struct pollfd pfds[MAX_SESSIONS + 1]; 505 struct rrdp *s, *ns; 506 size_t i; 507 508 if (pledge("stdio recvfd", NULL) == -1) 509 err(1, "pledge"); 510 511 memset(&pfds, 0, sizeof(pfds)); 512 513 msgbuf_init(&msgq); 514 msgq.fd = fd; 515 516 for (;;) { 517 i = 1; 518 TAILQ_FOREACH(s, &states, entry) { 519 if (i >= MAX_SESSIONS + 1) { 520 /* not enough sessions, wait for better times */ 521 s->pfd = NULL; 522 continue; 523 } 524 /* request new assets when there are free sessions */ 525 if (s->state == RRDP_STATE_REQ) { 526 const char *uri; 527 switch (s->task) { 528 case NOTIFICATION: 529 rrdp_http_req(s->id, s->notifyuri, 530 s->repository.last_mod); 531 break; 532 case SNAPSHOT: 533 case DELTA: 534 uri = notification_get_next(s->nxml, 535 s->hash, sizeof(s->hash), 536 s->task); 537 SHA256_Init(&s->ctx); 538 rrdp_http_req(s->id, uri, NULL); 539 break; 540 } 541 s->state = RRDP_STATE_WAIT; 542 } 543 s->pfd = pfds + i++; 544 s->pfd->fd = s->infd; 545 s->pfd->events = POLLIN; 546 } 547 548 /* 549 * Update main fd last. 550 * The previous loop may have enqueue messages. 551 */ 552 pfds[0].fd = fd; 553 pfds[0].events = POLLIN; 554 if (msgq.queued) 555 pfds[0].events |= POLLOUT; 556 557 if (poll(pfds, i, INFTIM) == -1) 558 err(1, "poll"); 559 560 if (pfds[0].revents & POLLHUP) 561 break; 562 if (pfds[0].revents & POLLOUT) { 563 io_socket_nonblocking(fd); 564 switch (msgbuf_write(&msgq)) { 565 case 0: 566 errx(1, "write: connection closed"); 567 case -1: 568 err(1, "write"); 569 } 570 io_socket_blocking(fd); 571 } 572 if (pfds[0].revents & POLLIN) 573 rrdp_input_handler(fd); 574 575 TAILQ_FOREACH_SAFE(s, &states, entry, ns) { 576 if (s->pfd == NULL) 577 continue; 578 if (s->pfd->revents != 0) 579 rrdp_data_handler(s); 580 } 581 } 582 583 exit(0); 584 } 585 586 /* 587 * Both snapshots and deltas use publish_xml to store the publish and 588 * withdraw records. Once all the content is added the request is sent 589 * to the main process where it is processed. 590 */ 591 struct publish_xml * 592 new_publish_xml(enum publish_type type, char *uri, char *hash, size_t hlen) 593 { 594 struct publish_xml *pxml; 595 596 if ((pxml = calloc(1, sizeof(*pxml))) == NULL) 597 err(1, "%s", __func__); 598 599 pxml->type = type; 600 pxml->uri = uri; 601 if (hlen > 0) { 602 assert(hlen == sizeof(pxml->hash)); 603 memcpy(pxml->hash, hash, hlen); 604 } 605 606 return pxml; 607 } 608 609 void 610 free_publish_xml(struct publish_xml *pxml) 611 { 612 if (pxml == NULL) 613 return; 614 615 free(pxml->uri); 616 free(pxml->data); 617 free(pxml); 618 } 619 620 /* 621 * Add buf to the base64 data string, ensure that this remains a proper 622 * string by NUL-terminating the string. 623 */ 624 void 625 publish_add_content(struct publish_xml *pxml, const char *buf, int length) 626 { 627 int new_length; 628 629 /* 630 * optmisiation, this often gets called with '\n' as the 631 * only data... seems wasteful 632 */ 633 if (length == 1 && buf[0] == '\n') 634 return; 635 636 /* append content to data */ 637 new_length = pxml->data_length + length; 638 pxml->data = realloc(pxml->data, new_length + 1); 639 if (pxml->data == NULL) 640 err(1, "%s", __func__); 641 642 memcpy(pxml->data + pxml->data_length, buf, length); 643 pxml->data[new_length] = '\0'; 644 pxml->data_length = new_length; 645 } 646 647 /* 648 * Base64 decode the data blob and send the file to the main process 649 * where the hash is validated and the file stored in the repository. 650 * Increase the file_pending counter to ensure the RRDP process waits 651 * until all files have been processed before moving to the next stage. 652 * Returns 0 on success or -1 on errors (base64 decode failed). 653 */ 654 int 655 publish_done(struct rrdp *s, struct publish_xml *pxml) 656 { 657 enum rrdp_msg type = RRDP_FILE; 658 struct ibuf *b; 659 unsigned char *data = NULL; 660 size_t datasz = 0; 661 662 if (pxml->data_length > 0) 663 if ((base64_decode(pxml->data, &data, &datasz)) == -1) 664 return -1; 665 666 if ((b = ibuf_dynamic(256, UINT_MAX)) == NULL) 667 err(1, NULL); 668 io_simple_buffer(b, &type, sizeof(type)); 669 io_simple_buffer(b, &s->id, sizeof(s->id)); 670 io_simple_buffer(b, &pxml->type, sizeof(pxml->type)); 671 if (pxml->type != PUB_ADD) 672 io_simple_buffer(b, &pxml->hash, sizeof(pxml->hash)); 673 io_str_buffer(b, pxml->uri); 674 io_buf_buffer(b, data, datasz); 675 ibuf_close(&msgq, b); 676 s->file_pending++; 677 678 free(data); 679 free_publish_xml(pxml); 680 return 0; 681 } 682