1 /* 2 * Copyright (c) 2011-2012 The DragonFly Project. All rights reserved. 3 * 4 * This code is derived from software contributed to The DragonFly Project 5 * by Matthew Dillon <dillon@dragonflybsd.org> 6 * by Venkatesh Srinivas <vsrinivas@dragonflybsd.org> 7 * 8 * Redistribution and use in source and binary forms, with or without 9 * modification, are permitted provided that the following conditions 10 * are met: 11 * 12 * 1. Redistributions of source code must retain the above copyright 13 * notice, this list of conditions and the following disclaimer. 14 * 2. Redistributions in binary form must reproduce the above copyright 15 * notice, this list of conditions and the following disclaimer in 16 * the documentation and/or other materials provided with the 17 * distribution. 18 * 3. Neither the name of The DragonFly Project nor the names of its 19 * contributors may be used to endorse or promote products derived 20 * from this software without specific, prior written permission. 21 * 22 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 23 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 24 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 25 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 26 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 27 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING, 28 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 29 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED 30 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 31 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT 32 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 33 * SUCH DAMAGE. 34 */ 35 36 #include "dmsg_local.h" 37 38 int DMsgDebugOpt; 39 40 static int dmsg_state_msgrx(dmsg_msg_t *msg); 41 static void dmsg_state_cleanuptx(dmsg_msg_t *msg); 42 static void dmsg_msg_free_locked(dmsg_msg_t *msg); 43 44 RB_GENERATE(dmsg_state_tree, dmsg_state, rbnode, dmsg_state_cmp); 45 RB_GENERATE(dmsg_circuit_tree, dmsg_circuit, rbnode, dmsg_circuit_cmp); 46 47 /* 48 * STATE TREE - Represents open transactions which are indexed by their 49 * { msgid } relative to the governing iocom. 50 */ 51 int 52 dmsg_state_cmp(dmsg_state_t *state1, dmsg_state_t *state2) 53 { 54 if (state1->msgid < state2->msgid) 55 return(-1); 56 if (state1->msgid > state2->msgid) 57 return(1); 58 return(0); 59 } 60 61 /* 62 * CIRCUIT TREE - Represents open circuits which are indexed by their 63 * { msgid } relative to the governing iocom. 64 */ 65 int 66 dmsg_circuit_cmp(dmsg_circuit_t *circuit1, dmsg_circuit_t *circuit2) 67 { 68 if (circuit1->msgid < circuit2->msgid) 69 return(-1); 70 if (circuit1->msgid > circuit2->msgid) 71 return(1); 72 return(0); 73 } 74 75 /* 76 * Initialize a low-level ioq 77 */ 78 void 79 dmsg_ioq_init(dmsg_iocom_t *iocom __unused, dmsg_ioq_t *ioq) 80 { 81 bzero(ioq, sizeof(*ioq)); 82 ioq->state = DMSG_MSGQ_STATE_HEADER1; 83 TAILQ_INIT(&ioq->msgq); 84 } 85 86 /* 87 * Cleanup queue. 88 * 89 * caller holds iocom->mtx. 90 */ 91 void 92 dmsg_ioq_done(dmsg_iocom_t *iocom __unused, dmsg_ioq_t *ioq) 93 { 94 dmsg_msg_t *msg; 95 96 while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) { 97 assert(0); /* shouldn't happen */ 98 TAILQ_REMOVE(&ioq->msgq, msg, qentry); 99 dmsg_msg_free(msg); 100 } 101 if ((msg = ioq->msg) != NULL) { 102 ioq->msg = NULL; 103 dmsg_msg_free(msg); 104 } 105 } 106 107 /* 108 * Initialize a low-level communications channel. 109 * 110 * NOTE: The signal_func() is called at least once from the loop and can be 111 * re-armed via dmsg_iocom_restate(). 112 */ 113 void 114 dmsg_iocom_init(dmsg_iocom_t *iocom, int sock_fd, int alt_fd, 115 void (*signal_func)(dmsg_iocom_t *iocom), 116 void (*rcvmsg_func)(dmsg_msg_t *msg), 117 void (*usrmsg_func)(dmsg_msg_t *msg, int unmanaged), 118 void (*altmsg_func)(dmsg_iocom_t *iocom)) 119 { 120 struct stat st; 121 122 bzero(iocom, sizeof(*iocom)); 123 124 asprintf(&iocom->label, "iocom-%p", iocom); 125 iocom->signal_callback = signal_func; 126 iocom->rcvmsg_callback = rcvmsg_func; 127 iocom->altmsg_callback = altmsg_func; 128 iocom->usrmsg_callback = usrmsg_func; 129 130 pthread_mutex_init(&iocom->mtx, NULL); 131 RB_INIT(&iocom->circuit_tree); 132 TAILQ_INIT(&iocom->freeq); 133 TAILQ_INIT(&iocom->freeq_aux); 134 TAILQ_INIT(&iocom->txmsgq); 135 iocom->sock_fd = sock_fd; 136 iocom->alt_fd = alt_fd; 137 iocom->flags = DMSG_IOCOMF_RREQ | DMSG_IOCOMF_CLOSEALT; 138 if (signal_func) 139 iocom->flags |= DMSG_IOCOMF_SWORK; 140 dmsg_ioq_init(iocom, &iocom->ioq_rx); 141 dmsg_ioq_init(iocom, &iocom->ioq_tx); 142 if (pipe(iocom->wakeupfds) < 0) 143 assert(0); 144 fcntl(iocom->wakeupfds[0], F_SETFL, O_NONBLOCK); 145 fcntl(iocom->wakeupfds[1], F_SETFL, O_NONBLOCK); 146 147 dmsg_circuit_init(iocom, &iocom->circuit0); 148 149 /* 150 * Negotiate session crypto synchronously. This will mark the 151 * connection as error'd if it fails. If this is a pipe it's 152 * a linkage that we set up ourselves to the filesystem and there 153 * is no crypto. 154 */ 155 if (fstat(sock_fd, &st) < 0) 156 assert(0); 157 if (S_ISSOCK(st.st_mode)) 158 dmsg_crypto_negotiate(iocom); 159 160 /* 161 * Make sure our fds are set to non-blocking for the iocom core. 162 */ 163 if (sock_fd >= 0) 164 fcntl(sock_fd, F_SETFL, O_NONBLOCK); 165 #if 0 166 /* if line buffered our single fgets() should be fine */ 167 if (alt_fd >= 0) 168 fcntl(alt_fd, F_SETFL, O_NONBLOCK); 169 #endif 170 } 171 172 void 173 dmsg_iocom_label(dmsg_iocom_t *iocom, const char *ctl, ...) 174 { 175 va_list va; 176 char *optr; 177 178 va_start(va, ctl); 179 optr = iocom->label; 180 vasprintf(&iocom->label, ctl, va); 181 va_end(va); 182 if (optr) 183 free(optr); 184 } 185 186 /* 187 * May only be called from a callback from iocom_core. 188 * 189 * Adjust state machine functions, set flags to guarantee that both 190 * the recevmsg_func and the sendmsg_func is called at least once. 191 */ 192 void 193 dmsg_iocom_restate(dmsg_iocom_t *iocom, 194 void (*signal_func)(dmsg_iocom_t *), 195 void (*rcvmsg_func)(dmsg_msg_t *msg)) 196 { 197 pthread_mutex_lock(&iocom->mtx); 198 iocom->signal_callback = signal_func; 199 iocom->rcvmsg_callback = rcvmsg_func; 200 if (signal_func) 201 atomic_set_int(&iocom->flags, DMSG_IOCOMF_SWORK); 202 else 203 atomic_clear_int(&iocom->flags, DMSG_IOCOMF_SWORK); 204 pthread_mutex_unlock(&iocom->mtx); 205 } 206 207 void 208 dmsg_iocom_signal(dmsg_iocom_t *iocom) 209 { 210 pthread_mutex_lock(&iocom->mtx); 211 if (iocom->signal_callback) 212 atomic_set_int(&iocom->flags, DMSG_IOCOMF_SWORK); 213 pthread_mutex_unlock(&iocom->mtx); 214 } 215 216 /* 217 * Cleanup a terminating iocom. 218 * 219 * Caller should not hold iocom->mtx. The iocom has already been disconnected 220 * from all possible references to it. 221 */ 222 void 223 dmsg_iocom_done(dmsg_iocom_t *iocom) 224 { 225 dmsg_msg_t *msg; 226 227 if (iocom->sock_fd >= 0) { 228 close(iocom->sock_fd); 229 iocom->sock_fd = -1; 230 } 231 if (iocom->alt_fd >= 0 && (iocom->flags & DMSG_IOCOMF_CLOSEALT)) { 232 close(iocom->alt_fd); 233 iocom->alt_fd = -1; 234 } 235 dmsg_ioq_done(iocom, &iocom->ioq_rx); 236 dmsg_ioq_done(iocom, &iocom->ioq_tx); 237 while ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL) { 238 TAILQ_REMOVE(&iocom->freeq, msg, qentry); 239 free(msg); 240 } 241 while ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL) { 242 TAILQ_REMOVE(&iocom->freeq_aux, msg, qentry); 243 free(msg->aux_data); 244 msg->aux_data = NULL; 245 free(msg); 246 } 247 if (iocom->wakeupfds[0] >= 0) { 248 close(iocom->wakeupfds[0]); 249 iocom->wakeupfds[0] = -1; 250 } 251 if (iocom->wakeupfds[1] >= 0) { 252 close(iocom->wakeupfds[1]); 253 iocom->wakeupfds[1] = -1; 254 } 255 pthread_mutex_destroy(&iocom->mtx); 256 } 257 258 /* 259 * Basic initialization of a circuit structure. 260 * 261 * The circuit structure is initialized with one ref. 262 */ 263 void 264 dmsg_circuit_init(dmsg_iocom_t *iocom, dmsg_circuit_t *circuit) 265 { 266 circuit->refs = 1; 267 circuit->iocom = iocom; 268 RB_INIT(&circuit->staterd_tree); 269 RB_INIT(&circuit->statewr_tree); 270 } 271 272 /* 273 * Allocate a new one-way message. 274 */ 275 dmsg_msg_t * 276 dmsg_msg_alloc(dmsg_circuit_t *circuit, 277 size_t aux_size, uint32_t cmd, 278 void (*func)(dmsg_msg_t *), void *data) 279 { 280 dmsg_iocom_t *iocom = circuit->iocom; 281 dmsg_state_t *state = NULL; 282 dmsg_msg_t *msg; 283 int hbytes; 284 size_t aligned_size; 285 286 pthread_mutex_lock(&iocom->mtx); 287 #if 0 288 if (aux_size) { 289 aligned_size = DMSG_DOALIGN(aux_size); 290 if ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL) 291 TAILQ_REMOVE(&iocom->freeq_aux, msg, qentry); 292 } else { 293 aligned_size = 0; 294 if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL) 295 TAILQ_REMOVE(&iocom->freeq, msg, qentry); 296 } 297 #endif 298 aligned_size = DMSG_DOALIGN(aux_size); 299 msg = NULL; 300 if ((cmd & (DMSGF_CREATE | DMSGF_REPLY)) == DMSGF_CREATE) { 301 /* 302 * Create state when CREATE is set without REPLY. 303 * Assign a unique msgid, in this case simply using 304 * the pointer value for 'state'. 305 * 306 * NOTE: CREATE in txcmd handled by dmsg_msg_write() 307 * NOTE: DELETE in txcmd handled by dmsg_state_cleanuptx() 308 * 309 * NOTE: state initiated by us and state initiated by 310 * a remote create are placed in different RB trees. 311 * The msgid for SPAN state is used in source/target 312 * for message routing as appropriate. 313 */ 314 state = malloc(sizeof(*state)); 315 bzero(state, sizeof(*state)); 316 state->iocom = iocom; 317 state->circuit = circuit; 318 state->flags = DMSG_STATE_DYNAMIC; 319 state->msgid = (uint64_t)(uintptr_t)state; 320 state->txcmd = cmd & ~(DMSGF_CREATE | DMSGF_DELETE); 321 state->rxcmd = DMSGF_REPLY; 322 state->icmd = state->txcmd & DMSGF_BASECMDMASK; 323 state->func = func; 324 state->any.any = data; 325 RB_INSERT(dmsg_state_tree, &circuit->statewr_tree, state); 326 state->flags |= DMSG_STATE_INSERTED; 327 } 328 /* XXX SMP race for state */ 329 pthread_mutex_unlock(&iocom->mtx); 330 hbytes = (cmd & DMSGF_SIZE) * DMSG_ALIGN; 331 if (msg == NULL) { 332 msg = malloc(offsetof(struct dmsg_msg, any.head) + hbytes + 4); 333 bzero(msg, offsetof(struct dmsg_msg, any.head)); 334 *(int *)((char *)msg + 335 offsetof(struct dmsg_msg, any.head) + hbytes) = 336 0x71B2C3D4; 337 #if 0 338 msg = malloc(sizeof(*msg)); 339 bzero(msg, sizeof(*msg)); 340 #endif 341 } 342 343 /* 344 * [re]allocate the auxillary data buffer. The caller knows that 345 * a size-aligned buffer will be allocated but we do not want to 346 * force the caller to zero any tail piece, so we do that ourself. 347 */ 348 if (msg->aux_size != aux_size) { 349 if (msg->aux_data) { 350 free(msg->aux_data); 351 msg->aux_data = NULL; 352 msg->aux_size = 0; 353 } 354 if (aux_size) { 355 msg->aux_data = malloc(aligned_size); 356 msg->aux_size = aux_size; 357 if (aux_size != aligned_size) { 358 bzero(msg->aux_data + aux_size, 359 aligned_size - aux_size); 360 } 361 } 362 } 363 if (hbytes) 364 bzero(&msg->any.head, hbytes); 365 msg->hdr_size = hbytes; 366 msg->any.head.magic = DMSG_HDR_MAGIC; 367 msg->any.head.cmd = cmd; 368 msg->any.head.aux_descr = 0; 369 msg->any.head.aux_crc = 0; 370 msg->any.head.circuit = 0; 371 msg->circuit = circuit; 372 msg->iocom = iocom; 373 dmsg_circuit_hold(circuit); 374 if (state) { 375 msg->state = state; 376 state->msg = msg; 377 msg->any.head.msgid = state->msgid; 378 } else { 379 msg->any.head.msgid = 0; 380 } 381 return (msg); 382 } 383 384 /* 385 * Free a message so it can be reused afresh. 386 * 387 * NOTE: aux_size can be 0 with a non-NULL aux_data. 388 */ 389 static 390 void 391 dmsg_msg_free_locked(dmsg_msg_t *msg) 392 { 393 /*dmsg_iocom_t *iocom = msg->iocom;*/ 394 #if 1 395 int hbytes = (msg->any.head.cmd & DMSGF_SIZE) * DMSG_ALIGN; 396 if (*(int *)((char *)msg + 397 offsetof(struct dmsg_msg, any.head) + hbytes) != 398 0x71B2C3D4) { 399 fprintf(stderr, "MSGFREE FAILED CMD %08x\n", msg->any.head.cmd); 400 assert(0); 401 } 402 #endif 403 if (msg->circuit) { 404 dmsg_circuit_drop_locked(msg->circuit); 405 msg->circuit = NULL; 406 } 407 msg->state = NULL; 408 if (msg->aux_data) { 409 free(msg->aux_data); 410 msg->aux_data = NULL; 411 } 412 msg->aux_size = 0; 413 free (msg); 414 #if 0 415 if (msg->aux_data) 416 TAILQ_INSERT_TAIL(&iocom->freeq_aux, msg, qentry); 417 else 418 TAILQ_INSERT_TAIL(&iocom->freeq, msg, qentry); 419 #endif 420 } 421 422 void 423 dmsg_msg_free(dmsg_msg_t *msg) 424 { 425 dmsg_iocom_t *iocom = msg->iocom; 426 427 pthread_mutex_lock(&iocom->mtx); 428 dmsg_msg_free_locked(msg); 429 pthread_mutex_unlock(&iocom->mtx); 430 } 431 432 /* 433 * I/O core loop for an iocom. 434 * 435 * Thread localized, iocom->mtx not held. 436 */ 437 void 438 dmsg_iocom_core(dmsg_iocom_t *iocom) 439 { 440 struct pollfd fds[3]; 441 char dummybuf[256]; 442 dmsg_msg_t *msg; 443 int timeout; 444 int count; 445 int wi; /* wakeup pipe */ 446 int si; /* socket */ 447 int ai; /* alt bulk path socket */ 448 449 while ((iocom->flags & DMSG_IOCOMF_EOF) == 0) { 450 /* 451 * These iocom->flags are only manipulated within the 452 * context of the current thread. However, modifications 453 * still require atomic ops. 454 */ 455 if ((iocom->flags & (DMSG_IOCOMF_RWORK | 456 DMSG_IOCOMF_WWORK | 457 DMSG_IOCOMF_PWORK | 458 DMSG_IOCOMF_SWORK | 459 DMSG_IOCOMF_ARWORK | 460 DMSG_IOCOMF_AWWORK)) == 0) { 461 /* 462 * Only poll if no immediate work is pending. 463 * Otherwise we are just wasting our time calling 464 * poll. 465 */ 466 timeout = 5000; 467 468 count = 0; 469 wi = -1; 470 si = -1; 471 ai = -1; 472 473 /* 474 * Always check the inter-thread pipe, e.g. 475 * for iocom->txmsgq work. 476 */ 477 wi = count++; 478 fds[wi].fd = iocom->wakeupfds[0]; 479 fds[wi].events = POLLIN; 480 fds[wi].revents = 0; 481 482 /* 483 * Check the socket input/output direction as 484 * requested 485 */ 486 if (iocom->flags & (DMSG_IOCOMF_RREQ | 487 DMSG_IOCOMF_WREQ)) { 488 si = count++; 489 fds[si].fd = iocom->sock_fd; 490 fds[si].events = 0; 491 fds[si].revents = 0; 492 493 if (iocom->flags & DMSG_IOCOMF_RREQ) 494 fds[si].events |= POLLIN; 495 if (iocom->flags & DMSG_IOCOMF_WREQ) 496 fds[si].events |= POLLOUT; 497 } 498 499 /* 500 * Check the alternative fd for work. 501 */ 502 if (iocom->alt_fd >= 0) { 503 ai = count++; 504 fds[ai].fd = iocom->alt_fd; 505 fds[ai].events = POLLIN; 506 fds[ai].revents = 0; 507 } 508 poll(fds, count, timeout); 509 510 if (wi >= 0 && (fds[wi].revents & POLLIN)) 511 atomic_set_int(&iocom->flags, 512 DMSG_IOCOMF_PWORK); 513 if (si >= 0 && (fds[si].revents & POLLIN)) 514 atomic_set_int(&iocom->flags, 515 DMSG_IOCOMF_RWORK); 516 if (si >= 0 && (fds[si].revents & POLLOUT)) 517 atomic_set_int(&iocom->flags, 518 DMSG_IOCOMF_WWORK); 519 if (wi >= 0 && (fds[wi].revents & POLLOUT)) 520 atomic_set_int(&iocom->flags, 521 DMSG_IOCOMF_WWORK); 522 if (ai >= 0 && (fds[ai].revents & POLLIN)) 523 atomic_set_int(&iocom->flags, 524 DMSG_IOCOMF_ARWORK); 525 } else { 526 /* 527 * Always check the pipe 528 */ 529 atomic_set_int(&iocom->flags, DMSG_IOCOMF_PWORK); 530 } 531 532 if (iocom->flags & DMSG_IOCOMF_SWORK) { 533 atomic_clear_int(&iocom->flags, DMSG_IOCOMF_SWORK); 534 iocom->signal_callback(iocom); 535 } 536 537 /* 538 * Pending message queues from other threads wake us up 539 * with a write to the wakeupfds[] pipe. We have to clear 540 * the pipe with a dummy read. 541 */ 542 if (iocom->flags & DMSG_IOCOMF_PWORK) { 543 atomic_clear_int(&iocom->flags, DMSG_IOCOMF_PWORK); 544 read(iocom->wakeupfds[0], dummybuf, sizeof(dummybuf)); 545 atomic_set_int(&iocom->flags, DMSG_IOCOMF_RWORK); 546 atomic_set_int(&iocom->flags, DMSG_IOCOMF_WWORK); 547 if (TAILQ_FIRST(&iocom->txmsgq)) 548 dmsg_iocom_flush1(iocom); 549 } 550 551 /* 552 * Message write sequencing 553 */ 554 if (iocom->flags & DMSG_IOCOMF_WWORK) 555 dmsg_iocom_flush1(iocom); 556 557 /* 558 * Message read sequencing. Run this after the write 559 * sequencing in case the write sequencing allowed another 560 * auto-DELETE to occur on the read side. 561 */ 562 if (iocom->flags & DMSG_IOCOMF_RWORK) { 563 while ((iocom->flags & DMSG_IOCOMF_EOF) == 0 && 564 (msg = dmsg_ioq_read(iocom)) != NULL) { 565 if (DMsgDebugOpt) { 566 fprintf(stderr, "receive %s\n", 567 dmsg_msg_str(msg)); 568 } 569 iocom->rcvmsg_callback(msg); 570 dmsg_state_cleanuprx(iocom, msg); 571 } 572 } 573 574 if (iocom->flags & DMSG_IOCOMF_ARWORK) { 575 atomic_clear_int(&iocom->flags, DMSG_IOCOMF_ARWORK); 576 iocom->altmsg_callback(iocom); 577 } 578 } 579 } 580 581 /* 582 * Make sure there's enough room in the FIFO to hold the 583 * needed data. 584 * 585 * Assume worst case encrypted form is 2x the size of the 586 * plaintext equivalent. 587 */ 588 static 589 size_t 590 dmsg_ioq_makeroom(dmsg_ioq_t *ioq, size_t needed) 591 { 592 size_t bytes; 593 size_t nmax; 594 595 bytes = ioq->fifo_cdx - ioq->fifo_beg; 596 nmax = sizeof(ioq->buf) - ioq->fifo_end; 597 if (bytes + nmax / 2 < needed) { 598 if (bytes) { 599 bcopy(ioq->buf + ioq->fifo_beg, 600 ioq->buf, 601 bytes); 602 } 603 ioq->fifo_cdx -= ioq->fifo_beg; 604 ioq->fifo_beg = 0; 605 if (ioq->fifo_cdn < ioq->fifo_end) { 606 bcopy(ioq->buf + ioq->fifo_cdn, 607 ioq->buf + ioq->fifo_cdx, 608 ioq->fifo_end - ioq->fifo_cdn); 609 } 610 ioq->fifo_end -= ioq->fifo_cdn - ioq->fifo_cdx; 611 ioq->fifo_cdn = ioq->fifo_cdx; 612 nmax = sizeof(ioq->buf) - ioq->fifo_end; 613 } 614 return(nmax); 615 } 616 617 /* 618 * Read the next ready message from the ioq, issuing I/O if needed. 619 * Caller should retry on a read-event when NULL is returned. 620 * 621 * If an error occurs during reception a DMSG_LNK_ERROR msg will 622 * be returned for each open transaction, then the ioq and iocom 623 * will be errored out and a non-transactional DMSG_LNK_ERROR 624 * msg will be returned as the final message. The caller should not call 625 * us again after the final message is returned. 626 * 627 * Thread localized, iocom->mtx not held. 628 */ 629 dmsg_msg_t * 630 dmsg_ioq_read(dmsg_iocom_t *iocom) 631 { 632 dmsg_ioq_t *ioq = &iocom->ioq_rx; 633 dmsg_msg_t *msg; 634 dmsg_state_t *state; 635 dmsg_circuit_t *circuit0; 636 dmsg_hdr_t *head; 637 ssize_t n; 638 size_t bytes; 639 size_t nmax; 640 uint32_t aux_size; 641 uint32_t xcrc32; 642 int error; 643 644 again: 645 /* 646 * If a message is already pending we can just remove and 647 * return it. Message state has already been processed. 648 * (currently not implemented) 649 */ 650 if ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) { 651 TAILQ_REMOVE(&ioq->msgq, msg, qentry); 652 return (msg); 653 } 654 atomic_clear_int(&iocom->flags, DMSG_IOCOMF_RREQ | DMSG_IOCOMF_RWORK); 655 656 /* 657 * If the stream is errored out we stop processing it. 658 */ 659 if (ioq->error) 660 goto skip; 661 662 /* 663 * Message read in-progress (msg is NULL at the moment). We don't 664 * allocate a msg until we have its core header. 665 */ 666 nmax = sizeof(ioq->buf) - ioq->fifo_end; 667 bytes = ioq->fifo_cdx - ioq->fifo_beg; /* already decrypted */ 668 msg = ioq->msg; 669 670 switch(ioq->state) { 671 case DMSG_MSGQ_STATE_HEADER1: 672 /* 673 * Load the primary header, fail on any non-trivial read 674 * error or on EOF. Since the primary header is the same 675 * size is the message alignment it will never straddle 676 * the end of the buffer. 677 */ 678 nmax = dmsg_ioq_makeroom(ioq, sizeof(msg->any.head)); 679 if (bytes < sizeof(msg->any.head)) { 680 n = read(iocom->sock_fd, 681 ioq->buf + ioq->fifo_end, 682 nmax); 683 if (n <= 0) { 684 if (n == 0) { 685 ioq->error = DMSG_IOQ_ERROR_EOF; 686 break; 687 } 688 if (errno != EINTR && 689 errno != EINPROGRESS && 690 errno != EAGAIN) { 691 ioq->error = DMSG_IOQ_ERROR_SOCK; 692 break; 693 } 694 n = 0; 695 /* fall through */ 696 } 697 ioq->fifo_end += (size_t)n; 698 nmax -= (size_t)n; 699 } 700 701 /* 702 * Decrypt data received so far. Data will be decrypted 703 * in-place but might create gaps in the FIFO. Partial 704 * blocks are not immediately decrypted. 705 * 706 * WARNING! The header might be in the wrong endian, we 707 * do not fix it up until we get the entire 708 * extended header. 709 */ 710 if (iocom->flags & DMSG_IOCOMF_CRYPTED) { 711 dmsg_crypto_decrypt(iocom, ioq); 712 } else { 713 ioq->fifo_cdx = ioq->fifo_end; 714 ioq->fifo_cdn = ioq->fifo_end; 715 } 716 bytes = ioq->fifo_cdx - ioq->fifo_beg; 717 718 /* 719 * Insufficient data accumulated (msg is NULL, caller will 720 * retry on event). 721 */ 722 assert(msg == NULL); 723 if (bytes < sizeof(msg->any.head)) 724 break; 725 726 /* 727 * Check and fixup the core header. Note that the icrc 728 * has to be calculated before any fixups, but the crc 729 * fields in the msg may have to be swapped like everything 730 * else. 731 */ 732 head = (void *)(ioq->buf + ioq->fifo_beg); 733 if (head->magic != DMSG_HDR_MAGIC && 734 head->magic != DMSG_HDR_MAGIC_REV) { 735 fprintf(stderr, "%s: head->magic is bad %02x\n", 736 iocom->label, head->magic); 737 if (iocom->flags & DMSG_IOCOMF_CRYPTED) 738 fprintf(stderr, "(on encrypted link)\n"); 739 ioq->error = DMSG_IOQ_ERROR_SYNC; 740 break; 741 } 742 743 /* 744 * Calculate the full header size and aux data size 745 */ 746 if (head->magic == DMSG_HDR_MAGIC_REV) { 747 ioq->hbytes = (bswap32(head->cmd) & DMSGF_SIZE) * 748 DMSG_ALIGN; 749 aux_size = bswap32(head->aux_bytes); 750 } else { 751 ioq->hbytes = (head->cmd & DMSGF_SIZE) * 752 DMSG_ALIGN; 753 aux_size = head->aux_bytes; 754 } 755 ioq->abytes = DMSG_DOALIGN(aux_size); 756 ioq->unaligned_aux_size = aux_size; 757 if (ioq->hbytes < sizeof(msg->any.head) || 758 ioq->hbytes > sizeof(msg->any) || 759 ioq->abytes > DMSG_AUX_MAX) { 760 ioq->error = DMSG_IOQ_ERROR_FIELD; 761 break; 762 } 763 764 /* 765 * Allocate the message, the next state will fill it in. 766 * Note that the aux_data buffer will be sized to an aligned 767 * value and the aligned remainder zero'd for convenience. 768 */ 769 msg = dmsg_msg_alloc(&iocom->circuit0, aux_size, 770 ioq->hbytes / DMSG_ALIGN, 771 NULL, NULL); 772 ioq->msg = msg; 773 774 /* 775 * Fall through to the next state. Make sure that the 776 * extended header does not straddle the end of the buffer. 777 * We still want to issue larger reads into our buffer, 778 * book-keeping is easier if we don't bcopy() yet. 779 * 780 * Make sure there is enough room for bloated encrypt data. 781 */ 782 nmax = dmsg_ioq_makeroom(ioq, ioq->hbytes); 783 ioq->state = DMSG_MSGQ_STATE_HEADER2; 784 /* fall through */ 785 case DMSG_MSGQ_STATE_HEADER2: 786 /* 787 * Fill out the extended header. 788 */ 789 assert(msg != NULL); 790 if (bytes < ioq->hbytes) { 791 n = read(iocom->sock_fd, 792 ioq->buf + ioq->fifo_end, 793 nmax); 794 if (n <= 0) { 795 if (n == 0) { 796 ioq->error = DMSG_IOQ_ERROR_EOF; 797 break; 798 } 799 if (errno != EINTR && 800 errno != EINPROGRESS && 801 errno != EAGAIN) { 802 ioq->error = DMSG_IOQ_ERROR_SOCK; 803 break; 804 } 805 n = 0; 806 /* fall through */ 807 } 808 ioq->fifo_end += (size_t)n; 809 nmax -= (size_t)n; 810 } 811 812 if (iocom->flags & DMSG_IOCOMF_CRYPTED) { 813 dmsg_crypto_decrypt(iocom, ioq); 814 } else { 815 ioq->fifo_cdx = ioq->fifo_end; 816 ioq->fifo_cdn = ioq->fifo_end; 817 } 818 bytes = ioq->fifo_cdx - ioq->fifo_beg; 819 820 /* 821 * Insufficient data accumulated (set msg NULL so caller will 822 * retry on event). 823 */ 824 if (bytes < ioq->hbytes) { 825 msg = NULL; 826 break; 827 } 828 829 /* 830 * Calculate the extended header, decrypt data received 831 * so far. Handle endian-conversion for the entire extended 832 * header. 833 */ 834 head = (void *)(ioq->buf + ioq->fifo_beg); 835 836 /* 837 * Check the CRC. 838 */ 839 if (head->magic == DMSG_HDR_MAGIC_REV) 840 xcrc32 = bswap32(head->hdr_crc); 841 else 842 xcrc32 = head->hdr_crc; 843 head->hdr_crc = 0; 844 if (dmsg_icrc32(head, ioq->hbytes) != xcrc32) { 845 ioq->error = DMSG_IOQ_ERROR_XCRC; 846 fprintf(stderr, "BAD-XCRC(%08x,%08x) %s\n", 847 xcrc32, dmsg_icrc32(head, ioq->hbytes), 848 dmsg_msg_str(msg)); 849 assert(0); 850 break; 851 } 852 head->hdr_crc = xcrc32; 853 854 if (head->magic == DMSG_HDR_MAGIC_REV) { 855 dmsg_bswap_head(head); 856 } 857 858 /* 859 * Copy the extended header into the msg and adjust the 860 * FIFO. 861 */ 862 bcopy(head, &msg->any, ioq->hbytes); 863 864 /* 865 * We are either done or we fall-through. 866 */ 867 if (ioq->abytes == 0) { 868 ioq->fifo_beg += ioq->hbytes; 869 break; 870 } 871 872 /* 873 * Must adjust bytes (and the state) when falling through. 874 * nmax doesn't change. 875 */ 876 ioq->fifo_beg += ioq->hbytes; 877 bytes -= ioq->hbytes; 878 ioq->state = DMSG_MSGQ_STATE_AUXDATA1; 879 /* fall through */ 880 case DMSG_MSGQ_STATE_AUXDATA1: 881 /* 882 * Copy the partial or complete [decrypted] payload from 883 * remaining bytes in the FIFO in order to optimize the 884 * makeroom call in the AUXDATA2 state. We have to 885 * fall-through either way so we can check the crc. 886 * 887 * msg->aux_size tracks our aux data. 888 * 889 * (Lets not complicate matters if the data is encrypted, 890 * since the data in-stream is not the same size as the 891 * data decrypted). 892 */ 893 if (bytes >= ioq->abytes) { 894 bcopy(ioq->buf + ioq->fifo_beg, msg->aux_data, 895 ioq->abytes); 896 msg->aux_size = ioq->abytes; 897 ioq->fifo_beg += ioq->abytes; 898 assert(ioq->fifo_beg <= ioq->fifo_cdx); 899 assert(ioq->fifo_cdx <= ioq->fifo_cdn); 900 bytes -= ioq->abytes; 901 } else if (bytes) { 902 bcopy(ioq->buf + ioq->fifo_beg, msg->aux_data, 903 bytes); 904 msg->aux_size = bytes; 905 ioq->fifo_beg += bytes; 906 if (ioq->fifo_cdx < ioq->fifo_beg) 907 ioq->fifo_cdx = ioq->fifo_beg; 908 assert(ioq->fifo_beg <= ioq->fifo_cdx); 909 assert(ioq->fifo_cdx <= ioq->fifo_cdn); 910 bytes = 0; 911 } else { 912 msg->aux_size = 0; 913 } 914 ioq->state = DMSG_MSGQ_STATE_AUXDATA2; 915 /* fall through */ 916 case DMSG_MSGQ_STATE_AUXDATA2: 917 /* 918 * Make sure there is enough room for more data. 919 */ 920 assert(msg); 921 nmax = dmsg_ioq_makeroom(ioq, ioq->abytes - msg->aux_size); 922 923 /* 924 * Read and decrypt more of the payload. 925 */ 926 if (msg->aux_size < ioq->abytes) { 927 assert(bytes == 0); 928 n = read(iocom->sock_fd, 929 ioq->buf + ioq->fifo_end, 930 nmax); 931 if (n <= 0) { 932 if (n == 0) { 933 ioq->error = DMSG_IOQ_ERROR_EOF; 934 break; 935 } 936 if (errno != EINTR && 937 errno != EINPROGRESS && 938 errno != EAGAIN) { 939 ioq->error = DMSG_IOQ_ERROR_SOCK; 940 break; 941 } 942 n = 0; 943 /* fall through */ 944 } 945 ioq->fifo_end += (size_t)n; 946 nmax -= (size_t)n; 947 } 948 949 if (iocom->flags & DMSG_IOCOMF_CRYPTED) { 950 dmsg_crypto_decrypt(iocom, ioq); 951 } else { 952 ioq->fifo_cdx = ioq->fifo_end; 953 ioq->fifo_cdn = ioq->fifo_end; 954 } 955 bytes = ioq->fifo_cdx - ioq->fifo_beg; 956 957 if (bytes > ioq->abytes - msg->aux_size) 958 bytes = ioq->abytes - msg->aux_size; 959 960 if (bytes) { 961 bcopy(ioq->buf + ioq->fifo_beg, 962 msg->aux_data + msg->aux_size, 963 bytes); 964 msg->aux_size += bytes; 965 ioq->fifo_beg += bytes; 966 } 967 968 /* 969 * Insufficient data accumulated (set msg NULL so caller will 970 * retry on event). 971 * 972 * Assert the auxillary data size is correct, then record the 973 * original unaligned size from the message header. 974 */ 975 if (msg->aux_size < ioq->abytes) { 976 msg = NULL; 977 break; 978 } 979 assert(msg->aux_size == ioq->abytes); 980 msg->aux_size = ioq->unaligned_aux_size; 981 982 /* 983 * Check aux_crc, then we are done. Note that the crc 984 * is calculated over the aligned size, not the actual 985 * size. 986 */ 987 xcrc32 = dmsg_icrc32(msg->aux_data, ioq->abytes); 988 if (xcrc32 != msg->any.head.aux_crc) { 989 ioq->error = DMSG_IOQ_ERROR_ACRC; 990 fprintf(stderr, "iocom: ACRC error %08x vs %08x msgid %016jx msgcmd %08x auxsize %d\n", 991 xcrc32, msg->any.head.aux_crc, (intmax_t)msg->any.head.msgid, msg->any.head.cmd, msg->any.head.aux_bytes); 992 break; 993 } 994 break; 995 case DMSG_MSGQ_STATE_ERROR: 996 /* 997 * Continued calls to drain recorded transactions (returning 998 * a LNK_ERROR for each one), before we return the final 999 * LNK_ERROR. 1000 */ 1001 assert(msg == NULL); 1002 break; 1003 default: 1004 /* 1005 * We don't double-return errors, the caller should not 1006 * have called us again after getting an error msg. 1007 */ 1008 assert(0); 1009 break; 1010 } 1011 1012 /* 1013 * Check the message sequence. The iv[] should prevent any 1014 * possibility of a replay but we add this check anyway. 1015 */ 1016 if (msg && ioq->error == 0) { 1017 if ((msg->any.head.salt & 255) != (ioq->seq & 255)) { 1018 ioq->error = DMSG_IOQ_ERROR_MSGSEQ; 1019 } else { 1020 ++ioq->seq; 1021 } 1022 } 1023 1024 /* 1025 * Handle error, RREQ, or completion 1026 * 1027 * NOTE: nmax and bytes are invalid at this point, we don't bother 1028 * to update them when breaking out. 1029 */ 1030 if (ioq->error) { 1031 skip: 1032 fprintf(stderr, "IOQ ERROR %d\n", ioq->error); 1033 /* 1034 * An unrecoverable error causes all active receive 1035 * transactions to be terminated with a LNK_ERROR message. 1036 * 1037 * Once all active transactions are exhausted we set the 1038 * iocom ERROR flag and return a non-transactional LNK_ERROR 1039 * message, which should cause master processing loops to 1040 * terminate. 1041 */ 1042 assert(ioq->msg == msg); 1043 if (msg) { 1044 dmsg_msg_free(msg); 1045 ioq->msg = NULL; 1046 } 1047 1048 /* 1049 * No more I/O read processing 1050 */ 1051 ioq->state = DMSG_MSGQ_STATE_ERROR; 1052 1053 /* 1054 * Simulate a remote LNK_ERROR DELETE msg for any open 1055 * transactions, ending with a final non-transactional 1056 * LNK_ERROR (that the session can detect) when no 1057 * transactions remain. 1058 * 1059 * We only need to scan transactions on circuit0 as these 1060 * will contain all circuit forges, and terminating circuit 1061 * forges will automatically terminate the transactions on 1062 * any other circuits as well as those circuits. 1063 */ 1064 circuit0 = &iocom->circuit0; 1065 msg = dmsg_msg_alloc(circuit0, 0, DMSG_LNK_ERROR, NULL, NULL); 1066 msg->any.head.error = ioq->error; 1067 1068 pthread_mutex_lock(&iocom->mtx); 1069 dmsg_iocom_drain(iocom); 1070 1071 if ((state = RB_ROOT(&circuit0->staterd_tree)) != NULL) { 1072 /* 1073 * Active remote transactions are still present. 1074 * Simulate the other end sending us a DELETE. 1075 */ 1076 if (state->rxcmd & DMSGF_DELETE) { 1077 dmsg_msg_free(msg); 1078 fprintf(stderr, 1079 "iocom: ioq error(rd) %d sleeping " 1080 "state %p rxcmd %08x txcmd %08x " 1081 "func %p\n", 1082 ioq->error, state, state->rxcmd, 1083 state->txcmd, state->func); 1084 usleep(100000); /* XXX */ 1085 atomic_set_int(&iocom->flags, 1086 DMSG_IOCOMF_RWORK); 1087 msg = NULL; 1088 } else { 1089 /*state->txcmd |= DMSGF_DELETE;*/ 1090 msg->state = state; 1091 msg->iocom = iocom; 1092 msg->any.head.msgid = state->msgid; 1093 msg->any.head.cmd |= DMSGF_ABORT | 1094 DMSGF_DELETE; 1095 } 1096 } else if ((state = RB_ROOT(&circuit0->statewr_tree)) != NULL) { 1097 /* 1098 * Active local transactions are still present. 1099 * Simulate the other end sending us a DELETE. 1100 */ 1101 if (state->rxcmd & DMSGF_DELETE) { 1102 dmsg_msg_free(msg); 1103 fprintf(stderr, 1104 "iocom: ioq error(wr) %d sleeping " 1105 "state %p rxcmd %08x txcmd %08x " 1106 "func %p\n", 1107 ioq->error, state, state->rxcmd, 1108 state->txcmd, state->func); 1109 usleep(100000); /* XXX */ 1110 atomic_set_int(&iocom->flags, 1111 DMSG_IOCOMF_RWORK); 1112 msg = NULL; 1113 } else { 1114 msg->state = state; 1115 msg->iocom = iocom; 1116 msg->any.head.msgid = state->msgid; 1117 msg->any.head.cmd |= DMSGF_ABORT | 1118 DMSGF_DELETE | 1119 DMSGF_REPLY; 1120 if ((state->rxcmd & DMSGF_CREATE) == 0) { 1121 msg->any.head.cmd |= 1122 DMSGF_CREATE; 1123 } 1124 } 1125 } else { 1126 /* 1127 * No active local or remote transactions remain. 1128 * Generate a final LNK_ERROR and flag EOF. 1129 */ 1130 msg->state = NULL; 1131 atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF); 1132 fprintf(stderr, "EOF ON SOCKET %d\n", iocom->sock_fd); 1133 } 1134 pthread_mutex_unlock(&iocom->mtx); 1135 1136 /* 1137 * For the iocom error case we want to set RWORK to indicate 1138 * that more messages might be pending. 1139 * 1140 * It is possible to return NULL when there is more work to 1141 * do because each message has to be DELETEd in both 1142 * directions before we continue on with the next (though 1143 * this could be optimized). The transmit direction will 1144 * re-set RWORK. 1145 */ 1146 if (msg) 1147 atomic_set_int(&iocom->flags, DMSG_IOCOMF_RWORK); 1148 } else if (msg == NULL) { 1149 /* 1150 * Insufficient data received to finish building the message, 1151 * set RREQ and return NULL. 1152 * 1153 * Leave ioq->msg intact. 1154 * Leave the FIFO intact. 1155 */ 1156 atomic_set_int(&iocom->flags, DMSG_IOCOMF_RREQ); 1157 } else { 1158 /* 1159 * Continue processing msg. 1160 * 1161 * The fifo has already been advanced past the message. 1162 * Trivially reset the FIFO indices if possible. 1163 * 1164 * clear the FIFO if it is now empty and set RREQ to wait 1165 * for more from the socket. If the FIFO is not empty set 1166 * TWORK to bypass the poll so we loop immediately. 1167 */ 1168 if (ioq->fifo_beg == ioq->fifo_cdx && 1169 ioq->fifo_cdn == ioq->fifo_end) { 1170 atomic_set_int(&iocom->flags, DMSG_IOCOMF_RREQ); 1171 ioq->fifo_cdx = 0; 1172 ioq->fifo_cdn = 0; 1173 ioq->fifo_beg = 0; 1174 ioq->fifo_end = 0; 1175 } else { 1176 atomic_set_int(&iocom->flags, DMSG_IOCOMF_RWORK); 1177 } 1178 ioq->state = DMSG_MSGQ_STATE_HEADER1; 1179 ioq->msg = NULL; 1180 1181 /* 1182 * Handle message routing. Validates non-zero sources 1183 * and routes message. Error will be 0 if the message is 1184 * destined for us. 1185 * 1186 * State processing only occurs for messages destined for us. 1187 */ 1188 if (DMsgDebugOpt >= 5) { 1189 fprintf(stderr, 1190 "rxmsg cmd=%08x msgid=%016jx circ=%016jx\n", 1191 msg->any.head.cmd, 1192 (intmax_t)msg->any.head.msgid, 1193 (intmax_t)msg->any.head.circuit); 1194 } 1195 if (msg->any.head.circuit) 1196 error = dmsg_circuit_route(msg); 1197 else 1198 error = dmsg_state_msgrx(msg); 1199 1200 if (error) { 1201 /* 1202 * Abort-after-closure, throw message away and 1203 * start reading another. 1204 */ 1205 if (error == DMSG_IOQ_ERROR_EALREADY) { 1206 dmsg_msg_free(msg); 1207 goto again; 1208 } 1209 1210 /* 1211 * msg routed, msg pointer no longer owned by us. 1212 * Go to the top and start reading another. 1213 */ 1214 if (error == DMSG_IOQ_ERROR_ROUTED) 1215 goto again; 1216 1217 /* 1218 * Process real error and throw away message. 1219 */ 1220 ioq->error = error; 1221 goto skip; 1222 } 1223 /* no error, not routed. Fall through and return msg */ 1224 } 1225 return (msg); 1226 } 1227 1228 /* 1229 * Calculate the header and data crc's and write a low-level message to 1230 * the connection. If aux_crc is non-zero the aux_data crc is already 1231 * assumed to have been set. 1232 * 1233 * A non-NULL msg is added to the queue but not necessarily flushed. 1234 * Calling this function with msg == NULL will get a flush going. 1235 * 1236 * (called from iocom_core only) 1237 */ 1238 void 1239 dmsg_iocom_flush1(dmsg_iocom_t *iocom) 1240 { 1241 dmsg_ioq_t *ioq = &iocom->ioq_tx; 1242 dmsg_msg_t *msg; 1243 uint32_t xcrc32; 1244 size_t hbytes; 1245 size_t abytes; 1246 dmsg_msg_queue_t tmpq; 1247 1248 atomic_clear_int(&iocom->flags, DMSG_IOCOMF_WREQ | DMSG_IOCOMF_WWORK); 1249 TAILQ_INIT(&tmpq); 1250 pthread_mutex_lock(&iocom->mtx); 1251 while ((msg = TAILQ_FIRST(&iocom->txmsgq)) != NULL) { 1252 TAILQ_REMOVE(&iocom->txmsgq, msg, qentry); 1253 TAILQ_INSERT_TAIL(&tmpq, msg, qentry); 1254 } 1255 pthread_mutex_unlock(&iocom->mtx); 1256 1257 while ((msg = TAILQ_FIRST(&tmpq)) != NULL) { 1258 /* 1259 * Process terminal connection errors. 1260 */ 1261 TAILQ_REMOVE(&tmpq, msg, qentry); 1262 if (ioq->error) { 1263 TAILQ_INSERT_TAIL(&ioq->msgq, msg, qentry); 1264 ++ioq->msgcount; 1265 continue; 1266 } 1267 1268 /* 1269 * Finish populating the msg fields. The salt ensures that 1270 * the iv[] array is ridiculously randomized and we also 1271 * re-seed our PRNG every 32768 messages just to be sure. 1272 */ 1273 msg->any.head.magic = DMSG_HDR_MAGIC; 1274 msg->any.head.salt = (random() << 8) | (ioq->seq & 255); 1275 ++ioq->seq; 1276 if ((ioq->seq & 32767) == 0) 1277 srandomdev(); 1278 1279 /* 1280 * Calculate aux_crc if 0, then calculate hdr_crc. 1281 */ 1282 if (msg->aux_size && msg->any.head.aux_crc == 0) { 1283 abytes = DMSG_DOALIGN(msg->aux_size); 1284 xcrc32 = dmsg_icrc32(msg->aux_data, abytes); 1285 msg->any.head.aux_crc = xcrc32; 1286 } 1287 msg->any.head.aux_bytes = msg->aux_size; 1288 1289 hbytes = (msg->any.head.cmd & DMSGF_SIZE) * 1290 DMSG_ALIGN; 1291 msg->any.head.hdr_crc = 0; 1292 msg->any.head.hdr_crc = dmsg_icrc32(&msg->any.head, hbytes); 1293 1294 /* 1295 * Enqueue the message (the flush codes handles stream 1296 * encryption). 1297 */ 1298 TAILQ_INSERT_TAIL(&ioq->msgq, msg, qentry); 1299 ++ioq->msgcount; 1300 } 1301 dmsg_iocom_flush2(iocom); 1302 } 1303 1304 /* 1305 * Thread localized, iocom->mtx not held by caller. 1306 * 1307 * (called from iocom_core via iocom_flush1 only) 1308 */ 1309 void 1310 dmsg_iocom_flush2(dmsg_iocom_t *iocom) 1311 { 1312 dmsg_ioq_t *ioq = &iocom->ioq_tx; 1313 dmsg_msg_t *msg; 1314 ssize_t n; 1315 struct iovec iov[DMSG_IOQ_MAXIOVEC]; 1316 size_t nact; 1317 size_t hbytes; 1318 size_t abytes; 1319 size_t hoff; 1320 size_t aoff; 1321 int iovcnt; 1322 1323 if (ioq->error) { 1324 dmsg_iocom_drain(iocom); 1325 return; 1326 } 1327 1328 /* 1329 * Pump messages out the connection by building an iovec. 1330 * 1331 * ioq->hbytes/ioq->abytes tracks how much of the first message 1332 * in the queue has been successfully written out, so we can 1333 * resume writing. 1334 */ 1335 iovcnt = 0; 1336 nact = 0; 1337 hoff = ioq->hbytes; 1338 aoff = ioq->abytes; 1339 1340 TAILQ_FOREACH(msg, &ioq->msgq, qentry) { 1341 hbytes = (msg->any.head.cmd & DMSGF_SIZE) * 1342 DMSG_ALIGN; 1343 abytes = DMSG_DOALIGN(msg->aux_size); 1344 assert(hoff <= hbytes && aoff <= abytes); 1345 1346 if (hoff < hbytes) { 1347 iov[iovcnt].iov_base = (char *)&msg->any.head + hoff; 1348 iov[iovcnt].iov_len = hbytes - hoff; 1349 nact += hbytes - hoff; 1350 ++iovcnt; 1351 if (iovcnt == DMSG_IOQ_MAXIOVEC) 1352 break; 1353 } 1354 if (aoff < abytes) { 1355 assert(msg->aux_data != NULL); 1356 iov[iovcnt].iov_base = (char *)msg->aux_data + aoff; 1357 iov[iovcnt].iov_len = abytes - aoff; 1358 nact += abytes - aoff; 1359 ++iovcnt; 1360 if (iovcnt == DMSG_IOQ_MAXIOVEC) 1361 break; 1362 } 1363 hoff = 0; 1364 aoff = 0; 1365 } 1366 if (iovcnt == 0) 1367 return; 1368 1369 /* 1370 * Encrypt and write the data. The crypto code will move the 1371 * data into the fifo and adjust the iov as necessary. If 1372 * encryption is disabled the iov is left alone. 1373 * 1374 * May return a smaller iov (thus a smaller n), with aggregated 1375 * chunks. May reduce nmax to what fits in the FIFO. 1376 * 1377 * This function sets nact to the number of original bytes now 1378 * encrypted, adding to the FIFO some number of bytes that might 1379 * be greater depending on the crypto mechanic. iov[] is adjusted 1380 * to point at the FIFO if necessary. 1381 * 1382 * NOTE: The return value from the writev() is the post-encrypted 1383 * byte count, not the plaintext count. 1384 */ 1385 if (iocom->flags & DMSG_IOCOMF_CRYPTED) { 1386 /* 1387 * Make sure the FIFO has a reasonable amount of space 1388 * left (if not completely full). 1389 * 1390 * In this situation we are staging the encrypted message 1391 * data in the FIFO. (nact) represents how much plaintext 1392 * has been staged, (n) represents how much encrypted data 1393 * has been flushed. The two are independent of each other. 1394 */ 1395 if (ioq->fifo_beg > sizeof(ioq->buf) / 2 && 1396 sizeof(ioq->buf) - ioq->fifo_end < DMSG_ALIGN * 2) { 1397 bcopy(ioq->buf + ioq->fifo_beg, ioq->buf, 1398 ioq->fifo_end - ioq->fifo_beg); 1399 ioq->fifo_cdx -= ioq->fifo_beg; 1400 ioq->fifo_cdn -= ioq->fifo_beg; 1401 ioq->fifo_end -= ioq->fifo_beg; 1402 ioq->fifo_beg = 0; 1403 } 1404 1405 iovcnt = dmsg_crypto_encrypt(iocom, ioq, iov, iovcnt, &nact); 1406 n = writev(iocom->sock_fd, iov, iovcnt); 1407 if (n > 0) { 1408 ioq->fifo_beg += n; 1409 ioq->fifo_cdn += n; 1410 ioq->fifo_cdx += n; 1411 if (ioq->fifo_beg == ioq->fifo_end) { 1412 ioq->fifo_beg = 0; 1413 ioq->fifo_cdn = 0; 1414 ioq->fifo_cdx = 0; 1415 ioq->fifo_end = 0; 1416 } 1417 } 1418 /* 1419 * We don't mess with the nact returned by the crypto_encrypt 1420 * call, which represents the filling of the FIFO. (n) tells 1421 * us how much we were able to write from the FIFO. The two 1422 * are different beasts when encrypting. 1423 */ 1424 } else { 1425 /* 1426 * In this situation we are not staging the messages to the 1427 * FIFO but instead writing them directly from the msg 1428 * structure(s), so (nact) is basically (n). 1429 */ 1430 n = writev(iocom->sock_fd, iov, iovcnt); 1431 if (n > 0) 1432 nact = n; 1433 else 1434 nact = 0; 1435 } 1436 1437 /* 1438 * Clean out the transmit queue based on what we successfully 1439 * sent (nact is the plaintext count). ioq->hbytes/abytes 1440 * represents the portion of the first message previously sent. 1441 */ 1442 while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) { 1443 hbytes = (msg->any.head.cmd & DMSGF_SIZE) * 1444 DMSG_ALIGN; 1445 abytes = DMSG_DOALIGN(msg->aux_size); 1446 1447 if ((size_t)nact < hbytes - ioq->hbytes) { 1448 ioq->hbytes += nact; 1449 nact = 0; 1450 break; 1451 } 1452 nact -= hbytes - ioq->hbytes; 1453 ioq->hbytes = hbytes; 1454 if ((size_t)nact < abytes - ioq->abytes) { 1455 ioq->abytes += nact; 1456 nact = 0; 1457 break; 1458 } 1459 nact -= abytes - ioq->abytes; 1460 /* ioq->abytes = abytes; optimized out */ 1461 1462 if (DMsgDebugOpt >= 5) { 1463 fprintf(stderr, 1464 "txmsg cmd=%08x msgid=%016jx circ=%016jx\n", 1465 msg->any.head.cmd, 1466 (intmax_t)msg->any.head.msgid, 1467 (intmax_t)msg->any.head.circuit); 1468 } 1469 1470 TAILQ_REMOVE(&ioq->msgq, msg, qentry); 1471 --ioq->msgcount; 1472 ioq->hbytes = 0; 1473 ioq->abytes = 0; 1474 1475 dmsg_state_cleanuptx(msg); 1476 } 1477 assert(nact == 0); 1478 1479 /* 1480 * Process the return value from the write w/regards to blocking. 1481 */ 1482 if (n < 0) { 1483 if (errno != EINTR && 1484 errno != EINPROGRESS && 1485 errno != EAGAIN) { 1486 /* 1487 * Fatal write error 1488 */ 1489 ioq->error = DMSG_IOQ_ERROR_SOCK; 1490 dmsg_iocom_drain(iocom); 1491 } else { 1492 /* 1493 * Wait for socket buffer space 1494 */ 1495 atomic_set_int(&iocom->flags, DMSG_IOCOMF_WREQ); 1496 } 1497 } else { 1498 atomic_set_int(&iocom->flags, DMSG_IOCOMF_WREQ); 1499 } 1500 if (ioq->error) { 1501 dmsg_iocom_drain(iocom); 1502 } 1503 } 1504 1505 /* 1506 * Kill pending msgs on ioq_tx and adjust the flags such that no more 1507 * write events will occur. We don't kill read msgs because we want 1508 * the caller to pull off our contrived terminal error msg to detect 1509 * the connection failure. 1510 * 1511 * Localized to iocom_core thread, iocom->mtx not held by caller. 1512 */ 1513 void 1514 dmsg_iocom_drain(dmsg_iocom_t *iocom) 1515 { 1516 dmsg_ioq_t *ioq = &iocom->ioq_tx; 1517 dmsg_msg_t *msg; 1518 1519 atomic_clear_int(&iocom->flags, DMSG_IOCOMF_WREQ | DMSG_IOCOMF_WWORK); 1520 ioq->hbytes = 0; 1521 ioq->abytes = 0; 1522 1523 while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) { 1524 TAILQ_REMOVE(&ioq->msgq, msg, qentry); 1525 --ioq->msgcount; 1526 dmsg_state_cleanuptx(msg); 1527 } 1528 } 1529 1530 /* 1531 * Write a message to an iocom, with additional state processing. 1532 */ 1533 void 1534 dmsg_msg_write(dmsg_msg_t *msg) 1535 { 1536 dmsg_iocom_t *iocom = msg->iocom; 1537 dmsg_state_t *state; 1538 char dummy; 1539 1540 /* 1541 * Handle state processing, create state if necessary. 1542 */ 1543 pthread_mutex_lock(&iocom->mtx); 1544 if ((state = msg->state) != NULL) { 1545 /* 1546 * Existing transaction (could be reply). It is also 1547 * possible for this to be the first reply (CREATE is set), 1548 * in which case we populate state->txcmd. 1549 * 1550 * state->txcmd is adjusted to hold the final message cmd, 1551 * and we also be sure to set the CREATE bit here. We did 1552 * not set it in dmsg_msg_alloc() because that would have 1553 * not been serialized (state could have gotten ripped out 1554 * from under the message prior to it being transmitted). 1555 */ 1556 if ((msg->any.head.cmd & (DMSGF_CREATE | DMSGF_REPLY)) == 1557 DMSGF_CREATE) { 1558 state->txcmd = msg->any.head.cmd & ~DMSGF_DELETE; 1559 state->icmd = state->txcmd & DMSGF_BASECMDMASK; 1560 } 1561 msg->any.head.msgid = state->msgid; 1562 assert(((state->txcmd ^ msg->any.head.cmd) & DMSGF_REPLY) == 0); 1563 if (msg->any.head.cmd & DMSGF_CREATE) { 1564 state->txcmd = msg->any.head.cmd & ~DMSGF_DELETE; 1565 } 1566 } 1567 1568 /* 1569 * Queue it for output, wake up the I/O pthread. Note that the 1570 * I/O thread is responsible for generating the CRCs and encryption. 1571 */ 1572 TAILQ_INSERT_TAIL(&iocom->txmsgq, msg, qentry); 1573 dummy = 0; 1574 write(iocom->wakeupfds[1], &dummy, 1); /* XXX optimize me */ 1575 pthread_mutex_unlock(&iocom->mtx); 1576 } 1577 1578 /* 1579 * This is a shortcut to formulate a reply to msg with a simple error code, 1580 * It can reply to and terminate a transaction, or it can reply to a one-way 1581 * messages. A DMSG_LNK_ERROR command code is utilized to encode 1582 * the error code (which can be 0). Not all transactions are terminated 1583 * with DMSG_LNK_ERROR status (the low level only cares about the 1584 * MSGF_DELETE flag), but most are. 1585 * 1586 * Replies to one-way messages are a bit of an oxymoron but the feature 1587 * is used by the debug (DBG) protocol. 1588 * 1589 * The reply contains no extended data. 1590 */ 1591 void 1592 dmsg_msg_reply(dmsg_msg_t *msg, uint32_t error) 1593 { 1594 dmsg_state_t *state = msg->state; 1595 dmsg_msg_t *nmsg; 1596 uint32_t cmd; 1597 1598 1599 /* 1600 * Reply with a simple error code and terminate the transaction. 1601 */ 1602 cmd = DMSG_LNK_ERROR; 1603 1604 /* 1605 * Check if our direction has even been initiated yet, set CREATE. 1606 * 1607 * Check what direction this is (command or reply direction). Note 1608 * that txcmd might not have been initiated yet. 1609 * 1610 * If our direction has already been closed we just return without 1611 * doing anything. 1612 */ 1613 if (state) { 1614 if (state->txcmd & DMSGF_DELETE) 1615 return; 1616 if (state->txcmd & DMSGF_REPLY) 1617 cmd |= DMSGF_REPLY; 1618 cmd |= DMSGF_DELETE; 1619 } else { 1620 if ((msg->any.head.cmd & DMSGF_REPLY) == 0) 1621 cmd |= DMSGF_REPLY; 1622 } 1623 1624 /* 1625 * Allocate the message and associate it with the existing state. 1626 * We cannot pass DMSGF_CREATE to msg_alloc() because that may 1627 * allocate new state. We have our state already. 1628 */ 1629 nmsg = dmsg_msg_alloc(msg->circuit, 0, cmd, NULL, NULL); 1630 if (state) { 1631 if ((state->txcmd & DMSGF_CREATE) == 0) 1632 nmsg->any.head.cmd |= DMSGF_CREATE; 1633 } 1634 nmsg->any.head.error = error; 1635 nmsg->any.head.msgid = msg->any.head.msgid; 1636 nmsg->any.head.circuit = msg->any.head.circuit; 1637 nmsg->state = state; 1638 dmsg_msg_write(nmsg); 1639 } 1640 1641 /* 1642 * Similar to dmsg_msg_reply() but leave the transaction open. That is, 1643 * we are generating a streaming reply or an intermediate acknowledgement 1644 * of some sort as part of the higher level protocol, with more to come 1645 * later. 1646 */ 1647 void 1648 dmsg_msg_result(dmsg_msg_t *msg, uint32_t error) 1649 { 1650 dmsg_state_t *state = msg->state; 1651 dmsg_msg_t *nmsg; 1652 uint32_t cmd; 1653 1654 1655 /* 1656 * Reply with a simple error code and terminate the transaction. 1657 */ 1658 cmd = DMSG_LNK_ERROR; 1659 1660 /* 1661 * Check if our direction has even been initiated yet, set CREATE. 1662 * 1663 * Check what direction this is (command or reply direction). Note 1664 * that txcmd might not have been initiated yet. 1665 * 1666 * If our direction has already been closed we just return without 1667 * doing anything. 1668 */ 1669 if (state) { 1670 if (state->txcmd & DMSGF_DELETE) 1671 return; 1672 if (state->txcmd & DMSGF_REPLY) 1673 cmd |= DMSGF_REPLY; 1674 /* continuing transaction, do not set MSGF_DELETE */ 1675 } else { 1676 if ((msg->any.head.cmd & DMSGF_REPLY) == 0) 1677 cmd |= DMSGF_REPLY; 1678 } 1679 1680 nmsg = dmsg_msg_alloc(msg->circuit, 0, cmd, NULL, NULL); 1681 if (state) { 1682 if ((state->txcmd & DMSGF_CREATE) == 0) 1683 nmsg->any.head.cmd |= DMSGF_CREATE; 1684 } 1685 nmsg->any.head.error = error; 1686 nmsg->any.head.msgid = msg->any.head.msgid; 1687 nmsg->any.head.circuit = msg->any.head.circuit; 1688 nmsg->state = state; 1689 dmsg_msg_write(nmsg); 1690 } 1691 1692 /* 1693 * Terminate a transaction given a state structure by issuing a DELETE. 1694 */ 1695 void 1696 dmsg_state_reply(dmsg_state_t *state, uint32_t error) 1697 { 1698 dmsg_msg_t *nmsg; 1699 uint32_t cmd = DMSG_LNK_ERROR | DMSGF_DELETE; 1700 1701 /* 1702 * Nothing to do if we already transmitted a delete 1703 */ 1704 if (state->txcmd & DMSGF_DELETE) 1705 return; 1706 1707 /* 1708 * Set REPLY if the other end initiated the command. Otherwise 1709 * we are the command direction. 1710 */ 1711 if (state->txcmd & DMSGF_REPLY) 1712 cmd |= DMSGF_REPLY; 1713 1714 nmsg = dmsg_msg_alloc(state->circuit, 0, cmd, NULL, NULL); 1715 if (state) { 1716 if ((state->txcmd & DMSGF_CREATE) == 0) 1717 nmsg->any.head.cmd |= DMSGF_CREATE; 1718 } 1719 nmsg->any.head.error = error; 1720 nmsg->any.head.msgid = state->msgid; 1721 nmsg->any.head.circuit = state->msg->any.head.circuit; 1722 nmsg->state = state; 1723 dmsg_msg_write(nmsg); 1724 } 1725 1726 /* 1727 * Terminate a transaction given a state structure by issuing a DELETE. 1728 */ 1729 void 1730 dmsg_state_result(dmsg_state_t *state, uint32_t error) 1731 { 1732 dmsg_msg_t *nmsg; 1733 uint32_t cmd = DMSG_LNK_ERROR; 1734 1735 /* 1736 * Nothing to do if we already transmitted a delete 1737 */ 1738 if (state->txcmd & DMSGF_DELETE) 1739 return; 1740 1741 /* 1742 * Set REPLY if the other end initiated the command. Otherwise 1743 * we are the command direction. 1744 */ 1745 if (state->txcmd & DMSGF_REPLY) 1746 cmd |= DMSGF_REPLY; 1747 1748 nmsg = dmsg_msg_alloc(state->circuit, 0, cmd, NULL, NULL); 1749 if (state) { 1750 if ((state->txcmd & DMSGF_CREATE) == 0) 1751 nmsg->any.head.cmd |= DMSGF_CREATE; 1752 } 1753 nmsg->any.head.error = error; 1754 nmsg->any.head.msgid = state->msgid; 1755 nmsg->any.head.circuit = state->msg->any.head.circuit; 1756 nmsg->state = state; 1757 dmsg_msg_write(nmsg); 1758 } 1759 1760 /************************************************************************ 1761 * TRANSACTION STATE HANDLING * 1762 ************************************************************************ 1763 * 1764 */ 1765 1766 /* 1767 * Process circuit and state tracking for a message after reception, prior 1768 * to execution. 1769 * 1770 * Called with msglk held and the msg dequeued. 1771 * 1772 * All messages are called with dummy state and return actual state. 1773 * (One-off messages often just return the same dummy state). 1774 * 1775 * May request that caller discard the message by setting *discardp to 1. 1776 * The returned state is not used in this case and is allowed to be NULL. 1777 * 1778 * -- 1779 * 1780 * These routines handle persistent and command/reply message state via the 1781 * CREATE and DELETE flags. The first message in a command or reply sequence 1782 * sets CREATE, the last message in a command or reply sequence sets DELETE. 1783 * 1784 * There can be any number of intermediate messages belonging to the same 1785 * sequence sent inbetween the CREATE message and the DELETE message, 1786 * which set neither flag. This represents a streaming command or reply. 1787 * 1788 * Any command message received with CREATE set expects a reply sequence to 1789 * be returned. Reply sequences work the same as command sequences except the 1790 * REPLY bit is also sent. Both the command side and reply side can 1791 * degenerate into a single message with both CREATE and DELETE set. Note 1792 * that one side can be streaming and the other side not, or neither, or both. 1793 * 1794 * The msgid is unique for the initiator. That is, two sides sending a new 1795 * message can use the same msgid without colliding. 1796 * 1797 * -- 1798 * 1799 * ABORT sequences work by setting the ABORT flag along with normal message 1800 * state. However, ABORTs can also be sent on half-closed messages, that is 1801 * even if the command or reply side has already sent a DELETE, as long as 1802 * the message has not been fully closed it can still send an ABORT+DELETE 1803 * to terminate the half-closed message state. 1804 * 1805 * Since ABORT+DELETEs can race we silently discard ABORT's for message 1806 * state which has already been fully closed. REPLY+ABORT+DELETEs can 1807 * also race, and in this situation the other side might have already 1808 * initiated a new unrelated command with the same message id. Since 1809 * the abort has not set the CREATE flag the situation can be detected 1810 * and the message will also be discarded. 1811 * 1812 * Non-blocking requests can be initiated with ABORT+CREATE[+DELETE]. 1813 * The ABORT request is essentially integrated into the command instead 1814 * of being sent later on. In this situation the command implementation 1815 * detects that CREATE and ABORT are both set (vs ABORT alone) and can 1816 * special-case non-blocking operation for the command. 1817 * 1818 * NOTE! Messages with ABORT set without CREATE or DELETE are considered 1819 * to be mid-stream aborts for command/reply sequences. ABORTs on 1820 * one-way messages are not supported. 1821 * 1822 * NOTE! If a command sequence does not support aborts the ABORT flag is 1823 * simply ignored. 1824 * 1825 * -- 1826 * 1827 * One-off messages (no reply expected) are sent with neither CREATE or DELETE 1828 * set. One-off messages cannot be aborted and typically aren't processed 1829 * by these routines. The REPLY bit can be used to distinguish whether a 1830 * one-off message is a command or reply. For example, one-off replies 1831 * will typically just contain status updates. 1832 */ 1833 static int 1834 dmsg_state_msgrx(dmsg_msg_t *msg) 1835 { 1836 dmsg_iocom_t *iocom = msg->iocom; 1837 dmsg_circuit_t *circuit; 1838 dmsg_circuit_t *ocircuit; 1839 dmsg_state_t *state; 1840 dmsg_state_t sdummy; 1841 dmsg_circuit_t cdummy; 1842 int error; 1843 1844 pthread_mutex_lock(&iocom->mtx); 1845 1846 /* 1847 * Locate existing persistent circuit and state, if any. 1848 */ 1849 if (msg->any.head.circuit == 0) { 1850 circuit = &iocom->circuit0; 1851 } else { 1852 cdummy.msgid = msg->any.head.circuit; 1853 circuit = RB_FIND(dmsg_circuit_tree, &iocom->circuit_tree, 1854 &cdummy); 1855 if (circuit == NULL) { 1856 pthread_mutex_unlock(&iocom->mtx); 1857 return (DMSG_IOQ_ERROR_BAD_CIRCUIT); 1858 } 1859 } 1860 1861 /* 1862 * Replace circuit0 with actual 1863 */ 1864 dmsg_circuit_hold(circuit); 1865 ocircuit = msg->circuit; 1866 msg->circuit = circuit; 1867 1868 /* 1869 * If received msg is a command state is on staterd_tree. 1870 * If received msg is a reply state is on statewr_tree. 1871 */ 1872 sdummy.msgid = msg->any.head.msgid; 1873 if (msg->any.head.cmd & DMSGF_REPLY) { 1874 state = RB_FIND(dmsg_state_tree, &circuit->statewr_tree, 1875 &sdummy); 1876 } else { 1877 state = RB_FIND(dmsg_state_tree, &circuit->staterd_tree, 1878 &sdummy); 1879 } 1880 msg->state = state; 1881 1882 pthread_mutex_unlock(&iocom->mtx); 1883 if (ocircuit) 1884 dmsg_circuit_drop(ocircuit); 1885 1886 /* 1887 * Short-cut one-off or mid-stream messages (state may be NULL). 1888 */ 1889 if ((msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE | 1890 DMSGF_ABORT)) == 0) { 1891 return(0); 1892 } 1893 1894 /* 1895 * Switch on CREATE, DELETE, REPLY, and also handle ABORT from 1896 * inside the case statements. 1897 */ 1898 switch(msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE | 1899 DMSGF_REPLY)) { 1900 case DMSGF_CREATE: 1901 case DMSGF_CREATE | DMSGF_DELETE: 1902 /* 1903 * New persistant command received. 1904 */ 1905 if (state) { 1906 fprintf(stderr, "duplicate-trans %s\n", 1907 dmsg_msg_str(msg)); 1908 error = DMSG_IOQ_ERROR_TRANS; 1909 assert(0); 1910 break; 1911 } 1912 state = malloc(sizeof(*state)); 1913 bzero(state, sizeof(*state)); 1914 state->iocom = iocom; 1915 state->circuit = circuit; 1916 state->flags = DMSG_STATE_DYNAMIC; 1917 state->msg = msg; 1918 state->txcmd = DMSGF_REPLY; 1919 state->rxcmd = msg->any.head.cmd & ~DMSGF_DELETE; 1920 state->icmd = state->rxcmd & DMSGF_BASECMDMASK; 1921 state->flags |= DMSG_STATE_INSERTED; 1922 state->msgid = msg->any.head.msgid; 1923 msg->state = state; 1924 pthread_mutex_lock(&iocom->mtx); 1925 RB_INSERT(dmsg_state_tree, &circuit->staterd_tree, state); 1926 pthread_mutex_unlock(&iocom->mtx); 1927 error = 0; 1928 if (DMsgDebugOpt) { 1929 fprintf(stderr, "create state %p id=%08x on iocom staterd %p\n", 1930 state, (uint32_t)state->msgid, iocom); 1931 } 1932 break; 1933 case DMSGF_DELETE: 1934 /* 1935 * Persistent state is expected but might not exist if an 1936 * ABORT+DELETE races the close. 1937 */ 1938 if (state == NULL) { 1939 if (msg->any.head.cmd & DMSGF_ABORT) { 1940 error = DMSG_IOQ_ERROR_EALREADY; 1941 } else { 1942 fprintf(stderr, "missing-state %s\n", 1943 dmsg_msg_str(msg)); 1944 error = DMSG_IOQ_ERROR_TRANS; 1945 assert(0); 1946 } 1947 break; 1948 } 1949 1950 /* 1951 * Handle another ABORT+DELETE case if the msgid has already 1952 * been reused. 1953 */ 1954 if ((state->rxcmd & DMSGF_CREATE) == 0) { 1955 if (msg->any.head.cmd & DMSGF_ABORT) { 1956 error = DMSG_IOQ_ERROR_EALREADY; 1957 } else { 1958 fprintf(stderr, "reused-state %s\n", 1959 dmsg_msg_str(msg)); 1960 error = DMSG_IOQ_ERROR_TRANS; 1961 assert(0); 1962 } 1963 break; 1964 } 1965 error = 0; 1966 break; 1967 default: 1968 /* 1969 * Check for mid-stream ABORT command received, otherwise 1970 * allow. 1971 */ 1972 if (msg->any.head.cmd & DMSGF_ABORT) { 1973 if (state == NULL || 1974 (state->rxcmd & DMSGF_CREATE) == 0) { 1975 error = DMSG_IOQ_ERROR_EALREADY; 1976 break; 1977 } 1978 } 1979 error = 0; 1980 break; 1981 case DMSGF_REPLY | DMSGF_CREATE: 1982 case DMSGF_REPLY | DMSGF_CREATE | DMSGF_DELETE: 1983 /* 1984 * When receiving a reply with CREATE set the original 1985 * persistent state message should already exist. 1986 */ 1987 if (state == NULL) { 1988 fprintf(stderr, "no-state(r) %s\n", 1989 dmsg_msg_str(msg)); 1990 error = DMSG_IOQ_ERROR_TRANS; 1991 assert(0); 1992 break; 1993 } 1994 assert(((state->rxcmd ^ msg->any.head.cmd) & 1995 DMSGF_REPLY) == 0); 1996 state->rxcmd = msg->any.head.cmd & ~DMSGF_DELETE; 1997 error = 0; 1998 break; 1999 case DMSGF_REPLY | DMSGF_DELETE: 2000 /* 2001 * Received REPLY+ABORT+DELETE in case where msgid has 2002 * already been fully closed, ignore the message. 2003 */ 2004 if (state == NULL) { 2005 if (msg->any.head.cmd & DMSGF_ABORT) { 2006 error = DMSG_IOQ_ERROR_EALREADY; 2007 } else { 2008 fprintf(stderr, "no-state(r,d) %s\n", 2009 dmsg_msg_str(msg)); 2010 error = DMSG_IOQ_ERROR_TRANS; 2011 assert(0); 2012 } 2013 break; 2014 } 2015 2016 /* 2017 * Received REPLY+ABORT+DELETE in case where msgid has 2018 * already been reused for an unrelated message, 2019 * ignore the message. 2020 */ 2021 if ((state->rxcmd & DMSGF_CREATE) == 0) { 2022 if (msg->any.head.cmd & DMSGF_ABORT) { 2023 error = DMSG_IOQ_ERROR_EALREADY; 2024 } else { 2025 fprintf(stderr, "reused-state(r,d) %s\n", 2026 dmsg_msg_str(msg)); 2027 error = DMSG_IOQ_ERROR_TRANS; 2028 assert(0); 2029 } 2030 break; 2031 } 2032 error = 0; 2033 break; 2034 case DMSGF_REPLY: 2035 /* 2036 * Check for mid-stream ABORT reply received to sent command. 2037 */ 2038 if (msg->any.head.cmd & DMSGF_ABORT) { 2039 if (state == NULL || 2040 (state->rxcmd & DMSGF_CREATE) == 0) { 2041 error = DMSG_IOQ_ERROR_EALREADY; 2042 break; 2043 } 2044 } 2045 error = 0; 2046 break; 2047 } 2048 return (error); 2049 } 2050 2051 void 2052 dmsg_state_cleanuprx(dmsg_iocom_t *iocom, dmsg_msg_t *msg) 2053 { 2054 dmsg_state_t *state; 2055 2056 if ((state = msg->state) == NULL) { 2057 /* 2058 * Free a non-transactional message, there is no state 2059 * to worry about. 2060 */ 2061 dmsg_msg_free(msg); 2062 } else if (msg->any.head.cmd & DMSGF_DELETE) { 2063 /* 2064 * Message terminating transaction, destroy the related 2065 * state, the original message, and this message (if it 2066 * isn't the original message due to a CREATE|DELETE). 2067 */ 2068 pthread_mutex_lock(&iocom->mtx); 2069 state->rxcmd |= DMSGF_DELETE; 2070 if (state->txcmd & DMSGF_DELETE) { 2071 if (state->msg == msg) 2072 state->msg = NULL; 2073 assert(state->flags & DMSG_STATE_INSERTED); 2074 if (state->rxcmd & DMSGF_REPLY) { 2075 assert(msg->any.head.cmd & DMSGF_REPLY); 2076 RB_REMOVE(dmsg_state_tree, 2077 &msg->circuit->statewr_tree, state); 2078 } else { 2079 assert((msg->any.head.cmd & DMSGF_REPLY) == 0); 2080 RB_REMOVE(dmsg_state_tree, 2081 &msg->circuit->staterd_tree, state); 2082 } 2083 state->flags &= ~DMSG_STATE_INSERTED; 2084 dmsg_state_free(state); 2085 } else { 2086 ; 2087 } 2088 pthread_mutex_unlock(&iocom->mtx); 2089 dmsg_msg_free(msg); 2090 } else if (state->msg != msg) { 2091 /* 2092 * Message not terminating transaction, leave state intact 2093 * and free message if it isn't the CREATE message. 2094 */ 2095 dmsg_msg_free(msg); 2096 } 2097 } 2098 2099 static void 2100 dmsg_state_cleanuptx(dmsg_msg_t *msg) 2101 { 2102 dmsg_iocom_t *iocom = msg->iocom; 2103 dmsg_state_t *state; 2104 2105 if ((state = msg->state) == NULL) { 2106 dmsg_msg_free(msg); 2107 } else if (msg->any.head.cmd & DMSGF_DELETE) { 2108 pthread_mutex_lock(&iocom->mtx); 2109 assert((state->txcmd & DMSGF_DELETE) == 0); 2110 state->txcmd |= DMSGF_DELETE; 2111 if (state->rxcmd & DMSGF_DELETE) { 2112 if (state->msg == msg) 2113 state->msg = NULL; 2114 assert(state->flags & DMSG_STATE_INSERTED); 2115 if (state->txcmd & DMSGF_REPLY) { 2116 assert(msg->any.head.cmd & DMSGF_REPLY); 2117 RB_REMOVE(dmsg_state_tree, 2118 &msg->circuit->staterd_tree, state); 2119 } else { 2120 assert((msg->any.head.cmd & DMSGF_REPLY) == 0); 2121 RB_REMOVE(dmsg_state_tree, 2122 &msg->circuit->statewr_tree, state); 2123 } 2124 state->flags &= ~DMSG_STATE_INSERTED; 2125 dmsg_state_free(state); 2126 } else { 2127 ; 2128 } 2129 pthread_mutex_unlock(&iocom->mtx); 2130 dmsg_msg_free(msg); 2131 } else if (state->msg != msg) { 2132 dmsg_msg_free(msg); 2133 } 2134 } 2135 2136 /* 2137 * Called with iocom locked 2138 */ 2139 void 2140 dmsg_state_free(dmsg_state_t *state) 2141 { 2142 dmsg_msg_t *msg; 2143 2144 if (DMsgDebugOpt) { 2145 fprintf(stderr, "terminate state %p id=%08x\n", 2146 state, (uint32_t)state->msgid); 2147 } 2148 if (state->any.any != NULL) /* XXX avoid deadlock w/exit & kernel */ 2149 closefrom(3); 2150 assert(state->any.any == NULL); 2151 msg = state->msg; 2152 state->msg = NULL; 2153 if (msg) 2154 dmsg_msg_free_locked(msg); 2155 free(state); 2156 } 2157 2158 /* 2159 * Called with iocom locked 2160 */ 2161 void 2162 dmsg_circuit_hold(dmsg_circuit_t *circuit) 2163 { 2164 assert(circuit->refs > 0); /* caller must hold ref */ 2165 atomic_add_int(&circuit->refs, 1); /* to safely add more */ 2166 } 2167 2168 /* 2169 * Called with iocom locked 2170 */ 2171 void 2172 dmsg_circuit_drop(dmsg_circuit_t *circuit) 2173 { 2174 dmsg_iocom_t *iocom = circuit->iocom; 2175 char dummy; 2176 2177 assert(circuit->refs > 0); 2178 assert(iocom); 2179 2180 /* 2181 * Decrement circuit refs, destroy circuit when refs drops to 0. 2182 */ 2183 if (atomic_fetchadd_int(&circuit->refs, -1) != 1) 2184 return; 2185 assert(circuit != &iocom->circuit0); 2186 2187 assert(RB_EMPTY(&circuit->staterd_tree)); 2188 assert(RB_EMPTY(&circuit->statewr_tree)); 2189 pthread_mutex_lock(&iocom->mtx); 2190 RB_REMOVE(dmsg_circuit_tree, &iocom->circuit_tree, circuit); 2191 circuit->iocom = NULL; 2192 pthread_mutex_unlock(&iocom->mtx); 2193 dmsg_free(circuit); 2194 2195 /* 2196 * When an iocom error is present the rx code will terminate the 2197 * receive side for all transactions and (indirectly) all circuits 2198 * by simulating DELETE messages. The state and related circuits 2199 * don't disappear until the related states are closed in both 2200 * directions 2201 * 2202 * Detect the case where the last circuit is now gone (and thus all 2203 * states for all circuits are gone), and wakeup the rx thread to 2204 * complete the termination. 2205 */ 2206 if (iocom->ioq_rx.error && RB_EMPTY(&iocom->circuit_tree)) { 2207 dummy = 0; 2208 write(iocom->wakeupfds[1], &dummy, 1); 2209 } 2210 } 2211 2212 void 2213 dmsg_circuit_drop_locked(dmsg_circuit_t *circuit) 2214 { 2215 dmsg_iocom_t *iocom; 2216 2217 iocom = circuit->iocom; 2218 assert(circuit->refs > 0); 2219 assert(iocom); 2220 2221 if (atomic_fetchadd_int(&circuit->refs, -1) == 1) { 2222 assert(circuit != &iocom->circuit0); 2223 assert(RB_EMPTY(&circuit->staterd_tree)); 2224 assert(RB_EMPTY(&circuit->statewr_tree)); 2225 RB_REMOVE(dmsg_circuit_tree, &iocom->circuit_tree, circuit); 2226 circuit->iocom = NULL; 2227 dmsg_free(circuit); 2228 if (iocom->ioq_rx.error && RB_EMPTY(&iocom->circuit_tree)) { 2229 char dummy = 0; 2230 write(iocom->wakeupfds[1], &dummy, 1); 2231 } 2232 } 2233 } 2234 2235 /* 2236 * This swaps endian for a hammer2_msg_hdr. Note that the extended 2237 * header is not adjusted, just the core header. 2238 */ 2239 void 2240 dmsg_bswap_head(dmsg_hdr_t *head) 2241 { 2242 head->magic = bswap16(head->magic); 2243 head->reserved02 = bswap16(head->reserved02); 2244 head->salt = bswap32(head->salt); 2245 2246 head->msgid = bswap64(head->msgid); 2247 head->circuit = bswap64(head->circuit); 2248 head->reserved18= bswap64(head->reserved18); 2249 2250 head->cmd = bswap32(head->cmd); 2251 head->aux_crc = bswap32(head->aux_crc); 2252 head->aux_bytes = bswap32(head->aux_bytes); 2253 head->error = bswap32(head->error); 2254 head->aux_descr = bswap64(head->aux_descr); 2255 head->reserved38= bswap32(head->reserved38); 2256 head->hdr_crc = bswap32(head->hdr_crc); 2257 } 2258