xref: /openbsd-src/usr.sbin/rpki-client/rrdp.c (revision f1dd7b858388b4a23f4f67a4957ec5ff656ebbe8)
1 /*	$OpenBSD: rrdp.c,v 1.9 2021/04/21 09:36:06 claudio Exp $ */
2 /*
3  * Copyright (c) 2020 Nils Fisher <nils_fisher@hotmail.com>
4  * Copyright (c) 2021 Claudio Jeker <claudio@openbsd.org>
5  *
6  * Permission to use, copy, modify, and distribute this software for any
7  * purpose with or without fee is hereby granted, provided that the above
8  * copyright notice and this permission notice appear in all copies.
9  *
10  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17  */
18 #include <sys/queue.h>
19 #include <sys/stat.h>
20 
21 #include <assert.h>
22 #include <ctype.h>
23 #include <err.h>
24 #include <errno.h>
25 #include <fcntl.h>
26 #include <limits.h>
27 #include <poll.h>
28 #include <string.h>
29 #include <unistd.h>
30 #include <imsg.h>
31 
32 #include <expat.h>
33 #include <openssl/sha.h>
34 
35 #include "extern.h"
36 #include "rrdp.h"
37 
38 #define MAX_SESSIONS	12
39 #define	READ_BUF_SIZE	(32 * 1024)
40 
41 static struct msgbuf	msgq;
42 
43 #define RRDP_STATE_REQ		0x01
44 #define RRDP_STATE_WAIT		0x02
45 #define RRDP_STATE_PARSE	0x04
46 #define RRDP_STATE_PARSE_ERROR	0x08
47 #define RRDP_STATE_PARSE_DONE	0x10
48 #define RRDP_STATE_HTTP_DONE	0x20
49 #define RRDP_STATE_DONE		(RRDP_STATE_PARSE_DONE | RRDP_STATE_HTTP_DONE)
50 
51 struct rrdp {
52 	TAILQ_ENTRY(rrdp)	 entry;
53 	size_t			 id;
54 	char			*notifyuri;
55 	char			*local;
56 	char			*last_mod;
57 
58 	struct pollfd		*pfd;
59 	int			 infd;
60 	int			 state;
61 	unsigned int		 file_pending;
62 	unsigned int		 file_failed;
63 	enum http_result	 res;
64 	enum rrdp_task		 task;
65 
66 	char			 hash[SHA256_DIGEST_LENGTH];
67 	SHA256_CTX		 ctx;
68 
69 	struct rrdp_session	 repository;
70 	struct rrdp_session	 current;
71 	XML_Parser		 parser;
72 	struct notification_xml	*nxml;
73 	struct snapshot_xml	*sxml;
74 	struct delta_xml	*dxml;
75 };
76 
77 TAILQ_HEAD(,rrdp)	states = TAILQ_HEAD_INITIALIZER(states);
78 
79 struct publish_xml {
80 	char			*uri;
81 	char			*data;
82 	char			 hash[SHA256_DIGEST_LENGTH];
83 	int			 data_length;
84 	enum publish_type	 type;
85 };
86 
87 char *
88 xstrdup(const char *s)
89 {
90 	char *r;
91 	if ((r = strdup(s)) == NULL)
92 		err(1, "strdup");
93 	return r;
94 }
95 
96 /*
97  * Hex decode hexstring into the supplied buffer.
98  * Return 0 on success else -1, if buffer too small or bad encoding.
99  */
100 int
101 hex_decode(const char *hexstr, char *buf, size_t len)
102 {
103 	unsigned char ch, r;
104 	size_t pos = 0;
105 	int i;
106 
107 	while (*hexstr) {
108 		r = 0;
109 		for (i = 0; i < 2; i++) {
110 			ch = hexstr[i];
111 			if (isdigit(ch))
112 				ch -= '0';
113 			else if (islower(ch))
114 				ch -= ('a' - 10);
115 			else if (isupper(ch))
116 				ch -= ('A' - 10);
117 			else
118 				return -1;
119 			if (ch > 0xf)
120 				return -1;
121 			r = r << 4 | ch;
122 		}
123 		if (pos < len)
124 			buf[pos++] = r;
125 		else
126 			return -1;
127 
128 		hexstr += 2;
129 	}
130 	return 0;
131 }
132 
133 /*
134  * Report back that a RRDP request finished.
135  * ok should only be set to 1 if the cache is now up-to-date.
136  */
137 static void
138 rrdp_done(size_t id, int ok)
139 {
140 	enum rrdp_msg type = RRDP_END;
141 	struct ibuf *b;
142 
143 	if ((b = ibuf_open(sizeof(type) + sizeof(id) + sizeof(ok))) == NULL)
144 		err(1, NULL);
145 	io_simple_buffer(b, &type, sizeof(type));
146 	io_simple_buffer(b, &id, sizeof(id));
147 	io_simple_buffer(b, &ok, sizeof(ok));
148 	ibuf_close(&msgq, b);
149 }
150 
151 /*
152  * Request an URI to be fetched via HTTPS.
153  * The main process will respond with a RRDP_HTTP_INI which includes
154  * the file descriptor to read from. RRDP_HTTP_FIN is sent at the
155  * end of the request with the HTTP status code and last modified timestamp.
156  * If the request should not set the If-Modified-Since: header then last_mod
157  * should be set to NULL, else it should point to a proper date string.
158  */
159 static void
160 rrdp_http_req(size_t id, const char *uri, const char *last_mod)
161 {
162 	enum rrdp_msg type = RRDP_HTTP_REQ;
163 	struct ibuf *b;
164 
165 	if ((b = ibuf_dynamic(256, UINT_MAX)) == NULL)
166 		err(1, NULL);
167 	io_simple_buffer(b, &type, sizeof(type));
168 	io_simple_buffer(b, &id, sizeof(id));
169 	io_str_buffer(b, uri);
170 	io_str_buffer(b, last_mod);
171 	ibuf_close(&msgq, b);
172 }
173 
174 /*
175  * Send the session state to the main process so it gets stored.
176  */
177 static void
178 rrdp_state_send(struct rrdp *s)
179 {
180 	enum rrdp_msg type = RRDP_SESSION;
181 	struct ibuf *b;
182 
183 	if ((b = ibuf_dynamic(256, UINT_MAX)) == NULL)
184 		err(1, NULL);
185 	io_simple_buffer(b, &type, sizeof(type));
186 	io_simple_buffer(b, &s->id, sizeof(s->id));
187 	io_str_buffer(b, s->current.session_id);
188 	io_simple_buffer(b, &s->current.serial, sizeof(s->current.serial));
189 	io_str_buffer(b, s->current.last_mod);
190 	ibuf_close(&msgq, b);
191 }
192 
193 static struct rrdp *
194 rrdp_new(size_t id, char *local, char *notify, char *session_id,
195     long long serial, char *last_mod)
196 {
197 	struct rrdp *s;
198 
199 	if ((s = calloc(1, sizeof(*s))) == NULL)
200 		err(1, NULL);
201 
202 	s->infd = -1;
203 	s->id = id;
204 	s->local = local;
205 	s->notifyuri = notify;
206 	s->repository.session_id = session_id;
207 	s->repository.serial = serial;
208 	s->repository.last_mod = last_mod;
209 
210 	s->state = RRDP_STATE_REQ;
211 	if ((s->parser = XML_ParserCreate("US-ASCII")) == NULL)
212 		err(1, "XML_ParserCreate");
213 
214 	s->nxml = new_notification_xml(s->parser, &s->repository, &s->current);
215 
216 	TAILQ_INSERT_TAIL(&states, s, entry);
217 
218 	return s;
219 }
220 
221 static void
222 rrdp_free(struct rrdp *s)
223 {
224 	if (s == NULL)
225 		return;
226 
227 	TAILQ_REMOVE(&states, s, entry);
228 
229 	free_notification_xml(s->nxml);
230 	free_snapshot_xml(s->sxml);
231 	free_delta_xml(s->dxml);
232 
233 	if (s->parser)
234 		XML_ParserFree(s->parser);
235 	if (s->infd != -1)
236 		close(s->infd);
237 	free(s->notifyuri);
238 	free(s->local);
239 	free(s->last_mod);
240 	free(s->repository.last_mod);
241 	free(s->repository.session_id);
242 	free(s->current.last_mod);
243 	free(s->current.session_id);
244 
245 	free(s);
246 }
247 
248 static struct rrdp *
249 rrdp_get(size_t id)
250 {
251 	struct rrdp *s;
252 
253 	TAILQ_FOREACH(s, &states, entry)
254 		if (s->id == id)
255 			break;
256 	return s;
257 }
258 
259 static void
260 rrdp_failed(struct rrdp *s)
261 {
262 	size_t id = s->id;
263 
264 	/* reset file state before retrying */
265 	s->file_failed = 0;
266 
267 	/* XXX MUST do some cleanup in the repo here */
268 	if (s->task == DELTA) {
269 		/* fallback to a snapshot as per RFC8182 */
270 		free_delta_xml(s->dxml);
271 		s->dxml = NULL;
272 		s->sxml = new_snapshot_xml(s->parser, &s->current, s);
273 		s->task = SNAPSHOT;
274 		s->state = RRDP_STATE_REQ;
275 		logx("%s: delta sync failed, fallback to snapshot", s->local);
276 	} else {
277 		/*
278 		 * TODO: update state to track recurring failures
279 		 * and fall back to rsync after a while.
280 		 * This should probably happen in the main process.
281 		 */
282 		rrdp_free(s);
283 		rrdp_done(id, 0);
284 	}
285 }
286 
287 static void
288 rrdp_finished(struct rrdp *s)
289 {
290 	size_t id = s->id;
291 
292 	/* check if all parts of the process have finished */
293 	if ((s->state & RRDP_STATE_DONE) != RRDP_STATE_DONE)
294 		return;
295 
296 	/* still some files pending */
297 	if (s->file_pending > 0)
298 		return;
299 
300 	if (s->state & RRDP_STATE_PARSE_ERROR) {
301 		rrdp_failed(s);
302 		return;
303 	}
304 
305 	if (s->res == HTTP_OK) {
306 		XML_Parser p = s->parser;
307 
308 		/*
309 		 * Finalize parsing on success to be sure that
310 		 * all of the XML is correct. Needs to be done here
311 		 * since the call would most probably fail for non
312 		 * successful data fetches.
313 		 */
314 		if (XML_Parse(p, NULL, 0, 1) != XML_STATUS_OK) {
315 			warnx("%s: XML error at line %llu: %s", s->local,
316 			    (unsigned long long)XML_GetCurrentLineNumber(p),
317 			    XML_ErrorString(XML_GetErrorCode(p)));
318 			rrdp_failed(s);
319 			return;
320 		}
321 
322 		/* If a file caused an error fail the update */
323 		if (s->file_failed > 0) {
324 			rrdp_failed(s);
325 			return;
326 		}
327 
328 		switch (s->task) {
329 		case NOTIFICATION:
330 			s->task = notification_done(s->nxml, s->last_mod);
331 			s->last_mod = NULL;
332 			switch (s->task) {
333 			case NOTIFICATION:
334 				logx("%s: repository not modified", s->local);
335 				rrdp_state_send(s);
336 				rrdp_free(s);
337 				rrdp_done(id, 1);
338 				break;
339 			case SNAPSHOT:
340 				logx("%s: downloading snapshot", s->local);
341 				s->sxml = new_snapshot_xml(p, &s->current, s);
342 				s->state = RRDP_STATE_REQ;
343 				break;
344 			case DELTA:
345 				logx("%s: downloading %lld deltas", s->local,
346 				    s->repository.serial - s->current.serial);
347 				s->dxml = new_delta_xml(p, &s->current, s);
348 				s->state = RRDP_STATE_REQ;
349 				break;
350 			}
351 			break;
352 		case SNAPSHOT:
353 			rrdp_state_send(s);
354 			rrdp_free(s);
355 			rrdp_done(id, 1);
356 			break;
357 		case DELTA:
358 			if (notification_delta_done(s->nxml)) {
359 				/* finished */
360 				rrdp_state_send(s);
361 				rrdp_free(s);
362 				rrdp_done(id, 1);
363 			} else {
364 				/* reset delta parser for next delta */
365 				free_delta_xml(s->dxml);
366 				s->dxml = new_delta_xml(p, &s->current, s);
367 				s->state = RRDP_STATE_REQ;
368 			}
369 			break;
370 		}
371 	} else if (s->res == HTTP_NOT_MOD && s->task == NOTIFICATION) {
372 		logx("%s: notification file not modified", s->local);
373 		/* no need to update state file */
374 		rrdp_free(s);
375 		rrdp_done(id, 1);
376 	} else {
377 		rrdp_failed(s);
378 	}
379 }
380 
381 static void
382 rrdp_input_handler(int fd)
383 {
384 	char *local, *notify, *session_id, *last_mod;
385 	struct rrdp *s;
386 	enum rrdp_msg type;
387 	enum http_result res;
388 	long long serial;
389 	size_t id;
390 	int infd, ok;
391 
392 	infd = io_recvfd(fd, &type, sizeof(type));
393 	io_simple_read(fd, &id, sizeof(id));
394 
395 	switch (type) {
396 	case RRDP_START:
397 		io_str_read(fd, &local);
398 		io_str_read(fd, &notify);
399 		io_str_read(fd, &session_id);
400 		io_simple_read(fd, &serial, sizeof(serial));
401 		io_str_read(fd, &last_mod);
402 		if (infd != -1)
403 			errx(1, "received unexpected fd %d", infd);
404 
405 		s = rrdp_new(id, local, notify, session_id, serial, last_mod);
406 		break;
407 	case RRDP_HTTP_INI:
408 		if (infd == -1)
409 			errx(1, "expected fd not received");
410 		s = rrdp_get(id);
411 		if (s == NULL)
412 			errx(1, "rrdp session %zu does not exist", id);
413 		if (s->state != RRDP_STATE_WAIT)
414 			errx(1, "%s: bad internal state", s->local);
415 
416 		s->infd = infd;
417 		s->state = RRDP_STATE_PARSE;
418 		break;
419 	case RRDP_HTTP_FIN:
420 		io_simple_read(fd, &res, sizeof(res));
421 		io_str_read(fd, &last_mod);
422 		if (infd != -1)
423 			errx(1, "received unexpected fd");
424 
425 		s = rrdp_get(id);
426 		if (s == NULL)
427 			errx(1, "rrdp session %zu does not exist", id);
428 		if (!(s->state & RRDP_STATE_PARSE))
429 			errx(1, "%s: bad internal state", s->local);
430 
431 		s->res = res;
432 		s->last_mod = last_mod;
433 		s->state |= RRDP_STATE_HTTP_DONE;
434 		rrdp_finished(s);
435 		break;
436 	case RRDP_FILE:
437 		s = rrdp_get(id);
438 		if (s == NULL)
439 			errx(1, "rrdp session %zu does not exist", id);
440 		if (infd != -1)
441 			errx(1, "received unexpected fd %d", infd);
442 		io_simple_read(fd, &ok, sizeof(ok));
443 		if (ok == 0)
444 			s->file_failed++;
445 		s->file_pending--;
446 		if (s->file_pending == 0)
447 			rrdp_finished(s);
448 		break;
449 	default:
450 		errx(1, "unexpected message %d", type);
451 	}
452 }
453 
454 static void
455 rrdp_data_handler(struct rrdp *s)
456 {
457 	char buf[READ_BUF_SIZE];
458 	XML_Parser p = s->parser;
459 	ssize_t len;
460 
461 	len = read(s->infd, buf, sizeof(buf));
462 	if (len == -1) {
463 		s->state |= RRDP_STATE_PARSE_ERROR;
464 		warn("%s: read failure", s->local);
465 		return;
466 	}
467 	if ((s->state & RRDP_STATE_PARSE) == 0)
468 		errx(1, "%s: bad parser state", s->local);
469 	if (len == 0) {
470 		/* parser stage finished */
471 		close(s->infd);
472 		s->infd = -1;
473 
474 		if (s->task != NOTIFICATION) {
475 			char h[SHA256_DIGEST_LENGTH];
476 
477 			SHA256_Final(h, &s->ctx);
478 			if (memcmp(s->hash, h, sizeof(s->hash)) != 0) {
479 				s->state |= RRDP_STATE_PARSE_ERROR;
480 				warnx("%s: bad message digest", s->local);
481 			}
482 		}
483 
484 		s->state |= RRDP_STATE_PARSE_DONE;
485 		rrdp_finished(s);
486 		return;
487 	}
488 
489 	/* parse and maybe hash the bytes just read */
490 	if (s->task != NOTIFICATION)
491 		SHA256_Update(&s->ctx, buf, len);
492 	if ((s->state & RRDP_STATE_PARSE_ERROR) == 0 &&
493 	    XML_Parse(p, buf, len, 0) != XML_STATUS_OK) {
494 		warnx("%s: parse error at line %llu: %s", s->local,
495 		    (unsigned long long)XML_GetCurrentLineNumber(p),
496 		    XML_ErrorString(XML_GetErrorCode(p)));
497 		s->state |= RRDP_STATE_PARSE_ERROR;
498 	}
499 }
500 
501 void
502 proc_rrdp(int fd)
503 {
504 	struct pollfd pfds[MAX_SESSIONS + 1];
505 	struct rrdp *s, *ns;
506 	size_t i;
507 
508 	if (pledge("stdio recvfd", NULL) == -1)
509 		err(1, "pledge");
510 
511 	memset(&pfds, 0, sizeof(pfds));
512 
513 	msgbuf_init(&msgq);
514 	msgq.fd = fd;
515 
516 	for (;;) {
517 		i = 1;
518 		TAILQ_FOREACH(s, &states, entry) {
519 			if (i >= MAX_SESSIONS + 1) {
520 				/* not enough sessions, wait for better times */
521 				s->pfd = NULL;
522 				continue;
523 			}
524 			/* request new assets when there are free sessions */
525 			if (s->state == RRDP_STATE_REQ) {
526 				const char *uri;
527 				switch (s->task) {
528 				case NOTIFICATION:
529 					rrdp_http_req(s->id, s->notifyuri,
530 					    s->repository.last_mod);
531 					break;
532 				case SNAPSHOT:
533 				case DELTA:
534 					uri = notification_get_next(s->nxml,
535 					    s->hash, sizeof(s->hash),
536 					    s->task);
537 					SHA256_Init(&s->ctx);
538 					rrdp_http_req(s->id, uri, NULL);
539 					break;
540 				}
541 				s->state = RRDP_STATE_WAIT;
542 			}
543 			s->pfd = pfds + i++;
544 			s->pfd->fd = s->infd;
545 			s->pfd->events = POLLIN;
546 		}
547 
548 		/*
549 		 * Update main fd last.
550 		 * The previous loop may have enqueue messages.
551 		 */
552 		pfds[0].fd = fd;
553 		pfds[0].events = POLLIN;
554 		if (msgq.queued)
555 			pfds[0].events |= POLLOUT;
556 
557 		if (poll(pfds, i, INFTIM) == -1)
558 			err(1, "poll");
559 
560 		if (pfds[0].revents & POLLHUP)
561 			break;
562 		if (pfds[0].revents & POLLOUT) {
563 			io_socket_nonblocking(fd);
564 			switch (msgbuf_write(&msgq)) {
565 			case 0:
566 				errx(1, "write: connection closed");
567 			case -1:
568 				err(1, "write");
569 			}
570 			io_socket_blocking(fd);
571 		}
572 		if (pfds[0].revents & POLLIN)
573 			rrdp_input_handler(fd);
574 
575 		TAILQ_FOREACH_SAFE(s, &states, entry, ns) {
576 			if (s->pfd == NULL)
577 				continue;
578 			if (s->pfd->revents != 0)
579 				rrdp_data_handler(s);
580 		}
581 	}
582 
583 	exit(0);
584 }
585 
586 /*
587  * Both snapshots and deltas use publish_xml to store the publish and
588  * withdraw records. Once all the content is added the request is sent
589  * to the main process where it is processed.
590  */
591 struct publish_xml *
592 new_publish_xml(enum publish_type type, char *uri, char *hash, size_t hlen)
593 {
594 	struct publish_xml *pxml;
595 
596 	if ((pxml = calloc(1, sizeof(*pxml))) == NULL)
597 		err(1, "%s", __func__);
598 
599 	pxml->type = type;
600 	pxml->uri = uri;
601 	if (hlen > 0) {
602 		assert(hlen == sizeof(pxml->hash));
603 		memcpy(pxml->hash, hash, hlen);
604 	}
605 
606 	return pxml;
607 }
608 
609 void
610 free_publish_xml(struct publish_xml *pxml)
611 {
612 	if (pxml == NULL)
613 		return;
614 
615 	free(pxml->uri);
616 	free(pxml->data);
617 	free(pxml);
618 }
619 
620 /*
621  * Add buf to the base64 data string, ensure that this remains a proper
622  * string by NUL-terminating the string.
623  */
624 void
625 publish_add_content(struct publish_xml *pxml, const char *buf, int length)
626 {
627 	int new_length;
628 
629 	/*
630 	 * optmisiation, this often gets called with '\n' as the
631 	 * only data... seems wasteful
632 	 */
633 	if (length == 1 && buf[0] == '\n')
634 		return;
635 
636 	/* append content to data */
637 	new_length = pxml->data_length + length;
638 	pxml->data = realloc(pxml->data, new_length + 1);
639 	if (pxml->data == NULL)
640 		err(1, "%s", __func__);
641 
642 	memcpy(pxml->data + pxml->data_length, buf, length);
643 	pxml->data[new_length] = '\0';
644 	pxml->data_length = new_length;
645 }
646 
647 /*
648  * Base64 decode the data blob and send the file to the main process
649  * where the hash is validated and the file stored in the repository.
650  * Increase the file_pending counter to ensure the RRDP process waits
651  * until all files have been processed before moving to the next stage.
652  * Returns 0 on success or -1 on errors (base64 decode failed).
653  */
654 int
655 publish_done(struct rrdp *s, struct publish_xml *pxml)
656 {
657 	enum rrdp_msg type = RRDP_FILE;
658 	struct ibuf *b;
659 	unsigned char *data = NULL;
660 	size_t datasz = 0;
661 
662 	if (pxml->data_length > 0)
663 		if ((base64_decode(pxml->data, &data, &datasz)) == -1)
664 			return -1;
665 
666 	if ((b = ibuf_dynamic(256, UINT_MAX)) == NULL)
667 		err(1, NULL);
668 	io_simple_buffer(b, &type, sizeof(type));
669 	io_simple_buffer(b, &s->id, sizeof(s->id));
670 	io_simple_buffer(b, &pxml->type, sizeof(pxml->type));
671 	if (pxml->type != PUB_ADD)
672 		io_simple_buffer(b, &pxml->hash, sizeof(pxml->hash));
673 	io_str_buffer(b, pxml->uri);
674 	io_buf_buffer(b, data, datasz);
675 	ibuf_close(&msgq, b);
676 	s->file_pending++;
677 
678 	free(data);
679 	free_publish_xml(pxml);
680 	return 0;
681 }
682