xref: /dflybsd-src/lib/libdmsg/msg.c (revision 9e1c08804a46f1c1a9cd11e190ddba7d2bc4abed)
1 /*
2  * Copyright (c) 2011-2012 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@dragonflybsd.org>
6  * by Venkatesh Srinivas <vsrinivas@dragonflybsd.org>
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  *
12  * 1. Redistributions of source code must retain the above copyright
13  *    notice, this list of conditions and the following disclaimer.
14  * 2. Redistributions in binary form must reproduce the above copyright
15  *    notice, this list of conditions and the following disclaimer in
16  *    the documentation and/or other materials provided with the
17  *    distribution.
18  * 3. Neither the name of The DragonFly Project nor the names of its
19  *    contributors may be used to endorse or promote products derived
20  *    from this software without specific, prior written permission.
21  *
22  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
25  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
26  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
27  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
28  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
29  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
30  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
31  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
32  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
33  * SUCH DAMAGE.
34  */
35 
36 #include "dmsg_local.h"
37 
38 int DMsgDebugOpt;
39 
40 static int dmsg_state_msgrx(dmsg_msg_t *msg);
41 static int dmsg_state_routedrx(dmsg_state_t *state, dmsg_msg_t *msg);
42 static void dmsg_state_cleanuptx(dmsg_iocom_t *iocom, dmsg_msg_t *msg);
43 static void dmsg_msg_free_locked(dmsg_msg_t *msg);
44 
45 RB_GENERATE(dmsg_state_tree, dmsg_state, rbnode, dmsg_state_cmp);
46 
47 /*
48  * STATE TREE - Represents open transactions which are indexed by their
49  *		{ msgid } relative to the governing iocom.
50  */
51 int
52 dmsg_state_cmp(dmsg_state_t *state1, dmsg_state_t *state2)
53 {
54 	if (state1->msgid < state2->msgid)
55 		return(-1);
56 	if (state1->msgid > state2->msgid)
57 		return(1);
58 	return(0);
59 }
60 
61 /*
62  * Initialize a low-level ioq
63  */
64 void
65 dmsg_ioq_init(dmsg_iocom_t *iocom __unused, dmsg_ioq_t *ioq)
66 {
67 	bzero(ioq, sizeof(*ioq));
68 	ioq->state = DMSG_MSGQ_STATE_HEADER1;
69 	TAILQ_INIT(&ioq->msgq);
70 }
71 
72 /*
73  * Cleanup queue.
74  *
75  * caller holds iocom->mtx.
76  */
77 void
78 dmsg_ioq_done(dmsg_iocom_t *iocom __unused, dmsg_ioq_t *ioq)
79 {
80 	dmsg_msg_t *msg;
81 
82 	while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
83 		assert(0);	/* shouldn't happen */
84 		TAILQ_REMOVE(&ioq->msgq, msg, qentry);
85 		dmsg_msg_free(msg);
86 	}
87 	if ((msg = ioq->msg) != NULL) {
88 		ioq->msg = NULL;
89 		dmsg_msg_free(msg);
90 	}
91 }
92 
93 /*
94  * Initialize a low-level communications channel.
95  *
96  * NOTE: The signal_func() is called at least once from the loop and can be
97  *	 re-armed via dmsg_iocom_restate().
98  */
99 void
100 dmsg_iocom_init(dmsg_iocom_t *iocom, int sock_fd, int alt_fd,
101 		   void (*signal_func)(dmsg_iocom_t *iocom),
102 		   void (*rcvmsg_func)(dmsg_msg_t *msg),
103 		   void (*usrmsg_func)(dmsg_msg_t *msg, int unmanaged),
104 		   void (*altmsg_func)(dmsg_iocom_t *iocom))
105 {
106 	struct stat st;
107 
108 	bzero(iocom, sizeof(*iocom));
109 
110 	asprintf(&iocom->label, "iocom-%p", iocom);
111 	iocom->signal_callback = signal_func;
112 	iocom->rcvmsg_callback = rcvmsg_func;
113 	iocom->altmsg_callback = altmsg_func;
114 	iocom->usrmsg_callback = usrmsg_func;
115 
116 	pthread_mutex_init(&iocom->mtx, NULL);
117 	RB_INIT(&iocom->staterd_tree);
118 	RB_INIT(&iocom->statewr_tree);
119 	TAILQ_INIT(&iocom->freeq);
120 	TAILQ_INIT(&iocom->freeq_aux);
121 	TAILQ_INIT(&iocom->txmsgq);
122 	iocom->sock_fd = sock_fd;
123 	iocom->alt_fd = alt_fd;
124 	iocom->flags = DMSG_IOCOMF_RREQ | DMSG_IOCOMF_CLOSEALT;
125 	if (signal_func)
126 		iocom->flags |= DMSG_IOCOMF_SWORK;
127 	dmsg_ioq_init(iocom, &iocom->ioq_rx);
128 	dmsg_ioq_init(iocom, &iocom->ioq_tx);
129 	iocom->state0.iocom = iocom;
130 	iocom->state0.parent = &iocom->state0;
131 	TAILQ_INIT(&iocom->state0.subq);
132 
133 	if (pipe(iocom->wakeupfds) < 0)
134 		assert(0);
135 	fcntl(iocom->wakeupfds[0], F_SETFL, O_NONBLOCK);
136 	fcntl(iocom->wakeupfds[1], F_SETFL, O_NONBLOCK);
137 
138 	/*
139 	 * Negotiate session crypto synchronously.  This will mark the
140 	 * connection as error'd if it fails.  If this is a pipe it's
141 	 * a linkage that we set up ourselves to the filesystem and there
142 	 * is no crypto.
143 	 */
144 	if (fstat(sock_fd, &st) < 0)
145 		assert(0);
146 	if (S_ISSOCK(st.st_mode))
147 		dmsg_crypto_negotiate(iocom);
148 
149 	/*
150 	 * Make sure our fds are set to non-blocking for the iocom core.
151 	 */
152 	if (sock_fd >= 0)
153 		fcntl(sock_fd, F_SETFL, O_NONBLOCK);
154 #if 0
155 	/* if line buffered our single fgets() should be fine */
156 	if (alt_fd >= 0)
157 		fcntl(alt_fd, F_SETFL, O_NONBLOCK);
158 #endif
159 }
160 
161 void
162 dmsg_iocom_label(dmsg_iocom_t *iocom, const char *ctl, ...)
163 {
164 	va_list va;
165 	char *optr;
166 
167 	va_start(va, ctl);
168 	optr = iocom->label;
169 	vasprintf(&iocom->label, ctl, va);
170 	va_end(va);
171 	if (optr)
172 		free(optr);
173 }
174 
175 /*
176  * May only be called from a callback from iocom_core.
177  *
178  * Adjust state machine functions, set flags to guarantee that both
179  * the recevmsg_func and the sendmsg_func is called at least once.
180  */
181 void
182 dmsg_iocom_restate(dmsg_iocom_t *iocom,
183 		   void (*signal_func)(dmsg_iocom_t *),
184 		   void (*rcvmsg_func)(dmsg_msg_t *msg))
185 {
186 	pthread_mutex_lock(&iocom->mtx);
187 	iocom->signal_callback = signal_func;
188 	iocom->rcvmsg_callback = rcvmsg_func;
189 	if (signal_func)
190 		atomic_set_int(&iocom->flags, DMSG_IOCOMF_SWORK);
191 	else
192 		atomic_clear_int(&iocom->flags, DMSG_IOCOMF_SWORK);
193 	pthread_mutex_unlock(&iocom->mtx);
194 }
195 
196 void
197 dmsg_iocom_signal(dmsg_iocom_t *iocom)
198 {
199 	pthread_mutex_lock(&iocom->mtx);
200 	if (iocom->signal_callback)
201 		atomic_set_int(&iocom->flags, DMSG_IOCOMF_SWORK);
202 	pthread_mutex_unlock(&iocom->mtx);
203 }
204 
205 /*
206  * Cleanup a terminating iocom.
207  *
208  * Caller should not hold iocom->mtx.  The iocom has already been disconnected
209  * from all possible references to it.
210  */
211 void
212 dmsg_iocom_done(dmsg_iocom_t *iocom)
213 {
214 	dmsg_msg_t *msg;
215 
216 	if (iocom->sock_fd >= 0) {
217 		close(iocom->sock_fd);
218 		iocom->sock_fd = -1;
219 	}
220 	if (iocom->alt_fd >= 0 && (iocom->flags & DMSG_IOCOMF_CLOSEALT)) {
221 		close(iocom->alt_fd);
222 		iocom->alt_fd = -1;
223 	}
224 	dmsg_ioq_done(iocom, &iocom->ioq_rx);
225 	dmsg_ioq_done(iocom, &iocom->ioq_tx);
226 	while ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL) {
227 		TAILQ_REMOVE(&iocom->freeq, msg, qentry);
228 		free(msg);
229 	}
230 	while ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL) {
231 		TAILQ_REMOVE(&iocom->freeq_aux, msg, qentry);
232 		free(msg->aux_data);
233 		msg->aux_data = NULL;
234 		free(msg);
235 	}
236 	if (iocom->wakeupfds[0] >= 0) {
237 		close(iocom->wakeupfds[0]);
238 		iocom->wakeupfds[0] = -1;
239 	}
240 	if (iocom->wakeupfds[1] >= 0) {
241 		close(iocom->wakeupfds[1]);
242 		iocom->wakeupfds[1] = -1;
243 	}
244 	pthread_mutex_destroy(&iocom->mtx);
245 }
246 
247 /*
248  * Allocate a new message using the specified transaction state.
249  *
250  * If CREATE is set a new transaction is allocated relative to the passed-in
251  * transaction.
252  *
253  * If CREATE is not set the message is associated with the passed-in
254  * transaction.
255  */
256 dmsg_msg_t *
257 dmsg_msg_alloc(dmsg_state_t *state,
258 	       size_t aux_size, uint32_t cmd,
259 	       void (*func)(dmsg_msg_t *), void *data)
260 {
261 	dmsg_iocom_t *iocom = state->iocom;
262 	dmsg_state_t *pstate;
263 	dmsg_msg_t *msg;
264 	int hbytes;
265 	size_t aligned_size;
266 
267 	pthread_mutex_lock(&iocom->mtx);
268 #if 0
269 	if (aux_size) {
270 		aligned_size = DMSG_DOALIGN(aux_size);
271 		if ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL)
272 			TAILQ_REMOVE(&iocom->freeq_aux, msg, qentry);
273 	} else {
274 		aligned_size = 0;
275 		if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL)
276 			TAILQ_REMOVE(&iocom->freeq, msg, qentry);
277 	}
278 #endif
279 	aligned_size = DMSG_DOALIGN(aux_size);
280 	msg = NULL;
281 	if ((cmd & (DMSGF_CREATE | DMSGF_REPLY)) == DMSGF_CREATE) {
282 		/*
283 		 * When CREATE is set without REPLY the caller is
284 		 * initiating a new transaction stacked under the specified
285 		 * circuit.
286 		 *
287 		 * NOTE: CREATE in txcmd handled by dmsg_msg_write()
288 		 * NOTE: DELETE in txcmd handled by dmsg_state_cleanuptx()
289 		 */
290 		pstate = state;
291 		state = malloc(sizeof(*state));
292 		bzero(state, sizeof(*state));
293 		TAILQ_INIT(&state->subq);
294 		state->parent = pstate;
295 		state->iocom = iocom;
296 		state->flags = DMSG_STATE_DYNAMIC;
297 		state->msgid = (uint64_t)(uintptr_t)state;
298 		state->txcmd = cmd & ~(DMSGF_CREATE | DMSGF_DELETE);
299 		state->rxcmd = DMSGF_REPLY;
300 		state->icmd = state->txcmd & DMSGF_BASECMDMASK;
301 		state->func = func;
302 		state->any.any = data;
303 		RB_INSERT(dmsg_state_tree, &iocom->statewr_tree, state);
304 		TAILQ_INSERT_TAIL(&pstate->subq, state, entry);
305 		state->flags |= DMSG_STATE_INSERTED;
306 	} else {
307 		/*
308 		 * Otherwise the message is transmitted over the existing
309 		 * open transaction.
310 		 */
311 		pstate = state->parent;
312 	}
313 
314 	/* XXX SMP race for state */
315 	pthread_mutex_unlock(&iocom->mtx);
316 	hbytes = (cmd & DMSGF_SIZE) * DMSG_ALIGN;
317 	if (msg == NULL) {
318 		msg = malloc(offsetof(struct dmsg_msg, any.head) + hbytes + 4);
319 		bzero(msg, offsetof(struct dmsg_msg, any.head));
320 		*(int *)((char *)msg +
321 			 offsetof(struct dmsg_msg, any.head) + hbytes) =
322 				 0x71B2C3D4;
323 #if 0
324 		msg = malloc(sizeof(*msg));
325 		bzero(msg, sizeof(*msg));
326 #endif
327 	}
328 
329 	/*
330 	 * [re]allocate the auxillary data buffer.  The caller knows that
331 	 * a size-aligned buffer will be allocated but we do not want to
332 	 * force the caller to zero any tail piece, so we do that ourself.
333 	 */
334 	if (msg->aux_size != aux_size) {
335 		if (msg->aux_data) {
336 			free(msg->aux_data);
337 			msg->aux_data = NULL;
338 			msg->aux_size = 0;
339 		}
340 		if (aux_size) {
341 			msg->aux_data = malloc(aligned_size);
342 			msg->aux_size = aux_size;
343 			if (aux_size != aligned_size) {
344 				bzero(msg->aux_data + aux_size,
345 				      aligned_size - aux_size);
346 			}
347 		}
348 	}
349 
350 	/*
351 	 * Set REVTRANS if the transaction was remotely initiated
352 	 * Set REVCIRC if the circuit was remotely initiated
353 	 */
354 	if (state->flags & DMSG_STATE_OPPOSITE)
355 		cmd |= DMSGF_REVTRANS;
356 	if (pstate->flags & DMSG_STATE_OPPOSITE)
357 		cmd |= DMSGF_REVCIRC;
358 
359 	/*
360 	 * Finish filling out the header.
361 	 */
362 	if (hbytes)
363 		bzero(&msg->any.head, hbytes);
364 	msg->hdr_size = hbytes;
365 	msg->any.head.magic = DMSG_HDR_MAGIC;
366 	msg->any.head.cmd = cmd;
367 	msg->any.head.aux_descr = 0;
368 	msg->any.head.aux_crc = 0;
369 	msg->any.head.msgid = state->msgid;
370 	msg->any.head.circuit = pstate->msgid;
371 	msg->state = state;
372 
373 	return (msg);
374 }
375 
376 /*
377  * Free a message so it can be reused afresh.
378  *
379  * NOTE: aux_size can be 0 with a non-NULL aux_data.
380  */
381 static
382 void
383 dmsg_msg_free_locked(dmsg_msg_t *msg)
384 {
385 	/*dmsg_iocom_t *iocom = msg->iocom;*/
386 #if 1
387 	int hbytes = (msg->any.head.cmd & DMSGF_SIZE) * DMSG_ALIGN;
388 	if (*(int *)((char *)msg +
389 		     offsetof(struct  dmsg_msg, any.head) + hbytes) !=
390 	     0x71B2C3D4) {
391 		fprintf(stderr, "MSGFREE FAILED CMD %08x\n", msg->any.head.cmd);
392 		assert(0);
393 	}
394 #endif
395 	msg->state = NULL;
396 	if (msg->aux_data) {
397 		free(msg->aux_data);
398 		msg->aux_data = NULL;
399 	}
400 	msg->aux_size = 0;
401 	free (msg);
402 #if 0
403 	if (msg->aux_data)
404 		TAILQ_INSERT_TAIL(&iocom->freeq_aux, msg, qentry);
405 	else
406 		TAILQ_INSERT_TAIL(&iocom->freeq, msg, qentry);
407 #endif
408 }
409 
410 void
411 dmsg_msg_free(dmsg_msg_t *msg)
412 {
413 	dmsg_iocom_t *iocom = msg->state->iocom;
414 
415 	pthread_mutex_lock(&iocom->mtx);
416 	dmsg_msg_free_locked(msg);
417 	pthread_mutex_unlock(&iocom->mtx);
418 }
419 
420 /*
421  * I/O core loop for an iocom.
422  *
423  * Thread localized, iocom->mtx not held.
424  */
425 void
426 dmsg_iocom_core(dmsg_iocom_t *iocom)
427 {
428 	struct pollfd fds[3];
429 	char dummybuf[256];
430 	dmsg_msg_t *msg;
431 	int timeout;
432 	int count;
433 	int wi;	/* wakeup pipe */
434 	int si;	/* socket */
435 	int ai;	/* alt bulk path socket */
436 
437 	while ((iocom->flags & DMSG_IOCOMF_EOF) == 0) {
438 		/*
439 		 * These iocom->flags are only manipulated within the
440 		 * context of the current thread.  However, modifications
441 		 * still require atomic ops.
442 		 */
443 		if ((iocom->flags & (DMSG_IOCOMF_RWORK |
444 				     DMSG_IOCOMF_WWORK |
445 				     DMSG_IOCOMF_PWORK |
446 				     DMSG_IOCOMF_SWORK |
447 				     DMSG_IOCOMF_ARWORK |
448 				     DMSG_IOCOMF_AWWORK)) == 0) {
449 			/*
450 			 * Only poll if no immediate work is pending.
451 			 * Otherwise we are just wasting our time calling
452 			 * poll.
453 			 */
454 			timeout = 5000;
455 
456 			count = 0;
457 			wi = -1;
458 			si = -1;
459 			ai = -1;
460 
461 			/*
462 			 * Always check the inter-thread pipe, e.g.
463 			 * for iocom->txmsgq work.
464 			 */
465 			wi = count++;
466 			fds[wi].fd = iocom->wakeupfds[0];
467 			fds[wi].events = POLLIN;
468 			fds[wi].revents = 0;
469 
470 			/*
471 			 * Check the socket input/output direction as
472 			 * requested
473 			 */
474 			if (iocom->flags & (DMSG_IOCOMF_RREQ |
475 					    DMSG_IOCOMF_WREQ)) {
476 				si = count++;
477 				fds[si].fd = iocom->sock_fd;
478 				fds[si].events = 0;
479 				fds[si].revents = 0;
480 
481 				if (iocom->flags & DMSG_IOCOMF_RREQ)
482 					fds[si].events |= POLLIN;
483 				if (iocom->flags & DMSG_IOCOMF_WREQ)
484 					fds[si].events |= POLLOUT;
485 			}
486 
487 			/*
488 			 * Check the alternative fd for work.
489 			 */
490 			if (iocom->alt_fd >= 0) {
491 				ai = count++;
492 				fds[ai].fd = iocom->alt_fd;
493 				fds[ai].events = POLLIN;
494 				fds[ai].revents = 0;
495 			}
496 			poll(fds, count, timeout);
497 
498 			if (wi >= 0 && (fds[wi].revents & POLLIN))
499 				atomic_set_int(&iocom->flags,
500 					       DMSG_IOCOMF_PWORK);
501 			if (si >= 0 && (fds[si].revents & POLLIN))
502 				atomic_set_int(&iocom->flags,
503 					       DMSG_IOCOMF_RWORK);
504 			if (si >= 0 && (fds[si].revents & POLLOUT))
505 				atomic_set_int(&iocom->flags,
506 					       DMSG_IOCOMF_WWORK);
507 			if (wi >= 0 && (fds[wi].revents & POLLOUT))
508 				atomic_set_int(&iocom->flags,
509 					       DMSG_IOCOMF_WWORK);
510 			if (ai >= 0 && (fds[ai].revents & POLLIN))
511 				atomic_set_int(&iocom->flags,
512 					       DMSG_IOCOMF_ARWORK);
513 		} else {
514 			/*
515 			 * Always check the pipe
516 			 */
517 			atomic_set_int(&iocom->flags, DMSG_IOCOMF_PWORK);
518 		}
519 
520 		if (iocom->flags & DMSG_IOCOMF_SWORK) {
521 			atomic_clear_int(&iocom->flags, DMSG_IOCOMF_SWORK);
522 			iocom->signal_callback(iocom);
523 		}
524 
525 		/*
526 		 * Pending message queues from other threads wake us up
527 		 * with a write to the wakeupfds[] pipe.  We have to clear
528 		 * the pipe with a dummy read.
529 		 */
530 		if (iocom->flags & DMSG_IOCOMF_PWORK) {
531 			atomic_clear_int(&iocom->flags, DMSG_IOCOMF_PWORK);
532 			read(iocom->wakeupfds[0], dummybuf, sizeof(dummybuf));
533 			atomic_set_int(&iocom->flags, DMSG_IOCOMF_RWORK);
534 			atomic_set_int(&iocom->flags, DMSG_IOCOMF_WWORK);
535 			if (TAILQ_FIRST(&iocom->txmsgq))
536 				dmsg_iocom_flush1(iocom);
537 		}
538 
539 		/*
540 		 * Message write sequencing
541 		 */
542 		if (iocom->flags & DMSG_IOCOMF_WWORK)
543 			dmsg_iocom_flush1(iocom);
544 
545 		/*
546 		 * Message read sequencing.  Run this after the write
547 		 * sequencing in case the write sequencing allowed another
548 		 * auto-DELETE to occur on the read side.
549 		 */
550 		if (iocom->flags & DMSG_IOCOMF_RWORK) {
551 			while ((iocom->flags & DMSG_IOCOMF_EOF) == 0 &&
552 			       (msg = dmsg_ioq_read(iocom)) != NULL) {
553 				if (DMsgDebugOpt) {
554 					fprintf(stderr, "receive %s\n",
555 						dmsg_msg_str(msg));
556 				}
557 				iocom->rcvmsg_callback(msg);
558 				dmsg_state_cleanuprx(iocom, msg);
559 			}
560 		}
561 
562 		if (iocom->flags & DMSG_IOCOMF_ARWORK) {
563 			atomic_clear_int(&iocom->flags, DMSG_IOCOMF_ARWORK);
564 			iocom->altmsg_callback(iocom);
565 		}
566 	}
567 }
568 
569 /*
570  * Make sure there's enough room in the FIFO to hold the
571  * needed data.
572  *
573  * Assume worst case encrypted form is 2x the size of the
574  * plaintext equivalent.
575  */
576 static
577 size_t
578 dmsg_ioq_makeroom(dmsg_ioq_t *ioq, size_t needed)
579 {
580 	size_t bytes;
581 	size_t nmax;
582 
583 	bytes = ioq->fifo_cdx - ioq->fifo_beg;
584 	nmax = sizeof(ioq->buf) - ioq->fifo_end;
585 	if (bytes + nmax / 2 < needed) {
586 		if (bytes) {
587 			bcopy(ioq->buf + ioq->fifo_beg,
588 			      ioq->buf,
589 			      bytes);
590 		}
591 		ioq->fifo_cdx -= ioq->fifo_beg;
592 		ioq->fifo_beg = 0;
593 		if (ioq->fifo_cdn < ioq->fifo_end) {
594 			bcopy(ioq->buf + ioq->fifo_cdn,
595 			      ioq->buf + ioq->fifo_cdx,
596 			      ioq->fifo_end - ioq->fifo_cdn);
597 		}
598 		ioq->fifo_end -= ioq->fifo_cdn - ioq->fifo_cdx;
599 		ioq->fifo_cdn = ioq->fifo_cdx;
600 		nmax = sizeof(ioq->buf) - ioq->fifo_end;
601 	}
602 	return(nmax);
603 }
604 
605 /*
606  * Read the next ready message from the ioq, issuing I/O if needed.
607  * Caller should retry on a read-event when NULL is returned.
608  *
609  * If an error occurs during reception a DMSG_LNK_ERROR msg will
610  * be returned for each open transaction, then the ioq and iocom
611  * will be errored out and a non-transactional DMSG_LNK_ERROR
612  * msg will be returned as the final message.  The caller should not call
613  * us again after the final message is returned.
614  *
615  * Thread localized, iocom->mtx not held.
616  */
617 dmsg_msg_t *
618 dmsg_ioq_read(dmsg_iocom_t *iocom)
619 {
620 	dmsg_ioq_t *ioq = &iocom->ioq_rx;
621 	dmsg_msg_t *msg;
622 	dmsg_state_t *state;
623 	dmsg_hdr_t *head;
624 	ssize_t n;
625 	size_t bytes;
626 	size_t nmax;
627 	uint32_t aux_size;
628 	uint32_t xcrc32;
629 	int error;
630 
631 again:
632 	/*
633 	 * If a message is already pending we can just remove and
634 	 * return it.  Message state has already been processed.
635 	 * (currently not implemented)
636 	 */
637 	if ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
638 		TAILQ_REMOVE(&ioq->msgq, msg, qentry);
639 		return (msg);
640 	}
641 	atomic_clear_int(&iocom->flags, DMSG_IOCOMF_RREQ | DMSG_IOCOMF_RWORK);
642 
643 	/*
644 	 * If the stream is errored out we stop processing it.
645 	 */
646 	if (ioq->error)
647 		goto skip;
648 
649 	/*
650 	 * Message read in-progress (msg is NULL at the moment).  We don't
651 	 * allocate a msg until we have its core header.
652 	 */
653 	nmax = sizeof(ioq->buf) - ioq->fifo_end;
654 	bytes = ioq->fifo_cdx - ioq->fifo_beg;		/* already decrypted */
655 	msg = ioq->msg;
656 
657 	switch(ioq->state) {
658 	case DMSG_MSGQ_STATE_HEADER1:
659 		/*
660 		 * Load the primary header, fail on any non-trivial read
661 		 * error or on EOF.  Since the primary header is the same
662 		 * size is the message alignment it will never straddle
663 		 * the end of the buffer.
664 		 */
665 		nmax = dmsg_ioq_makeroom(ioq, sizeof(msg->any.head));
666 		if (bytes < sizeof(msg->any.head)) {
667 			n = read(iocom->sock_fd,
668 				 ioq->buf + ioq->fifo_end,
669 				 nmax);
670 			if (n <= 0) {
671 				if (n == 0) {
672 					ioq->error = DMSG_IOQ_ERROR_EOF;
673 					break;
674 				}
675 				if (errno != EINTR &&
676 				    errno != EINPROGRESS &&
677 				    errno != EAGAIN) {
678 					ioq->error = DMSG_IOQ_ERROR_SOCK;
679 					break;
680 				}
681 				n = 0;
682 				/* fall through */
683 			}
684 			ioq->fifo_end += (size_t)n;
685 			nmax -= (size_t)n;
686 		}
687 
688 		/*
689 		 * Decrypt data received so far.  Data will be decrypted
690 		 * in-place but might create gaps in the FIFO.  Partial
691 		 * blocks are not immediately decrypted.
692 		 *
693 		 * WARNING!  The header might be in the wrong endian, we
694 		 *	     do not fix it up until we get the entire
695 		 *	     extended header.
696 		 */
697 		if (iocom->flags & DMSG_IOCOMF_CRYPTED) {
698 			dmsg_crypto_decrypt(iocom, ioq);
699 		} else {
700 			ioq->fifo_cdx = ioq->fifo_end;
701 			ioq->fifo_cdn = ioq->fifo_end;
702 		}
703 		bytes = ioq->fifo_cdx - ioq->fifo_beg;
704 
705 		/*
706 		 * Insufficient data accumulated (msg is NULL, caller will
707 		 * retry on event).
708 		 */
709 		assert(msg == NULL);
710 		if (bytes < sizeof(msg->any.head))
711 			break;
712 
713 		/*
714 		 * Check and fixup the core header.  Note that the icrc
715 		 * has to be calculated before any fixups, but the crc
716 		 * fields in the msg may have to be swapped like everything
717 		 * else.
718 		 */
719 		head = (void *)(ioq->buf + ioq->fifo_beg);
720 		if (head->magic != DMSG_HDR_MAGIC &&
721 		    head->magic != DMSG_HDR_MAGIC_REV) {
722 			fprintf(stderr, "%s: head->magic is bad %02x\n",
723 				iocom->label, head->magic);
724 			if (iocom->flags & DMSG_IOCOMF_CRYPTED)
725 				fprintf(stderr, "(on encrypted link)\n");
726 			ioq->error = DMSG_IOQ_ERROR_SYNC;
727 			break;
728 		}
729 
730 		/*
731 		 * Calculate the full header size and aux data size
732 		 */
733 		if (head->magic == DMSG_HDR_MAGIC_REV) {
734 			ioq->hbytes = (bswap32(head->cmd) & DMSGF_SIZE) *
735 				      DMSG_ALIGN;
736 			aux_size = bswap32(head->aux_bytes);
737 		} else {
738 			ioq->hbytes = (head->cmd & DMSGF_SIZE) *
739 				      DMSG_ALIGN;
740 			aux_size = head->aux_bytes;
741 		}
742 		ioq->abytes = DMSG_DOALIGN(aux_size);
743 		ioq->unaligned_aux_size = aux_size;
744 		if (ioq->hbytes < sizeof(msg->any.head) ||
745 		    ioq->hbytes > sizeof(msg->any) ||
746 		    ioq->abytes > DMSG_AUX_MAX) {
747 			ioq->error = DMSG_IOQ_ERROR_FIELD;
748 			break;
749 		}
750 
751 		/*
752 		 * Allocate the message, the next state will fill it in.
753 		 *
754 		 * NOTE: The aux_data buffer will be sized to an aligned
755 		 *	 value and the aligned remainder zero'd for
756 		 *	 convenience.
757 		 *
758 		 * NOTE: Supply dummy state and a degenerate cmd without
759 		 *	 CREATE set.  The message will temporarily be
760 		 *	 associated with state0 until later post-processing.
761 		 */
762 		msg = dmsg_msg_alloc(&iocom->state0, aux_size,
763 				     ioq->hbytes / DMSG_ALIGN,
764 				     NULL, NULL);
765 		ioq->msg = msg;
766 
767 		/*
768 		 * Fall through to the next state.  Make sure that the
769 		 * extended header does not straddle the end of the buffer.
770 		 * We still want to issue larger reads into our buffer,
771 		 * book-keeping is easier if we don't bcopy() yet.
772 		 *
773 		 * Make sure there is enough room for bloated encrypt data.
774 		 */
775 		nmax = dmsg_ioq_makeroom(ioq, ioq->hbytes);
776 		ioq->state = DMSG_MSGQ_STATE_HEADER2;
777 		/* fall through */
778 	case DMSG_MSGQ_STATE_HEADER2:
779 		/*
780 		 * Fill out the extended header.
781 		 */
782 		assert(msg != NULL);
783 		if (bytes < ioq->hbytes) {
784 			n = read(iocom->sock_fd,
785 				 ioq->buf + ioq->fifo_end,
786 				 nmax);
787 			if (n <= 0) {
788 				if (n == 0) {
789 					ioq->error = DMSG_IOQ_ERROR_EOF;
790 					break;
791 				}
792 				if (errno != EINTR &&
793 				    errno != EINPROGRESS &&
794 				    errno != EAGAIN) {
795 					ioq->error = DMSG_IOQ_ERROR_SOCK;
796 					break;
797 				}
798 				n = 0;
799 				/* fall through */
800 			}
801 			ioq->fifo_end += (size_t)n;
802 			nmax -= (size_t)n;
803 		}
804 
805 		if (iocom->flags & DMSG_IOCOMF_CRYPTED) {
806 			dmsg_crypto_decrypt(iocom, ioq);
807 		} else {
808 			ioq->fifo_cdx = ioq->fifo_end;
809 			ioq->fifo_cdn = ioq->fifo_end;
810 		}
811 		bytes = ioq->fifo_cdx - ioq->fifo_beg;
812 
813 		/*
814 		 * Insufficient data accumulated (set msg NULL so caller will
815 		 * retry on event).
816 		 */
817 		if (bytes < ioq->hbytes) {
818 			msg = NULL;
819 			break;
820 		}
821 
822 		/*
823 		 * Calculate the extended header, decrypt data received
824 		 * so far.  Handle endian-conversion for the entire extended
825 		 * header.
826 		 */
827 		head = (void *)(ioq->buf + ioq->fifo_beg);
828 
829 		/*
830 		 * Check the CRC.
831 		 */
832 		if (head->magic == DMSG_HDR_MAGIC_REV)
833 			xcrc32 = bswap32(head->hdr_crc);
834 		else
835 			xcrc32 = head->hdr_crc;
836 		head->hdr_crc = 0;
837 		if (dmsg_icrc32(head, ioq->hbytes) != xcrc32) {
838 			ioq->error = DMSG_IOQ_ERROR_XCRC;
839 			fprintf(stderr, "BAD-XCRC(%08x,%08x) %s\n",
840 				xcrc32, dmsg_icrc32(head, ioq->hbytes),
841 				dmsg_msg_str(msg));
842 			assert(0);
843 			break;
844 		}
845 		head->hdr_crc = xcrc32;
846 
847 		if (head->magic == DMSG_HDR_MAGIC_REV) {
848 			dmsg_bswap_head(head);
849 		}
850 
851 		/*
852 		 * Copy the extended header into the msg and adjust the
853 		 * FIFO.
854 		 */
855 		bcopy(head, &msg->any, ioq->hbytes);
856 
857 		/*
858 		 * We are either done or we fall-through.
859 		 */
860 		if (ioq->abytes == 0) {
861 			ioq->fifo_beg += ioq->hbytes;
862 			break;
863 		}
864 
865 		/*
866 		 * Must adjust bytes (and the state) when falling through.
867 		 * nmax doesn't change.
868 		 */
869 		ioq->fifo_beg += ioq->hbytes;
870 		bytes -= ioq->hbytes;
871 		ioq->state = DMSG_MSGQ_STATE_AUXDATA1;
872 		/* fall through */
873 	case DMSG_MSGQ_STATE_AUXDATA1:
874 		/*
875 		 * Copy the partial or complete [decrypted] payload from
876 		 * remaining bytes in the FIFO in order to optimize the
877 		 * makeroom call in the AUXDATA2 state.  We have to
878 		 * fall-through either way so we can check the crc.
879 		 *
880 		 * msg->aux_size tracks our aux data.
881 		 *
882 		 * (Lets not complicate matters if the data is encrypted,
883 		 *  since the data in-stream is not the same size as the
884 		 *  data decrypted).
885 		 */
886 		if (bytes >= ioq->abytes) {
887 			bcopy(ioq->buf + ioq->fifo_beg, msg->aux_data,
888 			      ioq->abytes);
889 			msg->aux_size = ioq->abytes;
890 			ioq->fifo_beg += ioq->abytes;
891 			assert(ioq->fifo_beg <= ioq->fifo_cdx);
892 			assert(ioq->fifo_cdx <= ioq->fifo_cdn);
893 			bytes -= ioq->abytes;
894 		} else if (bytes) {
895 			bcopy(ioq->buf + ioq->fifo_beg, msg->aux_data,
896 			      bytes);
897 			msg->aux_size = bytes;
898 			ioq->fifo_beg += bytes;
899 			if (ioq->fifo_cdx < ioq->fifo_beg)
900 				ioq->fifo_cdx = ioq->fifo_beg;
901 			assert(ioq->fifo_beg <= ioq->fifo_cdx);
902 			assert(ioq->fifo_cdx <= ioq->fifo_cdn);
903 			bytes = 0;
904 		} else {
905 			msg->aux_size = 0;
906 		}
907 		ioq->state = DMSG_MSGQ_STATE_AUXDATA2;
908 		/* fall through */
909 	case DMSG_MSGQ_STATE_AUXDATA2:
910 		/*
911 		 * Make sure there is enough room for more data.
912 		 */
913 		assert(msg);
914 		nmax = dmsg_ioq_makeroom(ioq, ioq->abytes - msg->aux_size);
915 
916 		/*
917 		 * Read and decrypt more of the payload.
918 		 */
919 		if (msg->aux_size < ioq->abytes) {
920 			assert(bytes == 0);
921 			n = read(iocom->sock_fd,
922 				 ioq->buf + ioq->fifo_end,
923 				 nmax);
924 			if (n <= 0) {
925 				if (n == 0) {
926 					ioq->error = DMSG_IOQ_ERROR_EOF;
927 					break;
928 				}
929 				if (errno != EINTR &&
930 				    errno != EINPROGRESS &&
931 				    errno != EAGAIN) {
932 					ioq->error = DMSG_IOQ_ERROR_SOCK;
933 					break;
934 				}
935 				n = 0;
936 				/* fall through */
937 			}
938 			ioq->fifo_end += (size_t)n;
939 			nmax -= (size_t)n;
940 		}
941 
942 		if (iocom->flags & DMSG_IOCOMF_CRYPTED) {
943 			dmsg_crypto_decrypt(iocom, ioq);
944 		} else {
945 			ioq->fifo_cdx = ioq->fifo_end;
946 			ioq->fifo_cdn = ioq->fifo_end;
947 		}
948 		bytes = ioq->fifo_cdx - ioq->fifo_beg;
949 
950 		if (bytes > ioq->abytes - msg->aux_size)
951 			bytes = ioq->abytes - msg->aux_size;
952 
953 		if (bytes) {
954 			bcopy(ioq->buf + ioq->fifo_beg,
955 			      msg->aux_data + msg->aux_size,
956 			      bytes);
957 			msg->aux_size += bytes;
958 			ioq->fifo_beg += bytes;
959 		}
960 
961 		/*
962 		 * Insufficient data accumulated (set msg NULL so caller will
963 		 * retry on event).
964 		 *
965 		 * Assert the auxillary data size is correct, then record the
966 		 * original unaligned size from the message header.
967 		 */
968 		if (msg->aux_size < ioq->abytes) {
969 			msg = NULL;
970 			break;
971 		}
972 		assert(msg->aux_size == ioq->abytes);
973 		msg->aux_size = ioq->unaligned_aux_size;
974 
975 		/*
976 		 * Check aux_crc, then we are done.  Note that the crc
977 		 * is calculated over the aligned size, not the actual
978 		 * size.
979 		 */
980 		xcrc32 = dmsg_icrc32(msg->aux_data, ioq->abytes);
981 		if (xcrc32 != msg->any.head.aux_crc) {
982 			ioq->error = DMSG_IOQ_ERROR_ACRC;
983 			fprintf(stderr, "iocom: ACRC error %08x vs %08x msgid %016jx msgcmd %08x auxsize %d\n",
984 				xcrc32, msg->any.head.aux_crc, (intmax_t)msg->any.head.msgid, msg->any.head.cmd, msg->any.head.aux_bytes);
985 			break;
986 		}
987 		break;
988 	case DMSG_MSGQ_STATE_ERROR:
989 		/*
990 		 * Continued calls to drain recorded transactions (returning
991 		 * a LNK_ERROR for each one), before we return the final
992 		 * LNK_ERROR.
993 		 */
994 		assert(msg == NULL);
995 		break;
996 	default:
997 		/*
998 		 * We don't double-return errors, the caller should not
999 		 * have called us again after getting an error msg.
1000 		 */
1001 		assert(0);
1002 		break;
1003 	}
1004 
1005 	/*
1006 	 * Check the message sequence.  The iv[] should prevent any
1007 	 * possibility of a replay but we add this check anyway.
1008 	 */
1009 	if (msg && ioq->error == 0) {
1010 		if ((msg->any.head.salt & 255) != (ioq->seq & 255)) {
1011 			ioq->error = DMSG_IOQ_ERROR_MSGSEQ;
1012 		} else {
1013 			++ioq->seq;
1014 		}
1015 	}
1016 
1017 	/*
1018 	 * Handle error, RREQ, or completion
1019 	 *
1020 	 * NOTE: nmax and bytes are invalid at this point, we don't bother
1021 	 *	 to update them when breaking out.
1022 	 */
1023 	if (ioq->error) {
1024 skip:
1025 		fprintf(stderr, "IOQ ERROR %d\n", ioq->error);
1026 		/*
1027 		 * An unrecoverable error causes all active receive
1028 		 * transactions to be terminated with a LNK_ERROR message.
1029 		 *
1030 		 * Once all active transactions are exhausted we set the
1031 		 * iocom ERROR flag and return a non-transactional LNK_ERROR
1032 		 * message, which should cause master processing loops to
1033 		 * terminate.
1034 		 */
1035 		assert(ioq->msg == msg);
1036 		if (msg) {
1037 			dmsg_msg_free(msg);
1038 			ioq->msg = NULL;
1039 		}
1040 
1041 		/*
1042 		 * No more I/O read processing
1043 		 */
1044 		ioq->state = DMSG_MSGQ_STATE_ERROR;
1045 
1046 		/*
1047 		 * Simulate a remote LNK_ERROR DELETE msg for any open
1048 		 * transactions, ending with a final non-transactional
1049 		 * LNK_ERROR (that the session can detect) when no
1050 		 * transactions remain.
1051 		 *
1052 		 * NOTE: Temporarily supply state0 and a degenerate cmd
1053 		 *	 without CREATE set.  The real state will be
1054 		 *	 assigned in the loop.
1055 		 *
1056 		 * NOTE: We are simulating a received message using our
1057 		 *	 side of the state, so the DMSGF_REV* bits have
1058 		 *	 to be reversed.
1059 		 */
1060 		msg = dmsg_msg_alloc(&iocom->state0, 0, DMSG_LNK_ERROR,
1061 				     NULL, NULL);
1062 		msg->any.head.error = ioq->error;
1063 
1064 		pthread_mutex_lock(&iocom->mtx);
1065 		dmsg_iocom_drain(iocom);
1066 
1067 		if ((state = RB_ROOT(&iocom->staterd_tree)) != NULL) {
1068 			/*
1069 			 * Active remote transactions are still present.
1070 			 * Simulate the other end sending us a DELETE.
1071 			 */
1072 			if (state->rxcmd & DMSGF_DELETE) {
1073 				dmsg_msg_free(msg);
1074 				fprintf(stderr,
1075 					"iocom: ioq error(rd) %d sleeping "
1076 					"state %p rxcmd %08x txcmd %08x "
1077 					"func %p\n",
1078 					ioq->error, state, state->rxcmd,
1079 					state->txcmd, state->func);
1080 				usleep(100000);	/* XXX */
1081 				atomic_set_int(&iocom->flags,
1082 					       DMSG_IOCOMF_RWORK);
1083 				msg = NULL;
1084 			} else {
1085 				fprintf(stderr, "SIMULATE ERROR1\n");
1086 				/*state->txcmd |= DMSGF_DELETE;*/
1087 				msg->state = state;
1088 				msg->any.head.msgid = state->msgid;
1089 				msg->any.head.circuit = state->parent->msgid;
1090 				msg->any.head.cmd |= DMSGF_ABORT |
1091 						     DMSGF_DELETE;
1092 				if ((state->parent->flags &
1093 				     DMSG_STATE_OPPOSITE) == 0) {
1094 					msg->any.head.cmd |= DMSGF_REVCIRC;
1095 				}
1096 			}
1097 		} else if ((state = RB_ROOT(&iocom->statewr_tree)) != NULL) {
1098 			/*
1099 			 * Active local transactions are still present.
1100 			 * Simulate the other end sending us a DELETE.
1101 			 */
1102 			if (state->rxcmd & DMSGF_DELETE) {
1103 				dmsg_msg_free(msg);
1104 				fprintf(stderr,
1105 					"iocom: ioq error(wr) %d sleeping "
1106 					"state %p rxcmd %08x txcmd %08x "
1107 					"func %p\n",
1108 					ioq->error, state, state->rxcmd,
1109 					state->txcmd, state->func);
1110 				usleep(100000);	/* XXX */
1111 				atomic_set_int(&iocom->flags,
1112 					       DMSG_IOCOMF_RWORK);
1113 				msg = NULL;
1114 			} else {
1115 				fprintf(stderr, "SIMULATE ERROR1\n");
1116 				msg->state = state;
1117 				msg->any.head.msgid = state->msgid;
1118 				msg->any.head.circuit = state->parent->msgid;
1119 				msg->any.head.cmd |= DMSGF_ABORT |
1120 						     DMSGF_DELETE |
1121 						     DMSGF_REVTRANS |
1122 						     DMSGF_REPLY;
1123 				if ((state->parent->flags &
1124 				     DMSG_STATE_OPPOSITE) == 0) {
1125 					msg->any.head.cmd |= DMSGF_REVCIRC;
1126 				}
1127 				if ((state->rxcmd & DMSGF_CREATE) == 0)
1128 					msg->any.head.cmd |= DMSGF_CREATE;
1129 			}
1130 		} else {
1131 			/*
1132 			 * No active local or remote transactions remain.
1133 			 * Generate a final LNK_ERROR and flag EOF.
1134 			 */
1135 			atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF);
1136 			fprintf(stderr, "EOF ON SOCKET %d\n", iocom->sock_fd);
1137 		}
1138 		pthread_mutex_unlock(&iocom->mtx);
1139 
1140 		/*
1141 		 * For the iocom error case we want to set RWORK to indicate
1142 		 * that more messages might be pending.
1143 		 *
1144 		 * It is possible to return NULL when there is more work to
1145 		 * do because each message has to be DELETEd in both
1146 		 * directions before we continue on with the next (though
1147 		 * this could be optimized).  The transmit direction will
1148 		 * re-set RWORK.
1149 		 */
1150 		if (msg)
1151 			atomic_set_int(&iocom->flags, DMSG_IOCOMF_RWORK);
1152 	} else if (msg == NULL) {
1153 		/*
1154 		 * Insufficient data received to finish building the message,
1155 		 * set RREQ and return NULL.
1156 		 *
1157 		 * Leave ioq->msg intact.
1158 		 * Leave the FIFO intact.
1159 		 */
1160 		atomic_set_int(&iocom->flags, DMSG_IOCOMF_RREQ);
1161 	} else {
1162 		/*
1163 		 * Continue processing msg.
1164 		 *
1165 		 * The fifo has already been advanced past the message.
1166 		 * Trivially reset the FIFO indices if possible.
1167 		 *
1168 		 * clear the FIFO if it is now empty and set RREQ to wait
1169 		 * for more from the socket.  If the FIFO is not empty set
1170 		 * TWORK to bypass the poll so we loop immediately.
1171 		 */
1172 		if (ioq->fifo_beg == ioq->fifo_cdx &&
1173 		    ioq->fifo_cdn == ioq->fifo_end) {
1174 			atomic_set_int(&iocom->flags, DMSG_IOCOMF_RREQ);
1175 			ioq->fifo_cdx = 0;
1176 			ioq->fifo_cdn = 0;
1177 			ioq->fifo_beg = 0;
1178 			ioq->fifo_end = 0;
1179 		} else {
1180 			atomic_set_int(&iocom->flags, DMSG_IOCOMF_RWORK);
1181 		}
1182 		ioq->state = DMSG_MSGQ_STATE_HEADER1;
1183 		ioq->msg = NULL;
1184 
1185 		/*
1186 		 * Handle message routing.  Validates non-zero sources
1187 		 * and routes message.  Error will be 0 if the message is
1188 		 * destined for us.
1189 		 *
1190 		 * State processing only occurs for messages destined for us.
1191 		 */
1192 		if (DMsgDebugOpt >= 5) {
1193 			fprintf(stderr,
1194 				"rxmsg cmd=%08x msgid=%016jx circ=%016jx\n",
1195 				msg->any.head.cmd,
1196 				(intmax_t)msg->any.head.msgid,
1197 				(intmax_t)msg->any.head.circuit);
1198 		}
1199 		error = dmsg_state_msgrx(msg);
1200 
1201 		if (error) {
1202 			/*
1203 			 * Abort-after-closure, throw message away and
1204 			 * start reading another.
1205 			 */
1206 			if (error == DMSG_IOQ_ERROR_EALREADY) {
1207 				dmsg_msg_free(msg);
1208 				goto again;
1209 			}
1210 
1211 			/*
1212 			 * msg routed, msg pointer no longer owned by us.
1213 			 * Go to the top and start reading another.
1214 			 */
1215 			if (error == DMSG_IOQ_ERROR_ROUTED)
1216 				goto again;
1217 
1218 			/*
1219 			 * Process real error and throw away message.
1220 			 */
1221 			ioq->error = error;
1222 			goto skip;
1223 		}
1224 		/* no error, not routed.  Fall through and return msg */
1225 	}
1226 	return (msg);
1227 }
1228 
1229 /*
1230  * Calculate the header and data crc's and write a low-level message to
1231  * the connection.  If aux_crc is non-zero the aux_data crc is already
1232  * assumed to have been set.
1233  *
1234  * A non-NULL msg is added to the queue but not necessarily flushed.
1235  * Calling this function with msg == NULL will get a flush going.
1236  *
1237  * (called from iocom_core only)
1238  */
1239 void
1240 dmsg_iocom_flush1(dmsg_iocom_t *iocom)
1241 {
1242 	dmsg_ioq_t *ioq = &iocom->ioq_tx;
1243 	dmsg_msg_t *msg;
1244 	uint32_t xcrc32;
1245 	size_t hbytes;
1246 	size_t abytes;
1247 	dmsg_msg_queue_t tmpq;
1248 
1249 	atomic_clear_int(&iocom->flags, DMSG_IOCOMF_WREQ | DMSG_IOCOMF_WWORK);
1250 	TAILQ_INIT(&tmpq);
1251 	pthread_mutex_lock(&iocom->mtx);
1252 	while ((msg = TAILQ_FIRST(&iocom->txmsgq)) != NULL) {
1253 		TAILQ_REMOVE(&iocom->txmsgq, msg, qentry);
1254 		TAILQ_INSERT_TAIL(&tmpq, msg, qentry);
1255 	}
1256 	pthread_mutex_unlock(&iocom->mtx);
1257 
1258 	while ((msg = TAILQ_FIRST(&tmpq)) != NULL) {
1259 		/*
1260 		 * Process terminal connection errors.
1261 		 */
1262 		TAILQ_REMOVE(&tmpq, msg, qentry);
1263 		if (ioq->error) {
1264 			TAILQ_INSERT_TAIL(&ioq->msgq, msg, qentry);
1265 			++ioq->msgcount;
1266 			continue;
1267 		}
1268 
1269 		/*
1270 		 * Finish populating the msg fields.  The salt ensures that
1271 		 * the iv[] array is ridiculously randomized and we also
1272 		 * re-seed our PRNG every 32768 messages just to be sure.
1273 		 */
1274 		msg->any.head.magic = DMSG_HDR_MAGIC;
1275 		msg->any.head.salt = (random() << 8) | (ioq->seq & 255);
1276 		++ioq->seq;
1277 		if ((ioq->seq & 32767) == 0)
1278 			srandomdev();
1279 
1280 		/*
1281 		 * Calculate aux_crc if 0, then calculate hdr_crc.
1282 		 */
1283 		if (msg->aux_size && msg->any.head.aux_crc == 0) {
1284 			abytes = DMSG_DOALIGN(msg->aux_size);
1285 			xcrc32 = dmsg_icrc32(msg->aux_data, abytes);
1286 			msg->any.head.aux_crc = xcrc32;
1287 		}
1288 		msg->any.head.aux_bytes = msg->aux_size;
1289 
1290 		hbytes = (msg->any.head.cmd & DMSGF_SIZE) *
1291 			 DMSG_ALIGN;
1292 		msg->any.head.hdr_crc = 0;
1293 		msg->any.head.hdr_crc = dmsg_icrc32(&msg->any.head, hbytes);
1294 
1295 		/*
1296 		 * Enqueue the message (the flush codes handles stream
1297 		 * encryption).
1298 		 */
1299 		TAILQ_INSERT_TAIL(&ioq->msgq, msg, qentry);
1300 		++ioq->msgcount;
1301 	}
1302 	dmsg_iocom_flush2(iocom);
1303 }
1304 
1305 /*
1306  * Thread localized, iocom->mtx not held by caller.
1307  *
1308  * (called from iocom_core via iocom_flush1 only)
1309  */
1310 void
1311 dmsg_iocom_flush2(dmsg_iocom_t *iocom)
1312 {
1313 	dmsg_ioq_t *ioq = &iocom->ioq_tx;
1314 	dmsg_msg_t *msg;
1315 	ssize_t n;
1316 	struct iovec iov[DMSG_IOQ_MAXIOVEC];
1317 	size_t nact;
1318 	size_t hbytes;
1319 	size_t abytes;
1320 	size_t hoff;
1321 	size_t aoff;
1322 	int iovcnt;
1323 
1324 	if (ioq->error) {
1325 		dmsg_iocom_drain(iocom);
1326 		return;
1327 	}
1328 
1329 	/*
1330 	 * Pump messages out the connection by building an iovec.
1331 	 *
1332 	 * ioq->hbytes/ioq->abytes tracks how much of the first message
1333 	 * in the queue has been successfully written out, so we can
1334 	 * resume writing.
1335 	 */
1336 	iovcnt = 0;
1337 	nact = 0;
1338 	hoff = ioq->hbytes;
1339 	aoff = ioq->abytes;
1340 
1341 	TAILQ_FOREACH(msg, &ioq->msgq, qentry) {
1342 		hbytes = (msg->any.head.cmd & DMSGF_SIZE) *
1343 			 DMSG_ALIGN;
1344 		abytes = DMSG_DOALIGN(msg->aux_size);
1345 		assert(hoff <= hbytes && aoff <= abytes);
1346 
1347 		if (hoff < hbytes) {
1348 			iov[iovcnt].iov_base = (char *)&msg->any.head + hoff;
1349 			iov[iovcnt].iov_len = hbytes - hoff;
1350 			nact += hbytes - hoff;
1351 			++iovcnt;
1352 			if (iovcnt == DMSG_IOQ_MAXIOVEC)
1353 				break;
1354 		}
1355 		if (aoff < abytes) {
1356 			assert(msg->aux_data != NULL);
1357 			iov[iovcnt].iov_base = (char *)msg->aux_data + aoff;
1358 			iov[iovcnt].iov_len = abytes - aoff;
1359 			nact += abytes - aoff;
1360 			++iovcnt;
1361 			if (iovcnt == DMSG_IOQ_MAXIOVEC)
1362 				break;
1363 		}
1364 		hoff = 0;
1365 		aoff = 0;
1366 	}
1367 	if (iovcnt == 0)
1368 		return;
1369 
1370 	/*
1371 	 * Encrypt and write the data.  The crypto code will move the
1372 	 * data into the fifo and adjust the iov as necessary.  If
1373 	 * encryption is disabled the iov is left alone.
1374 	 *
1375 	 * May return a smaller iov (thus a smaller n), with aggregated
1376 	 * chunks.  May reduce nmax to what fits in the FIFO.
1377 	 *
1378 	 * This function sets nact to the number of original bytes now
1379 	 * encrypted, adding to the FIFO some number of bytes that might
1380 	 * be greater depending on the crypto mechanic.  iov[] is adjusted
1381 	 * to point at the FIFO if necessary.
1382 	 *
1383 	 * NOTE: The return value from the writev() is the post-encrypted
1384 	 *	 byte count, not the plaintext count.
1385 	 */
1386 	if (iocom->flags & DMSG_IOCOMF_CRYPTED) {
1387 		/*
1388 		 * Make sure the FIFO has a reasonable amount of space
1389 		 * left (if not completely full).
1390 		 *
1391 		 * In this situation we are staging the encrypted message
1392 		 * data in the FIFO.  (nact) represents how much plaintext
1393 		 * has been staged, (n) represents how much encrypted data
1394 		 * has been flushed.  The two are independent of each other.
1395 		 */
1396 		if (ioq->fifo_beg > sizeof(ioq->buf) / 2 &&
1397 		    sizeof(ioq->buf) - ioq->fifo_end < DMSG_ALIGN * 2) {
1398 			bcopy(ioq->buf + ioq->fifo_beg, ioq->buf,
1399 			      ioq->fifo_end - ioq->fifo_beg);
1400 			ioq->fifo_cdx -= ioq->fifo_beg;
1401 			ioq->fifo_cdn -= ioq->fifo_beg;
1402 			ioq->fifo_end -= ioq->fifo_beg;
1403 			ioq->fifo_beg = 0;
1404 		}
1405 
1406 		iovcnt = dmsg_crypto_encrypt(iocom, ioq, iov, iovcnt, &nact);
1407 		n = writev(iocom->sock_fd, iov, iovcnt);
1408 		if (n > 0) {
1409 			ioq->fifo_beg += n;
1410 			ioq->fifo_cdn += n;
1411 			ioq->fifo_cdx += n;
1412 			if (ioq->fifo_beg == ioq->fifo_end) {
1413 				ioq->fifo_beg = 0;
1414 				ioq->fifo_cdn = 0;
1415 				ioq->fifo_cdx = 0;
1416 				ioq->fifo_end = 0;
1417 			}
1418 		}
1419 		/*
1420 		 * We don't mess with the nact returned by the crypto_encrypt
1421 		 * call, which represents the filling of the FIFO.  (n) tells
1422 		 * us how much we were able to write from the FIFO.  The two
1423 		 * are different beasts when encrypting.
1424 		 */
1425 	} else {
1426 		/*
1427 		 * In this situation we are not staging the messages to the
1428 		 * FIFO but instead writing them directly from the msg
1429 		 * structure(s), so (nact) is basically (n).
1430 		 */
1431 		n = writev(iocom->sock_fd, iov, iovcnt);
1432 		if (n > 0)
1433 			nact = n;
1434 		else
1435 			nact = 0;
1436 	}
1437 
1438 	/*
1439 	 * Clean out the transmit queue based on what we successfully
1440 	 * sent (nact is the plaintext count).  ioq->hbytes/abytes
1441 	 * represents the portion of the first message previously sent.
1442 	 */
1443 	while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
1444 		hbytes = (msg->any.head.cmd & DMSGF_SIZE) *
1445 			 DMSG_ALIGN;
1446 		abytes = DMSG_DOALIGN(msg->aux_size);
1447 
1448 		if ((size_t)nact < hbytes - ioq->hbytes) {
1449 			ioq->hbytes += nact;
1450 			nact = 0;
1451 			break;
1452 		}
1453 		nact -= hbytes - ioq->hbytes;
1454 		ioq->hbytes = hbytes;
1455 		if ((size_t)nact < abytes - ioq->abytes) {
1456 			ioq->abytes += nact;
1457 			nact = 0;
1458 			break;
1459 		}
1460 		nact -= abytes - ioq->abytes;
1461 		/* ioq->abytes = abytes; optimized out */
1462 
1463 #if 0
1464 		fprintf(stderr,
1465 			"txmsg cmd=%08x msgid=%016jx circ=%016jx\n",
1466 			msg->any.head.cmd,
1467 			(intmax_t)msg->any.head.msgid,
1468 			(intmax_t)msg->any.head.circuit);
1469 #endif
1470 
1471 		TAILQ_REMOVE(&ioq->msgq, msg, qentry);
1472 		--ioq->msgcount;
1473 		ioq->hbytes = 0;
1474 		ioq->abytes = 0;
1475 
1476 		dmsg_state_cleanuptx(iocom, msg);
1477 	}
1478 	assert(nact == 0);
1479 
1480 	/*
1481 	 * Process the return value from the write w/regards to blocking.
1482 	 */
1483 	if (n < 0) {
1484 		if (errno != EINTR &&
1485 		    errno != EINPROGRESS &&
1486 		    errno != EAGAIN) {
1487 			/*
1488 			 * Fatal write error
1489 			 */
1490 			ioq->error = DMSG_IOQ_ERROR_SOCK;
1491 			dmsg_iocom_drain(iocom);
1492 		} else {
1493 			/*
1494 			 * Wait for socket buffer space
1495 			 */
1496 			atomic_set_int(&iocom->flags, DMSG_IOCOMF_WREQ);
1497 		}
1498 	} else {
1499 		atomic_set_int(&iocom->flags, DMSG_IOCOMF_WREQ);
1500 	}
1501 	if (ioq->error) {
1502 		dmsg_iocom_drain(iocom);
1503 	}
1504 }
1505 
1506 /*
1507  * Kill pending msgs on ioq_tx and adjust the flags such that no more
1508  * write events will occur.  We don't kill read msgs because we want
1509  * the caller to pull off our contrived terminal error msg to detect
1510  * the connection failure.
1511  *
1512  * Localized to iocom_core thread, iocom->mtx not held by caller.
1513  */
1514 void
1515 dmsg_iocom_drain(dmsg_iocom_t *iocom)
1516 {
1517 	dmsg_ioq_t *ioq = &iocom->ioq_tx;
1518 	dmsg_msg_t *msg;
1519 
1520 	atomic_clear_int(&iocom->flags, DMSG_IOCOMF_WREQ | DMSG_IOCOMF_WWORK);
1521 	ioq->hbytes = 0;
1522 	ioq->abytes = 0;
1523 
1524 	while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
1525 		TAILQ_REMOVE(&ioq->msgq, msg, qentry);
1526 		--ioq->msgcount;
1527 		dmsg_state_cleanuptx(iocom, msg);
1528 	}
1529 }
1530 
1531 /*
1532  * Write a message to an iocom, with additional state processing.
1533  */
1534 void
1535 dmsg_msg_write(dmsg_msg_t *msg)
1536 {
1537 	dmsg_iocom_t *iocom = msg->state->iocom;
1538 	dmsg_state_t *state;
1539 	char dummy;
1540 
1541 	/*
1542 	 * Handle state processing, create state if necessary.
1543 	 */
1544 	pthread_mutex_lock(&iocom->mtx);
1545 	state = msg->state;
1546 	if (state != &state->iocom->state0) {
1547 		/*
1548 		 * Existing transaction (could be reply).  It is also
1549 		 * possible for this to be the first reply (CREATE is set),
1550 		 * in which case we populate state->txcmd.
1551 		 *
1552 		 * state->txcmd is adjusted to hold the final message cmd,
1553 		 * and we also be sure to set the CREATE bit here.  We did
1554 		 * not set it in dmsg_msg_alloc() because that would have
1555 		 * not been serialized (state could have gotten ripped out
1556 		 * from under the message prior to it being transmitted).
1557 		 */
1558 		if ((msg->any.head.cmd & (DMSGF_CREATE | DMSGF_REPLY)) ==
1559 		    DMSGF_CREATE) {
1560 			state->txcmd = msg->any.head.cmd & ~DMSGF_DELETE;
1561 			state->icmd = state->txcmd & DMSGF_BASECMDMASK;
1562 		}
1563 		msg->any.head.msgid = state->msgid;
1564 
1565 		if (msg->any.head.cmd & DMSGF_CREATE) {
1566 			state->txcmd = msg->any.head.cmd & ~DMSGF_DELETE;
1567 		}
1568 	}
1569 
1570 #if 0
1571 	fprintf(stderr,
1572 		"MSGWRITE %016jx %08x\n",
1573 		msg->any.head.msgid, msg->any.head.cmd);
1574 #endif
1575 
1576 	/*
1577 	 * Queue it for output, wake up the I/O pthread.  Note that the
1578 	 * I/O thread is responsible for generating the CRCs and encryption.
1579 	 */
1580 	TAILQ_INSERT_TAIL(&iocom->txmsgq, msg, qentry);
1581 	dummy = 0;
1582 	write(iocom->wakeupfds[1], &dummy, 1);	/* XXX optimize me */
1583 	pthread_mutex_unlock(&iocom->mtx);
1584 }
1585 
1586 /*
1587  * This is a shortcut to formulate a reply to msg with a simple error code,
1588  * It can reply to and terminate a transaction, or it can reply to a one-way
1589  * messages.  A DMSG_LNK_ERROR command code is utilized to encode
1590  * the error code (which can be 0).  Not all transactions are terminated
1591  * with DMSG_LNK_ERROR status (the low level only cares about the
1592  * MSGF_DELETE flag), but most are.
1593  *
1594  * Replies to one-way messages are a bit of an oxymoron but the feature
1595  * is used by the debug (DBG) protocol.
1596  *
1597  * The reply contains no extended data.
1598  */
1599 void
1600 dmsg_msg_reply(dmsg_msg_t *msg, uint32_t error)
1601 {
1602 	dmsg_state_t *state = msg->state;
1603 	dmsg_msg_t *nmsg;
1604 	uint32_t cmd;
1605 
1606 
1607 	/*
1608 	 * Reply with a simple error code and terminate the transaction.
1609 	 */
1610 	cmd = DMSG_LNK_ERROR;
1611 
1612 	/*
1613 	 * Check if our direction has even been initiated yet, set CREATE.
1614 	 *
1615 	 * Check what direction this is (command or reply direction).  Note
1616 	 * that txcmd might not have been initiated yet.
1617 	 *
1618 	 * If our direction has already been closed we just return without
1619 	 * doing anything.
1620 	 */
1621 	if (state != &state->iocom->state0) {
1622 		if (state->txcmd & DMSGF_DELETE)
1623 			return;
1624 		if (state->txcmd & DMSGF_REPLY)
1625 			cmd |= DMSGF_REPLY;
1626 		cmd |= DMSGF_DELETE;
1627 	} else {
1628 		if ((msg->any.head.cmd & DMSGF_REPLY) == 0)
1629 			cmd |= DMSGF_REPLY;
1630 	}
1631 
1632 	/*
1633 	 * Allocate the message and associate it with the existing state.
1634 	 * We cannot pass DMSGF_CREATE to msg_alloc() because that may
1635 	 * allocate new state.  We have our state already.
1636 	 */
1637 	nmsg = dmsg_msg_alloc(state, 0, cmd, NULL, NULL);
1638 	if (state != &state->iocom->state0) {
1639 		if ((state->txcmd & DMSGF_CREATE) == 0)
1640 			nmsg->any.head.cmd |= DMSGF_CREATE;
1641 	}
1642 	nmsg->any.head.error = error;
1643 
1644 	dmsg_msg_write(nmsg);
1645 }
1646 
1647 /*
1648  * Similar to dmsg_msg_reply() but leave the transaction open.  That is,
1649  * we are generating a streaming reply or an intermediate acknowledgement
1650  * of some sort as part of the higher level protocol, with more to come
1651  * later.
1652  */
1653 void
1654 dmsg_msg_result(dmsg_msg_t *msg, uint32_t error)
1655 {
1656 	dmsg_state_t *state = msg->state;
1657 	dmsg_msg_t *nmsg;
1658 	uint32_t cmd;
1659 
1660 
1661 	/*
1662 	 * Reply with a simple error code and terminate the transaction.
1663 	 */
1664 	cmd = DMSG_LNK_ERROR;
1665 
1666 	/*
1667 	 * Check if our direction has even been initiated yet, set CREATE.
1668 	 *
1669 	 * Check what direction this is (command or reply direction).  Note
1670 	 * that txcmd might not have been initiated yet.
1671 	 *
1672 	 * If our direction has already been closed we just return without
1673 	 * doing anything.
1674 	 */
1675 	if (state != &state->iocom->state0) {
1676 		if (state->txcmd & DMSGF_DELETE)
1677 			return;
1678 		if (state->txcmd & DMSGF_REPLY)
1679 			cmd |= DMSGF_REPLY;
1680 		/* continuing transaction, do not set MSGF_DELETE */
1681 	} else {
1682 		if ((msg->any.head.cmd & DMSGF_REPLY) == 0)
1683 			cmd |= DMSGF_REPLY;
1684 	}
1685 	nmsg = dmsg_msg_alloc(state, 0, cmd, NULL, NULL);
1686 	if (state != &state->iocom->state0) {
1687 		if ((state->txcmd & DMSGF_CREATE) == 0)
1688 			nmsg->any.head.cmd |= DMSGF_CREATE;
1689 	}
1690 	nmsg->any.head.error = error;
1691 
1692 	dmsg_msg_write(nmsg);
1693 }
1694 
1695 /*
1696  * Terminate a transaction given a state structure by issuing a DELETE.
1697  * (the state structure must not be &iocom->state0)
1698  */
1699 void
1700 dmsg_state_reply(dmsg_state_t *state, uint32_t error)
1701 {
1702 	dmsg_msg_t *nmsg;
1703 	uint32_t cmd = DMSG_LNK_ERROR | DMSGF_DELETE;
1704 
1705 	/*
1706 	 * Nothing to do if we already transmitted a delete
1707 	 */
1708 	if (state->txcmd & DMSGF_DELETE)
1709 		return;
1710 
1711 	/*
1712 	 * Set REPLY if the other end initiated the command.  Otherwise
1713 	 * we are the command direction.
1714 	 */
1715 	if (state->txcmd & DMSGF_REPLY)
1716 		cmd |= DMSGF_REPLY;
1717 
1718 	nmsg = dmsg_msg_alloc(state, 0, cmd, NULL, NULL);
1719 	if (state != &state->iocom->state0) {
1720 		if ((state->txcmd & DMSGF_CREATE) == 0)
1721 			nmsg->any.head.cmd |= DMSGF_CREATE;
1722 	}
1723 	nmsg->any.head.error = error;
1724 	dmsg_msg_write(nmsg);
1725 }
1726 
1727 /*
1728  * Terminate a transaction given a state structure by issuing a DELETE.
1729  * (the state structure must not be &iocom->state0)
1730  */
1731 void
1732 dmsg_state_result(dmsg_state_t *state, uint32_t error)
1733 {
1734 	dmsg_msg_t *nmsg;
1735 	uint32_t cmd = DMSG_LNK_ERROR;
1736 
1737 	/*
1738 	 * Nothing to do if we already transmitted a delete
1739 	 */
1740 	if (state->txcmd & DMSGF_DELETE)
1741 		return;
1742 
1743 	/*
1744 	 * Set REPLY if the other end initiated the command.  Otherwise
1745 	 * we are the command direction.
1746 	 */
1747 	if (state->txcmd & DMSGF_REPLY)
1748 		cmd |= DMSGF_REPLY;
1749 
1750 	nmsg = dmsg_msg_alloc(state, 0, cmd, NULL, NULL);
1751 	if (state != &state->iocom->state0) {
1752 		if ((state->txcmd & DMSGF_CREATE) == 0)
1753 			nmsg->any.head.cmd |= DMSGF_CREATE;
1754 	}
1755 	nmsg->any.head.error = error;
1756 	dmsg_msg_write(nmsg);
1757 }
1758 
1759 /************************************************************************
1760  *			TRANSACTION STATE HANDLING			*
1761  ************************************************************************
1762  *
1763  */
1764 
1765 /*
1766  * Process circuit and state tracking for a message after reception, prior
1767  * to execution.
1768  *
1769  * Called with msglk held and the msg dequeued.
1770  *
1771  * All messages are called with dummy state and return actual state.
1772  * (One-off messages often just return the same dummy state).
1773  *
1774  * May request that caller discard the message by setting *discardp to 1.
1775  * The returned state is not used in this case and is allowed to be NULL.
1776  *
1777  * --
1778  *
1779  * These routines handle persistent and command/reply message state via the
1780  * CREATE and DELETE flags.  The first message in a command or reply sequence
1781  * sets CREATE, the last message in a command or reply sequence sets DELETE.
1782  *
1783  * There can be any number of intermediate messages belonging to the same
1784  * sequence sent inbetween the CREATE message and the DELETE message,
1785  * which set neither flag.  This represents a streaming command or reply.
1786  *
1787  * Any command message received with CREATE set expects a reply sequence to
1788  * be returned.  Reply sequences work the same as command sequences except the
1789  * REPLY bit is also sent.  Both the command side and reply side can
1790  * degenerate into a single message with both CREATE and DELETE set.  Note
1791  * that one side can be streaming and the other side not, or neither, or both.
1792  *
1793  * The msgid is unique for the initiator.  That is, two sides sending a new
1794  * message can use the same msgid without colliding.
1795  *
1796  * --
1797  *
1798  * ABORT sequences work by setting the ABORT flag along with normal message
1799  * state.  However, ABORTs can also be sent on half-closed messages, that is
1800  * even if the command or reply side has already sent a DELETE, as long as
1801  * the message has not been fully closed it can still send an ABORT+DELETE
1802  * to terminate the half-closed message state.
1803  *
1804  * Since ABORT+DELETEs can race we silently discard ABORT's for message
1805  * state which has already been fully closed.  REPLY+ABORT+DELETEs can
1806  * also race, and in this situation the other side might have already
1807  * initiated a new unrelated command with the same message id.  Since
1808  * the abort has not set the CREATE flag the situation can be detected
1809  * and the message will also be discarded.
1810  *
1811  * Non-blocking requests can be initiated with ABORT+CREATE[+DELETE].
1812  * The ABORT request is essentially integrated into the command instead
1813  * of being sent later on.  In this situation the command implementation
1814  * detects that CREATE and ABORT are both set (vs ABORT alone) and can
1815  * special-case non-blocking operation for the command.
1816  *
1817  * NOTE!  Messages with ABORT set without CREATE or DELETE are considered
1818  *	  to be mid-stream aborts for command/reply sequences.  ABORTs on
1819  *	  one-way messages are not supported.
1820  *
1821  * NOTE!  If a command sequence does not support aborts the ABORT flag is
1822  *	  simply ignored.
1823  *
1824  * --
1825  *
1826  * One-off messages (no reply expected) are sent with neither CREATE or DELETE
1827  * set.  One-off messages cannot be aborted and typically aren't processed
1828  * by these routines.  The REPLY bit can be used to distinguish whether a
1829  * one-off message is a command or reply.  For example, one-off replies
1830  * will typically just contain status updates.
1831  */
1832 static int
1833 dmsg_state_msgrx(dmsg_msg_t *msg)
1834 {
1835 	dmsg_iocom_t *iocom = msg->state->iocom;
1836 	dmsg_state_t *state;
1837 	dmsg_state_t *pstate;
1838 	dmsg_state_t sdummy;
1839 	int error;
1840 
1841 #if 0
1842 	fprintf(stderr,
1843 		"MSGREAD  %016jx %08x\n",
1844 		msg->any.head.msgid, msg->any.head.cmd);
1845 #endif
1846 
1847 	pthread_mutex_lock(&iocom->mtx);
1848 
1849 	/*
1850 	 * XXX handle circuit accounting
1851 	 */
1852 
1853 	/*
1854 	 * If received msg is a command state is on staterd_tree.
1855 	 * If received msg is a reply state is on statewr_tree.
1856 	 * Otherwise there is no state (retain &iocom->state0)
1857 	 */
1858 	sdummy.msgid = msg->any.head.msgid;
1859 	if (msg->any.head.cmd & DMSGF_REVTRANS)
1860 		state = RB_FIND(dmsg_state_tree, &iocom->statewr_tree, &sdummy);
1861 	else
1862 		state = RB_FIND(dmsg_state_tree, &iocom->staterd_tree, &sdummy);
1863 	if (state)
1864 		msg->state = state;	/* found an open transaction */
1865 	else
1866 		state = msg->state;	/* retain &iocom->state0 */
1867 
1868 	pthread_mutex_unlock(&iocom->mtx);
1869 
1870 	/*
1871 	 * Short-cut one-off or mid-stream messages (state may be NULL).
1872 	 */
1873 	if ((msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE |
1874 				  DMSGF_ABORT)) == 0) {
1875 		error = 0;
1876 		goto done;
1877 	}
1878 
1879 	/*
1880 	 * Switch on CREATE, DELETE, REPLY, and also handle ABORT from
1881 	 * inside the case statements.
1882 	 */
1883 	switch(msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE |
1884 				    DMSGF_REPLY)) {
1885 	case DMSGF_CREATE:
1886 	case DMSGF_CREATE | DMSGF_DELETE:
1887 		/*
1888 		 * New persistant command received.
1889 		 */
1890 		if (state != &state->iocom->state0) {
1891 			fprintf(stderr,
1892 				"duplicate transaction %s\n",
1893 				dmsg_msg_str(msg));
1894 			error = DMSG_IOQ_ERROR_TRANS;
1895 			assert(0);
1896 			break;
1897 		}
1898 
1899 		/*
1900 		 * Lookup the circuit.  The circuit is an open transaction.
1901 		 * the REVCIRC bit in the message tells us which side
1902 		 * initiated the transaction representing the circuit.
1903 		 */
1904 		if (msg->any.head.circuit) {
1905 			pthread_mutex_lock(&iocom->mtx);
1906 			sdummy.msgid = msg->any.head.circuit;
1907 
1908 			if (msg->any.head.cmd & DMSGF_REVCIRC) {
1909 				pstate = RB_FIND(dmsg_state_tree,
1910 						 &iocom->statewr_tree,
1911 						 &sdummy);
1912 			} else {
1913 				pstate = RB_FIND(dmsg_state_tree,
1914 						 &iocom->staterd_tree,
1915 						 &sdummy);
1916 			}
1917 			if (pstate == NULL) {
1918 				fprintf(stderr,
1919 					"missing parent in stacked trans %s\n",
1920 					dmsg_msg_str(msg));
1921 				error = DMSG_IOQ_ERROR_TRANS;
1922 				pthread_mutex_unlock(&iocom->mtx);
1923 				assert(0);
1924 				break;
1925 			}
1926 			pthread_mutex_unlock(&iocom->mtx);
1927 		} else {
1928 			pstate = &iocom->state0;
1929 		}
1930 
1931 		/*
1932 		 * Allocate new state
1933 		 */
1934 		state = malloc(sizeof(*state));
1935 		bzero(state, sizeof(*state));
1936 		TAILQ_INIT(&state->subq);
1937 		state->parent = pstate;
1938 		state->iocom = iocom;
1939 		state->flags = DMSG_STATE_DYNAMIC |
1940 			       DMSG_STATE_OPPOSITE |
1941 			       (pstate->flags & DMSG_STATE_ROUTED);
1942 		state->msgid = msg->any.head.msgid;
1943 		state->txcmd = DMSGF_REPLY;
1944 		state->rxcmd = msg->any.head.cmd & ~DMSGF_DELETE;
1945 		state->icmd = state->rxcmd & DMSGF_BASECMDMASK;
1946 		msg->state = state;
1947 
1948 		pthread_mutex_lock(&iocom->mtx);
1949 		RB_INSERT(dmsg_state_tree, &iocom->staterd_tree, state);
1950 		TAILQ_INSERT_TAIL(&pstate->subq, state, entry);
1951 		state->flags |= DMSG_STATE_INSERTED;
1952 		pthread_mutex_unlock(&iocom->mtx);
1953 		error = 0;
1954 		if (DMsgDebugOpt) {
1955 			fprintf(stderr,
1956 				"create state %p id=%08x on iocom staterd %p\n",
1957 				state, (uint32_t)state->msgid, iocom);
1958 		}
1959 		break;
1960 	case DMSGF_DELETE:
1961 		/*
1962 		 * Persistent state is expected but might not exist if an
1963 		 * ABORT+DELETE races the close.
1964 		 */
1965 		if (state == &state->iocom->state0) {
1966 			if (msg->any.head.cmd & DMSGF_ABORT) {
1967 				error = DMSG_IOQ_ERROR_EALREADY;
1968 			} else {
1969 				fprintf(stderr, "missing-state %s\n",
1970 					dmsg_msg_str(msg));
1971 				error = DMSG_IOQ_ERROR_TRANS;
1972 			assert(0);
1973 			}
1974 			break;
1975 		}
1976 
1977 		/*
1978 		 * Handle another ABORT+DELETE case if the msgid has already
1979 		 * been reused.
1980 		 */
1981 		if ((state->rxcmd & DMSGF_CREATE) == 0) {
1982 			if (msg->any.head.cmd & DMSGF_ABORT) {
1983 				error = DMSG_IOQ_ERROR_EALREADY;
1984 			} else {
1985 				fprintf(stderr, "reused-state %s\n",
1986 					dmsg_msg_str(msg));
1987 				error = DMSG_IOQ_ERROR_TRANS;
1988 			assert(0);
1989 			}
1990 			break;
1991 		}
1992 		error = 0;
1993 		break;
1994 	default:
1995 		/*
1996 		 * Check for mid-stream ABORT command received, otherwise
1997 		 * allow.
1998 		 */
1999 		if (msg->any.head.cmd & DMSGF_ABORT) {
2000 			if (state == &state->iocom->state0 ||
2001 			    (state->rxcmd & DMSGF_CREATE) == 0) {
2002 				error = DMSG_IOQ_ERROR_EALREADY;
2003 				break;
2004 			}
2005 		}
2006 		error = 0;
2007 		break;
2008 	case DMSGF_REPLY | DMSGF_CREATE:
2009 	case DMSGF_REPLY | DMSGF_CREATE | DMSGF_DELETE:
2010 		/*
2011 		 * When receiving a reply with CREATE set the original
2012 		 * persistent state message should already exist.
2013 		 */
2014 		if (state == &state->iocom->state0) {
2015 			fprintf(stderr, "no-state(r) %s\n",
2016 				dmsg_msg_str(msg));
2017 			error = DMSG_IOQ_ERROR_TRANS;
2018 			assert(0);
2019 			break;
2020 		}
2021 		assert(((state->rxcmd ^ msg->any.head.cmd) &
2022 			DMSGF_REPLY) == 0);
2023 		state->rxcmd = msg->any.head.cmd & ~DMSGF_DELETE;
2024 		error = 0;
2025 		break;
2026 	case DMSGF_REPLY | DMSGF_DELETE:
2027 		/*
2028 		 * Received REPLY+ABORT+DELETE in case where msgid has
2029 		 * already been fully closed, ignore the message.
2030 		 */
2031 		if (state == &state->iocom->state0) {
2032 			if (msg->any.head.cmd & DMSGF_ABORT) {
2033 				error = DMSG_IOQ_ERROR_EALREADY;
2034 			} else {
2035 				fprintf(stderr, "no-state(r,d) %s\n",
2036 					dmsg_msg_str(msg));
2037 				error = DMSG_IOQ_ERROR_TRANS;
2038 			assert(0);
2039 			}
2040 			break;
2041 		}
2042 
2043 		/*
2044 		 * Received REPLY+ABORT+DELETE in case where msgid has
2045 		 * already been reused for an unrelated message,
2046 		 * ignore the message.
2047 		 */
2048 		if ((state->rxcmd & DMSGF_CREATE) == 0) {
2049 			if (msg->any.head.cmd & DMSGF_ABORT) {
2050 				error = DMSG_IOQ_ERROR_EALREADY;
2051 			} else {
2052 				fprintf(stderr, "reused-state(r,d) %s\n",
2053 					dmsg_msg_str(msg));
2054 				error = DMSG_IOQ_ERROR_TRANS;
2055 			assert(0);
2056 			}
2057 			break;
2058 		}
2059 		error = 0;
2060 		break;
2061 	case DMSGF_REPLY:
2062 		/*
2063 		 * Check for mid-stream ABORT reply received to sent command.
2064 		 */
2065 		if (msg->any.head.cmd & DMSGF_ABORT) {
2066 			if (state == &state->iocom->state0 ||
2067 			    (state->rxcmd & DMSGF_CREATE) == 0) {
2068 				error = DMSG_IOQ_ERROR_EALREADY;
2069 				break;
2070 			}
2071 		}
2072 		error = 0;
2073 		break;
2074 	}
2075 
2076 	/*
2077 	 * Calculate the easy-switch() transactional command.  Represents
2078 	 * the outer-transaction command for any transaction-create or
2079 	 * transaction-delete, and the inner message command for any
2080 	 * non-transaction or inside-transaction command.  tcmd will be
2081 	 * set to 0 for any messaging error condition.
2082 	 *
2083 	 * The two can be told apart because outer-transaction commands
2084 	 * always have a DMSGF_CREATE and/or DMSGF_DELETE flag.
2085 	 */
2086 done:
2087 	if (msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE)) {
2088 		if (state != &state->iocom->state0) {
2089 			msg->tcmd = (msg->state->icmd & DMSGF_BASECMDMASK) |
2090 				    (msg->any.head.cmd & (DMSGF_CREATE |
2091 							  DMSGF_DELETE |
2092 							  DMSGF_REPLY));
2093 		} else {
2094 			msg->tcmd = 0;
2095 		}
2096 	} else {
2097 		msg->tcmd = msg->any.head.cmd & DMSGF_CMDSWMASK;
2098 	}
2099 
2100 	/*
2101 	 * Possibly route the message if the state inherited the ROUTED
2102 	 * flag.
2103 	 */
2104 	if (state->flags & DMSG_STATE_ROUTED)
2105 		error = dmsg_state_routedrx(state, msg);
2106 
2107 	return (error);
2108 }
2109 
2110 /*
2111  * Routed messages still do state-tracking
2112  */
2113 static int
2114 dmsg_state_routedrx(dmsg_state_t *state, dmsg_msg_t *msg)
2115 {
2116 	/*
2117 	 * If this message is a CREATE or DELETE on the LNK_SPAN transaction
2118 	 * itself we process it normally rather than route it.
2119 	 */
2120 	if (state->parent == &state->iocom->state0 &&
2121 	    (msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE))) {
2122 		assert(state->icmd == DMSG_LNK_SPAN);
2123 		return 0;
2124 	}
2125 
2126 	/*
2127 	 * When routing the msgid must be translated to our representation
2128 	 * of the transaction. XXX
2129 	 */
2130 	fprintf(stderr, "ROUTING MESSAGE\n");
2131 
2132 	if (state->parent == &state->iocom->state0 &&
2133 	    state->icmd == DMSG_LNK_SPAN) {
2134 		/*
2135 		 * Route a non-transactional command through the SPAN.
2136 		 */
2137 	} else {
2138 		/*
2139 		 * Route a transactional message stacked under the LNK_SPAN.
2140 		 */
2141 	}
2142 	return DMSG_IOQ_ERROR_ROUTED;
2143 }
2144 
2145 void
2146 dmsg_state_cleanuprx(dmsg_iocom_t *iocom, dmsg_msg_t *msg)
2147 {
2148 	dmsg_state_t *state;
2149 	dmsg_state_t *pstate;
2150 
2151 	assert(msg->state->iocom == iocom);
2152 	state = msg->state;
2153 	if (state == &iocom->state0) {
2154 		/*
2155 		 * Free a non-transactional message, there is no state
2156 		 * to worry about.
2157 		 */
2158 		dmsg_msg_free(msg);
2159 	} else if (msg->any.head.cmd & DMSGF_DELETE) {
2160 		/*
2161 		 * Message terminating transaction, destroy the related
2162 		 * state, the original message, and this message (if it
2163 		 * isn't the original message due to a CREATE|DELETE).
2164 		 */
2165 		pthread_mutex_lock(&iocom->mtx);
2166 		state->rxcmd |= DMSGF_DELETE;
2167 		if (state->txcmd & DMSGF_DELETE) {
2168 			assert(state->flags & DMSG_STATE_INSERTED);
2169 			assert(TAILQ_EMPTY(&state->subq));
2170 			if (state->rxcmd & DMSGF_REPLY) {
2171 				assert(msg->any.head.cmd & DMSGF_REPLY);
2172 				RB_REMOVE(dmsg_state_tree,
2173 					  &iocom->statewr_tree, state);
2174 			} else {
2175 				assert((msg->any.head.cmd & DMSGF_REPLY) == 0);
2176 				RB_REMOVE(dmsg_state_tree,
2177 					  &iocom->staterd_tree, state);
2178 			}
2179 			pstate = state->parent;
2180 			TAILQ_REMOVE(&pstate->subq, state, entry);
2181 			if (pstate != &pstate->iocom->state0 &&
2182 			    TAILQ_EMPTY(&pstate->subq) &&
2183 			    (pstate->flags & DMSG_STATE_INSERTED) == 0) {
2184 				dmsg_state_free(pstate);
2185 			}
2186 			state->flags &= ~DMSG_STATE_INSERTED;
2187 			state->parent = NULL;
2188 			dmsg_msg_free(msg);
2189 			if (TAILQ_EMPTY(&state->subq))
2190 				dmsg_state_free(state);
2191 		} else {
2192 			dmsg_msg_free(msg);
2193 		}
2194 		pthread_mutex_unlock(&iocom->mtx);
2195 	} else {
2196 		/*
2197 		 * Message not terminating transaction, leave state intact
2198 		 * and free message if it isn't the CREATE message.
2199 		 */
2200 		dmsg_msg_free(msg);
2201 	}
2202 }
2203 
2204 static void
2205 dmsg_state_cleanuptx(dmsg_iocom_t *iocom, dmsg_msg_t *msg)
2206 {
2207 	dmsg_state_t *state;
2208 	dmsg_state_t *pstate;
2209 
2210 	assert(iocom == msg->state->iocom);
2211 	state = msg->state;
2212 	if (state == &state->iocom->state0) {
2213 		dmsg_msg_free(msg);
2214 	} else if (msg->any.head.cmd & DMSGF_DELETE) {
2215 		pthread_mutex_lock(&iocom->mtx);
2216 		assert((state->txcmd & DMSGF_DELETE) == 0);
2217 		state->txcmd |= DMSGF_DELETE;
2218 		if (state->rxcmd & DMSGF_DELETE) {
2219 			assert(state->flags & DMSG_STATE_INSERTED);
2220 			assert(TAILQ_EMPTY(&state->subq));
2221 			if (state->txcmd & DMSGF_REPLY) {
2222 				assert(msg->any.head.cmd & DMSGF_REPLY);
2223 				RB_REMOVE(dmsg_state_tree,
2224 					  &iocom->staterd_tree, state);
2225 			} else {
2226 				assert((msg->any.head.cmd & DMSGF_REPLY) == 0);
2227 				RB_REMOVE(dmsg_state_tree,
2228 					  &iocom->statewr_tree, state);
2229 			}
2230 			pstate = state->parent;
2231 			TAILQ_REMOVE(&pstate->subq, state, entry);
2232 			if (pstate != &pstate->iocom->state0 &&
2233 			    TAILQ_EMPTY(&pstate->subq) &&
2234 			    (pstate->flags & DMSG_STATE_INSERTED) == 0) {
2235 				dmsg_state_free(pstate);
2236 			}
2237 			state->flags &= ~DMSG_STATE_INSERTED;
2238 			state->parent = NULL;
2239 			dmsg_msg_free(msg);
2240 			if (TAILQ_EMPTY(&state->subq))
2241 				dmsg_state_free(state);
2242 		} else {
2243 			dmsg_msg_free(msg);
2244 		}
2245 		pthread_mutex_unlock(&iocom->mtx);
2246 	} else {
2247 		dmsg_msg_free(msg);
2248 	}
2249 }
2250 
2251 /*
2252  * Called with iocom locked
2253  */
2254 void
2255 dmsg_state_free(dmsg_state_t *state)
2256 {
2257 	if (DMsgDebugOpt) {
2258 		fprintf(stderr, "terminate state %p id=%08x\n",
2259 			state, (uint32_t)state->msgid);
2260 	}
2261 	if (state->any.any != NULL)   /* XXX avoid deadlock w/exit & kernel */
2262 		closefrom(3);
2263 	assert(state->any.any == NULL);
2264 	free(state);
2265 }
2266 
2267 /*
2268  * This swaps endian for a hammer2_msg_hdr.  Note that the extended
2269  * header is not adjusted, just the core header.
2270  */
2271 void
2272 dmsg_bswap_head(dmsg_hdr_t *head)
2273 {
2274 	head->magic	= bswap16(head->magic);
2275 	head->reserved02 = bswap16(head->reserved02);
2276 	head->salt	= bswap32(head->salt);
2277 
2278 	head->msgid	= bswap64(head->msgid);
2279 	head->circuit	= bswap64(head->circuit);
2280 	head->reserved18= bswap64(head->reserved18);
2281 
2282 	head->cmd	= bswap32(head->cmd);
2283 	head->aux_crc	= bswap32(head->aux_crc);
2284 	head->aux_bytes	= bswap32(head->aux_bytes);
2285 	head->error	= bswap32(head->error);
2286 	head->aux_descr = bswap64(head->aux_descr);
2287 	head->reserved38= bswap32(head->reserved38);
2288 	head->hdr_crc	= bswap32(head->hdr_crc);
2289 }
2290