xref: /openbsd-src/usr.bin/rsync/sender.c (revision 81b5c8f98a2a1ff4a013450db6d29b7a72829fcd)
1*81b5c8f9Sclaudio /*	$OpenBSD: sender.c,v 1.33 2024/03/20 09:26:42 claudio Exp $ */
260a32ee9Sbenno /*
360a32ee9Sbenno  * Copyright (c) 2019 Kristaps Dzonsons <kristaps@bsd.lv>
460a32ee9Sbenno  *
560a32ee9Sbenno  * Permission to use, copy, modify, and distribute this software for any
660a32ee9Sbenno  * purpose with or without fee is hereby granted, provided that the above
760a32ee9Sbenno  * copyright notice and this permission notice appear in all copies.
860a32ee9Sbenno  *
960a32ee9Sbenno  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
1060a32ee9Sbenno  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
1160a32ee9Sbenno  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
1260a32ee9Sbenno  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
1360a32ee9Sbenno  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
1460a32ee9Sbenno  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
1560a32ee9Sbenno  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
1660a32ee9Sbenno  */
170889042fSflorian #include <sys/mman.h>
180889042fSflorian #include <sys/queue.h>
1960a32ee9Sbenno #include <sys/stat.h>
2060a32ee9Sbenno 
2160a32ee9Sbenno #include <assert.h>
220889042fSflorian #include <fcntl.h>
2360a32ee9Sbenno #include <inttypes.h>
240889042fSflorian #include <poll.h>
2560a32ee9Sbenno #include <stdlib.h>
2660a32ee9Sbenno #include <string.h>
2760a32ee9Sbenno #include <unistd.h>
2860a32ee9Sbenno 
2964a7cfb7Sflorian #include <openssl/md4.h>
3064a7cfb7Sflorian 
3160a32ee9Sbenno #include "extern.h"
3260a32ee9Sbenno 
3360a32ee9Sbenno /*
340889042fSflorian  * A request from the receiver to download updated file data.
350889042fSflorian  */
360889042fSflorian struct	send_dl {
370889042fSflorian 	int32_t			 idx; /* index in our file list */
380889042fSflorian 	struct blkset		*blks; /* the sender's block information */
390889042fSflorian 	TAILQ_ENTRY(send_dl)	 entries;
400889042fSflorian };
410889042fSflorian 
420889042fSflorian /*
430889042fSflorian  * The current file being "updated": sent from sender to receiver.
440889042fSflorian  * If there is no file being uploaded, "cur" is NULL.
450889042fSflorian  */
460889042fSflorian struct	send_up {
470889042fSflorian 	struct send_dl	*cur; /* file being updated or NULL */
480889042fSflorian 	struct blkstat	 stat; /* status of file being updated */
490889042fSflorian };
500889042fSflorian 
510889042fSflorian TAILQ_HEAD(send_dlq, send_dl);
520889042fSflorian 
530889042fSflorian /*
540889042fSflorian  * We have finished updating the receiver's file with sender data.
550889042fSflorian  * Deallocate and wipe clean all resources required for that.
560889042fSflorian  */
570889042fSflorian static void
send_up_reset(struct send_up * p)580889042fSflorian send_up_reset(struct send_up *p)
590889042fSflorian {
600889042fSflorian 
61ed5cc9fbSderaadt 	assert(p != NULL);
620889042fSflorian 
630889042fSflorian 	/* Free the download request, if applicable. */
640889042fSflorian 
650889042fSflorian 	if (p->cur != NULL) {
660889042fSflorian 		free(p->cur->blks);
670889042fSflorian 		free(p->cur);
680889042fSflorian 		p->cur = NULL;
690889042fSflorian 	}
700889042fSflorian 
710889042fSflorian 	/* If we mapped a file for scanning, unmap it and close. */
720889042fSflorian 
730889042fSflorian 	if (p->stat.map != MAP_FAILED)
740889042fSflorian 		munmap(p->stat.map, p->stat.mapsz);
750889042fSflorian 
760889042fSflorian 	p->stat.map = MAP_FAILED;
770889042fSflorian 	p->stat.mapsz = 0;
780889042fSflorian 
790889042fSflorian 	if (p->stat.fd != -1)
800889042fSflorian 		close(p->stat.fd);
810889042fSflorian 
820889042fSflorian 	p->stat.fd = -1;
830889042fSflorian 
840889042fSflorian 	/* Now clear the in-transfer information. */
850889042fSflorian 
860889042fSflorian 	p->stat.offs = 0;
870889042fSflorian 	p->stat.hint = 0;
8864a7cfb7Sflorian 	p->stat.curst = BLKSTAT_NONE;
8955cb9f91Sbenno }
9055cb9f91Sbenno 
9155cb9f91Sbenno /*
9255cb9f91Sbenno  * This is the bulk of the sender work.
9355cb9f91Sbenno  * Here we tend to an output buffer that responds to receiver requests
9455cb9f91Sbenno  * for data.
9555cb9f91Sbenno  * This does not act upon the output descriptor itself so as to avoid
9655cb9f91Sbenno  * blocking, which otherwise would deadlock the protocol.
9755cb9f91Sbenno  * Returns zero on failure, non-zero on success.
9855cb9f91Sbenno  */
9955cb9f91Sbenno static int
send_up_fsm(struct sess * sess,size_t * phase,struct send_up * up,void ** wb,size_t * wbsz,size_t * wbmax,const struct flist * fl)10055cb9f91Sbenno send_up_fsm(struct sess *sess, size_t *phase,
10155cb9f91Sbenno 	struct send_up *up, void **wb, size_t *wbsz, size_t *wbmax,
10255cb9f91Sbenno 	const struct flist *fl)
10355cb9f91Sbenno {
10455cb9f91Sbenno 	size_t		 pos = 0, isz = sizeof(int32_t),
10555cb9f91Sbenno 			 dsz = MD4_DIGEST_LENGTH;
10655cb9f91Sbenno 	unsigned char	 fmd[MD4_DIGEST_LENGTH];
10755cb9f91Sbenno 	off_t		 sz;
10855cb9f91Sbenno 	char		 buf[20];
10955cb9f91Sbenno 
11055cb9f91Sbenno 	switch (up->stat.curst) {
11155cb9f91Sbenno 	case BLKSTAT_DATA:
11255cb9f91Sbenno 		/*
11355cb9f91Sbenno 		 * A data segment to be written: buffer both the length
11455cb9f91Sbenno 		 * and the data.
11555cb9f91Sbenno 		 * If we've finished the transfer, move on to the token;
11655cb9f91Sbenno 		 * otherwise, keep sending data.
11755cb9f91Sbenno 		 */
11855cb9f91Sbenno 
11955cb9f91Sbenno 		sz = MINIMUM(MAX_CHUNK,
12055cb9f91Sbenno 			up->stat.curlen - up->stat.curpos);
12155cb9f91Sbenno 		if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, isz)) {
122b2a7eac7Sbenno 			ERRX1("io_lowbuffer_alloc");
12355cb9f91Sbenno 			return 0;
12455cb9f91Sbenno 		}
12555cb9f91Sbenno 		io_lowbuffer_int(sess, *wb, &pos, *wbsz, sz);
12655cb9f91Sbenno 		if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, sz)) {
127b2a7eac7Sbenno 			ERRX1("io_lowbuffer_alloc");
12855cb9f91Sbenno 			return 0;
12955cb9f91Sbenno 		}
13055cb9f91Sbenno 		io_lowbuffer_buf(sess, *wb, &pos, *wbsz,
13155cb9f91Sbenno 			up->stat.map + up->stat.curpos, sz);
13255cb9f91Sbenno 
13355cb9f91Sbenno 		up->stat.curpos += sz;
13455cb9f91Sbenno 		if (up->stat.curpos == up->stat.curlen)
13555cb9f91Sbenno 			up->stat.curst = BLKSTAT_TOK;
13655cb9f91Sbenno 		return 1;
13755cb9f91Sbenno 	case BLKSTAT_TOK:
13855cb9f91Sbenno 		/*
13955cb9f91Sbenno 		 * The data token following (maybe) a data segment.
14055cb9f91Sbenno 		 * These can also come standalone if, say, the file's
14155cb9f91Sbenno 		 * being fully written.
14255cb9f91Sbenno 		 * It's followed by a hash or another data segment,
14355cb9f91Sbenno 		 * depending on the token.
14455cb9f91Sbenno 		 */
14555cb9f91Sbenno 
14655cb9f91Sbenno 		if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, isz)) {
147b2a7eac7Sbenno 			ERRX1("io_lowbuffer_alloc");
14855cb9f91Sbenno 			return 0;
14955cb9f91Sbenno 		}
15055cb9f91Sbenno 		io_lowbuffer_int(sess, *wb,
15155cb9f91Sbenno 			&pos, *wbsz, up->stat.curtok);
15255cb9f91Sbenno 		up->stat.curst = up->stat.curtok ?
15355cb9f91Sbenno 			BLKSTAT_NEXT : BLKSTAT_HASH;
15455cb9f91Sbenno 		return 1;
15555cb9f91Sbenno 	case BLKSTAT_HASH:
15655cb9f91Sbenno 		/*
15755cb9f91Sbenno 		 * The hash following transmission of all file contents.
15855cb9f91Sbenno 		 * This is always followed by the state that we're
15955cb9f91Sbenno 		 * finished with the file.
16055cb9f91Sbenno 		 */
16155cb9f91Sbenno 
162dcad62ecSclaudio 		hash_file_final(&up->stat.ctx, fmd);
16355cb9f91Sbenno 		if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, dsz)) {
164b2a7eac7Sbenno 			ERRX1("io_lowbuffer_alloc");
16555cb9f91Sbenno 			return 0;
16655cb9f91Sbenno 		}
16755cb9f91Sbenno 		io_lowbuffer_buf(sess, *wb, &pos, *wbsz, fmd, dsz);
16855cb9f91Sbenno 		up->stat.curst = BLKSTAT_DONE;
16955cb9f91Sbenno 		return 1;
17055cb9f91Sbenno 	case BLKSTAT_DONE:
17155cb9f91Sbenno 		/*
17255cb9f91Sbenno 		 * The data has been written.
17355cb9f91Sbenno 		 * Clear our current send file and allow the block below
17455cb9f91Sbenno 		 * to find another.
17555cb9f91Sbenno 		 */
17655cb9f91Sbenno 
177021bd7eeSbenno 		if (!sess->opts->dry_run)
178b2a7eac7Sbenno 			LOG3("%s: flushed %jd KB total, %.2f%% uploaded",
17902f20df6Sderaadt 			    fl[up->cur->idx].path,
18055cb9f91Sbenno 			    (intmax_t)up->stat.total / 1024,
18155cb9f91Sbenno 			    100.0 * up->stat.dirty / up->stat.total);
18255cb9f91Sbenno 		send_up_reset(up);
18355cb9f91Sbenno 		return 1;
18455cb9f91Sbenno 	case BLKSTAT_PHASE:
18555cb9f91Sbenno 		/*
18655cb9f91Sbenno 		 * This is where we actually stop the algorithm: we're
18755cb9f91Sbenno 		 * already at the second phase.
18855cb9f91Sbenno 		 */
18955cb9f91Sbenno 
19055cb9f91Sbenno 		send_up_reset(up);
19155cb9f91Sbenno 		(*phase)++;
19255cb9f91Sbenno 		return 1;
19355cb9f91Sbenno 	case BLKSTAT_NEXT:
19455cb9f91Sbenno 		/*
19555cb9f91Sbenno 		 * Our last case: we need to find the
19655cb9f91Sbenno 		 * next block (and token) to transmit to
19755cb9f91Sbenno 		 * the receiver.
19855cb9f91Sbenno 		 * These will drive the finite state
19955cb9f91Sbenno 		 * machine in the first few conditional
20055cb9f91Sbenno 		 * blocks of this set.
20155cb9f91Sbenno 		 */
20255cb9f91Sbenno 
20355cb9f91Sbenno 		assert(up->stat.fd != -1);
20455cb9f91Sbenno 		blk_match(sess, up->cur->blks,
20555cb9f91Sbenno 			fl[up->cur->idx].path, &up->stat);
20655cb9f91Sbenno 		return 1;
20755cb9f91Sbenno 	case BLKSTAT_NONE:
20855cb9f91Sbenno 		break;
20955cb9f91Sbenno 	}
21055cb9f91Sbenno 
21155cb9f91Sbenno 	assert(BLKSTAT_NONE == up->stat.curst);
21255cb9f91Sbenno 
21355cb9f91Sbenno 	/*
21455cb9f91Sbenno 	 * We've either hit the phase change following the last file (or
21555cb9f91Sbenno 	 * start, or prior phase change), or we need to prime the next
21655cb9f91Sbenno 	 * file for transmission.
21755cb9f91Sbenno 	 * We special-case dry-run mode.
21855cb9f91Sbenno 	 */
21955cb9f91Sbenno 
22055cb9f91Sbenno 	if (up->cur->idx < 0) {
22155cb9f91Sbenno 		if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, isz)) {
222b2a7eac7Sbenno 			ERRX1("io_lowbuffer_alloc");
22355cb9f91Sbenno 			return 0;
22455cb9f91Sbenno 		}
22555cb9f91Sbenno 		io_lowbuffer_int(sess, *wb, &pos, *wbsz, -1);
22655cb9f91Sbenno 
22755cb9f91Sbenno 		if (sess->opts->server && sess->rver > 27) {
22855cb9f91Sbenno 			if (!io_lowbuffer_alloc(sess,
22955cb9f91Sbenno 			    wb, wbsz, wbmax, isz)) {
230b2a7eac7Sbenno 				ERRX1("io_lowbuffer_alloc");
23155cb9f91Sbenno 				return 0;
23255cb9f91Sbenno 			}
23355cb9f91Sbenno 			io_lowbuffer_int(sess, *wb, &pos, *wbsz, -1);
23455cb9f91Sbenno 		}
23555cb9f91Sbenno 		up->stat.curst = BLKSTAT_PHASE;
23655cb9f91Sbenno 	} else if (sess->opts->dry_run) {
23755cb9f91Sbenno 		if (!sess->opts->server)
238b2a7eac7Sbenno 			LOG1("%s", fl[up->cur->idx].wpath);
23955cb9f91Sbenno 
24055cb9f91Sbenno 		if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, isz)) {
241b2a7eac7Sbenno 			ERRX1("io_lowbuffer_alloc");
24255cb9f91Sbenno 			return 0;
24355cb9f91Sbenno 		}
24455cb9f91Sbenno 		io_lowbuffer_int(sess, *wb, &pos, *wbsz, up->cur->idx);
245021bd7eeSbenno 		up->stat.curst = BLKSTAT_DONE;
24655cb9f91Sbenno 	} else {
24755cb9f91Sbenno 		assert(up->stat.fd != -1);
24855cb9f91Sbenno 
24955cb9f91Sbenno 		/*
25055cb9f91Sbenno 		 * FIXME: use the nice output of log_file() and so on in
25155cb9f91Sbenno 		 * downloader.c, which means moving this into
25255cb9f91Sbenno 		 * BLKSTAT_DONE instead of having it be here.
25355cb9f91Sbenno 		 */
25455cb9f91Sbenno 
25555cb9f91Sbenno 		if (!sess->opts->server)
256b2a7eac7Sbenno 			LOG1("%s", fl[up->cur->idx].wpath);
25755cb9f91Sbenno 
25855cb9f91Sbenno 		if (!io_lowbuffer_alloc(sess, wb, wbsz, wbmax, 20)) {
259b2a7eac7Sbenno 			ERRX1("io_lowbuffer_alloc");
26055cb9f91Sbenno 			return 0;
26155cb9f91Sbenno 		}
26255cb9f91Sbenno 		assert(sizeof(buf) == 20);
263ba617adaSbenno 		blk_recv_ack(buf, up->cur->blks, up->cur->idx);
26455cb9f91Sbenno 		io_lowbuffer_buf(sess, *wb, &pos, *wbsz, buf, 20);
26555cb9f91Sbenno 
266b2a7eac7Sbenno 		LOG3("%s: primed for %jd B total",
26702f20df6Sderaadt 		    fl[up->cur->idx].path, (intmax_t)up->cur->blks->size);
26855cb9f91Sbenno 		up->stat.curst = BLKSTAT_NEXT;
26955cb9f91Sbenno 	}
27055cb9f91Sbenno 
27155cb9f91Sbenno 	return 1;
2720889042fSflorian }
2730889042fSflorian 
2740889042fSflorian /*
2750889042fSflorian  * Enqueue a download request, getting it off the read channel as
2760889042fSflorian  * quickly a possible.
2770889042fSflorian  * This frees up the read channel for further incoming requests.
2780889042fSflorian  * We'll handle each element in turn, up to and including the last
2790889042fSflorian  * request (phase change), which is always a -1 idx.
2800889042fSflorian  * Returns zero on failure, non-zero on success.
2810889042fSflorian  */
2820889042fSflorian static int
send_dl_enqueue(struct sess * sess,struct send_dlq * q,int32_t idx,const struct flist * fl,size_t flsz,int fd)2830889042fSflorian send_dl_enqueue(struct sess *sess, struct send_dlq *q,
2840889042fSflorian 	int32_t idx, const struct flist *fl, size_t flsz, int fd)
2850889042fSflorian {
2860889042fSflorian 	struct send_dl	*s;
2870889042fSflorian 
2880889042fSflorian 	/* End-of-phase marker. */
2890889042fSflorian 
2900889042fSflorian 	if (idx == -1) {
2910889042fSflorian 		if ((s = calloc(1, sizeof(struct send_dl))) == NULL) {
292b2a7eac7Sbenno 			ERR("calloc");
2930889042fSflorian 			return 0;
2940889042fSflorian 		}
2950889042fSflorian 		s->idx = -1;
2960889042fSflorian 		s->blks = NULL;
2970889042fSflorian 		TAILQ_INSERT_TAIL(q, s, entries);
2980889042fSflorian 		return 1;
2990889042fSflorian 	}
3000889042fSflorian 
3010889042fSflorian 	/* Validate the index. */
3020889042fSflorian 
3030889042fSflorian 	if (idx < 0 || (uint32_t)idx >= flsz) {
304b2a7eac7Sbenno 		ERRX("file index out of bounds: invalid %d out of %zu",
30502f20df6Sderaadt 		    idx, flsz);
3060889042fSflorian 		return 0;
3070889042fSflorian 	} else if (S_ISDIR(fl[idx].st.mode)) {
308b2a7eac7Sbenno 		ERRX("blocks requested for "
3090889042fSflorian 			"directory: %s", fl[idx].path);
3100889042fSflorian 		return 0;
3110889042fSflorian 	} else if (S_ISLNK(fl[idx].st.mode)) {
312b2a7eac7Sbenno 		ERRX("blocks requested for "
3130889042fSflorian 			"symlink: %s", fl[idx].path);
3140889042fSflorian 		return 0;
3150889042fSflorian 	} else if (!S_ISREG(fl[idx].st.mode)) {
316b2a7eac7Sbenno 		ERRX("blocks requested for "
3170889042fSflorian 			"special: %s", fl[idx].path);
3180889042fSflorian 		return 0;
3190889042fSflorian 	}
3200889042fSflorian 
3210889042fSflorian 	if ((s = calloc(1, sizeof(struct send_dl))) == NULL) {
322b2a7eac7Sbenno 		ERR("callloc");
3230889042fSflorian 		return 0;
3240889042fSflorian 	}
3250889042fSflorian 	s->idx = idx;
3260889042fSflorian 	s->blks = NULL;
3270889042fSflorian 	TAILQ_INSERT_TAIL(q, s, entries);
3280889042fSflorian 
3290889042fSflorian 	/*
3300889042fSflorian 	 * This blocks til the full blockset has been read.
3310889042fSflorian 	 * That's ok, because the most important thing is getting data
3320889042fSflorian 	 * off the wire.
3330889042fSflorian 	 */
3340889042fSflorian 
3350889042fSflorian 	if (!sess->opts->dry_run) {
3360889042fSflorian 		s->blks = blk_recv(sess, fd, fl[idx].path);
3370889042fSflorian 		if (s->blks == NULL) {
338b2a7eac7Sbenno 			ERRX1("blk_recv");
3390889042fSflorian 			return 0;
3400889042fSflorian 		}
3410889042fSflorian 	}
3420889042fSflorian 	return 1;
3430889042fSflorian }
3440889042fSflorian 
3450889042fSflorian /*
34660a32ee9Sbenno  * A client sender manages the read-only source files and sends data to
34760a32ee9Sbenno  * the receiver as requested.
34860a32ee9Sbenno  * First it sends its list of files, then it waits for the server to
34960a32ee9Sbenno  * request updates to individual files.
3500889042fSflorian  * It queues requests for updates as soon as it receives them.
35160a32ee9Sbenno  * Returns zero on failure, non-zero on success.
35260a32ee9Sbenno  *
353320af302Sclaudio  * Pledges: stdio, getpw, rpath.
35460a32ee9Sbenno  */
35560a32ee9Sbenno int
rsync_sender(struct sess * sess,int fdin,int fdout,size_t argc,char ** argv)35660a32ee9Sbenno rsync_sender(struct sess *sess, int fdin,
35760a32ee9Sbenno 	int fdout, size_t argc, char **argv)
35860a32ee9Sbenno {
35960a32ee9Sbenno 	struct flist	   *fl = NULL;
36064a7cfb7Sflorian 	const struct flist *f;
36157987d16Sclaudio 	size_t		    i, flsz = 0, phase = 0;
36260a32ee9Sbenno 	int		    rc = 0, c;
36360a32ee9Sbenno 	int32_t		    idx;
3640889042fSflorian 	struct pollfd	    pfd[3];
3650889042fSflorian 	struct send_dlq	    sdlq;
36664a7cfb7Sflorian 	struct send_dl	   *dl;
3670889042fSflorian 	struct send_up	    up;
3680889042fSflorian 	struct stat	    st;
36964a7cfb7Sflorian 	void		   *wbuf = NULL;
37055cb9f91Sbenno 	size_t		    wbufpos = 0, wbufsz = 0, wbufmax = 0;
37164a7cfb7Sflorian 	ssize_t		    ssz;
37260a32ee9Sbenno 
373320af302Sclaudio 	if (pledge("stdio getpw rpath", NULL) == -1) {
374b2a7eac7Sbenno 		ERR("pledge");
37560a32ee9Sbenno 		return 0;
37660a32ee9Sbenno 	}
37760a32ee9Sbenno 
3780889042fSflorian 	memset(&up, 0, sizeof(struct send_up));
3790889042fSflorian 	TAILQ_INIT(&sdlq);
3800889042fSflorian 	up.stat.fd = -1;
3810889042fSflorian 	up.stat.map = MAP_FAILED;
382ea555712Sflorian 	up.stat.blktab = blkhash_alloc();
3830889042fSflorian 
38460a32ee9Sbenno 	/*
38560a32ee9Sbenno 	 * Generate the list of files we want to send from our
38660a32ee9Sbenno 	 * command-line input.
38760a32ee9Sbenno 	 * This will also remove all invalid files.
38860a32ee9Sbenno 	 */
38960a32ee9Sbenno 
39060a32ee9Sbenno 	if (!flist_gen(sess, argc, argv, &fl, &flsz)) {
391b2a7eac7Sbenno 		ERRX1("flist_gen");
39260a32ee9Sbenno 		goto out;
39360a32ee9Sbenno 	}
39460a32ee9Sbenno 
39560a32ee9Sbenno 	/* Client sends zero-length exclusions if deleting. */
39657987d16Sclaudio 	if (!sess->opts->server && sess->opts->del)
39757987d16Sclaudio 		send_rules(sess, fdout);
39860a32ee9Sbenno 
39960a32ee9Sbenno 	/*
40060a32ee9Sbenno 	 * Then the file list in any mode.
40160a32ee9Sbenno 	 * Finally, the IO error (always zero for us).
40260a32ee9Sbenno 	 */
40360a32ee9Sbenno 
40460a32ee9Sbenno 	if (!flist_send(sess, fdin, fdout, fl, flsz)) {
405b2a7eac7Sbenno 		ERRX1("flist_send");
40660a32ee9Sbenno 		goto out;
40760a32ee9Sbenno 	} else if (!io_write_int(sess, fdout, 0)) {
408b2a7eac7Sbenno 		ERRX1("io_write_int");
40960a32ee9Sbenno 		goto out;
41060a32ee9Sbenno 	}
41160a32ee9Sbenno 
41260a32ee9Sbenno 	/* Exit if we're the server with zero files. */
41360a32ee9Sbenno 
414f1dcb30aSderaadt 	if (flsz == 0 && sess->opts->server) {
415b2a7eac7Sbenno 		WARNX("sender has empty file list: exiting");
41660a32ee9Sbenno 		rc = 1;
41760a32ee9Sbenno 		goto out;
41860a32ee9Sbenno 	} else if (!sess->opts->server)
419b2a7eac7Sbenno 		LOG1("Transfer starting: %zu files", flsz);
42060a32ee9Sbenno 
42160a32ee9Sbenno 	/*
42260a32ee9Sbenno 	 * If we're the server, read our exclusion list.
42360a32ee9Sbenno 	 * This is always 0 for now.
42460a32ee9Sbenno 	 */
42560a32ee9Sbenno 
42657987d16Sclaudio 	if (sess->opts->server)
42757987d16Sclaudio 		recv_rules(sess, fdin);
42860a32ee9Sbenno 
42960a32ee9Sbenno 	/*
4300889042fSflorian 	 * Set up our poll events.
4310889042fSflorian 	 * We start by polling only in receiver requests, enabling other
4320889042fSflorian 	 * poll events on demand.
43360a32ee9Sbenno 	 */
43460a32ee9Sbenno 
4350e1879aaSclaudio 	pfd[0].fd = -1; /* from receiver */
4360889042fSflorian 	pfd[0].events = POLLIN;
4370889042fSflorian 	pfd[1].fd = -1; /* to receiver */
4380889042fSflorian 	pfd[1].events = POLLOUT;
4390889042fSflorian 	pfd[2].fd = -1; /* from local file */
4400889042fSflorian 	pfd[2].events = POLLIN;
44160a32ee9Sbenno 
4420889042fSflorian 	for (;;) {
4430e1879aaSclaudio 		/* disable recevier until all buffered data was sent */
4440e1879aaSclaudio 		if (pfd[1].fd != -1 && wbufsz > 0)
4450e1879aaSclaudio 			pfd[0].fd = -1;
4460e1879aaSclaudio 		else
4470e1879aaSclaudio 			pfd[0].fd = fdin;
448a84b4914Sclaudio 		if ((c = poll(pfd, 3, poll_timeout)) == -1) {
449b2a7eac7Sbenno 			ERR("poll");
4500889042fSflorian 			goto out;
4510889042fSflorian 		} else if (c == 0) {
452b2a7eac7Sbenno 			ERRX("poll: timeout");
4530889042fSflorian 			goto out;
4540889042fSflorian 		}
4550889042fSflorian 		for (i = 0; i < 3; i++)
4560889042fSflorian 			if (pfd[i].revents & (POLLERR|POLLNVAL)) {
457b2a7eac7Sbenno 				ERRX("poll: bad fd");
4580889042fSflorian 				goto out;
4590889042fSflorian 			} else if (pfd[i].revents & POLLHUP) {
460b2a7eac7Sbenno 				ERRX("poll: hangup");
4610889042fSflorian 				goto out;
4620889042fSflorian 			}
4630889042fSflorian 
4640889042fSflorian 		/*
4650889042fSflorian 		 * If we have a request coming down off the wire, pull
4660889042fSflorian 		 * it in as quickly as possible into our buffer.
467aeb41609Sbenno 		 * Start by seeing if we have a log message.
468aeb41609Sbenno 		 * If we do, pop it off, then see if we have anything
469aeb41609Sbenno 		 * left and hit it again if so (read priority).
4700889042fSflorian 		 */
4710889042fSflorian 
4722b7b2d66Sbenno 		if (sess->mplex_reads && (pfd[0].revents & POLLIN)) {
473aeb41609Sbenno 			if (!io_read_flush(sess, fdin)) {
474b2a7eac7Sbenno 				ERRX1("io_read_flush");
475aeb41609Sbenno 				goto out;
476aeb41609Sbenno 			} else if (sess->mplex_read_remain == 0) {
477ba617adaSbenno 				c = io_read_check(fdin);
478aeb41609Sbenno 				if (c < 0) {
479b2a7eac7Sbenno 					ERRX1("io_read_check");
480aeb41609Sbenno 					goto out;
481aeb41609Sbenno 				} else if (c > 0)
482aeb41609Sbenno 					continue;
483aeb41609Sbenno 				pfd[0].revents &= ~POLLIN;
484aeb41609Sbenno 			}
485aeb41609Sbenno 		}
486aeb41609Sbenno 
487aeb41609Sbenno 		/*
488aeb41609Sbenno 		 * Now that we've handled the log messages, we're left
489aeb41609Sbenno 		 * here if we have any actual data coming down.
490aeb41609Sbenno 		 * Enqueue message requests, then loop again if we see
491aeb41609Sbenno 		 * more data (read priority).
492aeb41609Sbenno 		 */
493aeb41609Sbenno 
494aeb41609Sbenno 		if (pfd[0].revents & POLLIN) {
49560a32ee9Sbenno 			if (!io_read_int(sess, fdin, &idx)) {
496b2a7eac7Sbenno 				ERRX1("io_read_int");
49760a32ee9Sbenno 				goto out;
49860a32ee9Sbenno 			}
4990889042fSflorian 			if (!send_dl_enqueue(sess,
5000889042fSflorian 			    &sdlq, idx, fl, flsz, fdin)) {
501b2a7eac7Sbenno 				ERRX1("send_dl_enqueue");
5020889042fSflorian 				goto out;
5030889042fSflorian 			}
504ba617adaSbenno 			c = io_read_check(fdin);
5050889042fSflorian 			if (c < 0) {
506b2a7eac7Sbenno 				ERRX1("io_read_check");
5070889042fSflorian 				goto out;
508aeb41609Sbenno 			} else if (c > 0)
509aeb41609Sbenno 				continue;
5100889042fSflorian 		}
5110889042fSflorian 
5120889042fSflorian 		/*
51364a7cfb7Sflorian 		 * One of our local files has been opened in response
51464a7cfb7Sflorian 		 * to a receiver request and now we can map it.
5150889042fSflorian 		 * We'll respond to the event by looking at the map when
5160889042fSflorian 		 * the writer is available.
5170889042fSflorian 		 * Here we also enable the poll event for output.
5180889042fSflorian 		 */
5190889042fSflorian 
5200889042fSflorian 		if (pfd[2].revents & POLLIN) {
5210889042fSflorian 			assert(up.cur != NULL);
5220889042fSflorian 			assert(up.stat.fd != -1);
5230889042fSflorian 			assert(up.stat.map == MAP_FAILED);
5240889042fSflorian 			assert(up.stat.mapsz == 0);
52564a7cfb7Sflorian 			f = &fl[up.cur->idx];
5260889042fSflorian 
5270889042fSflorian 			if (fstat(up.stat.fd, &st) == -1) {
528b2a7eac7Sbenno 				ERR("%s: fstat", f->path);
5290889042fSflorian 				goto out;
5300889042fSflorian 			}
5310889042fSflorian 
5320889042fSflorian 			/*
5330889042fSflorian 			 * If the file is zero-length, the map will
5340889042fSflorian 			 * fail, but either way we want to unset that
53564a7cfb7Sflorian 			 * we're waiting for the file to open and set
53664a7cfb7Sflorian 			 * that we're ready for the output channel.
5370889042fSflorian 			 */
5380889042fSflorian 
5390889042fSflorian 			if ((up.stat.mapsz = st.st_size) > 0) {
54064a7cfb7Sflorian 				up.stat.map = mmap(NULL,
54164a7cfb7Sflorian 					up.stat.mapsz, PROT_READ,
54264a7cfb7Sflorian 					MAP_SHARED, up.stat.fd, 0);
5430889042fSflorian 				if (up.stat.map == MAP_FAILED) {
544b2a7eac7Sbenno 					ERR("%s: mmap", f->path);
5450889042fSflorian 					goto out;
5460889042fSflorian 				}
5470889042fSflorian 			}
54864a7cfb7Sflorian 
5490889042fSflorian 			pfd[2].fd = -1;
5500889042fSflorian 			pfd[1].fd = fdout;
5510889042fSflorian 		}
5520889042fSflorian 
5530889042fSflorian 		/*
55464a7cfb7Sflorian 		 * If we have buffers waiting to write, write them out
55564a7cfb7Sflorian 		 * as soon as we can in a non-blocking fashion.
55664a7cfb7Sflorian 		 * We must not be waiting for any local files.
55764a7cfb7Sflorian 		 * ALL WRITES MUST HAPPEN HERE.
55864a7cfb7Sflorian 		 * This keeps the sender deadlock-free.
5590889042fSflorian 		 */
5600889042fSflorian 
56164a7cfb7Sflorian 		if ((pfd[1].revents & POLLOUT) && wbufsz > 0) {
5620889042fSflorian 			assert(pfd[2].fd == -1);
56364a7cfb7Sflorian 			assert(wbufsz - wbufpos);
5646e53a029Sderaadt 			ssz = write(fdout, wbuf + wbufpos, wbufsz - wbufpos);
5653aaa63ebSderaadt 			if (ssz == -1) {
566b2a7eac7Sbenno 				ERR("write");
56764a7cfb7Sflorian 				goto out;
56864a7cfb7Sflorian 			}
56964a7cfb7Sflorian 			wbufpos += ssz;
57064a7cfb7Sflorian 			if (wbufpos == wbufsz)
57164a7cfb7Sflorian 				wbufpos = wbufsz = 0;
57264a7cfb7Sflorian 			pfd[1].revents &= ~POLLOUT;
57364a7cfb7Sflorian 
57464a7cfb7Sflorian 			/* This is usually in io.c... */
57564a7cfb7Sflorian 
57664a7cfb7Sflorian 			sess->total_write += ssz;
57764a7cfb7Sflorian 		}
57864a7cfb7Sflorian 
57955cb9f91Sbenno 		/*
58055cb9f91Sbenno 		 * Engage the FSM for the current transfer.
58155cb9f91Sbenno 		 * If our phase changes, stop processing.
58255cb9f91Sbenno 		 */
58355cb9f91Sbenno 
58455cb9f91Sbenno 		if (pfd[1].revents & POLLOUT && up.cur != NULL) {
58564a7cfb7Sflorian 			assert(pfd[2].fd == -1);
586ed5cc9fbSderaadt 			assert(wbufpos == 0 && wbufsz == 0);
58755cb9f91Sbenno 			if (!send_up_fsm(sess, &phase,
58855cb9f91Sbenno 			    &up, &wbuf, &wbufsz, &wbufmax, fl)) {
589b2a7eac7Sbenno 				ERRX1("send_up_fsm");
59064a7cfb7Sflorian 				goto out;
591*81b5c8f9Sclaudio 			}
592*81b5c8f9Sclaudio 			if (phase > 1)
59360a32ee9Sbenno 				break;
5940889042fSflorian 		}
5950889042fSflorian 
5960889042fSflorian 		/*
5970889042fSflorian 		 * Incoming queue management.
5980889042fSflorian 		 * If we have no queue component that we're waiting on,
5990889042fSflorian 		 * then pull off the receiver-request queue and start
6000889042fSflorian 		 * processing the request.
6010889042fSflorian 		 */
6020889042fSflorian 
6030889042fSflorian 		if (up.cur == NULL) {
6040889042fSflorian 			assert(pfd[2].fd == -1);
6050889042fSflorian 			assert(up.stat.fd == -1);
6060889042fSflorian 			assert(up.stat.map == MAP_FAILED);
6070889042fSflorian 			assert(up.stat.mapsz == 0);
60864a7cfb7Sflorian 			assert(wbufsz == 0 && wbufpos == 0);
60964a7cfb7Sflorian 			pfd[1].fd = -1;
61064a7cfb7Sflorian 
61164a7cfb7Sflorian 			/*
61264a7cfb7Sflorian 			 * If there's nothing in the queue, then keep
61364a7cfb7Sflorian 			 * the output channel disabled and wait for
61464a7cfb7Sflorian 			 * whatever comes next from the reader.
61564a7cfb7Sflorian 			 */
6160889042fSflorian 
6170889042fSflorian 			if ((up.cur = TAILQ_FIRST(&sdlq)) == NULL)
6180889042fSflorian 				continue;
6190889042fSflorian 			TAILQ_REMOVE(&sdlq, up.cur, entries);
6200889042fSflorian 
621ea555712Sflorian 			/* Hash our blocks. */
622ea555712Sflorian 
623dcad62ecSclaudio 			hash_file_start(&up.stat.ctx, sess);
624ea555712Sflorian 			blkhash_set(up.stat.blktab, up.cur->blks);
625ea555712Sflorian 
62664a7cfb7Sflorian 			/*
62764a7cfb7Sflorian 			 * End of phase: enable channel to receiver.
62864a7cfb7Sflorian 			 * We'll need our output buffer enabled in order
62964a7cfb7Sflorian 			 * to process this event.
63064a7cfb7Sflorian 			 */
6310889042fSflorian 
6320889042fSflorian 			if (up.cur->idx == -1) {
6330889042fSflorian 				pfd[1].fd = fdout;
6340889042fSflorian 				continue;
6350889042fSflorian 			}
6360889042fSflorian 
63764a7cfb7Sflorian 			/*
63864a7cfb7Sflorian 			 * Non-blocking open of file.
63964a7cfb7Sflorian 			 * This will be picked up in the state machine
64064a7cfb7Sflorian 			 * block of not being primed.
64164a7cfb7Sflorian 			 */
6420889042fSflorian 
6430889042fSflorian 			up.stat.fd = open(fl[up.cur->idx].path,
6440889042fSflorian 				O_RDONLY|O_NONBLOCK, 0);
6450889042fSflorian 			if (up.stat.fd == -1) {
646b2a7eac7Sbenno 				ERR("%s: open", fl[up.cur->idx].path);
6470889042fSflorian 				goto out;
6480889042fSflorian 			}
6490889042fSflorian 			pfd[2].fd = up.stat.fd;
65060a32ee9Sbenno 		}
65160a32ee9Sbenno 	}
65260a32ee9Sbenno 
65364a7cfb7Sflorian 	if (!TAILQ_EMPTY(&sdlq)) {
654b2a7eac7Sbenno 		ERRX("phases complete with files still queued");
65564a7cfb7Sflorian 		goto out;
65664a7cfb7Sflorian 	}
65764a7cfb7Sflorian 
65860a32ee9Sbenno 	if (!sess_stats_send(sess, fdout)) {
659b2a7eac7Sbenno 		ERRX1("sess_stats_end");
66060a32ee9Sbenno 		goto out;
66160a32ee9Sbenno 	}
66260a32ee9Sbenno 
66360a32ee9Sbenno 	/* Final "goodbye" message. */
66460a32ee9Sbenno 
66560a32ee9Sbenno 	if (!io_read_int(sess, fdin, &idx)) {
666b2a7eac7Sbenno 		ERRX1("io_read_int");
66760a32ee9Sbenno 		goto out;
668*81b5c8f9Sclaudio 	}
669*81b5c8f9Sclaudio 	if (idx != -1) {
670b2a7eac7Sbenno 		ERRX("read incorrect update complete ack");
67160a32ee9Sbenno 		goto out;
67260a32ee9Sbenno 	}
67360a32ee9Sbenno 
674b2a7eac7Sbenno 	LOG2("sender finished updating");
67560a32ee9Sbenno 	rc = 1;
67660a32ee9Sbenno out:
67764a7cfb7Sflorian 	send_up_reset(&up);
67864a7cfb7Sflorian 	while ((dl = TAILQ_FIRST(&sdlq)) != NULL) {
67955cb9f91Sbenno 		TAILQ_REMOVE(&sdlq, dl, entries);
68064a7cfb7Sflorian 		free(dl->blks);
68164a7cfb7Sflorian 		free(dl);
68264a7cfb7Sflorian 	}
68360a32ee9Sbenno 	flist_free(fl, flsz);
68464a7cfb7Sflorian 	free(wbuf);
685ea555712Sflorian 	blkhash_free(up.stat.blktab);
68660a32ee9Sbenno 	return rc;
68760a32ee9Sbenno }
688