1 /* $NetBSD: xenbus_xs.c,v 1.23 2012/11/28 16:26:59 royger Exp $ */ 2 /****************************************************************************** 3 * xenbus_xs.c 4 * 5 * This is the kernel equivalent of the "xs" library. We don't need everything 6 * and we use xenbus_comms for communication. 7 * 8 * Copyright (C) 2005 Rusty Russell, IBM Corporation 9 * 10 * This file may be distributed separately from the Linux kernel, or 11 * incorporated into other software packages, subject to the following license: 12 * 13 * Permission is hereby granted, free of charge, to any person obtaining a copy 14 * of this source file (the "Software"), to deal in the Software without 15 * restriction, including without limitation the rights to use, copy, modify, 16 * merge, publish, distribute, sublicense, and/or sell copies of the Software, 17 * and to permit persons to whom the Software is furnished to do so, subject to 18 * the following conditions: 19 * 20 * The above copyright notice and this permission notice shall be included in 21 * all copies or substantial portions of the Software. 22 * 23 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 24 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 25 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 26 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 27 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 28 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 29 * IN THE SOFTWARE. 30 */ 31 32 #include <sys/cdefs.h> 33 __KERNEL_RCSID(0, "$NetBSD: xenbus_xs.c,v 1.23 2012/11/28 16:26:59 royger Exp $"); 34 35 #if 0 36 #define DPRINTK(fmt, args...) \ 37 printf("xenbus_xs (%s:%d) " fmt ".\n", __func__, __LINE__, ##args) 38 #else 39 #define DPRINTK(fmt, args...) ((void)0) 40 #endif 41 42 #include <sys/types.h> 43 #include <sys/null.h> 44 #include <sys/errno.h> 45 #include <sys/malloc.h> 46 #include <sys/systm.h> 47 #include <sys/param.h> 48 #include <sys/proc.h> 49 #include <sys/mutex.h> 50 #include <sys/kthread.h> 51 52 #include <xen/xen.h> /* for xendomain_is_dom0() */ 53 #include <xen/xenbus.h> 54 #include "xenbus_comms.h" 55 56 #define streq(a, b) (strcmp((a), (b)) == 0) 57 58 struct xs_stored_msg { 59 SIMPLEQ_ENTRY(xs_stored_msg) msg_next; 60 61 struct xsd_sockmsg hdr; 62 63 union { 64 /* Queued replies. */ 65 struct { 66 char *body; 67 } reply; 68 69 /* Queued watch events. */ 70 struct { 71 struct xenbus_watch *handle; 72 char **vec; 73 unsigned int vec_size; 74 } watch; 75 } u; 76 }; 77 78 struct xs_handle { 79 /* A list of replies. Currently only one will ever be outstanding. */ 80 SIMPLEQ_HEAD(, xs_stored_msg) reply_list; 81 kmutex_t reply_lock; 82 kcondvar_t reply_cv; 83 kmutex_t xs_lock; /* serialize access to xenstore */ 84 int suspend_spl; 85 86 }; 87 88 static struct xs_handle xs_state; 89 90 /* List of registered watches, and a lock to protect it. */ 91 static SLIST_HEAD(, xenbus_watch) watches; 92 static kmutex_t watches_lock; 93 94 /* List of pending watch callback events, and a lock to protect it. */ 95 static SIMPLEQ_HEAD(, xs_stored_msg) watch_events; 96 static kmutex_t watch_events_lock; 97 static kcondvar_t watch_cv; 98 99 static int 100 get_error(const char *errorstring) 101 { 102 unsigned int i; 103 104 for (i = 0; !streq(errorstring, xsd_errors[i].errstring); i++) { 105 if (i == (sizeof(xsd_errors) / sizeof(xsd_errors[0]) - 1)) { 106 printf( 107 "XENBUS xen store gave: unknown error %s", 108 errorstring); 109 return EINVAL; 110 } 111 } 112 return xsd_errors[i].errnum; 113 } 114 115 static void * 116 read_reply(enum xsd_sockmsg_type *type, unsigned int *len) 117 { 118 struct xs_stored_msg *msg; 119 char *body; 120 121 mutex_enter(&xs_state.reply_lock); 122 while (SIMPLEQ_EMPTY(&xs_state.reply_list)) { 123 cv_wait(&xs_state.reply_cv, &xs_state.reply_lock); 124 } 125 msg = SIMPLEQ_FIRST(&xs_state.reply_list); 126 SIMPLEQ_REMOVE_HEAD(&xs_state.reply_list, msg_next); 127 mutex_exit(&xs_state.reply_lock); 128 129 *type = msg->hdr.type; 130 if (len) 131 *len = msg->hdr.len; 132 body = msg->u.reply.body; 133 DPRINTK("read_reply: type %d body %s", 134 msg->hdr.type, body); 135 136 free(msg, M_DEVBUF); 137 138 return body; 139 } 140 141 #if 0 142 /* Emergency write. */ 143 void 144 xenbus_debug_write(const char *str, unsigned int count) 145 { 146 struct xsd_sockmsg msg = { 0 }; 147 148 msg.type = XS_DEBUG; 149 msg.len = sizeof("print") + count + 1; 150 151 xb_write(&msg, sizeof(msg)); 152 xb_write("print", sizeof("print")); 153 xb_write(str, count); 154 xb_write("", 1); 155 } 156 #endif 157 158 int 159 xenbus_dev_request_and_reply(struct xsd_sockmsg *msg, void**reply) 160 { 161 int err = 0, s; 162 163 s = spltty(); 164 mutex_enter(&xs_state.xs_lock); 165 err = xb_write(msg, sizeof(*msg) + msg->len); 166 if (err) { 167 msg->type = XS_ERROR; 168 *reply = NULL; 169 } else { 170 *reply = read_reply(&msg->type, &msg->len); 171 } 172 mutex_exit(&xs_state.xs_lock); 173 splx(s); 174 175 return err; 176 } 177 178 /* Send message to xs, get kmalloc'ed reply. ERR_PTR() on error. */ 179 static int 180 xs_talkv(struct xenbus_transaction *t, 181 enum xsd_sockmsg_type type, 182 const struct iovec *iovec, 183 unsigned int num_vecs, 184 unsigned int *len, 185 char **retbuf) 186 { 187 struct xsd_sockmsg msg; 188 unsigned int i; 189 int err, s; 190 void *ret; 191 192 msg.tx_id = (uint32_t)(unsigned long)t; 193 msg.req_id = 0; 194 msg.type = type; 195 msg.len = 0; 196 for (i = 0; i < num_vecs; i++) 197 msg.len += iovec[i].iov_len; 198 199 s = spltty(); 200 mutex_enter(&xs_state.xs_lock); 201 202 DPRINTK("write msg"); 203 err = xb_write(&msg, sizeof(msg)); 204 DPRINTK("write msg err %d", err); 205 if (err) { 206 mutex_exit(&xs_state.xs_lock); 207 splx(s); 208 return (err); 209 } 210 211 for (i = 0; i < num_vecs; i++) { 212 DPRINTK("write iovect"); 213 err = xb_write(iovec[i].iov_base, iovec[i].iov_len); 214 DPRINTK("write iovect err %d", err); 215 if (err) { 216 mutex_exit(&xs_state.xs_lock); 217 splx(s); 218 return (err); 219 } 220 } 221 222 DPRINTK("read"); 223 ret = read_reply(&msg.type, len); 224 DPRINTK("read done"); 225 226 mutex_exit(&xs_state.xs_lock); 227 splx(s); 228 229 if (msg.type == XS_ERROR) { 230 err = get_error(ret); 231 free(ret, M_DEVBUF); 232 return (err); 233 } 234 235 KASSERT(msg.type == type); 236 if (retbuf != NULL) 237 *retbuf = ret; 238 else 239 free(ret, M_DEVBUF); 240 return 0; 241 } 242 243 /* Simplified version of xs_talkv: single message. */ 244 static int 245 xs_single(struct xenbus_transaction *t, 246 enum xsd_sockmsg_type type, 247 const char *string, 248 unsigned int *len, 249 char **ret) 250 { 251 struct iovec iovec; 252 253 /* xs_talkv only reads iovec */ 254 iovec.iov_base = __UNCONST(string); 255 iovec.iov_len = strlen(string) + 1; 256 return xs_talkv(t, type, &iovec, 1, len, ret); 257 } 258 259 static unsigned int 260 count_strings(const char *strings, unsigned int len) 261 { 262 unsigned int num; 263 const char *p; 264 265 for (p = strings, num = 0; p < strings + len; p += strlen(p) + 1) 266 num++; 267 268 return num; 269 } 270 271 /* Return the path to dir with /name appended. Buffer must be kfree()'ed. */ 272 static char * 273 join(const char *dir, const char *name) 274 { 275 char *buffer; 276 277 buffer = malloc(strlen(dir) + strlen("/") + strlen(name) + 1, 278 M_DEVBUF, M_NOWAIT); 279 if (buffer == NULL) 280 return NULL; 281 282 strcpy(buffer, dir); 283 if (!streq(name, "")) { 284 strcat(buffer, "/"); 285 strcat(buffer, name); 286 } 287 288 return buffer; 289 } 290 291 static char ** 292 split(char *strings, unsigned int len, unsigned int *num) 293 { 294 char *p, **ret; 295 296 /* Count the strings. */ 297 *num = count_strings(strings, len); 298 299 /* Transfer to one big alloc for easy freeing. */ 300 ret = malloc(*num * sizeof(char *) + len, M_DEVBUF, M_NOWAIT); 301 if (!ret) { 302 free(strings, M_DEVBUF); 303 return NULL; 304 } 305 memcpy(&ret[*num], strings, len); 306 free(strings, M_DEVBUF); 307 308 strings = (char *)&ret[*num]; 309 for (p = strings, *num = 0; p < strings + len; p += strlen(p) + 1) 310 ret[(*num)++] = p; 311 312 return ret; 313 } 314 315 int 316 xenbus_directory(struct xenbus_transaction *t, 317 const char *dir, const char *node, unsigned int *num, 318 char ***retbuf) 319 { 320 char *strings, *path; 321 unsigned int len; 322 int err; 323 324 path = join(dir, node); 325 if (path == NULL) 326 return ENOMEM; 327 328 err = xs_single(t, XS_DIRECTORY, path, &len, &strings); 329 DPRINTK("xs_single %d %d", err, len); 330 free(path, M_DEVBUF); 331 if (err) 332 return err; 333 334 DPRINTK("xs_single strings %s", strings); 335 *retbuf = split(strings, len, num); 336 if (*retbuf == NULL) 337 return ENOMEM; 338 return 0; 339 } 340 341 /* Check if a path exists. Return 1 if it does. */ 342 int 343 xenbus_exists(struct xenbus_transaction *t, 344 const char *dir, const char *node) 345 { 346 char **d; 347 int dir_n, err; 348 349 err = xenbus_directory(t, dir, node, &dir_n, &d); 350 if (err) 351 return 0; 352 free(d, M_DEVBUF); 353 return 1; 354 } 355 356 /* Get the value of a single file. 357 * Returns a kmalloced value: call free() on it after use. 358 * len indicates length in bytes. 359 */ 360 int 361 xenbus_read(struct xenbus_transaction *t, 362 const char *dir, const char *node, unsigned int *len, 363 char **ret) 364 { 365 char *path; 366 int err; 367 368 path = join(dir, node); 369 if (path == NULL) 370 return ENOMEM; 371 372 err = xs_single(t, XS_READ, path, len, ret); 373 free(path, M_DEVBUF); 374 return err; 375 } 376 377 /* Read a node and convert it to unsigned long. */ 378 int 379 xenbus_read_ul(struct xenbus_transaction *t, 380 const char *dir, const char *node, unsigned long *val, 381 int base) 382 { 383 char *string, *ep; 384 int err; 385 386 err = xenbus_read(t, dir, node, NULL, &string); 387 if (err) 388 return err; 389 *val = strtoul(string, &ep, base); 390 if (*ep != '\0') { 391 free(string, M_DEVBUF); 392 return EFTYPE; 393 } 394 free(string, M_DEVBUF); 395 return 0; 396 } 397 398 /* Read a node and convert it to unsigned long long. */ 399 int 400 xenbus_read_ull(struct xenbus_transaction *t, 401 const char *dir, const char *node, unsigned long long *val, 402 int base) 403 { 404 char *string, *ep; 405 int err; 406 407 err = xenbus_read(t, dir, node, NULL, &string); 408 if (err) 409 return err; 410 *val = strtoull(string, &ep, base); 411 if (*ep != '\0') { 412 free(string, M_DEVBUF); 413 return EFTYPE; 414 } 415 free(string, M_DEVBUF); 416 return 0; 417 } 418 419 /* Write the value of a single file. 420 * Returns -err on failure. 421 */ 422 int 423 xenbus_write(struct xenbus_transaction *t, 424 const char *dir, const char *node, const char *string) 425 { 426 const char *path; 427 struct iovec iovec[2]; 428 int ret; 429 430 path = join(dir, node); 431 if (path == NULL) 432 return ENOMEM; 433 434 /* xs_talkv only reads iovec */ 435 iovec[0].iov_base = __UNCONST(path); 436 iovec[0].iov_len = strlen(path) + 1; 437 iovec[1].iov_base = __UNCONST(string); 438 iovec[1].iov_len = strlen(string); 439 440 ret = xs_talkv(t, XS_WRITE, iovec, 2, NULL, NULL); 441 return ret; 442 } 443 444 /* Create a new directory. */ 445 int 446 xenbus_mkdir(struct xenbus_transaction *t, 447 const char *dir, const char *node) 448 { 449 char *path; 450 int ret; 451 452 path = join(dir, node); 453 if (path == NULL) 454 return ENOMEM; 455 456 ret = xs_single(t, XS_MKDIR, path, NULL, NULL); 457 free(path, M_DEVBUF); 458 return ret; 459 } 460 461 /* Destroy a file or directory (directories must be empty). */ 462 int xenbus_rm(struct xenbus_transaction *t, const char *dir, const char *node) 463 { 464 char *path; 465 int ret; 466 467 path = join(dir, node); 468 if (path == NULL) 469 return ENOMEM; 470 471 ret = xs_single(t, XS_RM, path, NULL, NULL); 472 free(path, M_DEVBUF); 473 return ret; 474 } 475 476 /* Start a transaction: changes by others will not be seen during this 477 * transaction, and changes will not be visible to others until end. 478 * MUST BE CALLED AT IPL_TTY ! 479 */ 480 struct xenbus_transaction * 481 xenbus_transaction_start(void) 482 { 483 char *id_str; 484 unsigned long id, err; 485 486 err = xs_single(NULL, XS_TRANSACTION_START, "", NULL, &id_str); 487 if (err) { 488 return NULL; 489 } 490 491 id = strtoul(id_str, NULL, 0); 492 free(id_str, M_DEVBUF); 493 494 return (struct xenbus_transaction *)id; 495 } 496 497 /* End a transaction. 498 * If abandon is true, transaction is discarded instead of committed. 499 * MUST BE CALLED AT IPL_TTY ! 500 */ 501 int xenbus_transaction_end(struct xenbus_transaction *t, int abort) 502 { 503 char abortstr[2]; 504 int err; 505 506 if (abort) 507 strcpy(abortstr, "F"); 508 else 509 strcpy(abortstr, "T"); 510 511 err = xs_single(t, XS_TRANSACTION_END, abortstr, NULL, NULL); 512 513 return err; 514 } 515 516 /* Single read and scanf: returns -errno or num scanned. */ 517 int 518 xenbus_scanf(struct xenbus_transaction *t, 519 const char *dir, const char *node, const char *fmt, ...) 520 { 521 va_list ap; 522 int ret; 523 char *val; 524 525 ret = xenbus_read(t, dir, node, NULL, &val); 526 if (ret) 527 return ret; 528 529 va_start(ap, fmt); 530 //ret = vsscanf(val, fmt, ap); 531 ret = ENXIO; 532 printf("xb_scanf format %s in %s\n", fmt, val); 533 va_end(ap); 534 free(val, M_DEVBUF); 535 return ret; 536 } 537 538 /* Single printf and write: returns -errno or 0. */ 539 int 540 xenbus_printf(struct xenbus_transaction *t, 541 const char *dir, const char *node, const char *fmt, ...) 542 { 543 va_list ap; 544 int ret; 545 #define PRINTF_BUFFER_SIZE 4096 546 char *printf_buffer; 547 548 printf_buffer = malloc(PRINTF_BUFFER_SIZE, M_DEVBUF, M_NOWAIT); 549 if (printf_buffer == NULL) 550 return ENOMEM; 551 552 va_start(ap, fmt); 553 ret = vsnprintf(printf_buffer, PRINTF_BUFFER_SIZE, fmt, ap); 554 va_end(ap); 555 556 KASSERT(ret < PRINTF_BUFFER_SIZE); 557 ret = xenbus_write(t, dir, node, printf_buffer); 558 559 free(printf_buffer, M_DEVBUF); 560 561 return ret; 562 } 563 564 /* Takes tuples of names, scanf-style args, and void **, NULL terminated. */ 565 int 566 xenbus_gather(struct xenbus_transaction *t, const char *dir, ...) 567 { 568 va_list ap; 569 const char *name; 570 int ret = 0; 571 572 va_start(ap, dir); 573 while (ret == 0 && (name = va_arg(ap, char *)) != NULL) { 574 const char *fmt = va_arg(ap, char *); 575 void *result = va_arg(ap, void *); 576 char *p; 577 578 ret = xenbus_read(t, dir, name, NULL, &p); 579 if (ret) 580 break; 581 if (fmt) { 582 // XXX if (sscanf(p, fmt, result) == 0) 583 ret = -EINVAL; 584 free(p, M_DEVBUF); 585 } else 586 *(char **)result = p; 587 } 588 va_end(ap); 589 return ret; 590 } 591 592 static int 593 xs_watch(const char *path, const char *token) 594 { 595 struct iovec iov[2]; 596 597 /* xs_talkv only reads iovec */ 598 iov[0].iov_base = __UNCONST(path); 599 iov[0].iov_len = strlen(path) + 1; 600 iov[1].iov_base = __UNCONST(token); 601 iov[1].iov_len = strlen(token) + 1; 602 603 return xs_talkv(NULL, XS_WATCH, iov, 2, NULL, NULL); 604 } 605 606 static int 607 xs_unwatch(const char *path, const char *token) 608 { 609 struct iovec iov[2]; 610 611 /* xs_talkv only reads iovec */ 612 iov[0].iov_base = __UNCONST(path); 613 iov[0].iov_len = strlen(path) + 1; 614 iov[1].iov_base = __UNCONST(token); 615 iov[1].iov_len = strlen(token) + 1; 616 617 return xs_talkv(NULL, XS_UNWATCH, iov, 2, NULL, NULL); 618 } 619 620 static struct xenbus_watch * 621 find_watch(const char *token) 622 { 623 struct xenbus_watch *i, *cmp; 624 625 cmp = (void *)strtoul(token, NULL, 16); 626 627 SLIST_FOREACH(i, &watches, watch_next) { 628 if (i == cmp) 629 return i; 630 } 631 632 return NULL; 633 } 634 635 /* Register callback to watch this node. */ 636 int 637 register_xenbus_watch(struct xenbus_watch *watch) 638 { 639 /* Pointer in ascii is the token. */ 640 char token[sizeof(watch) * 2 + 1]; 641 int err; 642 643 snprintf(token, sizeof(token), "%lX", (long)watch); 644 645 mutex_enter(&watches_lock); 646 KASSERT(find_watch(token) == 0); 647 SLIST_INSERT_HEAD(&watches, watch, watch_next); 648 mutex_exit(&watches_lock); 649 650 err = xs_watch(watch->node, token); 651 652 /* Ignore errors due to multiple registration. */ 653 if ((err != 0) && (err != EEXIST)) { 654 mutex_enter(&watches_lock); 655 SLIST_REMOVE(&watches, watch, xenbus_watch, watch_next); 656 mutex_exit(&watches_lock); 657 } 658 return err; 659 } 660 661 void 662 unregister_xenbus_watch(struct xenbus_watch *watch) 663 { 664 SIMPLEQ_HEAD(, xs_stored_msg) gclist; 665 struct xs_stored_msg *msg, *next_msg; 666 char token[sizeof(watch) * 2 + 1]; 667 int err; 668 669 snprintf(token, sizeof(token), "%lX", (long)watch); 670 671 mutex_enter(&watches_lock); 672 KASSERT(find_watch(token)); 673 SLIST_REMOVE(&watches, watch, xenbus_watch, watch_next); 674 mutex_exit(&watches_lock); 675 676 err = xs_unwatch(watch->node, token); 677 if (err) { 678 printf( 679 "XENBUS Failed to release watch %s: %i\n", 680 watch->node, err); 681 } 682 683 /* Cancel pending watch events. */ 684 SIMPLEQ_INIT(&gclist); 685 mutex_enter(&watch_events_lock); 686 for (msg = SIMPLEQ_FIRST(&watch_events); msg != NULL; msg = next_msg) { 687 next_msg = SIMPLEQ_NEXT(msg, msg_next); 688 if (msg->u.watch.handle != watch) 689 continue; 690 SIMPLEQ_REMOVE(&watch_events, msg, xs_stored_msg, msg_next); 691 SIMPLEQ_INSERT_TAIL(&gclist, msg, msg_next); 692 } 693 mutex_exit(&watch_events_lock); 694 695 while ((msg = SIMPLEQ_FIRST(&gclist)) != NULL) { 696 SIMPLEQ_REMOVE(&gclist, msg, xs_stored_msg, msg_next); 697 free(msg->u.watch.vec, M_DEVBUF); 698 free(msg, M_DEVBUF); 699 } 700 } 701 702 void 703 xs_suspend(void) 704 { 705 xs_state.suspend_spl = spltty(); 706 } 707 708 void 709 xs_resume(void) 710 { 711 struct xenbus_watch *watch; 712 char token[sizeof(watch) * 2 + 1]; 713 /* No need for watches_lock: the suspend_mutex is sufficient. */ 714 SLIST_FOREACH(watch, &watches, watch_next) { 715 snprintf(token, sizeof(token), "%lX", (long)watch); 716 xs_watch(watch->node, token); 717 } 718 719 splx(xs_state.suspend_spl); 720 } 721 722 static void 723 xenwatch_thread(void *unused) 724 { 725 SIMPLEQ_HEAD(, xs_stored_msg) events_to_proces; 726 struct xs_stored_msg *msg; 727 728 SIMPLEQ_INIT(&events_to_proces); 729 for (;;) { 730 mutex_enter(&watch_events_lock); 731 while (SIMPLEQ_EMPTY(&watch_events)) 732 cv_wait(&watch_cv, &watch_events_lock); 733 SIMPLEQ_CONCAT(&events_to_proces, &watch_events); 734 mutex_exit(&watch_events_lock); 735 736 DPRINTK("xenwatch_thread: processing events"); 737 738 while ((msg = SIMPLEQ_FIRST(&events_to_proces)) != NULL) { 739 DPRINTK("xenwatch_thread: got event"); 740 SIMPLEQ_REMOVE_HEAD(&events_to_proces, msg_next); 741 msg->u.watch.handle->xbw_callback( 742 msg->u.watch.handle, 743 (void *)msg->u.watch.vec, 744 msg->u.watch.vec_size); 745 free(msg->u.watch.vec, M_DEVBUF); 746 free(msg, M_DEVBUF); 747 } 748 } 749 } 750 751 static int 752 process_msg(void) 753 { 754 struct xs_stored_msg *msg, *s_msg; 755 char *body; 756 int err; 757 758 msg = malloc(sizeof(*msg), M_DEVBUF, M_NOWAIT); 759 if (msg == NULL) 760 return ENOMEM; 761 762 err = xb_read(&msg->hdr, sizeof(msg->hdr)); 763 DPRINTK("xb_read hdr %d", err); 764 if (err) { 765 free(msg, M_DEVBUF); 766 return err; 767 } 768 769 body = malloc(msg->hdr.len + 1, M_DEVBUF, M_NOWAIT); 770 if (body == NULL) { 771 free(msg, M_DEVBUF); 772 return ENOMEM; 773 } 774 775 err = xb_read(body, msg->hdr.len); 776 DPRINTK("xb_read body %d", err); 777 if (err) { 778 free(body, M_DEVBUF); 779 free(msg, M_DEVBUF); 780 return err; 781 } 782 body[msg->hdr.len] = '\0'; 783 784 if (msg->hdr.type == XS_WATCH_EVENT) { 785 bool found, repeated; 786 787 DPRINTK("process_msg: XS_WATCH_EVENT"); 788 msg->u.watch.vec = split(body, msg->hdr.len, 789 &msg->u.watch.vec_size); 790 if (msg->u.watch.vec == NULL) { 791 free(msg, M_DEVBUF); 792 return ENOMEM; 793 } 794 795 mutex_enter(&watches_lock); 796 msg->u.watch.handle = find_watch( 797 msg->u.watch.vec[XS_WATCH_TOKEN]); 798 found = (msg->u.watch.handle != NULL); 799 repeated = false; 800 if (found) { 801 mutex_enter(&watch_events_lock); 802 /* Don't add duplicate events to the queue of pending watches */ 803 SIMPLEQ_FOREACH(s_msg, &watch_events, msg_next) { 804 if (s_msg->u.watch.handle == msg->u.watch.handle) { 805 repeated = true; 806 break; 807 } 808 } 809 if (!repeated) { 810 SIMPLEQ_INSERT_TAIL(&watch_events, msg, msg_next); 811 cv_broadcast(&watch_cv); 812 } 813 mutex_exit(&watch_events_lock); 814 } 815 mutex_exit(&watches_lock); 816 if (!found || repeated) { 817 free(msg->u.watch.vec, M_DEVBUF); 818 free(msg, M_DEVBUF); 819 } 820 } else { 821 DPRINTK("process_msg: type %d body %s", msg->hdr.type, body); 822 823 msg->u.reply.body = body; 824 mutex_enter(&xs_state.reply_lock); 825 SIMPLEQ_INSERT_TAIL(&xs_state.reply_list, msg, msg_next); 826 cv_broadcast(&xs_state.reply_cv); 827 mutex_exit(&xs_state.reply_lock); 828 } 829 830 return 0; 831 } 832 833 static void 834 xenbus_thread(void *unused) 835 { 836 int err; 837 838 for (;;) { 839 err = process_msg(); 840 if (err) 841 printk("XENBUS error %d while reading message\n", err); 842 } 843 } 844 845 int 846 xs_init(device_t dev) 847 { 848 int err; 849 850 SLIST_INIT(&watches); 851 mutex_init(&watches_lock, MUTEX_DEFAULT, IPL_TTY); 852 853 SIMPLEQ_INIT(&watch_events); 854 mutex_init(&watch_events_lock, MUTEX_DEFAULT, IPL_TTY); 855 cv_init(&watch_cv, "evtsq"); 856 857 SIMPLEQ_INIT(&xs_state.reply_list); 858 mutex_init(&xs_state.xs_lock, MUTEX_DEFAULT, IPL_NONE); 859 mutex_init(&xs_state.reply_lock, MUTEX_DEFAULT, IPL_TTY); 860 cv_init(&xs_state.reply_cv, "rplq"); 861 862 err = kthread_create(PRI_NONE, 0, NULL, xenwatch_thread, 863 NULL, NULL, "xenwatch"); 864 if (err) { 865 aprint_error_dev(dev, "kthread_create(xenwatch): %d\n", err); 866 return err; 867 } 868 869 err = kthread_create(PRI_NONE, 0, NULL, xenbus_thread, 870 NULL, NULL, "xenbus"); 871 if (err) { 872 aprint_error_dev(dev, "kthread_create(xenbus): %d\n", err); 873 return err; 874 } 875 876 return 0; 877 } 878 879 /* 880 * Local variables: 881 * c-file-style: "linux" 882 * indent-tabs-mode: t 883 * c-indent-level: 8 884 * c-basic-offset: 8 885 * tab-width: 8 886 * End: 887 */ 888