xref: /dflybsd-src/lib/libdmsg/msg.c (revision 6f25d5554e6df42aa5235bf7e3e8e1cc9da25eec)
1 /*
2  * Copyright (c) 2011-2012 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@dragonflybsd.org>
6  * by Venkatesh Srinivas <vsrinivas@dragonflybsd.org>
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  *
12  * 1. Redistributions of source code must retain the above copyright
13  *    notice, this list of conditions and the following disclaimer.
14  * 2. Redistributions in binary form must reproduce the above copyright
15  *    notice, this list of conditions and the following disclaimer in
16  *    the documentation and/or other materials provided with the
17  *    distribution.
18  * 3. Neither the name of The DragonFly Project nor the names of its
19  *    contributors may be used to endorse or promote products derived
20  *    from this software without specific, prior written permission.
21  *
22  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
25  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
26  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
27  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
28  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
29  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
30  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
31  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
32  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
33  * SUCH DAMAGE.
34  */
35 
36 #include "dmsg_local.h"
37 
38 int DMsgDebugOpt;
39 
40 static int dmsg_state_msgrx(dmsg_msg_t *msg);
41 static void dmsg_state_cleanuptx(dmsg_msg_t *msg);
42 static void dmsg_msg_free_locked(dmsg_msg_t *msg);
43 
44 RB_GENERATE(dmsg_state_tree, dmsg_state, rbnode, dmsg_state_cmp);
45 RB_GENERATE(dmsg_circuit_tree, dmsg_circuit, rbnode, dmsg_circuit_cmp);
46 
47 /*
48  * STATE TREE - Represents open transactions which are indexed by their
49  *		{ msgid } relative to the governing iocom.
50  */
51 int
52 dmsg_state_cmp(dmsg_state_t *state1, dmsg_state_t *state2)
53 {
54 	if (state1->msgid < state2->msgid)
55 		return(-1);
56 	if (state1->msgid > state2->msgid)
57 		return(1);
58 	return(0);
59 }
60 
61 /*
62  * CIRCUIT TREE - Represents open circuits which are indexed by their
63  *		  { msgid } relative to the governing iocom.
64  */
65 int
66 dmsg_circuit_cmp(dmsg_circuit_t *circuit1, dmsg_circuit_t *circuit2)
67 {
68 	if (circuit1->msgid < circuit2->msgid)
69 		return(-1);
70 	if (circuit1->msgid > circuit2->msgid)
71 		return(1);
72 	return(0);
73 }
74 
75 /*
76  * Initialize a low-level ioq
77  */
78 void
79 dmsg_ioq_init(dmsg_iocom_t *iocom __unused, dmsg_ioq_t *ioq)
80 {
81 	bzero(ioq, sizeof(*ioq));
82 	ioq->state = DMSG_MSGQ_STATE_HEADER1;
83 	TAILQ_INIT(&ioq->msgq);
84 }
85 
86 /*
87  * Cleanup queue.
88  *
89  * caller holds iocom->mtx.
90  */
91 void
92 dmsg_ioq_done(dmsg_iocom_t *iocom __unused, dmsg_ioq_t *ioq)
93 {
94 	dmsg_msg_t *msg;
95 
96 	while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
97 		assert(0);	/* shouldn't happen */
98 		TAILQ_REMOVE(&ioq->msgq, msg, qentry);
99 		dmsg_msg_free(msg);
100 	}
101 	if ((msg = ioq->msg) != NULL) {
102 		ioq->msg = NULL;
103 		dmsg_msg_free(msg);
104 	}
105 }
106 
107 /*
108  * Initialize a low-level communications channel.
109  *
110  * NOTE: The signal_func() is called at least once from the loop and can be
111  *	 re-armed via dmsg_iocom_restate().
112  */
113 void
114 dmsg_iocom_init(dmsg_iocom_t *iocom, int sock_fd, int alt_fd,
115 		   void (*signal_func)(dmsg_iocom_t *iocom),
116 		   void (*rcvmsg_func)(dmsg_msg_t *msg),
117 		   void (*usrmsg_func)(dmsg_msg_t *msg, int unmanaged),
118 		   void (*altmsg_func)(dmsg_iocom_t *iocom))
119 {
120 	struct stat st;
121 
122 	bzero(iocom, sizeof(*iocom));
123 
124 	asprintf(&iocom->label, "iocom-%p", iocom);
125 	iocom->signal_callback = signal_func;
126 	iocom->rcvmsg_callback = rcvmsg_func;
127 	iocom->altmsg_callback = altmsg_func;
128 	iocom->usrmsg_callback = usrmsg_func;
129 
130 	pthread_mutex_init(&iocom->mtx, NULL);
131 	RB_INIT(&iocom->circuit_tree);
132 	TAILQ_INIT(&iocom->freeq);
133 	TAILQ_INIT(&iocom->freeq_aux);
134 	TAILQ_INIT(&iocom->txmsgq);
135 	iocom->sock_fd = sock_fd;
136 	iocom->alt_fd = alt_fd;
137 	iocom->flags = DMSG_IOCOMF_RREQ | DMSG_IOCOMF_CLOSEALT;
138 	if (signal_func)
139 		iocom->flags |= DMSG_IOCOMF_SWORK;
140 	dmsg_ioq_init(iocom, &iocom->ioq_rx);
141 	dmsg_ioq_init(iocom, &iocom->ioq_tx);
142 	if (pipe(iocom->wakeupfds) < 0)
143 		assert(0);
144 	fcntl(iocom->wakeupfds[0], F_SETFL, O_NONBLOCK);
145 	fcntl(iocom->wakeupfds[1], F_SETFL, O_NONBLOCK);
146 
147 	dmsg_circuit_init(iocom, &iocom->circuit0);
148 
149 	/*
150 	 * Negotiate session crypto synchronously.  This will mark the
151 	 * connection as error'd if it fails.  If this is a pipe it's
152 	 * a linkage that we set up ourselves to the filesystem and there
153 	 * is no crypto.
154 	 */
155 	if (fstat(sock_fd, &st) < 0)
156 		assert(0);
157 	if (S_ISSOCK(st.st_mode))
158 		dmsg_crypto_negotiate(iocom);
159 
160 	/*
161 	 * Make sure our fds are set to non-blocking for the iocom core.
162 	 */
163 	if (sock_fd >= 0)
164 		fcntl(sock_fd, F_SETFL, O_NONBLOCK);
165 #if 0
166 	/* if line buffered our single fgets() should be fine */
167 	if (alt_fd >= 0)
168 		fcntl(alt_fd, F_SETFL, O_NONBLOCK);
169 #endif
170 }
171 
172 void
173 dmsg_iocom_label(dmsg_iocom_t *iocom, const char *ctl, ...)
174 {
175 	va_list va;
176 	char *optr;
177 
178 	va_start(va, ctl);
179 	optr = iocom->label;
180 	vasprintf(&iocom->label, ctl, va);
181 	va_end(va);
182 	if (optr)
183 		free(optr);
184 }
185 
186 /*
187  * May only be called from a callback from iocom_core.
188  *
189  * Adjust state machine functions, set flags to guarantee that both
190  * the recevmsg_func and the sendmsg_func is called at least once.
191  */
192 void
193 dmsg_iocom_restate(dmsg_iocom_t *iocom,
194 		   void (*signal_func)(dmsg_iocom_t *),
195 		   void (*rcvmsg_func)(dmsg_msg_t *msg))
196 {
197 	pthread_mutex_lock(&iocom->mtx);
198 	iocom->signal_callback = signal_func;
199 	iocom->rcvmsg_callback = rcvmsg_func;
200 	if (signal_func)
201 		atomic_set_int(&iocom->flags, DMSG_IOCOMF_SWORK);
202 	else
203 		atomic_clear_int(&iocom->flags, DMSG_IOCOMF_SWORK);
204 	pthread_mutex_unlock(&iocom->mtx);
205 }
206 
207 void
208 dmsg_iocom_signal(dmsg_iocom_t *iocom)
209 {
210 	pthread_mutex_lock(&iocom->mtx);
211 	if (iocom->signal_callback)
212 		atomic_set_int(&iocom->flags, DMSG_IOCOMF_SWORK);
213 	pthread_mutex_unlock(&iocom->mtx);
214 }
215 
216 /*
217  * Cleanup a terminating iocom.
218  *
219  * Caller should not hold iocom->mtx.  The iocom has already been disconnected
220  * from all possible references to it.
221  */
222 void
223 dmsg_iocom_done(dmsg_iocom_t *iocom)
224 {
225 	dmsg_msg_t *msg;
226 
227 	if (iocom->sock_fd >= 0) {
228 		close(iocom->sock_fd);
229 		iocom->sock_fd = -1;
230 	}
231 	if (iocom->alt_fd >= 0 && (iocom->flags & DMSG_IOCOMF_CLOSEALT)) {
232 		close(iocom->alt_fd);
233 		iocom->alt_fd = -1;
234 	}
235 	dmsg_ioq_done(iocom, &iocom->ioq_rx);
236 	dmsg_ioq_done(iocom, &iocom->ioq_tx);
237 	while ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL) {
238 		TAILQ_REMOVE(&iocom->freeq, msg, qentry);
239 		free(msg);
240 	}
241 	while ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL) {
242 		TAILQ_REMOVE(&iocom->freeq_aux, msg, qentry);
243 		free(msg->aux_data);
244 		msg->aux_data = NULL;
245 		free(msg);
246 	}
247 	if (iocom->wakeupfds[0] >= 0) {
248 		close(iocom->wakeupfds[0]);
249 		iocom->wakeupfds[0] = -1;
250 	}
251 	if (iocom->wakeupfds[1] >= 0) {
252 		close(iocom->wakeupfds[1]);
253 		iocom->wakeupfds[1] = -1;
254 	}
255 	pthread_mutex_destroy(&iocom->mtx);
256 }
257 
258 /*
259  * Basic initialization of a circuit structure.
260  *
261  * The circuit structure is initialized with one ref.
262  */
263 void
264 dmsg_circuit_init(dmsg_iocom_t *iocom, dmsg_circuit_t *circuit)
265 {
266 	circuit->refs = 1;
267 	circuit->iocom = iocom;
268 	RB_INIT(&circuit->staterd_tree);
269 	RB_INIT(&circuit->statewr_tree);
270 }
271 
272 /*
273  * Allocate a new one-way message.
274  */
275 dmsg_msg_t *
276 dmsg_msg_alloc(dmsg_circuit_t *circuit,
277 	       size_t aux_size, uint32_t cmd,
278 	       void (*func)(dmsg_msg_t *), void *data)
279 {
280 	dmsg_iocom_t *iocom = circuit->iocom;
281 	dmsg_state_t *state = NULL;
282 	dmsg_msg_t *msg;
283 	int hbytes;
284 	size_t aligned_size;
285 
286 	pthread_mutex_lock(&iocom->mtx);
287 #if 0
288 	if (aux_size) {
289 		aligned_size = DMSG_DOALIGN(aux_size);
290 		if ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL)
291 			TAILQ_REMOVE(&iocom->freeq_aux, msg, qentry);
292 	} else {
293 		aligned_size = 0;
294 		if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL)
295 			TAILQ_REMOVE(&iocom->freeq, msg, qentry);
296 	}
297 #endif
298 	aligned_size = DMSG_DOALIGN(aux_size);
299 	msg = NULL;
300 	if ((cmd & (DMSGF_CREATE | DMSGF_REPLY)) == DMSGF_CREATE) {
301 		/*
302 		 * Create state when CREATE is set without REPLY.
303 		 * Assign a unique msgid, in this case simply using
304 		 * the pointer value for 'state'.
305 		 *
306 		 * NOTE: CREATE in txcmd handled by dmsg_msg_write()
307 		 * NOTE: DELETE in txcmd handled by dmsg_state_cleanuptx()
308 		 *
309 		 * NOTE: state initiated by us and state initiated by
310 		 *	 a remote create are placed in different RB trees.
311 		 *	 The msgid for SPAN state is used in source/target
312 		 *	 for message routing as appropriate.
313 		 */
314 		state = malloc(sizeof(*state));
315 		bzero(state, sizeof(*state));
316 		state->iocom = iocom;
317 		state->circuit = circuit;
318 		state->flags = DMSG_STATE_DYNAMIC;
319 		state->msgid = (uint64_t)(uintptr_t)state;
320 		state->txcmd = cmd & ~(DMSGF_CREATE | DMSGF_DELETE);
321 		state->rxcmd = DMSGF_REPLY;
322 		state->icmd = state->txcmd & DMSGF_BASECMDMASK;
323 		state->func = func;
324 		state->any.any = data;
325 		RB_INSERT(dmsg_state_tree, &circuit->statewr_tree, state);
326 		state->flags |= DMSG_STATE_INSERTED;
327 	}
328 	/* XXX SMP race for state */
329 	pthread_mutex_unlock(&iocom->mtx);
330 	hbytes = (cmd & DMSGF_SIZE) * DMSG_ALIGN;
331 	if (msg == NULL) {
332 		msg = malloc(offsetof(struct dmsg_msg, any.head) + hbytes + 4);
333 		bzero(msg, offsetof(struct dmsg_msg, any.head));
334 		*(int *)((char *)msg +
335 			 offsetof(struct dmsg_msg, any.head) + hbytes) =
336 				 0x71B2C3D4;
337 #if 0
338 		msg = malloc(sizeof(*msg));
339 		bzero(msg, sizeof(*msg));
340 #endif
341 	}
342 
343 	/*
344 	 * [re]allocate the auxillary data buffer.  The caller knows that
345 	 * a size-aligned buffer will be allocated but we do not want to
346 	 * force the caller to zero any tail piece, so we do that ourself.
347 	 */
348 	if (msg->aux_size != aux_size) {
349 		if (msg->aux_data) {
350 			free(msg->aux_data);
351 			msg->aux_data = NULL;
352 			msg->aux_size = 0;
353 		}
354 		if (aux_size) {
355 			msg->aux_data = malloc(aligned_size);
356 			msg->aux_size = aux_size;
357 			if (aux_size != aligned_size) {
358 				bzero(msg->aux_data + aux_size,
359 				      aligned_size - aux_size);
360 			}
361 		}
362 	}
363 	if (hbytes)
364 		bzero(&msg->any.head, hbytes);
365 	msg->hdr_size = hbytes;
366 	msg->any.head.magic = DMSG_HDR_MAGIC;
367 	msg->any.head.cmd = cmd;
368 	msg->any.head.aux_descr = 0;
369 	msg->any.head.aux_crc = 0;
370 	msg->any.head.circuit = 0;
371 	msg->circuit = circuit;
372 	msg->iocom = iocom;
373 	dmsg_circuit_hold(circuit);
374 	if (state) {
375 		msg->state = state;
376 		state->msg = msg;
377 		msg->any.head.msgid = state->msgid;
378 	} else {
379 		msg->any.head.msgid = 0;
380 	}
381 	return (msg);
382 }
383 
384 /*
385  * Free a message so it can be reused afresh.
386  *
387  * NOTE: aux_size can be 0 with a non-NULL aux_data.
388  */
389 static
390 void
391 dmsg_msg_free_locked(dmsg_msg_t *msg)
392 {
393 	/*dmsg_iocom_t *iocom = msg->iocom;*/
394 #if 1
395 	int hbytes = (msg->any.head.cmd & DMSGF_SIZE) * DMSG_ALIGN;
396 	if (*(int *)((char *)msg +
397 		     offsetof(struct  dmsg_msg, any.head) + hbytes) !=
398 	     0x71B2C3D4) {
399 		fprintf(stderr, "MSGFREE FAILED CMD %08x\n", msg->any.head.cmd);
400 		assert(0);
401 	}
402 #endif
403 	if (msg->circuit) {
404 		dmsg_circuit_drop_locked(msg->circuit);
405 		msg->circuit = NULL;
406 	}
407 	msg->state = NULL;
408 	if (msg->aux_data) {
409 		free(msg->aux_data);
410 		msg->aux_data = NULL;
411 	}
412 	msg->aux_size = 0;
413 	free (msg);
414 #if 0
415 	if (msg->aux_data)
416 		TAILQ_INSERT_TAIL(&iocom->freeq_aux, msg, qentry);
417 	else
418 		TAILQ_INSERT_TAIL(&iocom->freeq, msg, qentry);
419 #endif
420 }
421 
422 void
423 dmsg_msg_free(dmsg_msg_t *msg)
424 {
425 	dmsg_iocom_t *iocom = msg->iocom;
426 
427 	pthread_mutex_lock(&iocom->mtx);
428 	dmsg_msg_free_locked(msg);
429 	pthread_mutex_unlock(&iocom->mtx);
430 }
431 
432 /*
433  * I/O core loop for an iocom.
434  *
435  * Thread localized, iocom->mtx not held.
436  */
437 void
438 dmsg_iocom_core(dmsg_iocom_t *iocom)
439 {
440 	struct pollfd fds[3];
441 	char dummybuf[256];
442 	dmsg_msg_t *msg;
443 	int timeout;
444 	int count;
445 	int wi;	/* wakeup pipe */
446 	int si;	/* socket */
447 	int ai;	/* alt bulk path socket */
448 
449 	while ((iocom->flags & DMSG_IOCOMF_EOF) == 0) {
450 		/*
451 		 * These iocom->flags are only manipulated within the
452 		 * context of the current thread.  However, modifications
453 		 * still require atomic ops.
454 		 */
455 		if ((iocom->flags & (DMSG_IOCOMF_RWORK |
456 				     DMSG_IOCOMF_WWORK |
457 				     DMSG_IOCOMF_PWORK |
458 				     DMSG_IOCOMF_SWORK |
459 				     DMSG_IOCOMF_ARWORK |
460 				     DMSG_IOCOMF_AWWORK)) == 0) {
461 			/*
462 			 * Only poll if no immediate work is pending.
463 			 * Otherwise we are just wasting our time calling
464 			 * poll.
465 			 */
466 			timeout = 5000;
467 
468 			count = 0;
469 			wi = -1;
470 			si = -1;
471 			ai = -1;
472 
473 			/*
474 			 * Always check the inter-thread pipe, e.g.
475 			 * for iocom->txmsgq work.
476 			 */
477 			wi = count++;
478 			fds[wi].fd = iocom->wakeupfds[0];
479 			fds[wi].events = POLLIN;
480 			fds[wi].revents = 0;
481 
482 			/*
483 			 * Check the socket input/output direction as
484 			 * requested
485 			 */
486 			if (iocom->flags & (DMSG_IOCOMF_RREQ |
487 					    DMSG_IOCOMF_WREQ)) {
488 				si = count++;
489 				fds[si].fd = iocom->sock_fd;
490 				fds[si].events = 0;
491 				fds[si].revents = 0;
492 
493 				if (iocom->flags & DMSG_IOCOMF_RREQ)
494 					fds[si].events |= POLLIN;
495 				if (iocom->flags & DMSG_IOCOMF_WREQ)
496 					fds[si].events |= POLLOUT;
497 			}
498 
499 			/*
500 			 * Check the alternative fd for work.
501 			 */
502 			if (iocom->alt_fd >= 0) {
503 				ai = count++;
504 				fds[ai].fd = iocom->alt_fd;
505 				fds[ai].events = POLLIN;
506 				fds[ai].revents = 0;
507 			}
508 			poll(fds, count, timeout);
509 
510 			if (wi >= 0 && (fds[wi].revents & POLLIN))
511 				atomic_set_int(&iocom->flags,
512 					       DMSG_IOCOMF_PWORK);
513 			if (si >= 0 && (fds[si].revents & POLLIN))
514 				atomic_set_int(&iocom->flags,
515 					       DMSG_IOCOMF_RWORK);
516 			if (si >= 0 && (fds[si].revents & POLLOUT))
517 				atomic_set_int(&iocom->flags,
518 					       DMSG_IOCOMF_WWORK);
519 			if (wi >= 0 && (fds[wi].revents & POLLOUT))
520 				atomic_set_int(&iocom->flags,
521 					       DMSG_IOCOMF_WWORK);
522 			if (ai >= 0 && (fds[ai].revents & POLLIN))
523 				atomic_set_int(&iocom->flags,
524 					       DMSG_IOCOMF_ARWORK);
525 		} else {
526 			/*
527 			 * Always check the pipe
528 			 */
529 			atomic_set_int(&iocom->flags, DMSG_IOCOMF_PWORK);
530 		}
531 
532 		if (iocom->flags & DMSG_IOCOMF_SWORK) {
533 			atomic_clear_int(&iocom->flags, DMSG_IOCOMF_SWORK);
534 			iocom->signal_callback(iocom);
535 		}
536 
537 		/*
538 		 * Pending message queues from other threads wake us up
539 		 * with a write to the wakeupfds[] pipe.  We have to clear
540 		 * the pipe with a dummy read.
541 		 */
542 		if (iocom->flags & DMSG_IOCOMF_PWORK) {
543 			atomic_clear_int(&iocom->flags, DMSG_IOCOMF_PWORK);
544 			read(iocom->wakeupfds[0], dummybuf, sizeof(dummybuf));
545 			atomic_set_int(&iocom->flags, DMSG_IOCOMF_RWORK);
546 			atomic_set_int(&iocom->flags, DMSG_IOCOMF_WWORK);
547 			if (TAILQ_FIRST(&iocom->txmsgq))
548 				dmsg_iocom_flush1(iocom);
549 		}
550 
551 		/*
552 		 * Message write sequencing
553 		 */
554 		if (iocom->flags & DMSG_IOCOMF_WWORK)
555 			dmsg_iocom_flush1(iocom);
556 
557 		/*
558 		 * Message read sequencing.  Run this after the write
559 		 * sequencing in case the write sequencing allowed another
560 		 * auto-DELETE to occur on the read side.
561 		 */
562 		if (iocom->flags & DMSG_IOCOMF_RWORK) {
563 			while ((iocom->flags & DMSG_IOCOMF_EOF) == 0 &&
564 			       (msg = dmsg_ioq_read(iocom)) != NULL) {
565 				if (DMsgDebugOpt) {
566 					fprintf(stderr, "receive %s\n",
567 						dmsg_msg_str(msg));
568 				}
569 				iocom->rcvmsg_callback(msg);
570 				dmsg_state_cleanuprx(iocom, msg);
571 			}
572 		}
573 
574 		if (iocom->flags & DMSG_IOCOMF_ARWORK) {
575 			atomic_clear_int(&iocom->flags, DMSG_IOCOMF_ARWORK);
576 			iocom->altmsg_callback(iocom);
577 		}
578 	}
579 }
580 
581 /*
582  * Make sure there's enough room in the FIFO to hold the
583  * needed data.
584  *
585  * Assume worst case encrypted form is 2x the size of the
586  * plaintext equivalent.
587  */
588 static
589 size_t
590 dmsg_ioq_makeroom(dmsg_ioq_t *ioq, size_t needed)
591 {
592 	size_t bytes;
593 	size_t nmax;
594 
595 	bytes = ioq->fifo_cdx - ioq->fifo_beg;
596 	nmax = sizeof(ioq->buf) - ioq->fifo_end;
597 	if (bytes + nmax / 2 < needed) {
598 		if (bytes) {
599 			bcopy(ioq->buf + ioq->fifo_beg,
600 			      ioq->buf,
601 			      bytes);
602 		}
603 		ioq->fifo_cdx -= ioq->fifo_beg;
604 		ioq->fifo_beg = 0;
605 		if (ioq->fifo_cdn < ioq->fifo_end) {
606 			bcopy(ioq->buf + ioq->fifo_cdn,
607 			      ioq->buf + ioq->fifo_cdx,
608 			      ioq->fifo_end - ioq->fifo_cdn);
609 		}
610 		ioq->fifo_end -= ioq->fifo_cdn - ioq->fifo_cdx;
611 		ioq->fifo_cdn = ioq->fifo_cdx;
612 		nmax = sizeof(ioq->buf) - ioq->fifo_end;
613 	}
614 	return(nmax);
615 }
616 
617 /*
618  * Read the next ready message from the ioq, issuing I/O if needed.
619  * Caller should retry on a read-event when NULL is returned.
620  *
621  * If an error occurs during reception a DMSG_LNK_ERROR msg will
622  * be returned for each open transaction, then the ioq and iocom
623  * will be errored out and a non-transactional DMSG_LNK_ERROR
624  * msg will be returned as the final message.  The caller should not call
625  * us again after the final message is returned.
626  *
627  * Thread localized, iocom->mtx not held.
628  */
629 dmsg_msg_t *
630 dmsg_ioq_read(dmsg_iocom_t *iocom)
631 {
632 	dmsg_ioq_t *ioq = &iocom->ioq_rx;
633 	dmsg_msg_t *msg;
634 	dmsg_state_t *state;
635 	dmsg_circuit_t *circuit0;
636 	dmsg_hdr_t *head;
637 	ssize_t n;
638 	size_t bytes;
639 	size_t nmax;
640 	uint32_t aux_size;
641 	uint32_t xcrc32;
642 	int error;
643 
644 again:
645 	/*
646 	 * If a message is already pending we can just remove and
647 	 * return it.  Message state has already been processed.
648 	 * (currently not implemented)
649 	 */
650 	if ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
651 		TAILQ_REMOVE(&ioq->msgq, msg, qentry);
652 		return (msg);
653 	}
654 	atomic_clear_int(&iocom->flags, DMSG_IOCOMF_RREQ | DMSG_IOCOMF_RWORK);
655 
656 	/*
657 	 * If the stream is errored out we stop processing it.
658 	 */
659 	if (ioq->error)
660 		goto skip;
661 
662 	/*
663 	 * Message read in-progress (msg is NULL at the moment).  We don't
664 	 * allocate a msg until we have its core header.
665 	 */
666 	nmax = sizeof(ioq->buf) - ioq->fifo_end;
667 	bytes = ioq->fifo_cdx - ioq->fifo_beg;		/* already decrypted */
668 	msg = ioq->msg;
669 
670 	switch(ioq->state) {
671 	case DMSG_MSGQ_STATE_HEADER1:
672 		/*
673 		 * Load the primary header, fail on any non-trivial read
674 		 * error or on EOF.  Since the primary header is the same
675 		 * size is the message alignment it will never straddle
676 		 * the end of the buffer.
677 		 */
678 		nmax = dmsg_ioq_makeroom(ioq, sizeof(msg->any.head));
679 		if (bytes < sizeof(msg->any.head)) {
680 			n = read(iocom->sock_fd,
681 				 ioq->buf + ioq->fifo_end,
682 				 nmax);
683 			if (n <= 0) {
684 				if (n == 0) {
685 					ioq->error = DMSG_IOQ_ERROR_EOF;
686 					break;
687 				}
688 				if (errno != EINTR &&
689 				    errno != EINPROGRESS &&
690 				    errno != EAGAIN) {
691 					ioq->error = DMSG_IOQ_ERROR_SOCK;
692 					break;
693 				}
694 				n = 0;
695 				/* fall through */
696 			}
697 			ioq->fifo_end += (size_t)n;
698 			nmax -= (size_t)n;
699 		}
700 
701 		/*
702 		 * Decrypt data received so far.  Data will be decrypted
703 		 * in-place but might create gaps in the FIFO.  Partial
704 		 * blocks are not immediately decrypted.
705 		 *
706 		 * WARNING!  The header might be in the wrong endian, we
707 		 *	     do not fix it up until we get the entire
708 		 *	     extended header.
709 		 */
710 		if (iocom->flags & DMSG_IOCOMF_CRYPTED) {
711 			dmsg_crypto_decrypt(iocom, ioq);
712 		} else {
713 			ioq->fifo_cdx = ioq->fifo_end;
714 			ioq->fifo_cdn = ioq->fifo_end;
715 		}
716 		bytes = ioq->fifo_cdx - ioq->fifo_beg;
717 
718 		/*
719 		 * Insufficient data accumulated (msg is NULL, caller will
720 		 * retry on event).
721 		 */
722 		assert(msg == NULL);
723 		if (bytes < sizeof(msg->any.head))
724 			break;
725 
726 		/*
727 		 * Check and fixup the core header.  Note that the icrc
728 		 * has to be calculated before any fixups, but the crc
729 		 * fields in the msg may have to be swapped like everything
730 		 * else.
731 		 */
732 		head = (void *)(ioq->buf + ioq->fifo_beg);
733 		if (head->magic != DMSG_HDR_MAGIC &&
734 		    head->magic != DMSG_HDR_MAGIC_REV) {
735 			fprintf(stderr, "%s: head->magic is bad %02x\n",
736 				iocom->label, head->magic);
737 			if (iocom->flags & DMSG_IOCOMF_CRYPTED)
738 				fprintf(stderr, "(on encrypted link)\n");
739 			ioq->error = DMSG_IOQ_ERROR_SYNC;
740 			break;
741 		}
742 
743 		/*
744 		 * Calculate the full header size and aux data size
745 		 */
746 		if (head->magic == DMSG_HDR_MAGIC_REV) {
747 			ioq->hbytes = (bswap32(head->cmd) & DMSGF_SIZE) *
748 				      DMSG_ALIGN;
749 			aux_size = bswap32(head->aux_bytes);
750 		} else {
751 			ioq->hbytes = (head->cmd & DMSGF_SIZE) *
752 				      DMSG_ALIGN;
753 			aux_size = head->aux_bytes;
754 		}
755 		ioq->abytes = DMSG_DOALIGN(aux_size);
756 		ioq->unaligned_aux_size = aux_size;
757 		if (ioq->hbytes < sizeof(msg->any.head) ||
758 		    ioq->hbytes > sizeof(msg->any) ||
759 		    ioq->abytes > DMSG_AUX_MAX) {
760 			ioq->error = DMSG_IOQ_ERROR_FIELD;
761 			break;
762 		}
763 
764 		/*
765 		 * Allocate the message, the next state will fill it in.
766 		 * Note that the aux_data buffer will be sized to an aligned
767 		 * value and the aligned remainder zero'd for convenience.
768 		 */
769 		msg = dmsg_msg_alloc(&iocom->circuit0, aux_size,
770 				     ioq->hbytes / DMSG_ALIGN,
771 				     NULL, NULL);
772 		ioq->msg = msg;
773 
774 		/*
775 		 * Fall through to the next state.  Make sure that the
776 		 * extended header does not straddle the end of the buffer.
777 		 * We still want to issue larger reads into our buffer,
778 		 * book-keeping is easier if we don't bcopy() yet.
779 		 *
780 		 * Make sure there is enough room for bloated encrypt data.
781 		 */
782 		nmax = dmsg_ioq_makeroom(ioq, ioq->hbytes);
783 		ioq->state = DMSG_MSGQ_STATE_HEADER2;
784 		/* fall through */
785 	case DMSG_MSGQ_STATE_HEADER2:
786 		/*
787 		 * Fill out the extended header.
788 		 */
789 		assert(msg != NULL);
790 		if (bytes < ioq->hbytes) {
791 			n = read(iocom->sock_fd,
792 				 ioq->buf + ioq->fifo_end,
793 				 nmax);
794 			if (n <= 0) {
795 				if (n == 0) {
796 					ioq->error = DMSG_IOQ_ERROR_EOF;
797 					break;
798 				}
799 				if (errno != EINTR &&
800 				    errno != EINPROGRESS &&
801 				    errno != EAGAIN) {
802 					ioq->error = DMSG_IOQ_ERROR_SOCK;
803 					break;
804 				}
805 				n = 0;
806 				/* fall through */
807 			}
808 			ioq->fifo_end += (size_t)n;
809 			nmax -= (size_t)n;
810 		}
811 
812 		if (iocom->flags & DMSG_IOCOMF_CRYPTED) {
813 			dmsg_crypto_decrypt(iocom, ioq);
814 		} else {
815 			ioq->fifo_cdx = ioq->fifo_end;
816 			ioq->fifo_cdn = ioq->fifo_end;
817 		}
818 		bytes = ioq->fifo_cdx - ioq->fifo_beg;
819 
820 		/*
821 		 * Insufficient data accumulated (set msg NULL so caller will
822 		 * retry on event).
823 		 */
824 		if (bytes < ioq->hbytes) {
825 			msg = NULL;
826 			break;
827 		}
828 
829 		/*
830 		 * Calculate the extended header, decrypt data received
831 		 * so far.  Handle endian-conversion for the entire extended
832 		 * header.
833 		 */
834 		head = (void *)(ioq->buf + ioq->fifo_beg);
835 
836 		/*
837 		 * Check the CRC.
838 		 */
839 		if (head->magic == DMSG_HDR_MAGIC_REV)
840 			xcrc32 = bswap32(head->hdr_crc);
841 		else
842 			xcrc32 = head->hdr_crc;
843 		head->hdr_crc = 0;
844 		if (dmsg_icrc32(head, ioq->hbytes) != xcrc32) {
845 			ioq->error = DMSG_IOQ_ERROR_XCRC;
846 			fprintf(stderr, "BAD-XCRC(%08x,%08x) %s\n",
847 				xcrc32, dmsg_icrc32(head, ioq->hbytes),
848 				dmsg_msg_str(msg));
849 			assert(0);
850 			break;
851 		}
852 		head->hdr_crc = xcrc32;
853 
854 		if (head->magic == DMSG_HDR_MAGIC_REV) {
855 			dmsg_bswap_head(head);
856 		}
857 
858 		/*
859 		 * Copy the extended header into the msg and adjust the
860 		 * FIFO.
861 		 */
862 		bcopy(head, &msg->any, ioq->hbytes);
863 
864 		/*
865 		 * We are either done or we fall-through.
866 		 */
867 		if (ioq->abytes == 0) {
868 			ioq->fifo_beg += ioq->hbytes;
869 			break;
870 		}
871 
872 		/*
873 		 * Must adjust bytes (and the state) when falling through.
874 		 * nmax doesn't change.
875 		 */
876 		ioq->fifo_beg += ioq->hbytes;
877 		bytes -= ioq->hbytes;
878 		ioq->state = DMSG_MSGQ_STATE_AUXDATA1;
879 		/* fall through */
880 	case DMSG_MSGQ_STATE_AUXDATA1:
881 		/*
882 		 * Copy the partial or complete [decrypted] payload from
883 		 * remaining bytes in the FIFO in order to optimize the
884 		 * makeroom call in the AUXDATA2 state.  We have to
885 		 * fall-through either way so we can check the crc.
886 		 *
887 		 * msg->aux_size tracks our aux data.
888 		 *
889 		 * (Lets not complicate matters if the data is encrypted,
890 		 *  since the data in-stream is not the same size as the
891 		 *  data decrypted).
892 		 */
893 		if (bytes >= ioq->abytes) {
894 			bcopy(ioq->buf + ioq->fifo_beg, msg->aux_data,
895 			      ioq->abytes);
896 			msg->aux_size = ioq->abytes;
897 			ioq->fifo_beg += ioq->abytes;
898 			assert(ioq->fifo_beg <= ioq->fifo_cdx);
899 			assert(ioq->fifo_cdx <= ioq->fifo_cdn);
900 			bytes -= ioq->abytes;
901 		} else if (bytes) {
902 			bcopy(ioq->buf + ioq->fifo_beg, msg->aux_data,
903 			      bytes);
904 			msg->aux_size = bytes;
905 			ioq->fifo_beg += bytes;
906 			if (ioq->fifo_cdx < ioq->fifo_beg)
907 				ioq->fifo_cdx = ioq->fifo_beg;
908 			assert(ioq->fifo_beg <= ioq->fifo_cdx);
909 			assert(ioq->fifo_cdx <= ioq->fifo_cdn);
910 			bytes = 0;
911 		} else {
912 			msg->aux_size = 0;
913 		}
914 		ioq->state = DMSG_MSGQ_STATE_AUXDATA2;
915 		/* fall through */
916 	case DMSG_MSGQ_STATE_AUXDATA2:
917 		/*
918 		 * Make sure there is enough room for more data.
919 		 */
920 		assert(msg);
921 		nmax = dmsg_ioq_makeroom(ioq, ioq->abytes - msg->aux_size);
922 
923 		/*
924 		 * Read and decrypt more of the payload.
925 		 */
926 		if (msg->aux_size < ioq->abytes) {
927 			assert(bytes == 0);
928 			n = read(iocom->sock_fd,
929 				 ioq->buf + ioq->fifo_end,
930 				 nmax);
931 			if (n <= 0) {
932 				if (n == 0) {
933 					ioq->error = DMSG_IOQ_ERROR_EOF;
934 					break;
935 				}
936 				if (errno != EINTR &&
937 				    errno != EINPROGRESS &&
938 				    errno != EAGAIN) {
939 					ioq->error = DMSG_IOQ_ERROR_SOCK;
940 					break;
941 				}
942 				n = 0;
943 				/* fall through */
944 			}
945 			ioq->fifo_end += (size_t)n;
946 			nmax -= (size_t)n;
947 		}
948 
949 		if (iocom->flags & DMSG_IOCOMF_CRYPTED) {
950 			dmsg_crypto_decrypt(iocom, ioq);
951 		} else {
952 			ioq->fifo_cdx = ioq->fifo_end;
953 			ioq->fifo_cdn = ioq->fifo_end;
954 		}
955 		bytes = ioq->fifo_cdx - ioq->fifo_beg;
956 
957 		if (bytes > ioq->abytes - msg->aux_size)
958 			bytes = ioq->abytes - msg->aux_size;
959 
960 		if (bytes) {
961 			bcopy(ioq->buf + ioq->fifo_beg,
962 			      msg->aux_data + msg->aux_size,
963 			      bytes);
964 			msg->aux_size += bytes;
965 			ioq->fifo_beg += bytes;
966 		}
967 
968 		/*
969 		 * Insufficient data accumulated (set msg NULL so caller will
970 		 * retry on event).
971 		 *
972 		 * Assert the auxillary data size is correct, then record the
973 		 * original unaligned size from the message header.
974 		 */
975 		if (msg->aux_size < ioq->abytes) {
976 			msg = NULL;
977 			break;
978 		}
979 		assert(msg->aux_size == ioq->abytes);
980 		msg->aux_size = ioq->unaligned_aux_size;
981 
982 		/*
983 		 * Check aux_crc, then we are done.  Note that the crc
984 		 * is calculated over the aligned size, not the actual
985 		 * size.
986 		 */
987 		xcrc32 = dmsg_icrc32(msg->aux_data, ioq->abytes);
988 		if (xcrc32 != msg->any.head.aux_crc) {
989 			ioq->error = DMSG_IOQ_ERROR_ACRC;
990 			fprintf(stderr, "iocom: ACRC error %08x vs %08x msgid %016jx msgcmd %08x auxsize %d\n",
991 				xcrc32, msg->any.head.aux_crc, (intmax_t)msg->any.head.msgid, msg->any.head.cmd, msg->any.head.aux_bytes);
992 			break;
993 		}
994 		break;
995 	case DMSG_MSGQ_STATE_ERROR:
996 		/*
997 		 * Continued calls to drain recorded transactions (returning
998 		 * a LNK_ERROR for each one), before we return the final
999 		 * LNK_ERROR.
1000 		 */
1001 		assert(msg == NULL);
1002 		break;
1003 	default:
1004 		/*
1005 		 * We don't double-return errors, the caller should not
1006 		 * have called us again after getting an error msg.
1007 		 */
1008 		assert(0);
1009 		break;
1010 	}
1011 
1012 	/*
1013 	 * Check the message sequence.  The iv[] should prevent any
1014 	 * possibility of a replay but we add this check anyway.
1015 	 */
1016 	if (msg && ioq->error == 0) {
1017 		if ((msg->any.head.salt & 255) != (ioq->seq & 255)) {
1018 			ioq->error = DMSG_IOQ_ERROR_MSGSEQ;
1019 		} else {
1020 			++ioq->seq;
1021 		}
1022 	}
1023 
1024 	/*
1025 	 * Handle error, RREQ, or completion
1026 	 *
1027 	 * NOTE: nmax and bytes are invalid at this point, we don't bother
1028 	 *	 to update them when breaking out.
1029 	 */
1030 	if (ioq->error) {
1031 skip:
1032 		fprintf(stderr, "IOQ ERROR %d\n", ioq->error);
1033 		/*
1034 		 * An unrecoverable error causes all active receive
1035 		 * transactions to be terminated with a LNK_ERROR message.
1036 		 *
1037 		 * Once all active transactions are exhausted we set the
1038 		 * iocom ERROR flag and return a non-transactional LNK_ERROR
1039 		 * message, which should cause master processing loops to
1040 		 * terminate.
1041 		 */
1042 		assert(ioq->msg == msg);
1043 		if (msg) {
1044 			dmsg_msg_free(msg);
1045 			ioq->msg = NULL;
1046 		}
1047 
1048 		/*
1049 		 * No more I/O read processing
1050 		 */
1051 		ioq->state = DMSG_MSGQ_STATE_ERROR;
1052 
1053 		/*
1054 		 * Simulate a remote LNK_ERROR DELETE msg for any open
1055 		 * transactions, ending with a final non-transactional
1056 		 * LNK_ERROR (that the session can detect) when no
1057 		 * transactions remain.
1058 		 *
1059 		 * We only need to scan transactions on circuit0 as these
1060 		 * will contain all circuit forges, and terminating circuit
1061 		 * forges will automatically terminate the transactions on
1062 		 * any other circuits as well as those circuits.
1063 		 */
1064 		circuit0 = &iocom->circuit0;
1065 		msg = dmsg_msg_alloc(circuit0, 0, DMSG_LNK_ERROR, NULL, NULL);
1066 		msg->any.head.error = ioq->error;
1067 
1068 		pthread_mutex_lock(&iocom->mtx);
1069 		dmsg_iocom_drain(iocom);
1070 
1071 		if ((state = RB_ROOT(&circuit0->staterd_tree)) != NULL) {
1072 			/*
1073 			 * Active remote transactions are still present.
1074 			 * Simulate the other end sending us a DELETE.
1075 			 */
1076 			if (state->rxcmd & DMSGF_DELETE) {
1077 				dmsg_msg_free(msg);
1078 				fprintf(stderr,
1079 					"iocom: ioq error(rd) %d sleeping "
1080 					"state %p rxcmd %08x txcmd %08x "
1081 					"func %p\n",
1082 					ioq->error, state, state->rxcmd,
1083 					state->txcmd, state->func);
1084 				usleep(100000);	/* XXX */
1085 				atomic_set_int(&iocom->flags,
1086 					       DMSG_IOCOMF_RWORK);
1087 				msg = NULL;
1088 			} else {
1089 				/*state->txcmd |= DMSGF_DELETE;*/
1090 				msg->state = state;
1091 				msg->iocom = iocom;
1092 				msg->any.head.msgid = state->msgid;
1093 				msg->any.head.cmd |= DMSGF_ABORT |
1094 						     DMSGF_DELETE;
1095 			}
1096 		} else if ((state = RB_ROOT(&circuit0->statewr_tree)) != NULL) {
1097 			/*
1098 			 * Active local transactions are still present.
1099 			 * Simulate the other end sending us a DELETE.
1100 			 */
1101 			if (state->rxcmd & DMSGF_DELETE) {
1102 				dmsg_msg_free(msg);
1103 				fprintf(stderr,
1104 					"iocom: ioq error(wr) %d sleeping "
1105 					"state %p rxcmd %08x txcmd %08x "
1106 					"func %p\n",
1107 					ioq->error, state, state->rxcmd,
1108 					state->txcmd, state->func);
1109 				usleep(100000);	/* XXX */
1110 				atomic_set_int(&iocom->flags,
1111 					       DMSG_IOCOMF_RWORK);
1112 				msg = NULL;
1113 			} else {
1114 				msg->state = state;
1115 				msg->iocom = iocom;
1116 				msg->any.head.msgid = state->msgid;
1117 				msg->any.head.cmd |= DMSGF_ABORT |
1118 						     DMSGF_DELETE |
1119 						     DMSGF_REPLY;
1120 				if ((state->rxcmd & DMSGF_CREATE) == 0) {
1121 					msg->any.head.cmd |=
1122 						     DMSGF_CREATE;
1123 				}
1124 			}
1125 		} else {
1126 			/*
1127 			 * No active local or remote transactions remain.
1128 			 * Generate a final LNK_ERROR and flag EOF.
1129 			 */
1130 			msg->state = NULL;
1131 			atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF);
1132 			fprintf(stderr, "EOF ON SOCKET %d\n", iocom->sock_fd);
1133 		}
1134 		pthread_mutex_unlock(&iocom->mtx);
1135 
1136 		/*
1137 		 * For the iocom error case we want to set RWORK to indicate
1138 		 * that more messages might be pending.
1139 		 *
1140 		 * It is possible to return NULL when there is more work to
1141 		 * do because each message has to be DELETEd in both
1142 		 * directions before we continue on with the next (though
1143 		 * this could be optimized).  The transmit direction will
1144 		 * re-set RWORK.
1145 		 */
1146 		if (msg)
1147 			atomic_set_int(&iocom->flags, DMSG_IOCOMF_RWORK);
1148 	} else if (msg == NULL) {
1149 		/*
1150 		 * Insufficient data received to finish building the message,
1151 		 * set RREQ and return NULL.
1152 		 *
1153 		 * Leave ioq->msg intact.
1154 		 * Leave the FIFO intact.
1155 		 */
1156 		atomic_set_int(&iocom->flags, DMSG_IOCOMF_RREQ);
1157 	} else {
1158 		/*
1159 		 * Continue processing msg.
1160 		 *
1161 		 * The fifo has already been advanced past the message.
1162 		 * Trivially reset the FIFO indices if possible.
1163 		 *
1164 		 * clear the FIFO if it is now empty and set RREQ to wait
1165 		 * for more from the socket.  If the FIFO is not empty set
1166 		 * TWORK to bypass the poll so we loop immediately.
1167 		 */
1168 		if (ioq->fifo_beg == ioq->fifo_cdx &&
1169 		    ioq->fifo_cdn == ioq->fifo_end) {
1170 			atomic_set_int(&iocom->flags, DMSG_IOCOMF_RREQ);
1171 			ioq->fifo_cdx = 0;
1172 			ioq->fifo_cdn = 0;
1173 			ioq->fifo_beg = 0;
1174 			ioq->fifo_end = 0;
1175 		} else {
1176 			atomic_set_int(&iocom->flags, DMSG_IOCOMF_RWORK);
1177 		}
1178 		ioq->state = DMSG_MSGQ_STATE_HEADER1;
1179 		ioq->msg = NULL;
1180 
1181 		/*
1182 		 * Handle message routing.  Validates non-zero sources
1183 		 * and routes message.  Error will be 0 if the message is
1184 		 * destined for us.
1185 		 *
1186 		 * State processing only occurs for messages destined for us.
1187 		 */
1188 		if (DMsgDebugOpt >= 5) {
1189 			fprintf(stderr,
1190 				"rxmsg cmd=%08x msgid=%016jx circ=%016jx\n",
1191 				msg->any.head.cmd,
1192 				(intmax_t)msg->any.head.msgid,
1193 				(intmax_t)msg->any.head.circuit);
1194 		}
1195 		if (msg->any.head.circuit)
1196 			error = dmsg_circuit_route(msg);
1197 		else
1198 			error = dmsg_state_msgrx(msg);
1199 
1200 		if (error) {
1201 			/*
1202 			 * Abort-after-closure, throw message away and
1203 			 * start reading another.
1204 			 */
1205 			if (error == DMSG_IOQ_ERROR_EALREADY) {
1206 				dmsg_msg_free(msg);
1207 				goto again;
1208 			}
1209 
1210 			/*
1211 			 * msg routed, msg pointer no longer owned by us.
1212 			 * Go to the top and start reading another.
1213 			 */
1214 			if (error == DMSG_IOQ_ERROR_ROUTED)
1215 				goto again;
1216 
1217 			/*
1218 			 * Process real error and throw away message.
1219 			 */
1220 			ioq->error = error;
1221 			goto skip;
1222 		}
1223 		/* no error, not routed.  Fall through and return msg */
1224 	}
1225 	return (msg);
1226 }
1227 
1228 /*
1229  * Calculate the header and data crc's and write a low-level message to
1230  * the connection.  If aux_crc is non-zero the aux_data crc is already
1231  * assumed to have been set.
1232  *
1233  * A non-NULL msg is added to the queue but not necessarily flushed.
1234  * Calling this function with msg == NULL will get a flush going.
1235  *
1236  * (called from iocom_core only)
1237  */
1238 void
1239 dmsg_iocom_flush1(dmsg_iocom_t *iocom)
1240 {
1241 	dmsg_ioq_t *ioq = &iocom->ioq_tx;
1242 	dmsg_msg_t *msg;
1243 	uint32_t xcrc32;
1244 	size_t hbytes;
1245 	size_t abytes;
1246 	dmsg_msg_queue_t tmpq;
1247 
1248 	atomic_clear_int(&iocom->flags, DMSG_IOCOMF_WREQ | DMSG_IOCOMF_WWORK);
1249 	TAILQ_INIT(&tmpq);
1250 	pthread_mutex_lock(&iocom->mtx);
1251 	while ((msg = TAILQ_FIRST(&iocom->txmsgq)) != NULL) {
1252 		TAILQ_REMOVE(&iocom->txmsgq, msg, qentry);
1253 		TAILQ_INSERT_TAIL(&tmpq, msg, qentry);
1254 	}
1255 	pthread_mutex_unlock(&iocom->mtx);
1256 
1257 	while ((msg = TAILQ_FIRST(&tmpq)) != NULL) {
1258 		/*
1259 		 * Process terminal connection errors.
1260 		 */
1261 		TAILQ_REMOVE(&tmpq, msg, qentry);
1262 		if (ioq->error) {
1263 			TAILQ_INSERT_TAIL(&ioq->msgq, msg, qentry);
1264 			++ioq->msgcount;
1265 			continue;
1266 		}
1267 
1268 		/*
1269 		 * Finish populating the msg fields.  The salt ensures that
1270 		 * the iv[] array is ridiculously randomized and we also
1271 		 * re-seed our PRNG every 32768 messages just to be sure.
1272 		 */
1273 		msg->any.head.magic = DMSG_HDR_MAGIC;
1274 		msg->any.head.salt = (random() << 8) | (ioq->seq & 255);
1275 		++ioq->seq;
1276 		if ((ioq->seq & 32767) == 0)
1277 			srandomdev();
1278 
1279 		/*
1280 		 * Calculate aux_crc if 0, then calculate hdr_crc.
1281 		 */
1282 		if (msg->aux_size && msg->any.head.aux_crc == 0) {
1283 			abytes = DMSG_DOALIGN(msg->aux_size);
1284 			xcrc32 = dmsg_icrc32(msg->aux_data, abytes);
1285 			msg->any.head.aux_crc = xcrc32;
1286 		}
1287 		msg->any.head.aux_bytes = msg->aux_size;
1288 
1289 		hbytes = (msg->any.head.cmd & DMSGF_SIZE) *
1290 			 DMSG_ALIGN;
1291 		msg->any.head.hdr_crc = 0;
1292 		msg->any.head.hdr_crc = dmsg_icrc32(&msg->any.head, hbytes);
1293 
1294 		/*
1295 		 * Enqueue the message (the flush codes handles stream
1296 		 * encryption).
1297 		 */
1298 		TAILQ_INSERT_TAIL(&ioq->msgq, msg, qentry);
1299 		++ioq->msgcount;
1300 	}
1301 	dmsg_iocom_flush2(iocom);
1302 }
1303 
1304 /*
1305  * Thread localized, iocom->mtx not held by caller.
1306  *
1307  * (called from iocom_core via iocom_flush1 only)
1308  */
1309 void
1310 dmsg_iocom_flush2(dmsg_iocom_t *iocom)
1311 {
1312 	dmsg_ioq_t *ioq = &iocom->ioq_tx;
1313 	dmsg_msg_t *msg;
1314 	ssize_t n;
1315 	struct iovec iov[DMSG_IOQ_MAXIOVEC];
1316 	size_t nact;
1317 	size_t hbytes;
1318 	size_t abytes;
1319 	size_t hoff;
1320 	size_t aoff;
1321 	int iovcnt;
1322 
1323 	if (ioq->error) {
1324 		dmsg_iocom_drain(iocom);
1325 		return;
1326 	}
1327 
1328 	/*
1329 	 * Pump messages out the connection by building an iovec.
1330 	 *
1331 	 * ioq->hbytes/ioq->abytes tracks how much of the first message
1332 	 * in the queue has been successfully written out, so we can
1333 	 * resume writing.
1334 	 */
1335 	iovcnt = 0;
1336 	nact = 0;
1337 	hoff = ioq->hbytes;
1338 	aoff = ioq->abytes;
1339 
1340 	TAILQ_FOREACH(msg, &ioq->msgq, qentry) {
1341 		hbytes = (msg->any.head.cmd & DMSGF_SIZE) *
1342 			 DMSG_ALIGN;
1343 		abytes = DMSG_DOALIGN(msg->aux_size);
1344 		assert(hoff <= hbytes && aoff <= abytes);
1345 
1346 		if (hoff < hbytes) {
1347 			iov[iovcnt].iov_base = (char *)&msg->any.head + hoff;
1348 			iov[iovcnt].iov_len = hbytes - hoff;
1349 			nact += hbytes - hoff;
1350 			++iovcnt;
1351 			if (iovcnt == DMSG_IOQ_MAXIOVEC)
1352 				break;
1353 		}
1354 		if (aoff < abytes) {
1355 			assert(msg->aux_data != NULL);
1356 			iov[iovcnt].iov_base = (char *)msg->aux_data + aoff;
1357 			iov[iovcnt].iov_len = abytes - aoff;
1358 			nact += abytes - aoff;
1359 			++iovcnt;
1360 			if (iovcnt == DMSG_IOQ_MAXIOVEC)
1361 				break;
1362 		}
1363 		hoff = 0;
1364 		aoff = 0;
1365 	}
1366 	if (iovcnt == 0)
1367 		return;
1368 
1369 	/*
1370 	 * Encrypt and write the data.  The crypto code will move the
1371 	 * data into the fifo and adjust the iov as necessary.  If
1372 	 * encryption is disabled the iov is left alone.
1373 	 *
1374 	 * May return a smaller iov (thus a smaller n), with aggregated
1375 	 * chunks.  May reduce nmax to what fits in the FIFO.
1376 	 *
1377 	 * This function sets nact to the number of original bytes now
1378 	 * encrypted, adding to the FIFO some number of bytes that might
1379 	 * be greater depending on the crypto mechanic.  iov[] is adjusted
1380 	 * to point at the FIFO if necessary.
1381 	 *
1382 	 * NOTE: The return value from the writev() is the post-encrypted
1383 	 *	 byte count, not the plaintext count.
1384 	 */
1385 	if (iocom->flags & DMSG_IOCOMF_CRYPTED) {
1386 		/*
1387 		 * Make sure the FIFO has a reasonable amount of space
1388 		 * left (if not completely full).
1389 		 *
1390 		 * In this situation we are staging the encrypted message
1391 		 * data in the FIFO.  (nact) represents how much plaintext
1392 		 * has been staged, (n) represents how much encrypted data
1393 		 * has been flushed.  The two are independent of each other.
1394 		 */
1395 		if (ioq->fifo_beg > sizeof(ioq->buf) / 2 &&
1396 		    sizeof(ioq->buf) - ioq->fifo_end < DMSG_ALIGN * 2) {
1397 			bcopy(ioq->buf + ioq->fifo_beg, ioq->buf,
1398 			      ioq->fifo_end - ioq->fifo_beg);
1399 			ioq->fifo_cdx -= ioq->fifo_beg;
1400 			ioq->fifo_cdn -= ioq->fifo_beg;
1401 			ioq->fifo_end -= ioq->fifo_beg;
1402 			ioq->fifo_beg = 0;
1403 		}
1404 
1405 		iovcnt = dmsg_crypto_encrypt(iocom, ioq, iov, iovcnt, &nact);
1406 		n = writev(iocom->sock_fd, iov, iovcnt);
1407 		if (n > 0) {
1408 			ioq->fifo_beg += n;
1409 			ioq->fifo_cdn += n;
1410 			ioq->fifo_cdx += n;
1411 			if (ioq->fifo_beg == ioq->fifo_end) {
1412 				ioq->fifo_beg = 0;
1413 				ioq->fifo_cdn = 0;
1414 				ioq->fifo_cdx = 0;
1415 				ioq->fifo_end = 0;
1416 			}
1417 		}
1418 		/*
1419 		 * We don't mess with the nact returned by the crypto_encrypt
1420 		 * call, which represents the filling of the FIFO.  (n) tells
1421 		 * us how much we were able to write from the FIFO.  The two
1422 		 * are different beasts when encrypting.
1423 		 */
1424 	} else {
1425 		/*
1426 		 * In this situation we are not staging the messages to the
1427 		 * FIFO but instead writing them directly from the msg
1428 		 * structure(s), so (nact) is basically (n).
1429 		 */
1430 		n = writev(iocom->sock_fd, iov, iovcnt);
1431 		if (n > 0)
1432 			nact = n;
1433 		else
1434 			nact = 0;
1435 	}
1436 
1437 	/*
1438 	 * Clean out the transmit queue based on what we successfully
1439 	 * sent (nact is the plaintext count).  ioq->hbytes/abytes
1440 	 * represents the portion of the first message previously sent.
1441 	 */
1442 	while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
1443 		hbytes = (msg->any.head.cmd & DMSGF_SIZE) *
1444 			 DMSG_ALIGN;
1445 		abytes = DMSG_DOALIGN(msg->aux_size);
1446 
1447 		if ((size_t)nact < hbytes - ioq->hbytes) {
1448 			ioq->hbytes += nact;
1449 			nact = 0;
1450 			break;
1451 		}
1452 		nact -= hbytes - ioq->hbytes;
1453 		ioq->hbytes = hbytes;
1454 		if ((size_t)nact < abytes - ioq->abytes) {
1455 			ioq->abytes += nact;
1456 			nact = 0;
1457 			break;
1458 		}
1459 		nact -= abytes - ioq->abytes;
1460 		/* ioq->abytes = abytes; optimized out */
1461 
1462 		if (DMsgDebugOpt >= 5) {
1463 			fprintf(stderr,
1464 				"txmsg cmd=%08x msgid=%016jx circ=%016jx\n",
1465 				msg->any.head.cmd,
1466 				(intmax_t)msg->any.head.msgid,
1467 				(intmax_t)msg->any.head.circuit);
1468 		}
1469 
1470 		TAILQ_REMOVE(&ioq->msgq, msg, qentry);
1471 		--ioq->msgcount;
1472 		ioq->hbytes = 0;
1473 		ioq->abytes = 0;
1474 
1475 		dmsg_state_cleanuptx(msg);
1476 	}
1477 	assert(nact == 0);
1478 
1479 	/*
1480 	 * Process the return value from the write w/regards to blocking.
1481 	 */
1482 	if (n < 0) {
1483 		if (errno != EINTR &&
1484 		    errno != EINPROGRESS &&
1485 		    errno != EAGAIN) {
1486 			/*
1487 			 * Fatal write error
1488 			 */
1489 			ioq->error = DMSG_IOQ_ERROR_SOCK;
1490 			dmsg_iocom_drain(iocom);
1491 		} else {
1492 			/*
1493 			 * Wait for socket buffer space
1494 			 */
1495 			atomic_set_int(&iocom->flags, DMSG_IOCOMF_WREQ);
1496 		}
1497 	} else {
1498 		atomic_set_int(&iocom->flags, DMSG_IOCOMF_WREQ);
1499 	}
1500 	if (ioq->error) {
1501 		dmsg_iocom_drain(iocom);
1502 	}
1503 }
1504 
1505 /*
1506  * Kill pending msgs on ioq_tx and adjust the flags such that no more
1507  * write events will occur.  We don't kill read msgs because we want
1508  * the caller to pull off our contrived terminal error msg to detect
1509  * the connection failure.
1510  *
1511  * Localized to iocom_core thread, iocom->mtx not held by caller.
1512  */
1513 void
1514 dmsg_iocom_drain(dmsg_iocom_t *iocom)
1515 {
1516 	dmsg_ioq_t *ioq = &iocom->ioq_tx;
1517 	dmsg_msg_t *msg;
1518 
1519 	atomic_clear_int(&iocom->flags, DMSG_IOCOMF_WREQ | DMSG_IOCOMF_WWORK);
1520 	ioq->hbytes = 0;
1521 	ioq->abytes = 0;
1522 
1523 	while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
1524 		TAILQ_REMOVE(&ioq->msgq, msg, qentry);
1525 		--ioq->msgcount;
1526 		dmsg_state_cleanuptx(msg);
1527 	}
1528 }
1529 
1530 /*
1531  * Write a message to an iocom, with additional state processing.
1532  */
1533 void
1534 dmsg_msg_write(dmsg_msg_t *msg)
1535 {
1536 	dmsg_iocom_t *iocom = msg->iocom;
1537 	dmsg_state_t *state;
1538 	char dummy;
1539 
1540 	/*
1541 	 * Handle state processing, create state if necessary.
1542 	 */
1543 	pthread_mutex_lock(&iocom->mtx);
1544 	if ((state = msg->state) != NULL) {
1545 		/*
1546 		 * Existing transaction (could be reply).  It is also
1547 		 * possible for this to be the first reply (CREATE is set),
1548 		 * in which case we populate state->txcmd.
1549 		 *
1550 		 * state->txcmd is adjusted to hold the final message cmd,
1551 		 * and we also be sure to set the CREATE bit here.  We did
1552 		 * not set it in dmsg_msg_alloc() because that would have
1553 		 * not been serialized (state could have gotten ripped out
1554 		 * from under the message prior to it being transmitted).
1555 		 */
1556 		if ((msg->any.head.cmd & (DMSGF_CREATE | DMSGF_REPLY)) ==
1557 		    DMSGF_CREATE) {
1558 			state->txcmd = msg->any.head.cmd & ~DMSGF_DELETE;
1559 			state->icmd = state->txcmd & DMSGF_BASECMDMASK;
1560 		}
1561 		msg->any.head.msgid = state->msgid;
1562 		assert(((state->txcmd ^ msg->any.head.cmd) & DMSGF_REPLY) == 0);
1563 		if (msg->any.head.cmd & DMSGF_CREATE) {
1564 			state->txcmd = msg->any.head.cmd & ~DMSGF_DELETE;
1565 		}
1566 	}
1567 
1568 	/*
1569 	 * Queue it for output, wake up the I/O pthread.  Note that the
1570 	 * I/O thread is responsible for generating the CRCs and encryption.
1571 	 */
1572 	TAILQ_INSERT_TAIL(&iocom->txmsgq, msg, qentry);
1573 	dummy = 0;
1574 	write(iocom->wakeupfds[1], &dummy, 1);	/* XXX optimize me */
1575 	pthread_mutex_unlock(&iocom->mtx);
1576 }
1577 
1578 /*
1579  * This is a shortcut to formulate a reply to msg with a simple error code,
1580  * It can reply to and terminate a transaction, or it can reply to a one-way
1581  * messages.  A DMSG_LNK_ERROR command code is utilized to encode
1582  * the error code (which can be 0).  Not all transactions are terminated
1583  * with DMSG_LNK_ERROR status (the low level only cares about the
1584  * MSGF_DELETE flag), but most are.
1585  *
1586  * Replies to one-way messages are a bit of an oxymoron but the feature
1587  * is used by the debug (DBG) protocol.
1588  *
1589  * The reply contains no extended data.
1590  */
1591 void
1592 dmsg_msg_reply(dmsg_msg_t *msg, uint32_t error)
1593 {
1594 	dmsg_state_t *state = msg->state;
1595 	dmsg_msg_t *nmsg;
1596 	uint32_t cmd;
1597 
1598 
1599 	/*
1600 	 * Reply with a simple error code and terminate the transaction.
1601 	 */
1602 	cmd = DMSG_LNK_ERROR;
1603 
1604 	/*
1605 	 * Check if our direction has even been initiated yet, set CREATE.
1606 	 *
1607 	 * Check what direction this is (command or reply direction).  Note
1608 	 * that txcmd might not have been initiated yet.
1609 	 *
1610 	 * If our direction has already been closed we just return without
1611 	 * doing anything.
1612 	 */
1613 	if (state) {
1614 		if (state->txcmd & DMSGF_DELETE)
1615 			return;
1616 		if (state->txcmd & DMSGF_REPLY)
1617 			cmd |= DMSGF_REPLY;
1618 		cmd |= DMSGF_DELETE;
1619 	} else {
1620 		if ((msg->any.head.cmd & DMSGF_REPLY) == 0)
1621 			cmd |= DMSGF_REPLY;
1622 	}
1623 
1624 	/*
1625 	 * Allocate the message and associate it with the existing state.
1626 	 * We cannot pass DMSGF_CREATE to msg_alloc() because that may
1627 	 * allocate new state.  We have our state already.
1628 	 */
1629 	nmsg = dmsg_msg_alloc(msg->circuit, 0, cmd, NULL, NULL);
1630 	if (state) {
1631 		if ((state->txcmd & DMSGF_CREATE) == 0)
1632 			nmsg->any.head.cmd |= DMSGF_CREATE;
1633 	}
1634 	nmsg->any.head.error = error;
1635 	nmsg->any.head.msgid = msg->any.head.msgid;
1636 	nmsg->any.head.circuit = msg->any.head.circuit;
1637 	nmsg->state = state;
1638 	dmsg_msg_write(nmsg);
1639 }
1640 
1641 /*
1642  * Similar to dmsg_msg_reply() but leave the transaction open.  That is,
1643  * we are generating a streaming reply or an intermediate acknowledgement
1644  * of some sort as part of the higher level protocol, with more to come
1645  * later.
1646  */
1647 void
1648 dmsg_msg_result(dmsg_msg_t *msg, uint32_t error)
1649 {
1650 	dmsg_state_t *state = msg->state;
1651 	dmsg_msg_t *nmsg;
1652 	uint32_t cmd;
1653 
1654 
1655 	/*
1656 	 * Reply with a simple error code and terminate the transaction.
1657 	 */
1658 	cmd = DMSG_LNK_ERROR;
1659 
1660 	/*
1661 	 * Check if our direction has even been initiated yet, set CREATE.
1662 	 *
1663 	 * Check what direction this is (command or reply direction).  Note
1664 	 * that txcmd might not have been initiated yet.
1665 	 *
1666 	 * If our direction has already been closed we just return without
1667 	 * doing anything.
1668 	 */
1669 	if (state) {
1670 		if (state->txcmd & DMSGF_DELETE)
1671 			return;
1672 		if (state->txcmd & DMSGF_REPLY)
1673 			cmd |= DMSGF_REPLY;
1674 		/* continuing transaction, do not set MSGF_DELETE */
1675 	} else {
1676 		if ((msg->any.head.cmd & DMSGF_REPLY) == 0)
1677 			cmd |= DMSGF_REPLY;
1678 	}
1679 
1680 	nmsg = dmsg_msg_alloc(msg->circuit, 0, cmd, NULL, NULL);
1681 	if (state) {
1682 		if ((state->txcmd & DMSGF_CREATE) == 0)
1683 			nmsg->any.head.cmd |= DMSGF_CREATE;
1684 	}
1685 	nmsg->any.head.error = error;
1686 	nmsg->any.head.msgid = msg->any.head.msgid;
1687 	nmsg->any.head.circuit = msg->any.head.circuit;
1688 	nmsg->state = state;
1689 	dmsg_msg_write(nmsg);
1690 }
1691 
1692 /*
1693  * Terminate a transaction given a state structure by issuing a DELETE.
1694  */
1695 void
1696 dmsg_state_reply(dmsg_state_t *state, uint32_t error)
1697 {
1698 	dmsg_msg_t *nmsg;
1699 	uint32_t cmd = DMSG_LNK_ERROR | DMSGF_DELETE;
1700 
1701 	/*
1702 	 * Nothing to do if we already transmitted a delete
1703 	 */
1704 	if (state->txcmd & DMSGF_DELETE)
1705 		return;
1706 
1707 	/*
1708 	 * Set REPLY if the other end initiated the command.  Otherwise
1709 	 * we are the command direction.
1710 	 */
1711 	if (state->txcmd & DMSGF_REPLY)
1712 		cmd |= DMSGF_REPLY;
1713 
1714 	nmsg = dmsg_msg_alloc(state->circuit, 0, cmd, NULL, NULL);
1715 	if (state) {
1716 		if ((state->txcmd & DMSGF_CREATE) == 0)
1717 			nmsg->any.head.cmd |= DMSGF_CREATE;
1718 	}
1719 	nmsg->any.head.error = error;
1720 	nmsg->any.head.msgid = state->msgid;
1721 	nmsg->any.head.circuit = state->msg->any.head.circuit;
1722 	nmsg->state = state;
1723 	dmsg_msg_write(nmsg);
1724 }
1725 
1726 /*
1727  * Terminate a transaction given a state structure by issuing a DELETE.
1728  */
1729 void
1730 dmsg_state_result(dmsg_state_t *state, uint32_t error)
1731 {
1732 	dmsg_msg_t *nmsg;
1733 	uint32_t cmd = DMSG_LNK_ERROR;
1734 
1735 	/*
1736 	 * Nothing to do if we already transmitted a delete
1737 	 */
1738 	if (state->txcmd & DMSGF_DELETE)
1739 		return;
1740 
1741 	/*
1742 	 * Set REPLY if the other end initiated the command.  Otherwise
1743 	 * we are the command direction.
1744 	 */
1745 	if (state->txcmd & DMSGF_REPLY)
1746 		cmd |= DMSGF_REPLY;
1747 
1748 	nmsg = dmsg_msg_alloc(state->circuit, 0, cmd, NULL, NULL);
1749 	if (state) {
1750 		if ((state->txcmd & DMSGF_CREATE) == 0)
1751 			nmsg->any.head.cmd |= DMSGF_CREATE;
1752 	}
1753 	nmsg->any.head.error = error;
1754 	nmsg->any.head.msgid = state->msgid;
1755 	nmsg->any.head.circuit = state->msg->any.head.circuit;
1756 	nmsg->state = state;
1757 	dmsg_msg_write(nmsg);
1758 }
1759 
1760 /************************************************************************
1761  *			TRANSACTION STATE HANDLING			*
1762  ************************************************************************
1763  *
1764  */
1765 
1766 /*
1767  * Process circuit and state tracking for a message after reception, prior
1768  * to execution.
1769  *
1770  * Called with msglk held and the msg dequeued.
1771  *
1772  * All messages are called with dummy state and return actual state.
1773  * (One-off messages often just return the same dummy state).
1774  *
1775  * May request that caller discard the message by setting *discardp to 1.
1776  * The returned state is not used in this case and is allowed to be NULL.
1777  *
1778  * --
1779  *
1780  * These routines handle persistent and command/reply message state via the
1781  * CREATE and DELETE flags.  The first message in a command or reply sequence
1782  * sets CREATE, the last message in a command or reply sequence sets DELETE.
1783  *
1784  * There can be any number of intermediate messages belonging to the same
1785  * sequence sent inbetween the CREATE message and the DELETE message,
1786  * which set neither flag.  This represents a streaming command or reply.
1787  *
1788  * Any command message received with CREATE set expects a reply sequence to
1789  * be returned.  Reply sequences work the same as command sequences except the
1790  * REPLY bit is also sent.  Both the command side and reply side can
1791  * degenerate into a single message with both CREATE and DELETE set.  Note
1792  * that one side can be streaming and the other side not, or neither, or both.
1793  *
1794  * The msgid is unique for the initiator.  That is, two sides sending a new
1795  * message can use the same msgid without colliding.
1796  *
1797  * --
1798  *
1799  * ABORT sequences work by setting the ABORT flag along with normal message
1800  * state.  However, ABORTs can also be sent on half-closed messages, that is
1801  * even if the command or reply side has already sent a DELETE, as long as
1802  * the message has not been fully closed it can still send an ABORT+DELETE
1803  * to terminate the half-closed message state.
1804  *
1805  * Since ABORT+DELETEs can race we silently discard ABORT's for message
1806  * state which has already been fully closed.  REPLY+ABORT+DELETEs can
1807  * also race, and in this situation the other side might have already
1808  * initiated a new unrelated command with the same message id.  Since
1809  * the abort has not set the CREATE flag the situation can be detected
1810  * and the message will also be discarded.
1811  *
1812  * Non-blocking requests can be initiated with ABORT+CREATE[+DELETE].
1813  * The ABORT request is essentially integrated into the command instead
1814  * of being sent later on.  In this situation the command implementation
1815  * detects that CREATE and ABORT are both set (vs ABORT alone) and can
1816  * special-case non-blocking operation for the command.
1817  *
1818  * NOTE!  Messages with ABORT set without CREATE or DELETE are considered
1819  *	  to be mid-stream aborts for command/reply sequences.  ABORTs on
1820  *	  one-way messages are not supported.
1821  *
1822  * NOTE!  If a command sequence does not support aborts the ABORT flag is
1823  *	  simply ignored.
1824  *
1825  * --
1826  *
1827  * One-off messages (no reply expected) are sent with neither CREATE or DELETE
1828  * set.  One-off messages cannot be aborted and typically aren't processed
1829  * by these routines.  The REPLY bit can be used to distinguish whether a
1830  * one-off message is a command or reply.  For example, one-off replies
1831  * will typically just contain status updates.
1832  */
1833 static int
1834 dmsg_state_msgrx(dmsg_msg_t *msg)
1835 {
1836 	dmsg_iocom_t *iocom = msg->iocom;
1837 	dmsg_circuit_t *circuit;
1838 	dmsg_circuit_t *ocircuit;
1839 	dmsg_state_t *state;
1840 	dmsg_state_t sdummy;
1841 	dmsg_circuit_t cdummy;
1842 	int error;
1843 
1844 	pthread_mutex_lock(&iocom->mtx);
1845 
1846 	/*
1847 	 * Locate existing persistent circuit and state, if any.
1848 	 */
1849 	if (msg->any.head.circuit == 0) {
1850 		circuit = &iocom->circuit0;
1851 	} else {
1852 		cdummy.msgid = msg->any.head.circuit;
1853 		circuit = RB_FIND(dmsg_circuit_tree, &iocom->circuit_tree,
1854 				  &cdummy);
1855 		if (circuit == NULL) {
1856 			pthread_mutex_unlock(&iocom->mtx);
1857 			return (DMSG_IOQ_ERROR_BAD_CIRCUIT);
1858 		}
1859 	}
1860 
1861 	/*
1862 	 * Replace circuit0 with actual
1863 	 */
1864 	dmsg_circuit_hold(circuit);
1865 	ocircuit = msg->circuit;
1866 	msg->circuit = circuit;
1867 
1868 	/*
1869 	 * If received msg is a command state is on staterd_tree.
1870 	 * If received msg is a reply state is on statewr_tree.
1871 	 */
1872 	sdummy.msgid = msg->any.head.msgid;
1873 	if (msg->any.head.cmd & DMSGF_REPLY) {
1874 		state = RB_FIND(dmsg_state_tree, &circuit->statewr_tree,
1875 				&sdummy);
1876 	} else {
1877 		state = RB_FIND(dmsg_state_tree, &circuit->staterd_tree,
1878 				&sdummy);
1879 	}
1880 	msg->state = state;
1881 
1882 	pthread_mutex_unlock(&iocom->mtx);
1883 	if (ocircuit)
1884 		dmsg_circuit_drop(ocircuit);
1885 
1886 	/*
1887 	 * Short-cut one-off or mid-stream messages (state may be NULL).
1888 	 */
1889 	if ((msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE |
1890 				  DMSGF_ABORT)) == 0) {
1891 		return(0);
1892 	}
1893 
1894 	/*
1895 	 * Switch on CREATE, DELETE, REPLY, and also handle ABORT from
1896 	 * inside the case statements.
1897 	 */
1898 	switch(msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE |
1899 				    DMSGF_REPLY)) {
1900 	case DMSGF_CREATE:
1901 	case DMSGF_CREATE | DMSGF_DELETE:
1902 		/*
1903 		 * New persistant command received.
1904 		 */
1905 		if (state) {
1906 			fprintf(stderr, "duplicate-trans %s\n",
1907 				dmsg_msg_str(msg));
1908 			error = DMSG_IOQ_ERROR_TRANS;
1909 			assert(0);
1910 			break;
1911 		}
1912 		state = malloc(sizeof(*state));
1913 		bzero(state, sizeof(*state));
1914 		state->iocom = iocom;
1915 		state->circuit = circuit;
1916 		state->flags = DMSG_STATE_DYNAMIC;
1917 		state->msg = msg;
1918 		state->txcmd = DMSGF_REPLY;
1919 		state->rxcmd = msg->any.head.cmd & ~DMSGF_DELETE;
1920 		state->icmd = state->rxcmd & DMSGF_BASECMDMASK;
1921 		state->flags |= DMSG_STATE_INSERTED;
1922 		state->msgid = msg->any.head.msgid;
1923 		msg->state = state;
1924 		pthread_mutex_lock(&iocom->mtx);
1925 		RB_INSERT(dmsg_state_tree, &circuit->staterd_tree, state);
1926 		pthread_mutex_unlock(&iocom->mtx);
1927 		error = 0;
1928 		if (DMsgDebugOpt) {
1929 			fprintf(stderr, "create state %p id=%08x on iocom staterd %p\n",
1930 				state, (uint32_t)state->msgid, iocom);
1931 		}
1932 		break;
1933 	case DMSGF_DELETE:
1934 		/*
1935 		 * Persistent state is expected but might not exist if an
1936 		 * ABORT+DELETE races the close.
1937 		 */
1938 		if (state == NULL) {
1939 			if (msg->any.head.cmd & DMSGF_ABORT) {
1940 				error = DMSG_IOQ_ERROR_EALREADY;
1941 			} else {
1942 				fprintf(stderr, "missing-state %s\n",
1943 					dmsg_msg_str(msg));
1944 				error = DMSG_IOQ_ERROR_TRANS;
1945 			assert(0);
1946 			}
1947 			break;
1948 		}
1949 
1950 		/*
1951 		 * Handle another ABORT+DELETE case if the msgid has already
1952 		 * been reused.
1953 		 */
1954 		if ((state->rxcmd & DMSGF_CREATE) == 0) {
1955 			if (msg->any.head.cmd & DMSGF_ABORT) {
1956 				error = DMSG_IOQ_ERROR_EALREADY;
1957 			} else {
1958 				fprintf(stderr, "reused-state %s\n",
1959 					dmsg_msg_str(msg));
1960 				error = DMSG_IOQ_ERROR_TRANS;
1961 			assert(0);
1962 			}
1963 			break;
1964 		}
1965 		error = 0;
1966 		break;
1967 	default:
1968 		/*
1969 		 * Check for mid-stream ABORT command received, otherwise
1970 		 * allow.
1971 		 */
1972 		if (msg->any.head.cmd & DMSGF_ABORT) {
1973 			if (state == NULL ||
1974 			    (state->rxcmd & DMSGF_CREATE) == 0) {
1975 				error = DMSG_IOQ_ERROR_EALREADY;
1976 				break;
1977 			}
1978 		}
1979 		error = 0;
1980 		break;
1981 	case DMSGF_REPLY | DMSGF_CREATE:
1982 	case DMSGF_REPLY | DMSGF_CREATE | DMSGF_DELETE:
1983 		/*
1984 		 * When receiving a reply with CREATE set the original
1985 		 * persistent state message should already exist.
1986 		 */
1987 		if (state == NULL) {
1988 			fprintf(stderr, "no-state(r) %s\n",
1989 				dmsg_msg_str(msg));
1990 			error = DMSG_IOQ_ERROR_TRANS;
1991 			assert(0);
1992 			break;
1993 		}
1994 		assert(((state->rxcmd ^ msg->any.head.cmd) &
1995 			DMSGF_REPLY) == 0);
1996 		state->rxcmd = msg->any.head.cmd & ~DMSGF_DELETE;
1997 		error = 0;
1998 		break;
1999 	case DMSGF_REPLY | DMSGF_DELETE:
2000 		/*
2001 		 * Received REPLY+ABORT+DELETE in case where msgid has
2002 		 * already been fully closed, ignore the message.
2003 		 */
2004 		if (state == NULL) {
2005 			if (msg->any.head.cmd & DMSGF_ABORT) {
2006 				error = DMSG_IOQ_ERROR_EALREADY;
2007 			} else {
2008 				fprintf(stderr, "no-state(r,d) %s\n",
2009 					dmsg_msg_str(msg));
2010 				error = DMSG_IOQ_ERROR_TRANS;
2011 			assert(0);
2012 			}
2013 			break;
2014 		}
2015 
2016 		/*
2017 		 * Received REPLY+ABORT+DELETE in case where msgid has
2018 		 * already been reused for an unrelated message,
2019 		 * ignore the message.
2020 		 */
2021 		if ((state->rxcmd & DMSGF_CREATE) == 0) {
2022 			if (msg->any.head.cmd & DMSGF_ABORT) {
2023 				error = DMSG_IOQ_ERROR_EALREADY;
2024 			} else {
2025 				fprintf(stderr, "reused-state(r,d) %s\n",
2026 					dmsg_msg_str(msg));
2027 				error = DMSG_IOQ_ERROR_TRANS;
2028 			assert(0);
2029 			}
2030 			break;
2031 		}
2032 		error = 0;
2033 		break;
2034 	case DMSGF_REPLY:
2035 		/*
2036 		 * Check for mid-stream ABORT reply received to sent command.
2037 		 */
2038 		if (msg->any.head.cmd & DMSGF_ABORT) {
2039 			if (state == NULL ||
2040 			    (state->rxcmd & DMSGF_CREATE) == 0) {
2041 				error = DMSG_IOQ_ERROR_EALREADY;
2042 				break;
2043 			}
2044 		}
2045 		error = 0;
2046 		break;
2047 	}
2048 	return (error);
2049 }
2050 
2051 void
2052 dmsg_state_cleanuprx(dmsg_iocom_t *iocom, dmsg_msg_t *msg)
2053 {
2054 	dmsg_state_t *state;
2055 
2056 	if ((state = msg->state) == NULL) {
2057 		/*
2058 		 * Free a non-transactional message, there is no state
2059 		 * to worry about.
2060 		 */
2061 		dmsg_msg_free(msg);
2062 	} else if (msg->any.head.cmd & DMSGF_DELETE) {
2063 		/*
2064 		 * Message terminating transaction, destroy the related
2065 		 * state, the original message, and this message (if it
2066 		 * isn't the original message due to a CREATE|DELETE).
2067 		 */
2068 		pthread_mutex_lock(&iocom->mtx);
2069 		state->rxcmd |= DMSGF_DELETE;
2070 		if (state->txcmd & DMSGF_DELETE) {
2071 			if (state->msg == msg)
2072 				state->msg = NULL;
2073 			assert(state->flags & DMSG_STATE_INSERTED);
2074 			if (state->rxcmd & DMSGF_REPLY) {
2075 				assert(msg->any.head.cmd & DMSGF_REPLY);
2076 				RB_REMOVE(dmsg_state_tree,
2077 					  &msg->circuit->statewr_tree, state);
2078 			} else {
2079 				assert((msg->any.head.cmd & DMSGF_REPLY) == 0);
2080 				RB_REMOVE(dmsg_state_tree,
2081 					  &msg->circuit->staterd_tree, state);
2082 			}
2083 			state->flags &= ~DMSG_STATE_INSERTED;
2084 			dmsg_state_free(state);
2085 		} else {
2086 			;
2087 		}
2088 		pthread_mutex_unlock(&iocom->mtx);
2089 		dmsg_msg_free(msg);
2090 	} else if (state->msg != msg) {
2091 		/*
2092 		 * Message not terminating transaction, leave state intact
2093 		 * and free message if it isn't the CREATE message.
2094 		 */
2095 		dmsg_msg_free(msg);
2096 	}
2097 }
2098 
2099 static void
2100 dmsg_state_cleanuptx(dmsg_msg_t *msg)
2101 {
2102 	dmsg_iocom_t *iocom = msg->iocom;
2103 	dmsg_state_t *state;
2104 
2105 	if ((state = msg->state) == NULL) {
2106 		dmsg_msg_free(msg);
2107 	} else if (msg->any.head.cmd & DMSGF_DELETE) {
2108 		pthread_mutex_lock(&iocom->mtx);
2109 		assert((state->txcmd & DMSGF_DELETE) == 0);
2110 		state->txcmd |= DMSGF_DELETE;
2111 		if (state->rxcmd & DMSGF_DELETE) {
2112 			if (state->msg == msg)
2113 				state->msg = NULL;
2114 			assert(state->flags & DMSG_STATE_INSERTED);
2115 			if (state->txcmd & DMSGF_REPLY) {
2116 				assert(msg->any.head.cmd & DMSGF_REPLY);
2117 				RB_REMOVE(dmsg_state_tree,
2118 					  &msg->circuit->staterd_tree, state);
2119 			} else {
2120 				assert((msg->any.head.cmd & DMSGF_REPLY) == 0);
2121 				RB_REMOVE(dmsg_state_tree,
2122 					  &msg->circuit->statewr_tree, state);
2123 			}
2124 			state->flags &= ~DMSG_STATE_INSERTED;
2125 			dmsg_state_free(state);
2126 		} else {
2127 			;
2128 		}
2129 		pthread_mutex_unlock(&iocom->mtx);
2130 		dmsg_msg_free(msg);
2131 	} else if (state->msg != msg) {
2132 		dmsg_msg_free(msg);
2133 	}
2134 }
2135 
2136 /*
2137  * Called with iocom locked
2138  */
2139 void
2140 dmsg_state_free(dmsg_state_t *state)
2141 {
2142 	dmsg_msg_t *msg;
2143 
2144 	if (DMsgDebugOpt) {
2145 		fprintf(stderr, "terminate state %p id=%08x\n",
2146 			state, (uint32_t)state->msgid);
2147 	}
2148 	if (state->any.any != NULL)   /* XXX avoid deadlock w/exit & kernel */
2149 		closefrom(3);
2150 	assert(state->any.any == NULL);
2151 	msg = state->msg;
2152 	state->msg = NULL;
2153 	if (msg)
2154 		dmsg_msg_free_locked(msg);
2155 	free(state);
2156 }
2157 
2158 /*
2159  * Called with iocom locked
2160  */
2161 void
2162 dmsg_circuit_hold(dmsg_circuit_t *circuit)
2163 {
2164 	assert(circuit->refs > 0);		/* caller must hold ref */
2165 	atomic_add_int(&circuit->refs, 1);	/* to safely add more */
2166 }
2167 
2168 /*
2169  * Called with iocom locked
2170  */
2171 void
2172 dmsg_circuit_drop(dmsg_circuit_t *circuit)
2173 {
2174 	dmsg_iocom_t *iocom = circuit->iocom;
2175 	char dummy;
2176 
2177 	assert(circuit->refs > 0);
2178 	assert(iocom);
2179 
2180 	/*
2181 	 * Decrement circuit refs, destroy circuit when refs drops to 0.
2182 	 */
2183 	if (atomic_fetchadd_int(&circuit->refs, -1) != 1)
2184 		return;
2185 	assert(circuit != &iocom->circuit0);
2186 
2187 	assert(RB_EMPTY(&circuit->staterd_tree));
2188 	assert(RB_EMPTY(&circuit->statewr_tree));
2189 	pthread_mutex_lock(&iocom->mtx);
2190 	RB_REMOVE(dmsg_circuit_tree, &iocom->circuit_tree, circuit);
2191 	circuit->iocom = NULL;
2192 	pthread_mutex_unlock(&iocom->mtx);
2193 	dmsg_free(circuit);
2194 
2195 	/*
2196 	 * When an iocom error is present the rx code will terminate the
2197 	 * receive side for all transactions and (indirectly) all circuits
2198 	 * by simulating DELETE messages.  The state and related circuits
2199 	 * don't disappear until the related states are closed in both
2200 	 * directions
2201 	 *
2202 	 * Detect the case where the last circuit is now gone (and thus all
2203 	 * states for all circuits are gone), and wakeup the rx thread to
2204 	 * complete the termination.
2205 	 */
2206 	if (iocom->ioq_rx.error && RB_EMPTY(&iocom->circuit_tree)) {
2207 		dummy = 0;
2208 		write(iocom->wakeupfds[1], &dummy, 1);
2209 	}
2210 }
2211 
2212 void
2213 dmsg_circuit_drop_locked(dmsg_circuit_t *circuit)
2214 {
2215 	dmsg_iocom_t *iocom;
2216 
2217 	iocom = circuit->iocom;
2218 	assert(circuit->refs > 0);
2219 	assert(iocom);
2220 
2221 	if (atomic_fetchadd_int(&circuit->refs, -1) == 1) {
2222 		assert(circuit != &iocom->circuit0);
2223 		assert(RB_EMPTY(&circuit->staterd_tree));
2224 		assert(RB_EMPTY(&circuit->statewr_tree));
2225 		RB_REMOVE(dmsg_circuit_tree, &iocom->circuit_tree, circuit);
2226 		circuit->iocom = NULL;
2227 		dmsg_free(circuit);
2228 		if (iocom->ioq_rx.error && RB_EMPTY(&iocom->circuit_tree)) {
2229 			char dummy = 0;
2230 			write(iocom->wakeupfds[1], &dummy, 1);
2231 		}
2232 	}
2233 }
2234 
2235 /*
2236  * This swaps endian for a hammer2_msg_hdr.  Note that the extended
2237  * header is not adjusted, just the core header.
2238  */
2239 void
2240 dmsg_bswap_head(dmsg_hdr_t *head)
2241 {
2242 	head->magic	= bswap16(head->magic);
2243 	head->reserved02 = bswap16(head->reserved02);
2244 	head->salt	= bswap32(head->salt);
2245 
2246 	head->msgid	= bswap64(head->msgid);
2247 	head->circuit	= bswap64(head->circuit);
2248 	head->reserved18= bswap64(head->reserved18);
2249 
2250 	head->cmd	= bswap32(head->cmd);
2251 	head->aux_crc	= bswap32(head->aux_crc);
2252 	head->aux_bytes	= bswap32(head->aux_bytes);
2253 	head->error	= bswap32(head->error);
2254 	head->aux_descr = bswap64(head->aux_descr);
2255 	head->reserved38= bswap32(head->reserved38);
2256 	head->hdr_crc	= bswap32(head->hdr_crc);
2257 }
2258