1 /* $OpenBSD: rrdp.c,v 1.39 2024/11/21 13:32:27 claudio Exp $ */ 2 /* 3 * Copyright (c) 2020 Nils Fisher <nils_fisher@hotmail.com> 4 * Copyright (c) 2021 Claudio Jeker <claudio@openbsd.org> 5 * 6 * Permission to use, copy, modify, and distribute this software for any 7 * purpose with or without fee is hereby granted, provided that the above 8 * copyright notice and this permission notice appear in all copies. 9 * 10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 14 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 15 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 16 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 17 */ 18 #include <sys/queue.h> 19 #include <sys/stat.h> 20 21 #include <err.h> 22 #include <errno.h> 23 #include <fcntl.h> 24 #include <limits.h> 25 #include <poll.h> 26 #include <string.h> 27 #include <unistd.h> 28 #include <imsg.h> 29 30 #include <expat.h> 31 #include <openssl/sha.h> 32 33 #include "extern.h" 34 #include "rrdp.h" 35 36 #define MAX_SESSIONS 32 37 #define READ_BUF_SIZE (32 * 1024) 38 39 static struct msgbuf *msgq; 40 41 #define RRDP_STATE_REQ 0x01 42 #define RRDP_STATE_WAIT 0x02 43 #define RRDP_STATE_PARSE 0x04 44 #define RRDP_STATE_PARSE_ERROR 0x08 45 #define RRDP_STATE_PARSE_DONE 0x10 46 #define RRDP_STATE_HTTP_DONE 0x20 47 #define RRDP_STATE_DONE (RRDP_STATE_PARSE_DONE | RRDP_STATE_HTTP_DONE) 48 49 struct rrdp { 50 TAILQ_ENTRY(rrdp) entry; 51 unsigned int id; 52 char *notifyuri; 53 char *local; 54 char *last_mod; 55 56 struct pollfd *pfd; 57 int infd; 58 int state; 59 int aborted; 60 unsigned int file_pending; 61 unsigned int file_failed; 62 enum http_result res; 63 enum rrdp_task task; 64 65 char hash[SHA256_DIGEST_LENGTH]; 66 SHA256_CTX ctx; 67 68 struct rrdp_session *repository; 69 struct rrdp_session *current; 70 XML_Parser parser; 71 struct notification_xml *nxml; 72 struct snapshot_xml *sxml; 73 struct delta_xml *dxml; 74 }; 75 76 static TAILQ_HEAD(, rrdp) states = TAILQ_HEAD_INITIALIZER(states); 77 78 char * 79 xstrdup(const char *s) 80 { 81 char *r; 82 if ((r = strdup(s)) == NULL) 83 err(1, "strdup"); 84 return r; 85 } 86 87 /* 88 * Report back that a RRDP request finished. 89 * ok should only be set to 1 if the cache is now up-to-date. 90 */ 91 static void 92 rrdp_done(unsigned int id, int ok) 93 { 94 enum rrdp_msg type = RRDP_END; 95 struct ibuf *b; 96 97 b = io_new_buffer(); 98 io_simple_buffer(b, &type, sizeof(type)); 99 io_simple_buffer(b, &id, sizeof(id)); 100 io_simple_buffer(b, &ok, sizeof(ok)); 101 io_close_buffer(msgq, b); 102 } 103 104 /* 105 * Request an URI to be fetched via HTTPS. 106 * The main process will respond with a RRDP_HTTP_INI which includes 107 * the file descriptor to read from. RRDP_HTTP_FIN is sent at the 108 * end of the request with the HTTP status code and last modified timestamp. 109 * If the request should not set the If-Modified-Since: header then last_mod 110 * should be set to NULL, else it should point to a proper date string. 111 */ 112 static void 113 rrdp_http_req(unsigned int id, const char *uri, const char *last_mod) 114 { 115 enum rrdp_msg type = RRDP_HTTP_REQ; 116 struct ibuf *b; 117 118 b = io_new_buffer(); 119 io_simple_buffer(b, &type, sizeof(type)); 120 io_simple_buffer(b, &id, sizeof(id)); 121 io_str_buffer(b, uri); 122 io_str_buffer(b, last_mod); 123 io_close_buffer(msgq, b); 124 } 125 126 /* 127 * Send the session state to the main process so it gets stored. 128 */ 129 static void 130 rrdp_state_send(struct rrdp *s) 131 { 132 enum rrdp_msg type = RRDP_SESSION; 133 struct ibuf *b; 134 135 b = io_new_buffer(); 136 io_simple_buffer(b, &type, sizeof(type)); 137 io_simple_buffer(b, &s->id, sizeof(s->id)); 138 rrdp_session_buffer(b, s->current); 139 io_close_buffer(msgq, b); 140 } 141 142 /* 143 * Inform parent to clear the RRDP repository before start of snapshot. 144 */ 145 static void 146 rrdp_clear_repo(struct rrdp *s) 147 { 148 enum rrdp_msg type = RRDP_CLEAR; 149 struct ibuf *b; 150 151 b = io_new_buffer(); 152 io_simple_buffer(b, &type, sizeof(type)); 153 io_simple_buffer(b, &s->id, sizeof(s->id)); 154 io_close_buffer(msgq, b); 155 } 156 157 /* 158 * Send a blob of data to the main process to store it in the repository. 159 */ 160 void 161 rrdp_publish_file(struct rrdp *s, struct publish_xml *pxml, 162 unsigned char *data, size_t datasz) 163 { 164 enum rrdp_msg type = RRDP_FILE; 165 struct ibuf *b; 166 167 /* only send files if the fetch did not fail already */ 168 if (s->file_failed == 0) { 169 b = io_new_buffer(); 170 io_simple_buffer(b, &type, sizeof(type)); 171 io_simple_buffer(b, &s->id, sizeof(s->id)); 172 io_simple_buffer(b, &pxml->type, sizeof(pxml->type)); 173 if (pxml->type != PUB_ADD) 174 io_simple_buffer(b, &pxml->hash, sizeof(pxml->hash)); 175 io_str_buffer(b, pxml->uri); 176 io_buf_buffer(b, data, datasz); 177 io_close_buffer(msgq, b); 178 s->file_pending++; 179 } 180 } 181 182 static void 183 rrdp_new(unsigned int id, char *local, char *notify, struct rrdp_session *state) 184 { 185 struct rrdp *s; 186 187 if ((s = calloc(1, sizeof(*s))) == NULL) 188 err(1, NULL); 189 190 s->infd = -1; 191 s->id = id; 192 s->local = local; 193 s->notifyuri = notify; 194 s->repository = state; 195 if ((s->current = calloc(1, sizeof(*s->current))) == NULL) 196 err(1, NULL); 197 198 s->state = RRDP_STATE_REQ; 199 if ((s->parser = XML_ParserCreate("US-ASCII")) == NULL) 200 err(1, "XML_ParserCreate"); 201 202 s->nxml = new_notification_xml(s->parser, s->repository, s->current, 203 notify); 204 205 TAILQ_INSERT_TAIL(&states, s, entry); 206 } 207 208 static void 209 rrdp_free(struct rrdp *s) 210 { 211 if (s == NULL) 212 return; 213 214 TAILQ_REMOVE(&states, s, entry); 215 216 free_notification_xml(s->nxml); 217 free_snapshot_xml(s->sxml); 218 free_delta_xml(s->dxml); 219 220 if (s->parser) 221 XML_ParserFree(s->parser); 222 if (s->infd != -1) 223 close(s->infd); 224 free(s->notifyuri); 225 free(s->local); 226 free(s->last_mod); 227 rrdp_session_free(s->repository); 228 rrdp_session_free(s->current); 229 230 free(s); 231 } 232 233 static struct rrdp * 234 rrdp_get(unsigned int id) 235 { 236 struct rrdp *s; 237 238 TAILQ_FOREACH(s, &states, entry) 239 if (s->id == id) 240 break; 241 return s; 242 } 243 244 static void 245 rrdp_failed(struct rrdp *s) 246 { 247 unsigned int id = s->id; 248 249 /* reset file state before retrying */ 250 s->file_failed = 0; 251 252 if (s->task == DELTA && !s->aborted) { 253 /* fallback to a snapshot as per RFC8182 */ 254 free_delta_xml(s->dxml); 255 s->dxml = NULL; 256 rrdp_clear_repo(s); 257 s->sxml = new_snapshot_xml(s->parser, s->current, s); 258 s->task = SNAPSHOT; 259 s->state = RRDP_STATE_REQ; 260 logx("%s: delta sync failed, fallback to snapshot", s->local); 261 } else { 262 /* 263 * TODO: update state to track recurring failures 264 * and fall back to rsync after a while. 265 * This should probably happen in the main process. 266 */ 267 rrdp_free(s); 268 rrdp_done(id, 0); 269 } 270 } 271 272 static void 273 rrdp_finished(struct rrdp *s) 274 { 275 unsigned int id = s->id; 276 277 /* check if all parts of the process have finished */ 278 if ((s->state & RRDP_STATE_DONE) != RRDP_STATE_DONE) 279 return; 280 281 /* still some files pending */ 282 if (s->file_pending > 0) 283 return; 284 285 if (s->state & RRDP_STATE_PARSE_ERROR || s->aborted) { 286 rrdp_failed(s); 287 return; 288 } 289 290 if (s->res == HTTP_OK) { 291 XML_Parser p = s->parser; 292 293 /* 294 * Finalize parsing on success to be sure that 295 * all of the XML is correct. Needs to be done here 296 * since the call would most probably fail for non 297 * successful data fetches. 298 */ 299 if (XML_Parse(p, NULL, 0, 1) != XML_STATUS_OK) { 300 warnx("%s: XML error at line %llu: %s", s->local, 301 (unsigned long long)XML_GetCurrentLineNumber(p), 302 XML_ErrorString(XML_GetErrorCode(p))); 303 rrdp_failed(s); 304 return; 305 } 306 307 /* If a file caused an error fail the update */ 308 if (s->file_failed > 0) { 309 rrdp_failed(s); 310 return; 311 } 312 313 switch (s->task) { 314 case NOTIFICATION: 315 s->task = notification_done(s->nxml, s->last_mod); 316 s->last_mod = NULL; 317 switch (s->task) { 318 case NOTIFICATION: 319 logx("%s: repository not modified (%s#%lld)", 320 s->local, s->repository->session_id, 321 s->repository->serial); 322 rrdp_state_send(s); 323 rrdp_free(s); 324 rrdp_done(id, 1); 325 break; 326 case SNAPSHOT: 327 logx("%s: downloading snapshot (%s#%lld)", 328 s->local, s->current->session_id, 329 s->current->serial); 330 rrdp_clear_repo(s); 331 s->sxml = new_snapshot_xml(p, s->current, s); 332 s->state = RRDP_STATE_REQ; 333 break; 334 case DELTA: 335 logx("%s: downloading %lld deltas (%s#%lld)", 336 s->local, 337 s->repository->serial - s->current->serial, 338 s->current->session_id, s->current->serial); 339 s->dxml = new_delta_xml(p, s->current, s); 340 s->state = RRDP_STATE_REQ; 341 break; 342 } 343 break; 344 case SNAPSHOT: 345 rrdp_state_send(s); 346 rrdp_free(s); 347 rrdp_done(id, 1); 348 break; 349 case DELTA: 350 if (notification_delta_done(s->nxml)) { 351 /* finished */ 352 rrdp_state_send(s); 353 rrdp_free(s); 354 rrdp_done(id, 1); 355 } else { 356 /* reset delta parser for next delta */ 357 free_delta_xml(s->dxml); 358 s->dxml = new_delta_xml(p, s->current, s); 359 s->state = RRDP_STATE_REQ; 360 } 361 break; 362 } 363 } else if (s->res == HTTP_NOT_MOD && s->task == NOTIFICATION) { 364 logx("%s: notification file not modified (%s#%lld)", s->local, 365 s->repository->session_id, s->repository->serial); 366 /* no need to update state file */ 367 rrdp_free(s); 368 rrdp_done(id, 1); 369 } else { 370 rrdp_failed(s); 371 } 372 } 373 374 static void 375 rrdp_abort_req(struct rrdp *s) 376 { 377 unsigned int id = s->id; 378 379 s->aborted = 1; 380 if (s->state == RRDP_STATE_REQ) { 381 /* nothing is pending, just abort */ 382 rrdp_free(s); 383 rrdp_done(id, 1); 384 return; 385 } 386 if (s->state == RRDP_STATE_WAIT) 387 /* wait for HTTP_INI which will progress the state */ 388 return; 389 390 /* 391 * RRDP_STATE_PARSE or later, close infd, abort parser but 392 * wait for HTTP_FIN and file_pending to drop to 0. 393 */ 394 if (s->infd != -1) { 395 close(s->infd); 396 s->infd = -1; 397 s->state |= RRDP_STATE_PARSE_DONE | RRDP_STATE_PARSE_ERROR; 398 } 399 rrdp_finished(s); 400 } 401 402 static void 403 rrdp_input_handler(struct ibuf *b) 404 { 405 struct rrdp_session *state; 406 char *local, *notify, *last_mod; 407 struct rrdp *s; 408 enum rrdp_msg type; 409 enum http_result res; 410 unsigned int id; 411 int ok; 412 413 io_read_buf(b, &type, sizeof(type)); 414 io_read_buf(b, &id, sizeof(id)); 415 416 switch (type) { 417 case RRDP_START: 418 if (ibuf_fd_avail(b)) 419 errx(1, "received unexpected fd"); 420 io_read_str(b, &local); 421 io_read_str(b, ¬ify); 422 state = rrdp_session_read(b); 423 rrdp_new(id, local, notify, state); 424 break; 425 case RRDP_HTTP_INI: 426 s = rrdp_get(id); 427 if (s == NULL) 428 errx(1, "http ini, rrdp session %u does not exist", id); 429 if (s->state != RRDP_STATE_WAIT) 430 errx(1, "%s: bad internal state", s->local); 431 s->infd = ibuf_fd_get(b); 432 if (s->infd == -1) 433 errx(1, "expected fd not received"); 434 s->state = RRDP_STATE_PARSE; 435 if (s->aborted) { 436 rrdp_abort_req(s); 437 break; 438 } 439 break; 440 case RRDP_HTTP_FIN: 441 io_read_buf(b, &res, sizeof(res)); 442 io_read_str(b, &last_mod); 443 if (ibuf_fd_avail(b)) 444 errx(1, "received unexpected fd"); 445 446 s = rrdp_get(id); 447 if (s == NULL) 448 errx(1, "http fin, rrdp session %u does not exist", id); 449 if (!(s->state & RRDP_STATE_PARSE)) 450 errx(1, "%s: bad internal state", s->local); 451 s->state |= RRDP_STATE_HTTP_DONE; 452 s->res = res; 453 free(s->last_mod); 454 s->last_mod = last_mod; 455 rrdp_finished(s); 456 break; 457 case RRDP_FILE: 458 s = rrdp_get(id); 459 if (s == NULL) 460 errx(1, "file, rrdp session %u does not exist", id); 461 if (ibuf_fd_avail(b)) 462 errx(1, "received unexpected fd"); 463 io_read_buf(b, &ok, sizeof(ok)); 464 if (ok != 1) 465 s->file_failed++; 466 s->file_pending--; 467 if (s->file_pending == 0) 468 rrdp_finished(s); 469 break; 470 case RRDP_ABORT: 471 if (ibuf_fd_avail(b)) 472 errx(1, "received unexpected fd"); 473 s = rrdp_get(id); 474 if (s != NULL) 475 rrdp_abort_req(s); 476 break; 477 default: 478 errx(1, "unexpected message %d", type); 479 } 480 } 481 482 static void 483 rrdp_data_handler(struct rrdp *s) 484 { 485 char buf[READ_BUF_SIZE]; 486 XML_Parser p = s->parser; 487 ssize_t len; 488 489 len = read(s->infd, buf, sizeof(buf)); 490 if (len == -1) { 491 warn("%s: read failure", s->local); 492 rrdp_abort_req(s); 493 return; 494 } 495 if ((s->state & RRDP_STATE_PARSE) == 0) 496 errx(1, "%s: bad parser state", s->local); 497 if (len == 0) { 498 /* parser stage finished */ 499 close(s->infd); 500 s->infd = -1; 501 502 if (s->task != NOTIFICATION) { 503 char h[SHA256_DIGEST_LENGTH]; 504 505 SHA256_Final(h, &s->ctx); 506 if (memcmp(s->hash, h, sizeof(s->hash)) != 0) { 507 s->state |= RRDP_STATE_PARSE_ERROR; 508 warnx("%s: bad message digest", s->local); 509 } 510 } 511 512 s->state |= RRDP_STATE_PARSE_DONE; 513 rrdp_finished(s); 514 return; 515 } 516 517 /* parse and maybe hash the bytes just read */ 518 if (s->task != NOTIFICATION) 519 SHA256_Update(&s->ctx, buf, len); 520 if ((s->state & RRDP_STATE_PARSE_ERROR) == 0 && 521 XML_Parse(p, buf, len, 0) != XML_STATUS_OK) { 522 warnx("%s: parse error at line %llu: %s", s->local, 523 (unsigned long long)XML_GetCurrentLineNumber(p), 524 XML_ErrorString(XML_GetErrorCode(p))); 525 s->state |= RRDP_STATE_PARSE_ERROR; 526 } 527 } 528 529 void 530 proc_rrdp(int fd) 531 { 532 struct pollfd pfds[MAX_SESSIONS + 1]; 533 struct rrdp *s, *ns; 534 struct ibuf *b; 535 size_t i; 536 537 if (pledge("stdio recvfd", NULL) == -1) 538 err(1, "pledge"); 539 540 if ((msgq = msgbuf_new_reader(sizeof(size_t), io_parse_hdr, NULL)) == 541 NULL) 542 err(1, NULL); 543 544 for (;;) { 545 i = 1; 546 memset(&pfds, 0, sizeof(pfds)); 547 TAILQ_FOREACH(s, &states, entry) { 548 if (i >= MAX_SESSIONS + 1) { 549 /* not enough sessions, wait for better times */ 550 s->pfd = NULL; 551 continue; 552 } 553 /* request new assets when there are free sessions */ 554 if (s->state == RRDP_STATE_REQ) { 555 const char *uri; 556 switch (s->task) { 557 case NOTIFICATION: 558 rrdp_http_req(s->id, s->notifyuri, 559 s->repository->last_mod); 560 break; 561 case SNAPSHOT: 562 case DELTA: 563 uri = notification_get_next(s->nxml, 564 s->hash, sizeof(s->hash), 565 s->task); 566 SHA256_Init(&s->ctx); 567 rrdp_http_req(s->id, uri, NULL); 568 break; 569 } 570 s->state = RRDP_STATE_WAIT; 571 } 572 s->pfd = pfds + i++; 573 s->pfd->fd = s->infd; 574 s->pfd->events = POLLIN; 575 } 576 577 /* 578 * Update main fd last. 579 * The previous loop may have enqueue messages. 580 */ 581 pfds[0].fd = fd; 582 pfds[0].events = POLLIN; 583 if (msgbuf_queuelen(msgq) > 0) 584 pfds[0].events |= POLLOUT; 585 586 if (poll(pfds, i, INFTIM) == -1) { 587 if (errno == EINTR) 588 continue; 589 err(1, "poll"); 590 } 591 592 if (pfds[0].revents & POLLHUP) 593 break; 594 if (pfds[0].revents & POLLOUT) { 595 if (msgbuf_write(fd, msgq) == -1) { 596 if (errno == EPIPE) 597 errx(1, "write: connection closed"); 598 else 599 err(1, "write"); 600 } 601 } 602 if (pfds[0].revents & POLLIN) { 603 switch (msgbuf_read(fd, msgq)) { 604 case -1: 605 err(1, "msgbuf_read"); 606 case 0: 607 errx(1, "msgbuf_read: connection closed"); 608 } 609 while ((b = io_buf_get(msgq)) != NULL) { 610 rrdp_input_handler(b); 611 ibuf_free(b); 612 } 613 } 614 615 TAILQ_FOREACH_SAFE(s, &states, entry, ns) { 616 if (s->pfd == NULL) 617 continue; 618 if (s->pfd->revents != 0) 619 rrdp_data_handler(s); 620 } 621 } 622 623 exit(0); 624 } 625