1 /* 2 * Copyright (c) 2011-2012 The DragonFly Project. All rights reserved. 3 * 4 * This code is derived from software contributed to The DragonFly Project 5 * by Matthew Dillon <dillon@dragonflybsd.org> 6 * by Venkatesh Srinivas <vsrinivas@dragonflybsd.org> 7 * 8 * Redistribution and use in source and binary forms, with or without 9 * modification, are permitted provided that the following conditions 10 * are met: 11 * 12 * 1. Redistributions of source code must retain the above copyright 13 * notice, this list of conditions and the following disclaimer. 14 * 2. Redistributions in binary form must reproduce the above copyright 15 * notice, this list of conditions and the following disclaimer in 16 * the documentation and/or other materials provided with the 17 * distribution. 18 * 3. Neither the name of The DragonFly Project nor the names of its 19 * contributors may be used to endorse or promote products derived 20 * from this software without specific, prior written permission. 21 * 22 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 23 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 24 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 25 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 26 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 27 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING, 28 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 29 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED 30 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 31 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT 32 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 33 * SUCH DAMAGE. 34 */ 35 36 #include "dmsg_local.h" 37 38 int DMsgDebugOpt; 39 40 static int dmsg_state_msgrx(dmsg_msg_t *msg); 41 static void dmsg_state_cleanuptx(dmsg_msg_t *msg); 42 43 /* 44 * ROUTER TREE - Represents available routes for message routing, indexed 45 * by their span transaction id. The router structure is 46 * embedded in either an iocom, h2span_link (incoming), 47 * or h2span_relay (outgoing) (see msg_lnk.c). 48 */ 49 int 50 dmsg_router_cmp(dmsg_router_t *router1, dmsg_router_t *router2) 51 { 52 if (router1->target < router2->target) 53 return(-1); 54 if (router1->target > router2->target) 55 return(1); 56 return(0); 57 } 58 59 RB_GENERATE(dmsg_router_tree, dmsg_router, rbnode, dmsg_router_cmp); 60 61 static pthread_mutex_t router_mtx; 62 static struct dmsg_router_tree router_ltree = RB_INITIALIZER(router_ltree); 63 static struct dmsg_router_tree router_rtree = RB_INITIALIZER(router_rtree); 64 65 /* 66 * STATE TREE - Represents open transactions which are indexed by their 67 * {router,msgid} relative to the governing iocom. 68 * 69 * router is usually iocom->router since state isn't stored 70 * for relayed messages. 71 */ 72 int 73 dmsg_state_cmp(dmsg_state_t *state1, dmsg_state_t *state2) 74 { 75 #if 0 76 if (state1->router < state2->router) 77 return(-1); 78 if (state1->router > state2->router) 79 return(1); 80 #endif 81 if (state1->msgid < state2->msgid) 82 return(-1); 83 if (state1->msgid > state2->msgid) 84 return(1); 85 return(0); 86 } 87 88 RB_GENERATE(dmsg_state_tree, dmsg_state, rbnode, dmsg_state_cmp); 89 90 /* 91 * Initialize a low-level ioq 92 */ 93 void 94 dmsg_ioq_init(dmsg_iocom_t *iocom __unused, dmsg_ioq_t *ioq) 95 { 96 bzero(ioq, sizeof(*ioq)); 97 ioq->state = DMSG_MSGQ_STATE_HEADER1; 98 TAILQ_INIT(&ioq->msgq); 99 } 100 101 /* 102 * Cleanup queue. 103 * 104 * caller holds iocom->mtx. 105 */ 106 void 107 dmsg_ioq_done(dmsg_iocom_t *iocom __unused, dmsg_ioq_t *ioq) 108 { 109 dmsg_msg_t *msg; 110 111 while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) { 112 assert(0); /* shouldn't happen */ 113 TAILQ_REMOVE(&ioq->msgq, msg, qentry); 114 dmsg_msg_free(msg); 115 } 116 if ((msg = ioq->msg) != NULL) { 117 ioq->msg = NULL; 118 dmsg_msg_free(msg); 119 } 120 } 121 122 /* 123 * Initialize a low-level communications channel. 124 * 125 * NOTE: The signal_func() is called at least once from the loop and can be 126 * re-armed via dmsg_iocom_restate(). 127 */ 128 void 129 dmsg_iocom_init(dmsg_iocom_t *iocom, int sock_fd, int alt_fd, 130 void (*signal_func)(dmsg_router_t *), 131 void (*rcvmsg_func)(dmsg_msg_t *), 132 void (*dbgmsg_func)(dmsg_msg_t *), 133 void (*altmsg_func)(dmsg_iocom_t *)) 134 { 135 struct stat st; 136 137 bzero(iocom, sizeof(*iocom)); 138 139 iocom->router = dmsg_router_alloc(); 140 iocom->router->signal_callback = signal_func; 141 iocom->router->rcvmsg_callback = rcvmsg_func; 142 iocom->router->altmsg_callback = altmsg_func; 143 iocom->router->dbgmsg_callback = dbgmsg_func; 144 /* we do not call dmsg_router_connect() for iocom routers */ 145 146 pthread_mutex_init(&iocom->mtx, NULL); 147 RB_INIT(&iocom->router->staterd_tree); 148 RB_INIT(&iocom->router->statewr_tree); 149 TAILQ_INIT(&iocom->freeq); 150 TAILQ_INIT(&iocom->freeq_aux); 151 TAILQ_INIT(&iocom->router->txmsgq); 152 iocom->router->iocom = iocom; 153 iocom->sock_fd = sock_fd; 154 iocom->alt_fd = alt_fd; 155 iocom->flags = DMSG_IOCOMF_RREQ; 156 if (signal_func) 157 iocom->flags |= DMSG_IOCOMF_SWORK; 158 dmsg_ioq_init(iocom, &iocom->ioq_rx); 159 dmsg_ioq_init(iocom, &iocom->ioq_tx); 160 if (pipe(iocom->wakeupfds) < 0) 161 assert(0); 162 fcntl(iocom->wakeupfds[0], F_SETFL, O_NONBLOCK); 163 fcntl(iocom->wakeupfds[1], F_SETFL, O_NONBLOCK); 164 165 /* 166 * Negotiate session crypto synchronously. This will mark the 167 * connection as error'd if it fails. If this is a pipe it's 168 * a linkage that we set up ourselves to the filesystem and there 169 * is no crypto. 170 */ 171 if (fstat(sock_fd, &st) < 0) 172 assert(0); 173 if (S_ISSOCK(st.st_mode)) 174 dmsg_crypto_negotiate(iocom); 175 176 /* 177 * Make sure our fds are set to non-blocking for the iocom core. 178 */ 179 if (sock_fd >= 0) 180 fcntl(sock_fd, F_SETFL, O_NONBLOCK); 181 #if 0 182 /* if line buffered our single fgets() should be fine */ 183 if (alt_fd >= 0) 184 fcntl(alt_fd, F_SETFL, O_NONBLOCK); 185 #endif 186 } 187 188 /* 189 * May only be called from a callback from iocom_core. 190 * 191 * Adjust state machine functions, set flags to guarantee that both 192 * the recevmsg_func and the sendmsg_func is called at least once. 193 */ 194 void 195 dmsg_router_restate(dmsg_router_t *router, 196 void (*signal_func)(dmsg_router_t *), 197 void (*rcvmsg_func)(dmsg_msg_t *msg), 198 void (*altmsg_func)(dmsg_iocom_t *)) 199 { 200 router->signal_callback = signal_func; 201 router->rcvmsg_callback = rcvmsg_func; 202 router->altmsg_callback = altmsg_func; 203 if (signal_func) 204 router->iocom->flags |= DMSG_IOCOMF_SWORK; 205 else 206 router->iocom->flags &= ~DMSG_IOCOMF_SWORK; 207 } 208 209 void 210 dmsg_router_signal(dmsg_router_t *router) 211 { 212 if (router->signal_callback) 213 router->iocom->flags |= DMSG_IOCOMF_SWORK; 214 } 215 216 /* 217 * Cleanup a terminating iocom. 218 * 219 * Caller should not hold iocom->mtx. The iocom has already been disconnected 220 * from all possible references to it. 221 */ 222 void 223 dmsg_iocom_done(dmsg_iocom_t *iocom) 224 { 225 dmsg_msg_t *msg; 226 227 if (iocom->sock_fd >= 0) { 228 close(iocom->sock_fd); 229 iocom->sock_fd = -1; 230 } 231 if (iocom->alt_fd >= 0) { 232 close(iocom->alt_fd); 233 iocom->alt_fd = -1; 234 } 235 dmsg_ioq_done(iocom, &iocom->ioq_rx); 236 dmsg_ioq_done(iocom, &iocom->ioq_tx); 237 if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL) { 238 TAILQ_REMOVE(&iocom->freeq, msg, qentry); 239 free(msg); 240 } 241 if ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL) { 242 TAILQ_REMOVE(&iocom->freeq_aux, msg, qentry); 243 free(msg->aux_data); 244 msg->aux_data = NULL; 245 free(msg); 246 } 247 if (iocom->wakeupfds[0] >= 0) { 248 close(iocom->wakeupfds[0]); 249 iocom->wakeupfds[0] = -1; 250 } 251 if (iocom->wakeupfds[1] >= 0) { 252 close(iocom->wakeupfds[1]); 253 iocom->wakeupfds[1] = -1; 254 } 255 pthread_mutex_destroy(&iocom->mtx); 256 } 257 258 /* 259 * Allocate a new one-way message. 260 */ 261 dmsg_msg_t * 262 dmsg_msg_alloc(dmsg_router_t *router, size_t aux_size, uint32_t cmd, 263 void (*func)(dmsg_msg_t *), void *data) 264 { 265 dmsg_state_t *state = NULL; 266 dmsg_iocom_t *iocom = router->iocom; 267 dmsg_msg_t *msg; 268 int hbytes; 269 270 pthread_mutex_lock(&iocom->mtx); 271 if (aux_size) { 272 aux_size = (aux_size + DMSG_ALIGNMASK) & 273 ~DMSG_ALIGNMASK; 274 if ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL) 275 TAILQ_REMOVE(&iocom->freeq_aux, msg, qentry); 276 } else { 277 if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL) 278 TAILQ_REMOVE(&iocom->freeq, msg, qentry); 279 } 280 if ((cmd & (DMSGF_CREATE | DMSGF_REPLY)) == DMSGF_CREATE) { 281 /* 282 * Create state when CREATE is set without REPLY. 283 * 284 * NOTE: CREATE in txcmd handled by dmsg_msg_write() 285 * NOTE: DELETE in txcmd handled by dmsg_state_cleanuptx() 286 */ 287 state = malloc(sizeof(*state)); 288 bzero(state, sizeof(*state)); 289 state->iocom = iocom; 290 state->flags = DMSG_STATE_DYNAMIC; 291 state->msgid = (uint64_t)(uintptr_t)state; 292 state->router = router; 293 state->txcmd = cmd & ~(DMSGF_CREATE | DMSGF_DELETE); 294 state->rxcmd = DMSGF_REPLY; 295 state->func = func; 296 state->any.any = data; 297 pthread_mutex_lock(&iocom->mtx); 298 RB_INSERT(dmsg_state_tree, 299 &iocom->router->statewr_tree, 300 state); 301 pthread_mutex_unlock(&iocom->mtx); 302 state->flags |= DMSG_STATE_INSERTED; 303 } 304 pthread_mutex_unlock(&iocom->mtx); 305 if (msg == NULL) { 306 msg = malloc(sizeof(*msg)); 307 bzero(msg, sizeof(*msg)); 308 msg->aux_data = NULL; 309 msg->aux_size = 0; 310 } 311 if (msg->aux_size != aux_size) { 312 if (msg->aux_data) { 313 free(msg->aux_data); 314 msg->aux_data = NULL; 315 msg->aux_size = 0; 316 } 317 if (aux_size) { 318 msg->aux_data = malloc(aux_size); 319 msg->aux_size = aux_size; 320 } 321 } 322 hbytes = (cmd & DMSGF_SIZE) * DMSG_ALIGN; 323 if (hbytes) 324 bzero(&msg->any.head, hbytes); 325 msg->hdr_size = hbytes; 326 msg->any.head.cmd = cmd; 327 msg->any.head.aux_descr = 0; 328 msg->any.head.aux_crc = 0; 329 msg->router = router; 330 if (state) { 331 msg->state = state; 332 state->msg = msg; 333 msg->any.head.msgid = state->msgid; 334 } 335 return (msg); 336 } 337 338 /* 339 * Free a message so it can be reused afresh. 340 * 341 * NOTE: aux_size can be 0 with a non-NULL aux_data. 342 */ 343 static 344 void 345 dmsg_msg_free_locked(dmsg_msg_t *msg) 346 { 347 dmsg_iocom_t *iocom = msg->router->iocom; 348 349 msg->state = NULL; 350 if (msg->aux_data) 351 TAILQ_INSERT_TAIL(&iocom->freeq_aux, msg, qentry); 352 else 353 TAILQ_INSERT_TAIL(&iocom->freeq, msg, qentry); 354 } 355 356 void 357 dmsg_msg_free(dmsg_msg_t *msg) 358 { 359 dmsg_iocom_t *iocom = msg->router->iocom; 360 361 pthread_mutex_lock(&iocom->mtx); 362 dmsg_msg_free_locked(msg); 363 pthread_mutex_unlock(&iocom->mtx); 364 } 365 366 /* 367 * I/O core loop for an iocom. 368 * 369 * Thread localized, iocom->mtx not held. 370 */ 371 void 372 dmsg_iocom_core(dmsg_iocom_t *iocom) 373 { 374 struct pollfd fds[3]; 375 char dummybuf[256]; 376 dmsg_msg_t *msg; 377 int timeout; 378 int count; 379 int wi; /* wakeup pipe */ 380 int si; /* socket */ 381 int ai; /* alt bulk path socket */ 382 383 while ((iocom->flags & DMSG_IOCOMF_EOF) == 0) { 384 if ((iocom->flags & (DMSG_IOCOMF_RWORK | 385 DMSG_IOCOMF_WWORK | 386 DMSG_IOCOMF_PWORK | 387 DMSG_IOCOMF_SWORK | 388 DMSG_IOCOMF_ARWORK | 389 DMSG_IOCOMF_AWWORK)) == 0) { 390 /* 391 * Only poll if no immediate work is pending. 392 * Otherwise we are just wasting our time calling 393 * poll. 394 */ 395 timeout = 5000; 396 397 count = 0; 398 wi = -1; 399 si = -1; 400 ai = -1; 401 402 /* 403 * Always check the inter-thread pipe, e.g. 404 * for iocom->txmsgq work. 405 */ 406 wi = count++; 407 fds[wi].fd = iocom->wakeupfds[0]; 408 fds[wi].events = POLLIN; 409 fds[wi].revents = 0; 410 411 /* 412 * Check the socket input/output direction as 413 * requested 414 */ 415 if (iocom->flags & (DMSG_IOCOMF_RREQ | 416 DMSG_IOCOMF_WREQ)) { 417 si = count++; 418 fds[si].fd = iocom->sock_fd; 419 fds[si].events = 0; 420 fds[si].revents = 0; 421 422 if (iocom->flags & DMSG_IOCOMF_RREQ) 423 fds[si].events |= POLLIN; 424 if (iocom->flags & DMSG_IOCOMF_WREQ) 425 fds[si].events |= POLLOUT; 426 } 427 428 /* 429 * Check the alternative fd for work. 430 */ 431 if (iocom->alt_fd >= 0) { 432 ai = count++; 433 fds[ai].fd = iocom->alt_fd; 434 fds[ai].events = POLLIN; 435 fds[ai].revents = 0; 436 } 437 poll(fds, count, timeout); 438 439 if (wi >= 0 && (fds[wi].revents & POLLIN)) 440 iocom->flags |= DMSG_IOCOMF_PWORK; 441 if (si >= 0 && (fds[si].revents & POLLIN)) 442 iocom->flags |= DMSG_IOCOMF_RWORK; 443 if (si >= 0 && (fds[si].revents & POLLOUT)) 444 iocom->flags |= DMSG_IOCOMF_WWORK; 445 if (wi >= 0 && (fds[wi].revents & POLLOUT)) 446 iocom->flags |= DMSG_IOCOMF_WWORK; 447 if (ai >= 0 && (fds[ai].revents & POLLIN)) 448 iocom->flags |= DMSG_IOCOMF_ARWORK; 449 } else { 450 /* 451 * Always check the pipe 452 */ 453 iocom->flags |= DMSG_IOCOMF_PWORK; 454 } 455 456 if (iocom->flags & DMSG_IOCOMF_SWORK) { 457 iocom->flags &= ~DMSG_IOCOMF_SWORK; 458 iocom->router->signal_callback(iocom->router); 459 } 460 461 /* 462 * Pending message queues from other threads wake us up 463 * with a write to the wakeupfds[] pipe. We have to clear 464 * the pipe with a dummy read. 465 */ 466 if (iocom->flags & DMSG_IOCOMF_PWORK) { 467 iocom->flags &= ~DMSG_IOCOMF_PWORK; 468 read(iocom->wakeupfds[0], dummybuf, sizeof(dummybuf)); 469 iocom->flags |= DMSG_IOCOMF_RWORK; 470 iocom->flags |= DMSG_IOCOMF_WWORK; 471 if (TAILQ_FIRST(&iocom->router->txmsgq)) 472 dmsg_iocom_flush1(iocom); 473 } 474 475 /* 476 * Message write sequencing 477 */ 478 if (iocom->flags & DMSG_IOCOMF_WWORK) 479 dmsg_iocom_flush1(iocom); 480 481 /* 482 * Message read sequencing. Run this after the write 483 * sequencing in case the write sequencing allowed another 484 * auto-DELETE to occur on the read side. 485 */ 486 if (iocom->flags & DMSG_IOCOMF_RWORK) { 487 while ((iocom->flags & DMSG_IOCOMF_EOF) == 0 && 488 (msg = dmsg_ioq_read(iocom)) != NULL) { 489 if (DMsgDebugOpt) { 490 fprintf(stderr, "receive %s\n", 491 dmsg_msg_str(msg)); 492 } 493 iocom->router->rcvmsg_callback(msg); 494 dmsg_state_cleanuprx(iocom, msg); 495 } 496 } 497 498 if (iocom->flags & DMSG_IOCOMF_ARWORK) { 499 iocom->flags &= ~DMSG_IOCOMF_ARWORK; 500 iocom->router->altmsg_callback(iocom); 501 } 502 } 503 } 504 505 /* 506 * Make sure there's enough room in the FIFO to hold the 507 * needed data. 508 * 509 * Assume worst case encrypted form is 2x the size of the 510 * plaintext equivalent. 511 */ 512 static 513 size_t 514 dmsg_ioq_makeroom(dmsg_ioq_t *ioq, size_t needed) 515 { 516 size_t bytes; 517 size_t nmax; 518 519 bytes = ioq->fifo_cdx - ioq->fifo_beg; 520 nmax = sizeof(ioq->buf) - ioq->fifo_end; 521 if (bytes + nmax / 2 < needed) { 522 if (bytes) { 523 bcopy(ioq->buf + ioq->fifo_beg, 524 ioq->buf, 525 bytes); 526 } 527 ioq->fifo_cdx -= ioq->fifo_beg; 528 ioq->fifo_beg = 0; 529 if (ioq->fifo_cdn < ioq->fifo_end) { 530 bcopy(ioq->buf + ioq->fifo_cdn, 531 ioq->buf + ioq->fifo_cdx, 532 ioq->fifo_end - ioq->fifo_cdn); 533 } 534 ioq->fifo_end -= ioq->fifo_cdn - ioq->fifo_cdx; 535 ioq->fifo_cdn = ioq->fifo_cdx; 536 nmax = sizeof(ioq->buf) - ioq->fifo_end; 537 } 538 return(nmax); 539 } 540 541 /* 542 * Read the next ready message from the ioq, issuing I/O if needed. 543 * Caller should retry on a read-event when NULL is returned. 544 * 545 * If an error occurs during reception a DMSG_LNK_ERROR msg will 546 * be returned for each open transaction, then the ioq and iocom 547 * will be errored out and a non-transactional DMSG_LNK_ERROR 548 * msg will be returned as the final message. The caller should not call 549 * us again after the final message is returned. 550 * 551 * Thread localized, iocom->mtx not held. 552 */ 553 dmsg_msg_t * 554 dmsg_ioq_read(dmsg_iocom_t *iocom) 555 { 556 dmsg_ioq_t *ioq = &iocom->ioq_rx; 557 dmsg_msg_t *msg; 558 dmsg_state_t *state; 559 dmsg_hdr_t *head; 560 ssize_t n; 561 size_t bytes; 562 size_t nmax; 563 uint32_t xcrc32; 564 int error; 565 566 again: 567 iocom->flags &= ~(DMSG_IOCOMF_RREQ | DMSG_IOCOMF_RWORK); 568 569 /* 570 * If a message is already pending we can just remove and 571 * return it. Message state has already been processed. 572 * (currently not implemented) 573 */ 574 if ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) { 575 TAILQ_REMOVE(&ioq->msgq, msg, qentry); 576 return (msg); 577 } 578 579 /* 580 * If the stream is errored out we stop processing it. 581 */ 582 if (ioq->error) 583 goto skip; 584 585 /* 586 * Message read in-progress (msg is NULL at the moment). We don't 587 * allocate a msg until we have its core header. 588 */ 589 nmax = sizeof(ioq->buf) - ioq->fifo_end; 590 bytes = ioq->fifo_cdx - ioq->fifo_beg; /* already decrypted */ 591 msg = ioq->msg; 592 593 switch(ioq->state) { 594 case DMSG_MSGQ_STATE_HEADER1: 595 /* 596 * Load the primary header, fail on any non-trivial read 597 * error or on EOF. Since the primary header is the same 598 * size is the message alignment it will never straddle 599 * the end of the buffer. 600 */ 601 nmax = dmsg_ioq_makeroom(ioq, sizeof(msg->any.head)); 602 if (bytes < sizeof(msg->any.head)) { 603 n = read(iocom->sock_fd, 604 ioq->buf + ioq->fifo_end, 605 nmax); 606 if (n <= 0) { 607 if (n == 0) { 608 ioq->error = DMSG_IOQ_ERROR_EOF; 609 break; 610 } 611 if (errno != EINTR && 612 errno != EINPROGRESS && 613 errno != EAGAIN) { 614 ioq->error = DMSG_IOQ_ERROR_SOCK; 615 break; 616 } 617 n = 0; 618 /* fall through */ 619 } 620 ioq->fifo_end += (size_t)n; 621 nmax -= (size_t)n; 622 } 623 624 /* 625 * Decrypt data received so far. Data will be decrypted 626 * in-place but might create gaps in the FIFO. Partial 627 * blocks are not immediately decrypted. 628 * 629 * WARNING! The header might be in the wrong endian, we 630 * do not fix it up until we get the entire 631 * extended header. 632 */ 633 if (iocom->flags & DMSG_IOCOMF_CRYPTED) { 634 dmsg_crypto_decrypt(iocom, ioq); 635 } else { 636 ioq->fifo_cdx = ioq->fifo_end; 637 ioq->fifo_cdn = ioq->fifo_end; 638 } 639 bytes = ioq->fifo_cdx - ioq->fifo_beg; 640 641 /* 642 * Insufficient data accumulated (msg is NULL, caller will 643 * retry on event). 644 */ 645 assert(msg == NULL); 646 if (bytes < sizeof(msg->any.head)) 647 break; 648 649 /* 650 * Check and fixup the core header. Note that the icrc 651 * has to be calculated before any fixups, but the crc 652 * fields in the msg may have to be swapped like everything 653 * else. 654 */ 655 head = (void *)(ioq->buf + ioq->fifo_beg); 656 if (head->magic != DMSG_HDR_MAGIC && 657 head->magic != DMSG_HDR_MAGIC_REV) { 658 ioq->error = DMSG_IOQ_ERROR_SYNC; 659 break; 660 } 661 662 /* 663 * Calculate the full header size and aux data size 664 */ 665 if (head->magic == DMSG_HDR_MAGIC_REV) { 666 ioq->hbytes = (bswap32(head->cmd) & DMSGF_SIZE) * 667 DMSG_ALIGN; 668 ioq->abytes = bswap32(head->aux_bytes) * 669 DMSG_ALIGN; 670 } else { 671 ioq->hbytes = (head->cmd & DMSGF_SIZE) * 672 DMSG_ALIGN; 673 ioq->abytes = head->aux_bytes * DMSG_ALIGN; 674 } 675 if (ioq->hbytes < sizeof(msg->any.head) || 676 ioq->hbytes > sizeof(msg->any) || 677 ioq->abytes > DMSG_AUX_MAX) { 678 ioq->error = DMSG_IOQ_ERROR_FIELD; 679 break; 680 } 681 682 /* 683 * Allocate the message, the next state will fill it in. 684 */ 685 msg = dmsg_msg_alloc(iocom->router, ioq->abytes, 0, 686 NULL, NULL); 687 ioq->msg = msg; 688 689 /* 690 * Fall through to the next state. Make sure that the 691 * extended header does not straddle the end of the buffer. 692 * We still want to issue larger reads into our buffer, 693 * book-keeping is easier if we don't bcopy() yet. 694 * 695 * Make sure there is enough room for bloated encrypt data. 696 */ 697 nmax = dmsg_ioq_makeroom(ioq, ioq->hbytes); 698 ioq->state = DMSG_MSGQ_STATE_HEADER2; 699 /* fall through */ 700 case DMSG_MSGQ_STATE_HEADER2: 701 /* 702 * Fill out the extended header. 703 */ 704 assert(msg != NULL); 705 if (bytes < ioq->hbytes) { 706 n = read(iocom->sock_fd, 707 ioq->buf + ioq->fifo_end, 708 nmax); 709 if (n <= 0) { 710 if (n == 0) { 711 ioq->error = DMSG_IOQ_ERROR_EOF; 712 break; 713 } 714 if (errno != EINTR && 715 errno != EINPROGRESS && 716 errno != EAGAIN) { 717 ioq->error = DMSG_IOQ_ERROR_SOCK; 718 break; 719 } 720 n = 0; 721 /* fall through */ 722 } 723 ioq->fifo_end += (size_t)n; 724 nmax -= (size_t)n; 725 } 726 727 if (iocom->flags & DMSG_IOCOMF_CRYPTED) { 728 dmsg_crypto_decrypt(iocom, ioq); 729 } else { 730 ioq->fifo_cdx = ioq->fifo_end; 731 ioq->fifo_cdn = ioq->fifo_end; 732 } 733 bytes = ioq->fifo_cdx - ioq->fifo_beg; 734 735 /* 736 * Insufficient data accumulated (set msg NULL so caller will 737 * retry on event). 738 */ 739 if (bytes < ioq->hbytes) { 740 msg = NULL; 741 break; 742 } 743 744 /* 745 * Calculate the extended header, decrypt data received 746 * so far. Handle endian-conversion for the entire extended 747 * header. 748 */ 749 head = (void *)(ioq->buf + ioq->fifo_beg); 750 751 /* 752 * Check the CRC. 753 */ 754 if (head->magic == DMSG_HDR_MAGIC_REV) 755 xcrc32 = bswap32(head->hdr_crc); 756 else 757 xcrc32 = head->hdr_crc; 758 head->hdr_crc = 0; 759 if (dmsg_icrc32(head, ioq->hbytes) != xcrc32) { 760 ioq->error = DMSG_IOQ_ERROR_XCRC; 761 fprintf(stderr, "BAD-XCRC(%08x,%08x) %s\n", 762 xcrc32, dmsg_icrc32(head, ioq->hbytes), 763 dmsg_msg_str(msg)); 764 assert(0); 765 break; 766 } 767 head->hdr_crc = xcrc32; 768 769 if (head->magic == DMSG_HDR_MAGIC_REV) { 770 dmsg_bswap_head(head); 771 } 772 773 /* 774 * Copy the extended header into the msg and adjust the 775 * FIFO. 776 */ 777 bcopy(head, &msg->any, ioq->hbytes); 778 779 /* 780 * We are either done or we fall-through. 781 */ 782 if (ioq->abytes == 0) { 783 ioq->fifo_beg += ioq->hbytes; 784 break; 785 } 786 787 /* 788 * Must adjust bytes (and the state) when falling through. 789 * nmax doesn't change. 790 */ 791 ioq->fifo_beg += ioq->hbytes; 792 bytes -= ioq->hbytes; 793 ioq->state = DMSG_MSGQ_STATE_AUXDATA1; 794 /* fall through */ 795 case DMSG_MSGQ_STATE_AUXDATA1: 796 /* 797 * Copy the partial or complete payload from remaining 798 * bytes in the FIFO in order to optimize the makeroom call 799 * in the AUXDATA2 state. We have to fall-through either 800 * way so we can check the crc. 801 * 802 * msg->aux_size tracks our aux data. 803 */ 804 if (bytes >= ioq->abytes) { 805 bcopy(ioq->buf + ioq->fifo_beg, msg->aux_data, 806 ioq->abytes); 807 msg->aux_size = ioq->abytes; 808 ioq->fifo_beg += ioq->abytes; 809 assert(ioq->fifo_beg <= ioq->fifo_cdx); 810 assert(ioq->fifo_cdx <= ioq->fifo_cdn); 811 bytes -= ioq->abytes; 812 } else if (bytes) { 813 bcopy(ioq->buf + ioq->fifo_beg, msg->aux_data, 814 bytes); 815 msg->aux_size = bytes; 816 ioq->fifo_beg += bytes; 817 if (ioq->fifo_cdx < ioq->fifo_beg) 818 ioq->fifo_cdx = ioq->fifo_beg; 819 assert(ioq->fifo_beg <= ioq->fifo_cdx); 820 assert(ioq->fifo_cdx <= ioq->fifo_cdn); 821 bytes = 0; 822 } else { 823 msg->aux_size = 0; 824 } 825 ioq->state = DMSG_MSGQ_STATE_AUXDATA2; 826 /* fall through */ 827 case DMSG_MSGQ_STATE_AUXDATA2: 828 /* 829 * Make sure there is enough room for more data. 830 */ 831 assert(msg); 832 nmax = dmsg_ioq_makeroom(ioq, ioq->abytes - msg->aux_size); 833 834 /* 835 * Read and decrypt more of the payload. 836 */ 837 if (msg->aux_size < ioq->abytes) { 838 assert(bytes == 0); 839 n = read(iocom->sock_fd, 840 ioq->buf + ioq->fifo_end, 841 nmax); 842 if (n <= 0) { 843 if (n == 0) { 844 ioq->error = DMSG_IOQ_ERROR_EOF; 845 break; 846 } 847 if (errno != EINTR && 848 errno != EINPROGRESS && 849 errno != EAGAIN) { 850 ioq->error = DMSG_IOQ_ERROR_SOCK; 851 break; 852 } 853 n = 0; 854 /* fall through */ 855 } 856 ioq->fifo_end += (size_t)n; 857 nmax -= (size_t)n; 858 } 859 860 if (iocom->flags & DMSG_IOCOMF_CRYPTED) { 861 dmsg_crypto_decrypt(iocom, ioq); 862 } else { 863 ioq->fifo_cdx = ioq->fifo_end; 864 ioq->fifo_cdn = ioq->fifo_end; 865 } 866 bytes = ioq->fifo_cdx - ioq->fifo_beg; 867 868 if (bytes > ioq->abytes - msg->aux_size) 869 bytes = ioq->abytes - msg->aux_size; 870 871 if (bytes) { 872 bcopy(ioq->buf + ioq->fifo_beg, 873 msg->aux_data + msg->aux_size, 874 bytes); 875 msg->aux_size += bytes; 876 ioq->fifo_beg += bytes; 877 } 878 879 /* 880 * Insufficient data accumulated (set msg NULL so caller will 881 * retry on event). 882 */ 883 if (msg->aux_size < ioq->abytes) { 884 msg = NULL; 885 break; 886 } 887 assert(msg->aux_size == ioq->abytes); 888 889 /* 890 * Check aux_crc, then we are done. 891 */ 892 xcrc32 = dmsg_icrc32(msg->aux_data, msg->aux_size); 893 if (xcrc32 != msg->any.head.aux_crc) { 894 ioq->error = DMSG_IOQ_ERROR_ACRC; 895 break; 896 } 897 break; 898 case DMSG_MSGQ_STATE_ERROR: 899 /* 900 * Continued calls to drain recorded transactions (returning 901 * a LNK_ERROR for each one), before we return the final 902 * LNK_ERROR. 903 */ 904 assert(msg == NULL); 905 break; 906 default: 907 /* 908 * We don't double-return errors, the caller should not 909 * have called us again after getting an error msg. 910 */ 911 assert(0); 912 break; 913 } 914 915 /* 916 * Check the message sequence. The iv[] should prevent any 917 * possibility of a replay but we add this check anyway. 918 */ 919 if (msg && ioq->error == 0) { 920 if ((msg->any.head.salt & 255) != (ioq->seq & 255)) { 921 ioq->error = DMSG_IOQ_ERROR_MSGSEQ; 922 } else { 923 ++ioq->seq; 924 } 925 } 926 927 /* 928 * Handle relaying. Transactional state is not recorded XXX 929 */ 930 931 /* 932 * Process transactional state for the message. 933 */ 934 if (msg && ioq->error == 0) { 935 error = dmsg_state_msgrx(msg); 936 if (error) { 937 if (error == DMSG_IOQ_ERROR_EALREADY) { 938 dmsg_msg_free(msg); 939 goto again; 940 } 941 ioq->error = error; 942 } 943 } 944 945 /* 946 * Handle error, RREQ, or completion 947 * 948 * NOTE: nmax and bytes are invalid at this point, we don't bother 949 * to update them when breaking out. 950 */ 951 if (ioq->error) { 952 skip: 953 /* 954 * An unrecoverable error causes all active receive 955 * transactions to be terminated with a LNK_ERROR message. 956 * 957 * Once all active transactions are exhausted we set the 958 * iocom ERROR flag and return a non-transactional LNK_ERROR 959 * message, which should cause master processing loops to 960 * terminate. 961 */ 962 assert(ioq->msg == msg); 963 if (msg) { 964 dmsg_msg_free(msg); 965 ioq->msg = NULL; 966 } 967 968 /* 969 * No more I/O read processing 970 */ 971 ioq->state = DMSG_MSGQ_STATE_ERROR; 972 973 /* 974 * Simulate a remote LNK_ERROR DELETE msg for any open 975 * transactions, ending with a final non-transactional 976 * LNK_ERROR (that the session can detect) when no 977 * transactions remain. 978 */ 979 msg = dmsg_msg_alloc(iocom->router, 0, 0, NULL, NULL); 980 bzero(&msg->any.head, sizeof(msg->any.head)); 981 msg->any.head.magic = DMSG_HDR_MAGIC; 982 msg->any.head.cmd = DMSG_LNK_ERROR; 983 msg->any.head.error = ioq->error; 984 985 pthread_mutex_lock(&iocom->mtx); 986 dmsg_iocom_drain(iocom); 987 if ((state = RB_ROOT(&iocom->router->staterd_tree)) != NULL) { 988 /* 989 * Active remote transactions are still present. 990 * Simulate the other end sending us a DELETE. 991 */ 992 if (state->rxcmd & DMSGF_DELETE) { 993 dmsg_msg_free(msg); 994 msg = NULL; 995 } else { 996 /*state->txcmd |= DMSGF_DELETE;*/ 997 msg->state = state; 998 msg->router = state->router; 999 msg->any.head.msgid = state->msgid; 1000 msg->any.head.cmd |= DMSGF_ABORT | 1001 DMSGF_DELETE; 1002 } 1003 } else if ((state = RB_ROOT(&iocom->router->statewr_tree)) != 1004 NULL) { 1005 /* 1006 * Active local transactions are still present. 1007 * Simulate the other end sending us a DELETE. 1008 */ 1009 if (state->rxcmd & DMSGF_DELETE) { 1010 dmsg_msg_free(msg); 1011 msg = NULL; 1012 } else { 1013 msg->state = state; 1014 msg->router = state->router; 1015 msg->any.head.msgid = state->msgid; 1016 msg->any.head.cmd |= DMSGF_ABORT | 1017 DMSGF_DELETE | 1018 DMSGF_REPLY; 1019 if ((state->rxcmd & DMSGF_CREATE) == 0) { 1020 msg->any.head.cmd |= 1021 DMSGF_CREATE; 1022 } 1023 } 1024 } else { 1025 /* 1026 * No active local or remote transactions remain. 1027 * Generate a final LNK_ERROR and flag EOF. 1028 */ 1029 msg->state = NULL; 1030 iocom->flags |= DMSG_IOCOMF_EOF; 1031 fprintf(stderr, "EOF ON SOCKET %d\n", iocom->sock_fd); 1032 } 1033 pthread_mutex_unlock(&iocom->mtx); 1034 1035 /* 1036 * For the iocom error case we want to set RWORK to indicate 1037 * that more messages might be pending. 1038 * 1039 * It is possible to return NULL when there is more work to 1040 * do because each message has to be DELETEd in both 1041 * directions before we continue on with the next (though 1042 * this could be optimized). The transmit direction will 1043 * re-set RWORK. 1044 */ 1045 if (msg) 1046 iocom->flags |= DMSG_IOCOMF_RWORK; 1047 } else if (msg == NULL) { 1048 /* 1049 * Insufficient data received to finish building the message, 1050 * set RREQ and return NULL. 1051 * 1052 * Leave ioq->msg intact. 1053 * Leave the FIFO intact. 1054 */ 1055 iocom->flags |= DMSG_IOCOMF_RREQ; 1056 } else { 1057 /* 1058 * Return msg. 1059 * 1060 * The fifo has already been advanced past the message. 1061 * Trivially reset the FIFO indices if possible. 1062 * 1063 * clear the FIFO if it is now empty and set RREQ to wait 1064 * for more from the socket. If the FIFO is not empty set 1065 * TWORK to bypass the poll so we loop immediately. 1066 */ 1067 if (ioq->fifo_beg == ioq->fifo_cdx && 1068 ioq->fifo_cdn == ioq->fifo_end) { 1069 iocom->flags |= DMSG_IOCOMF_RREQ; 1070 ioq->fifo_cdx = 0; 1071 ioq->fifo_cdn = 0; 1072 ioq->fifo_beg = 0; 1073 ioq->fifo_end = 0; 1074 } else { 1075 iocom->flags |= DMSG_IOCOMF_RWORK; 1076 } 1077 ioq->state = DMSG_MSGQ_STATE_HEADER1; 1078 ioq->msg = NULL; 1079 } 1080 return (msg); 1081 } 1082 1083 /* 1084 * Calculate the header and data crc's and write a low-level message to 1085 * the connection. If aux_crc is non-zero the aux_data crc is already 1086 * assumed to have been set. 1087 * 1088 * A non-NULL msg is added to the queue but not necessarily flushed. 1089 * Calling this function with msg == NULL will get a flush going. 1090 * 1091 * Caller must hold iocom->mtx. 1092 */ 1093 void 1094 dmsg_iocom_flush1(dmsg_iocom_t *iocom) 1095 { 1096 dmsg_ioq_t *ioq = &iocom->ioq_tx; 1097 dmsg_msg_t *msg; 1098 uint32_t xcrc32; 1099 int hbytes; 1100 dmsg_msg_queue_t tmpq; 1101 1102 iocom->flags &= ~(DMSG_IOCOMF_WREQ | DMSG_IOCOMF_WWORK); 1103 TAILQ_INIT(&tmpq); 1104 pthread_mutex_lock(&iocom->mtx); 1105 while ((msg = TAILQ_FIRST(&iocom->router->txmsgq)) != NULL) { 1106 TAILQ_REMOVE(&iocom->router->txmsgq, msg, qentry); 1107 TAILQ_INSERT_TAIL(&tmpq, msg, qentry); 1108 } 1109 pthread_mutex_unlock(&iocom->mtx); 1110 1111 while ((msg = TAILQ_FIRST(&tmpq)) != NULL) { 1112 /* 1113 * Process terminal connection errors. 1114 */ 1115 TAILQ_REMOVE(&tmpq, msg, qentry); 1116 if (ioq->error) { 1117 TAILQ_INSERT_TAIL(&ioq->msgq, msg, qentry); 1118 ++ioq->msgcount; 1119 continue; 1120 } 1121 1122 /* 1123 * Finish populating the msg fields. The salt ensures that 1124 * the iv[] array is ridiculously randomized and we also 1125 * re-seed our PRNG every 32768 messages just to be sure. 1126 */ 1127 msg->any.head.magic = DMSG_HDR_MAGIC; 1128 msg->any.head.salt = (random() << 8) | (ioq->seq & 255); 1129 ++ioq->seq; 1130 if ((ioq->seq & 32767) == 0) 1131 srandomdev(); 1132 1133 /* 1134 * Calculate aux_crc if 0, then calculate hdr_crc. 1135 */ 1136 if (msg->aux_size && msg->any.head.aux_crc == 0) { 1137 assert((msg->aux_size & DMSG_ALIGNMASK) == 0); 1138 xcrc32 = dmsg_icrc32(msg->aux_data, msg->aux_size); 1139 msg->any.head.aux_crc = xcrc32; 1140 } 1141 msg->any.head.aux_bytes = msg->aux_size / DMSG_ALIGN; 1142 assert((msg->aux_size & DMSG_ALIGNMASK) == 0); 1143 1144 hbytes = (msg->any.head.cmd & DMSGF_SIZE) * 1145 DMSG_ALIGN; 1146 msg->any.head.hdr_crc = 0; 1147 msg->any.head.hdr_crc = dmsg_icrc32(&msg->any.head, hbytes); 1148 1149 /* 1150 * Enqueue the message (the flush codes handles stream 1151 * encryption). 1152 */ 1153 TAILQ_INSERT_TAIL(&ioq->msgq, msg, qentry); 1154 ++ioq->msgcount; 1155 } 1156 dmsg_iocom_flush2(iocom); 1157 } 1158 1159 /* 1160 * Thread localized, iocom->mtx not held by caller. 1161 */ 1162 void 1163 dmsg_iocom_flush2(dmsg_iocom_t *iocom) 1164 { 1165 dmsg_ioq_t *ioq = &iocom->ioq_tx; 1166 dmsg_msg_t *msg; 1167 ssize_t n; 1168 struct iovec iov[DMSG_IOQ_MAXIOVEC]; 1169 size_t nact; 1170 size_t hbytes; 1171 size_t abytes; 1172 size_t hoff; 1173 size_t aoff; 1174 int iovcnt; 1175 1176 if (ioq->error) { 1177 dmsg_iocom_drain(iocom); 1178 return; 1179 } 1180 1181 /* 1182 * Pump messages out the connection by building an iovec. 1183 * 1184 * ioq->hbytes/ioq->abytes tracks how much of the first message 1185 * in the queue has been successfully written out, so we can 1186 * resume writing. 1187 */ 1188 iovcnt = 0; 1189 nact = 0; 1190 hoff = ioq->hbytes; 1191 aoff = ioq->abytes; 1192 1193 TAILQ_FOREACH(msg, &ioq->msgq, qentry) { 1194 hbytes = (msg->any.head.cmd & DMSGF_SIZE) * 1195 DMSG_ALIGN; 1196 abytes = msg->aux_size; 1197 assert(hoff <= hbytes && aoff <= abytes); 1198 1199 if (hoff < hbytes) { 1200 iov[iovcnt].iov_base = (char *)&msg->any.head + hoff; 1201 iov[iovcnt].iov_len = hbytes - hoff; 1202 nact += hbytes - hoff; 1203 ++iovcnt; 1204 if (iovcnt == DMSG_IOQ_MAXIOVEC) 1205 break; 1206 } 1207 if (aoff < abytes) { 1208 assert(msg->aux_data != NULL); 1209 iov[iovcnt].iov_base = (char *)msg->aux_data + aoff; 1210 iov[iovcnt].iov_len = abytes - aoff; 1211 nact += abytes - aoff; 1212 ++iovcnt; 1213 if (iovcnt == DMSG_IOQ_MAXIOVEC) 1214 break; 1215 } 1216 hoff = 0; 1217 aoff = 0; 1218 } 1219 if (iovcnt == 0) 1220 return; 1221 1222 /* 1223 * Encrypt and write the data. The crypto code will move the 1224 * data into the fifo and adjust the iov as necessary. If 1225 * encryption is disabled the iov is left alone. 1226 * 1227 * May return a smaller iov (thus a smaller n), with aggregated 1228 * chunks. May reduce nmax to what fits in the FIFO. 1229 * 1230 * This function sets nact to the number of original bytes now 1231 * encrypted, adding to the FIFO some number of bytes that might 1232 * be greater depending on the crypto mechanic. iov[] is adjusted 1233 * to point at the FIFO if necessary. 1234 * 1235 * NOTE: The return value from the writev() is the post-encrypted 1236 * byte count, not the plaintext count. 1237 */ 1238 if (iocom->flags & DMSG_IOCOMF_CRYPTED) { 1239 /* 1240 * Make sure the FIFO has a reasonable amount of space 1241 * left (if not completely full). 1242 */ 1243 if (ioq->fifo_beg > sizeof(ioq->buf) / 2 && 1244 sizeof(ioq->buf) - ioq->fifo_end >= DMSG_ALIGN * 2) { 1245 bcopy(ioq->buf + ioq->fifo_beg, ioq->buf, 1246 ioq->fifo_end - ioq->fifo_beg); 1247 ioq->fifo_cdx -= ioq->fifo_beg; 1248 ioq->fifo_cdn -= ioq->fifo_beg; 1249 ioq->fifo_end -= ioq->fifo_beg; 1250 ioq->fifo_beg = 0; 1251 } 1252 1253 iovcnt = dmsg_crypto_encrypt(iocom, ioq, iov, iovcnt, &nact); 1254 n = writev(iocom->sock_fd, iov, iovcnt); 1255 if (n > 0) { 1256 ioq->fifo_beg += n; 1257 ioq->fifo_cdn += n; 1258 ioq->fifo_cdx += n; 1259 if (ioq->fifo_beg == ioq->fifo_end) { 1260 ioq->fifo_beg = 0; 1261 ioq->fifo_cdn = 0; 1262 ioq->fifo_cdx = 0; 1263 ioq->fifo_end = 0; 1264 } 1265 } 1266 } else { 1267 n = writev(iocom->sock_fd, iov, iovcnt); 1268 if (n > 0) 1269 nact = n; 1270 else 1271 nact = 0; 1272 } 1273 1274 /* 1275 * Clean out the transmit queue based on what we successfully 1276 * sent (nact is the plaintext count). ioq->hbytes/abytes 1277 * represents the portion of the first message previously sent. 1278 */ 1279 while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) { 1280 hbytes = (msg->any.head.cmd & DMSGF_SIZE) * 1281 DMSG_ALIGN; 1282 abytes = msg->aux_size; 1283 1284 if ((size_t)nact < hbytes - ioq->hbytes) { 1285 ioq->hbytes += nact; 1286 nact = 0; 1287 break; 1288 } 1289 nact -= hbytes - ioq->hbytes; 1290 ioq->hbytes = hbytes; 1291 if ((size_t)nact < abytes - ioq->abytes) { 1292 ioq->abytes += nact; 1293 nact = 0; 1294 break; 1295 } 1296 nact -= abytes - ioq->abytes; 1297 1298 TAILQ_REMOVE(&ioq->msgq, msg, qentry); 1299 --ioq->msgcount; 1300 ioq->hbytes = 0; 1301 ioq->abytes = 0; 1302 1303 dmsg_state_cleanuptx(msg); 1304 } 1305 assert(nact == 0); 1306 1307 /* 1308 * Process the return value from the write w/regards to blocking. 1309 */ 1310 if (n < 0) { 1311 if (errno != EINTR && 1312 errno != EINPROGRESS && 1313 errno != EAGAIN) { 1314 /* 1315 * Fatal write error 1316 */ 1317 ioq->error = DMSG_IOQ_ERROR_SOCK; 1318 dmsg_iocom_drain(iocom); 1319 } else { 1320 /* 1321 * Wait for socket buffer space 1322 */ 1323 iocom->flags |= DMSG_IOCOMF_WREQ; 1324 } 1325 } else { 1326 iocom->flags |= DMSG_IOCOMF_WREQ; 1327 } 1328 if (ioq->error) { 1329 dmsg_iocom_drain(iocom); 1330 } 1331 } 1332 1333 /* 1334 * Kill pending msgs on ioq_tx and adjust the flags such that no more 1335 * write events will occur. We don't kill read msgs because we want 1336 * the caller to pull off our contrived terminal error msg to detect 1337 * the connection failure. 1338 * 1339 * Thread localized, iocom->mtx not held by caller. 1340 */ 1341 void 1342 dmsg_iocom_drain(dmsg_iocom_t *iocom) 1343 { 1344 dmsg_ioq_t *ioq = &iocom->ioq_tx; 1345 dmsg_msg_t *msg; 1346 1347 iocom->flags &= ~(DMSG_IOCOMF_WREQ | DMSG_IOCOMF_WWORK); 1348 ioq->hbytes = 0; 1349 ioq->abytes = 0; 1350 1351 while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) { 1352 TAILQ_REMOVE(&ioq->msgq, msg, qentry); 1353 --ioq->msgcount; 1354 dmsg_state_cleanuptx(msg); 1355 } 1356 } 1357 1358 /* 1359 * Write a message to an iocom, with additional state processing. 1360 */ 1361 void 1362 dmsg_msg_write(dmsg_msg_t *msg) 1363 { 1364 dmsg_iocom_t *iocom = msg->router->iocom; 1365 dmsg_state_t *state; 1366 char dummy; 1367 1368 /* 1369 * Handle state processing, create state if necessary. 1370 */ 1371 pthread_mutex_lock(&iocom->mtx); 1372 if ((state = msg->state) != NULL) { 1373 /* 1374 * Existing transaction (could be reply). It is also 1375 * possible for this to be the first reply (CREATE is set), 1376 * in which case we populate state->txcmd. 1377 * 1378 * state->txcmd is adjusted to hold the final message cmd, 1379 * and we also be sure to set the CREATE bit here. We did 1380 * not set it in dmsg_msg_alloc() because that would have 1381 * not been serialized (state could have gotten ripped out 1382 * from under the message prior to it being transmitted). 1383 */ 1384 if ((msg->any.head.cmd & (DMSGF_CREATE | DMSGF_REPLY)) == 1385 DMSGF_CREATE) { 1386 state->txcmd = msg->any.head.cmd & ~DMSGF_DELETE; 1387 } 1388 msg->any.head.msgid = state->msgid; 1389 assert(((state->txcmd ^ msg->any.head.cmd) & DMSGF_REPLY) == 0); 1390 if (msg->any.head.cmd & DMSGF_CREATE) 1391 state->txcmd = msg->any.head.cmd & ~DMSGF_DELETE; 1392 } else { 1393 msg->any.head.msgid = 0; 1394 /* XXX set spanid by router */ 1395 } 1396 msg->any.head.source = 0; 1397 msg->any.head.target = msg->router->target; 1398 1399 /* 1400 * Queue it for output, wake up the I/O pthread. Note that the 1401 * I/O thread is responsible for generating the CRCs and encryption. 1402 */ 1403 TAILQ_INSERT_TAIL(&iocom->router->txmsgq, msg, qentry); 1404 dummy = 0; 1405 write(iocom->wakeupfds[1], &dummy, 1); /* XXX optimize me */ 1406 pthread_mutex_unlock(&iocom->mtx); 1407 } 1408 1409 /* 1410 * This is a shortcut to formulate a reply to msg with a simple error code, 1411 * It can reply to and terminate a transaction, or it can reply to a one-way 1412 * messages. A DMSG_LNK_ERROR command code is utilized to encode 1413 * the error code (which can be 0). Not all transactions are terminated 1414 * with DMSG_LNK_ERROR status (the low level only cares about the 1415 * MSGF_DELETE flag), but most are. 1416 * 1417 * Replies to one-way messages are a bit of an oxymoron but the feature 1418 * is used by the debug (DBG) protocol. 1419 * 1420 * The reply contains no extended data. 1421 */ 1422 void 1423 dmsg_msg_reply(dmsg_msg_t *msg, uint32_t error) 1424 { 1425 dmsg_iocom_t *iocom = msg->router->iocom; 1426 dmsg_state_t *state = msg->state; 1427 dmsg_msg_t *nmsg; 1428 uint32_t cmd; 1429 1430 1431 /* 1432 * Reply with a simple error code and terminate the transaction. 1433 */ 1434 cmd = DMSG_LNK_ERROR; 1435 1436 /* 1437 * Check if our direction has even been initiated yet, set CREATE. 1438 * 1439 * Check what direction this is (command or reply direction). Note 1440 * that txcmd might not have been initiated yet. 1441 * 1442 * If our direction has already been closed we just return without 1443 * doing anything. 1444 */ 1445 if (state) { 1446 if (state->txcmd & DMSGF_DELETE) 1447 return; 1448 if (state->txcmd & DMSGF_REPLY) 1449 cmd |= DMSGF_REPLY; 1450 cmd |= DMSGF_DELETE; 1451 } else { 1452 if ((msg->any.head.cmd & DMSGF_REPLY) == 0) 1453 cmd |= DMSGF_REPLY; 1454 } 1455 1456 /* 1457 * Allocate the message and associate it with the existing state. 1458 * We cannot pass MSGF_CREATE to msg_alloc() because that may 1459 * allocate new state. We have our state already. 1460 */ 1461 nmsg = dmsg_msg_alloc(iocom->router, 0, cmd, NULL, NULL); 1462 if (state) { 1463 if ((state->txcmd & DMSGF_CREATE) == 0) 1464 nmsg->any.head.cmd |= DMSGF_CREATE; 1465 } 1466 nmsg->any.head.error = error; 1467 nmsg->state = state; 1468 dmsg_msg_write(nmsg); 1469 } 1470 1471 /* 1472 * Similar to dmsg_msg_reply() but leave the transaction open. That is, 1473 * we are generating a streaming reply or an intermediate acknowledgement 1474 * of some sort as part of the higher level protocol, with more to come 1475 * later. 1476 */ 1477 void 1478 dmsg_msg_result(dmsg_msg_t *msg, uint32_t error) 1479 { 1480 dmsg_iocom_t *iocom = msg->router->iocom; 1481 dmsg_state_t *state = msg->state; 1482 dmsg_msg_t *nmsg; 1483 uint32_t cmd; 1484 1485 1486 /* 1487 * Reply with a simple error code and terminate the transaction. 1488 */ 1489 cmd = DMSG_LNK_ERROR; 1490 1491 /* 1492 * Check if our direction has even been initiated yet, set CREATE. 1493 * 1494 * Check what direction this is (command or reply direction). Note 1495 * that txcmd might not have been initiated yet. 1496 * 1497 * If our direction has already been closed we just return without 1498 * doing anything. 1499 */ 1500 if (state) { 1501 if (state->txcmd & DMSGF_DELETE) 1502 return; 1503 if (state->txcmd & DMSGF_REPLY) 1504 cmd |= DMSGF_REPLY; 1505 /* continuing transaction, do not set MSGF_DELETE */ 1506 } else { 1507 if ((msg->any.head.cmd & DMSGF_REPLY) == 0) 1508 cmd |= DMSGF_REPLY; 1509 } 1510 1511 nmsg = dmsg_msg_alloc(iocom->router, 0, cmd, NULL, NULL); 1512 if (state) { 1513 if ((state->txcmd & DMSGF_CREATE) == 0) 1514 nmsg->any.head.cmd |= DMSGF_CREATE; 1515 } 1516 nmsg->any.head.error = error; 1517 nmsg->state = state; 1518 dmsg_msg_write(nmsg); 1519 } 1520 1521 /* 1522 * Terminate a transaction given a state structure by issuing a DELETE. 1523 */ 1524 void 1525 dmsg_state_reply(dmsg_state_t *state, uint32_t error) 1526 { 1527 dmsg_msg_t *nmsg; 1528 uint32_t cmd = DMSG_LNK_ERROR | DMSGF_DELETE; 1529 1530 /* 1531 * Nothing to do if we already transmitted a delete 1532 */ 1533 if (state->txcmd & DMSGF_DELETE) 1534 return; 1535 1536 /* 1537 * Set REPLY if the other end initiated the command. Otherwise 1538 * we are the command direction. 1539 */ 1540 if (state->txcmd & DMSGF_REPLY) 1541 cmd |= DMSGF_REPLY; 1542 1543 nmsg = dmsg_msg_alloc(state->iocom->router, 0, cmd, NULL, NULL); 1544 if (state) { 1545 if ((state->txcmd & DMSGF_CREATE) == 0) 1546 nmsg->any.head.cmd |= DMSGF_CREATE; 1547 } 1548 nmsg->any.head.error = error; 1549 nmsg->state = state; 1550 dmsg_msg_write(nmsg); 1551 } 1552 1553 /************************************************************************ 1554 * TRANSACTION STATE HANDLING * 1555 ************************************************************************ 1556 * 1557 */ 1558 1559 /* 1560 * Process state tracking for a message after reception, prior to 1561 * execution. 1562 * 1563 * Called with msglk held and the msg dequeued. 1564 * 1565 * All messages are called with dummy state and return actual state. 1566 * (One-off messages often just return the same dummy state). 1567 * 1568 * May request that caller discard the message by setting *discardp to 1. 1569 * The returned state is not used in this case and is allowed to be NULL. 1570 * 1571 * -- 1572 * 1573 * These routines handle persistent and command/reply message state via the 1574 * CREATE and DELETE flags. The first message in a command or reply sequence 1575 * sets CREATE, the last message in a command or reply sequence sets DELETE. 1576 * 1577 * There can be any number of intermediate messages belonging to the same 1578 * sequence sent inbetween the CREATE message and the DELETE message, 1579 * which set neither flag. This represents a streaming command or reply. 1580 * 1581 * Any command message received with CREATE set expects a reply sequence to 1582 * be returned. Reply sequences work the same as command sequences except the 1583 * REPLY bit is also sent. Both the command side and reply side can 1584 * degenerate into a single message with both CREATE and DELETE set. Note 1585 * that one side can be streaming and the other side not, or neither, or both. 1586 * 1587 * The msgid is unique for the initiator. That is, two sides sending a new 1588 * message can use the same msgid without colliding. 1589 * 1590 * -- 1591 * 1592 * ABORT sequences work by setting the ABORT flag along with normal message 1593 * state. However, ABORTs can also be sent on half-closed messages, that is 1594 * even if the command or reply side has already sent a DELETE, as long as 1595 * the message has not been fully closed it can still send an ABORT+DELETE 1596 * to terminate the half-closed message state. 1597 * 1598 * Since ABORT+DELETEs can race we silently discard ABORT's for message 1599 * state which has already been fully closed. REPLY+ABORT+DELETEs can 1600 * also race, and in this situation the other side might have already 1601 * initiated a new unrelated command with the same message id. Since 1602 * the abort has not set the CREATE flag the situation can be detected 1603 * and the message will also be discarded. 1604 * 1605 * Non-blocking requests can be initiated with ABORT+CREATE[+DELETE]. 1606 * The ABORT request is essentially integrated into the command instead 1607 * of being sent later on. In this situation the command implementation 1608 * detects that CREATE and ABORT are both set (vs ABORT alone) and can 1609 * special-case non-blocking operation for the command. 1610 * 1611 * NOTE! Messages with ABORT set without CREATE or DELETE are considered 1612 * to be mid-stream aborts for command/reply sequences. ABORTs on 1613 * one-way messages are not supported. 1614 * 1615 * NOTE! If a command sequence does not support aborts the ABORT flag is 1616 * simply ignored. 1617 * 1618 * -- 1619 * 1620 * One-off messages (no reply expected) are sent with neither CREATE or DELETE 1621 * set. One-off messages cannot be aborted and typically aren't processed 1622 * by these routines. The REPLY bit can be used to distinguish whether a 1623 * one-off message is a command or reply. For example, one-off replies 1624 * will typically just contain status updates. 1625 */ 1626 static int 1627 dmsg_state_msgrx(dmsg_msg_t *msg) 1628 { 1629 dmsg_iocom_t *iocom = msg->router->iocom; 1630 dmsg_state_t *state; 1631 dmsg_state_t dummy; 1632 int error; 1633 1634 /* 1635 * Lock RB tree and locate existing persistent state, if any. 1636 * 1637 * If received msg is a command state is on staterd_tree. 1638 * If received msg is a reply state is on statewr_tree. 1639 */ 1640 dummy.msgid = msg->any.head.msgid; 1641 pthread_mutex_lock(&iocom->mtx); 1642 if (msg->any.head.cmd & DMSGF_REPLY) { 1643 state = RB_FIND(dmsg_state_tree, 1644 &iocom->router->statewr_tree, &dummy); 1645 } else { 1646 state = RB_FIND(dmsg_state_tree, 1647 &iocom->router->staterd_tree, &dummy); 1648 } 1649 msg->state = state; 1650 pthread_mutex_unlock(&iocom->mtx); 1651 1652 /* 1653 * Short-cut one-off or mid-stream messages (state may be NULL). 1654 */ 1655 if ((msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE | 1656 DMSGF_ABORT)) == 0) { 1657 return(0); 1658 } 1659 1660 /* 1661 * Switch on CREATE, DELETE, REPLY, and also handle ABORT from 1662 * inside the case statements. 1663 */ 1664 switch(msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE | 1665 DMSGF_REPLY)) { 1666 case DMSGF_CREATE: 1667 case DMSGF_CREATE | DMSGF_DELETE: 1668 /* 1669 * New persistant command received. 1670 */ 1671 if (state) { 1672 fprintf(stderr, "duplicate-trans %s\n", 1673 dmsg_msg_str(msg)); 1674 error = DMSG_IOQ_ERROR_TRANS; 1675 assert(0); 1676 break; 1677 } 1678 state = malloc(sizeof(*state)); 1679 bzero(state, sizeof(*state)); 1680 state->iocom = iocom; 1681 state->flags = DMSG_STATE_DYNAMIC; 1682 state->msg = msg; 1683 state->txcmd = DMSGF_REPLY; 1684 state->rxcmd = msg->any.head.cmd & ~DMSGF_DELETE; 1685 state->flags |= DMSG_STATE_INSERTED; 1686 state->msgid = msg->any.head.msgid; 1687 state->router = msg->router; 1688 msg->state = state; 1689 pthread_mutex_lock(&iocom->mtx); 1690 RB_INSERT(dmsg_state_tree, 1691 &iocom->router->staterd_tree, state); 1692 pthread_mutex_unlock(&iocom->mtx); 1693 error = 0; 1694 if (DMsgDebugOpt) { 1695 fprintf(stderr, "create state %p id=%08x on iocom staterd %p\n", 1696 state, (uint32_t)state->msgid, iocom); 1697 } 1698 break; 1699 case DMSGF_DELETE: 1700 /* 1701 * Persistent state is expected but might not exist if an 1702 * ABORT+DELETE races the close. 1703 */ 1704 if (state == NULL) { 1705 if (msg->any.head.cmd & DMSGF_ABORT) { 1706 error = DMSG_IOQ_ERROR_EALREADY; 1707 } else { 1708 fprintf(stderr, "missing-state %s\n", 1709 dmsg_msg_str(msg)); 1710 error = DMSG_IOQ_ERROR_TRANS; 1711 assert(0); 1712 } 1713 break; 1714 } 1715 1716 /* 1717 * Handle another ABORT+DELETE case if the msgid has already 1718 * been reused. 1719 */ 1720 if ((state->rxcmd & DMSGF_CREATE) == 0) { 1721 if (msg->any.head.cmd & DMSGF_ABORT) { 1722 error = DMSG_IOQ_ERROR_EALREADY; 1723 } else { 1724 fprintf(stderr, "reused-state %s\n", 1725 dmsg_msg_str(msg)); 1726 error = DMSG_IOQ_ERROR_TRANS; 1727 assert(0); 1728 } 1729 break; 1730 } 1731 error = 0; 1732 break; 1733 default: 1734 /* 1735 * Check for mid-stream ABORT command received, otherwise 1736 * allow. 1737 */ 1738 if (msg->any.head.cmd & DMSGF_ABORT) { 1739 if (state == NULL || 1740 (state->rxcmd & DMSGF_CREATE) == 0) { 1741 error = DMSG_IOQ_ERROR_EALREADY; 1742 break; 1743 } 1744 } 1745 error = 0; 1746 break; 1747 case DMSGF_REPLY | DMSGF_CREATE: 1748 case DMSGF_REPLY | DMSGF_CREATE | DMSGF_DELETE: 1749 /* 1750 * When receiving a reply with CREATE set the original 1751 * persistent state message should already exist. 1752 */ 1753 if (state == NULL) { 1754 fprintf(stderr, "no-state(r) %s\n", 1755 dmsg_msg_str(msg)); 1756 error = DMSG_IOQ_ERROR_TRANS; 1757 assert(0); 1758 break; 1759 } 1760 assert(((state->rxcmd ^ msg->any.head.cmd) & 1761 DMSGF_REPLY) == 0); 1762 state->rxcmd = msg->any.head.cmd & ~DMSGF_DELETE; 1763 error = 0; 1764 break; 1765 case DMSGF_REPLY | DMSGF_DELETE: 1766 /* 1767 * Received REPLY+ABORT+DELETE in case where msgid has 1768 * already been fully closed, ignore the message. 1769 */ 1770 if (state == NULL) { 1771 if (msg->any.head.cmd & DMSGF_ABORT) { 1772 error = DMSG_IOQ_ERROR_EALREADY; 1773 } else { 1774 fprintf(stderr, "no-state(r,d) %s\n", 1775 dmsg_msg_str(msg)); 1776 error = DMSG_IOQ_ERROR_TRANS; 1777 assert(0); 1778 } 1779 break; 1780 } 1781 1782 /* 1783 * Received REPLY+ABORT+DELETE in case where msgid has 1784 * already been reused for an unrelated message, 1785 * ignore the message. 1786 */ 1787 if ((state->rxcmd & DMSGF_CREATE) == 0) { 1788 if (msg->any.head.cmd & DMSGF_ABORT) { 1789 error = DMSG_IOQ_ERROR_EALREADY; 1790 } else { 1791 fprintf(stderr, "reused-state(r,d) %s\n", 1792 dmsg_msg_str(msg)); 1793 error = DMSG_IOQ_ERROR_TRANS; 1794 assert(0); 1795 } 1796 break; 1797 } 1798 error = 0; 1799 break; 1800 case DMSGF_REPLY: 1801 /* 1802 * Check for mid-stream ABORT reply received to sent command. 1803 */ 1804 if (msg->any.head.cmd & DMSGF_ABORT) { 1805 if (state == NULL || 1806 (state->rxcmd & DMSGF_CREATE) == 0) { 1807 error = DMSG_IOQ_ERROR_EALREADY; 1808 break; 1809 } 1810 } 1811 error = 0; 1812 break; 1813 } 1814 return (error); 1815 } 1816 1817 void 1818 dmsg_state_cleanuprx(dmsg_iocom_t *iocom, dmsg_msg_t *msg) 1819 { 1820 dmsg_state_t *state; 1821 1822 if ((state = msg->state) == NULL) { 1823 /* 1824 * Free a non-transactional message, there is no state 1825 * to worry about. 1826 */ 1827 dmsg_msg_free(msg); 1828 } else if (msg->any.head.cmd & DMSGF_DELETE) { 1829 /* 1830 * Message terminating transaction, destroy the related 1831 * state, the original message, and this message (if it 1832 * isn't the original message due to a CREATE|DELETE). 1833 */ 1834 pthread_mutex_lock(&iocom->mtx); 1835 state->rxcmd |= DMSGF_DELETE; 1836 if (state->txcmd & DMSGF_DELETE) { 1837 if (state->msg == msg) 1838 state->msg = NULL; 1839 assert(state->flags & DMSG_STATE_INSERTED); 1840 if (state->rxcmd & DMSGF_REPLY) { 1841 assert(msg->any.head.cmd & DMSGF_REPLY); 1842 RB_REMOVE(dmsg_state_tree, 1843 &iocom->router->statewr_tree, state); 1844 } else { 1845 assert((msg->any.head.cmd & DMSGF_REPLY) == 0); 1846 RB_REMOVE(dmsg_state_tree, 1847 &iocom->router->staterd_tree, state); 1848 } 1849 state->flags &= ~DMSG_STATE_INSERTED; 1850 dmsg_state_free(state); 1851 } else { 1852 ; 1853 } 1854 pthread_mutex_unlock(&iocom->mtx); 1855 dmsg_msg_free(msg); 1856 } else if (state->msg != msg) { 1857 /* 1858 * Message not terminating transaction, leave state intact 1859 * and free message if it isn't the CREATE message. 1860 */ 1861 dmsg_msg_free(msg); 1862 } 1863 } 1864 1865 static void 1866 dmsg_state_cleanuptx(dmsg_msg_t *msg) 1867 { 1868 dmsg_iocom_t *iocom = msg->router->iocom; 1869 dmsg_state_t *state; 1870 1871 if ((state = msg->state) == NULL) { 1872 dmsg_msg_free(msg); 1873 } else if (msg->any.head.cmd & DMSGF_DELETE) { 1874 pthread_mutex_lock(&iocom->mtx); 1875 state->txcmd |= DMSGF_DELETE; 1876 if (state->rxcmd & DMSGF_DELETE) { 1877 if (state->msg == msg) 1878 state->msg = NULL; 1879 assert(state->flags & DMSG_STATE_INSERTED); 1880 if (state->txcmd & DMSGF_REPLY) { 1881 assert(msg->any.head.cmd & DMSGF_REPLY); 1882 RB_REMOVE(dmsg_state_tree, 1883 &iocom->router->staterd_tree, state); 1884 } else { 1885 assert((msg->any.head.cmd & DMSGF_REPLY) == 0); 1886 RB_REMOVE(dmsg_state_tree, 1887 &iocom->router->statewr_tree, state); 1888 } 1889 state->flags &= ~DMSG_STATE_INSERTED; 1890 dmsg_state_free(state); 1891 } else { 1892 ; 1893 } 1894 pthread_mutex_unlock(&iocom->mtx); 1895 dmsg_msg_free(msg); 1896 } else if (state->msg != msg) { 1897 dmsg_msg_free(msg); 1898 } 1899 } 1900 1901 /* 1902 * Called with iocom locked 1903 */ 1904 void 1905 dmsg_state_free(dmsg_state_t *state) 1906 { 1907 dmsg_iocom_t *iocom = state->iocom; 1908 dmsg_msg_t *msg; 1909 char dummy; 1910 1911 if (DMsgDebugOpt) { 1912 fprintf(stderr, "terminate state %p id=%08x\n", 1913 state, (uint32_t)state->msgid); 1914 } 1915 assert(state->any.any == NULL); 1916 msg = state->msg; 1917 state->msg = NULL; 1918 if (msg) 1919 dmsg_msg_free_locked(msg); 1920 free(state); 1921 1922 /* 1923 * When an iocom error is present we are trying to close down the 1924 * iocom, but we have to wait for all states to terminate before 1925 * we can do so. The iocom rx code will terminate the receive side 1926 * for all transactions by simulating incoming DELETE messages, 1927 * but the state doesn't go away until both sides are terminated. 1928 * 1929 * We may have to wake up the rx code. 1930 */ 1931 if (iocom->ioq_rx.error && 1932 RB_EMPTY(&iocom->router->staterd_tree) && 1933 RB_EMPTY(&iocom->router->statewr_tree)) { 1934 dummy = 0; 1935 write(iocom->wakeupfds[1], &dummy, 1); 1936 } 1937 } 1938 1939 /************************************************************************ 1940 * ROUTING * 1941 ************************************************************************ 1942 * 1943 * Incoming messages are routed by their spanid, matched up against 1944 * outgoing LNK_SPANs managed by h2span_relay structures (see msg_lnk.c). 1945 * Any replies run through the same router. 1946 * 1947 * Originated messages are routed by their spanid, matched up against 1948 * incoming LNK_SPANs managed by h2span_link structures (see msg_lnk.c). 1949 * Replies come back through the same route. 1950 * 1951 * Keep in mind that ALL MESSAGE TRAFFIC pertaining to a particular 1952 * transaction runs through the same route. Commands and replies both. 1953 * 1954 * An originated message will use a different routing spanid to 1955 * reach a target node than a message which originates from that node. 1956 * They might use the same physical pipes (each pipe can have multiple 1957 * SPANs and RELAYs), but the routes are distinct from the perspective 1958 * of the router. 1959 */ 1960 dmsg_router_t * 1961 dmsg_router_alloc(void) 1962 { 1963 dmsg_router_t *router; 1964 1965 router = dmsg_alloc(sizeof(*router)); 1966 TAILQ_INIT(&router->txmsgq); 1967 return (router); 1968 } 1969 1970 void 1971 dmsg_router_connect(dmsg_router_t *router) 1972 { 1973 dmsg_router_t *tmp; 1974 1975 assert(router->link || router->relay); 1976 assert((router->flags & DMSG_ROUTER_CONNECTED) == 0); 1977 1978 pthread_mutex_lock(&router_mtx); 1979 if (router->link) 1980 tmp = RB_INSERT(dmsg_router_tree, &router_ltree, router); 1981 else 1982 tmp = RB_INSERT(dmsg_router_tree, &router_rtree, router); 1983 assert(tmp == NULL); 1984 router->flags |= DMSG_ROUTER_CONNECTED; 1985 pthread_mutex_unlock(&router_mtx); 1986 } 1987 1988 void 1989 dmsg_router_disconnect(dmsg_router_t **routerp) 1990 { 1991 dmsg_router_t *router; 1992 1993 router = *routerp; 1994 assert(router->link || router->relay); 1995 assert(router->flags & DMSG_ROUTER_CONNECTED); 1996 1997 pthread_mutex_lock(&router_mtx); 1998 if (router->link) 1999 RB_REMOVE(dmsg_router_tree, &router_ltree, router); 2000 else 2001 RB_REMOVE(dmsg_router_tree, &router_rtree, router); 2002 router->flags &= ~DMSG_ROUTER_CONNECTED; 2003 *routerp = NULL; 2004 pthread_mutex_unlock(&router_mtx); 2005 } 2006 2007 #if 0 2008 /* 2009 * XXX 2010 */ 2011 dmsg_router_t * 2012 dmsg_route_msg(dmsg_msg_t *msg) 2013 { 2014 } 2015 #endif 2016 2017 /* 2018 * This swaps endian for a hammer2_msg_hdr. Note that the extended 2019 * header is not adjusted, just the core header. 2020 */ 2021 void 2022 dmsg_bswap_head(dmsg_hdr_t *head) 2023 { 2024 head->magic = bswap16(head->magic); 2025 head->reserved02 = bswap16(head->reserved02); 2026 head->salt = bswap32(head->salt); 2027 2028 head->msgid = bswap64(head->msgid); 2029 head->source = bswap64(head->source); 2030 head->target = bswap64(head->target); 2031 2032 head->cmd = bswap32(head->cmd); 2033 head->aux_crc = bswap32(head->aux_crc); 2034 head->aux_bytes = bswap32(head->aux_bytes); 2035 head->error = bswap32(head->error); 2036 head->aux_descr = bswap64(head->aux_descr); 2037 head->reserved38= bswap32(head->reserved38); 2038 head->hdr_crc = bswap32(head->hdr_crc); 2039 } 2040