xref: /netbsd-src/sys/arch/xen/xenbus/xenbus_xs.c (revision bdc22b2e01993381dcefeff2bc9b56ca75a4235c)
1 /* $NetBSD: xenbus_xs.c,v 1.23 2012/11/28 16:26:59 royger Exp $ */
2 /******************************************************************************
3  * xenbus_xs.c
4  *
5  * This is the kernel equivalent of the "xs" library.  We don't need everything
6  * and we use xenbus_comms for communication.
7  *
8  * Copyright (C) 2005 Rusty Russell, IBM Corporation
9  *
10  * This file may be distributed separately from the Linux kernel, or
11  * incorporated into other software packages, subject to the following license:
12  *
13  * Permission is hereby granted, free of charge, to any person obtaining a copy
14  * of this source file (the "Software"), to deal in the Software without
15  * restriction, including without limitation the rights to use, copy, modify,
16  * merge, publish, distribute, sublicense, and/or sell copies of the Software,
17  * and to permit persons to whom the Software is furnished to do so, subject to
18  * the following conditions:
19  *
20  * The above copyright notice and this permission notice shall be included in
21  * all copies or substantial portions of the Software.
22  *
23  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
24  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
25  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
26  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
27  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
28  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
29  * IN THE SOFTWARE.
30  */
31 
32 #include <sys/cdefs.h>
33 __KERNEL_RCSID(0, "$NetBSD: xenbus_xs.c,v 1.23 2012/11/28 16:26:59 royger Exp $");
34 
35 #if 0
36 #define DPRINTK(fmt, args...) \
37     printf("xenbus_xs (%s:%d) " fmt ".\n", __func__, __LINE__, ##args)
38 #else
39 #define DPRINTK(fmt, args...) ((void)0)
40 #endif
41 
42 #include <sys/types.h>
43 #include <sys/null.h>
44 #include <sys/errno.h>
45 #include <sys/malloc.h>
46 #include <sys/systm.h>
47 #include <sys/param.h>
48 #include <sys/proc.h>
49 #include <sys/mutex.h>
50 #include <sys/kthread.h>
51 
52 #include <xen/xen.h>	/* for xendomain_is_dom0() */
53 #include <xen/xenbus.h>
54 #include "xenbus_comms.h"
55 
56 #define streq(a, b) (strcmp((a), (b)) == 0)
57 
58 struct xs_stored_msg {
59 	SIMPLEQ_ENTRY(xs_stored_msg) msg_next;
60 
61 	struct xsd_sockmsg hdr;
62 
63 	union {
64 		/* Queued replies. */
65 		struct {
66 			char *body;
67 		} reply;
68 
69 		/* Queued watch events. */
70 		struct {
71 			struct xenbus_watch *handle;
72 			char **vec;
73 			unsigned int vec_size;
74 		} watch;
75 	} u;
76 };
77 
78 struct xs_handle {
79 	/* A list of replies. Currently only one will ever be outstanding. */
80 	SIMPLEQ_HEAD(, xs_stored_msg) reply_list;
81 	kmutex_t reply_lock;
82 	kcondvar_t reply_cv;
83 	kmutex_t xs_lock; /* serialize access to xenstore */
84 	int suspend_spl;
85 
86 };
87 
88 static struct xs_handle xs_state;
89 
90 /* List of registered watches, and a lock to protect it. */
91 static SLIST_HEAD(, xenbus_watch) watches;
92 static kmutex_t watches_lock;
93 
94 /* List of pending watch callback events, and a lock to protect it. */
95 static SIMPLEQ_HEAD(, xs_stored_msg) watch_events;
96 static kmutex_t watch_events_lock;
97 static kcondvar_t watch_cv;
98 
99 static int
100 get_error(const char *errorstring)
101 {
102 	unsigned int i;
103 
104 	for (i = 0; !streq(errorstring, xsd_errors[i].errstring); i++) {
105 		if (i == (sizeof(xsd_errors) / sizeof(xsd_errors[0]) - 1)) {
106 			printf(
107 			       "XENBUS xen store gave: unknown error %s",
108 			       errorstring);
109 			return EINVAL;
110 		}
111 	}
112 	return xsd_errors[i].errnum;
113 }
114 
115 static void *
116 read_reply(enum xsd_sockmsg_type *type, unsigned int *len)
117 {
118 	struct xs_stored_msg *msg;
119 	char *body;
120 
121 	mutex_enter(&xs_state.reply_lock);
122 	while (SIMPLEQ_EMPTY(&xs_state.reply_list)) {
123 		cv_wait(&xs_state.reply_cv, &xs_state.reply_lock);
124 	}
125 	msg = SIMPLEQ_FIRST(&xs_state.reply_list);
126 	SIMPLEQ_REMOVE_HEAD(&xs_state.reply_list, msg_next);
127 	mutex_exit(&xs_state.reply_lock);
128 
129 	*type = msg->hdr.type;
130 	if (len)
131 		*len = msg->hdr.len;
132 	body = msg->u.reply.body;
133 	DPRINTK("read_reply: type %d body %s",
134 	    msg->hdr.type, body);
135 
136 	free(msg, M_DEVBUF);
137 
138 	return body;
139 }
140 
141 #if 0
142 /* Emergency write. */
143 void
144 xenbus_debug_write(const char *str, unsigned int count)
145 {
146 	struct xsd_sockmsg msg = { 0 };
147 
148 	msg.type = XS_DEBUG;
149 	msg.len = sizeof("print") + count + 1;
150 
151 	xb_write(&msg, sizeof(msg));
152 	xb_write("print", sizeof("print"));
153 	xb_write(str, count);
154 	xb_write("", 1);
155 }
156 #endif
157 
158 int
159 xenbus_dev_request_and_reply(struct xsd_sockmsg *msg, void**reply)
160 {
161 	int err = 0, s;
162 
163 	s = spltty();
164 	mutex_enter(&xs_state.xs_lock);
165 	err = xb_write(msg, sizeof(*msg) + msg->len);
166 	if (err) {
167 		msg->type = XS_ERROR;
168 		*reply = NULL;
169 	} else {
170 		*reply = read_reply(&msg->type, &msg->len);
171 	}
172 	mutex_exit(&xs_state.xs_lock);
173 	splx(s);
174 
175 	return err;
176 }
177 
178 /* Send message to xs, get kmalloc'ed reply.  ERR_PTR() on error. */
179 static int
180 xs_talkv(struct xenbus_transaction *t,
181 		      enum xsd_sockmsg_type type,
182 		      const struct iovec *iovec,
183 		      unsigned int num_vecs,
184 		      unsigned int *len,
185 		      char **retbuf)
186 {
187 	struct xsd_sockmsg msg;
188 	unsigned int i;
189 	int err, s;
190 	void *ret;
191 
192 	msg.tx_id = (uint32_t)(unsigned long)t;
193 	msg.req_id = 0;
194 	msg.type = type;
195 	msg.len = 0;
196 	for (i = 0; i < num_vecs; i++)
197 		msg.len += iovec[i].iov_len;
198 
199 	s = spltty();
200 	mutex_enter(&xs_state.xs_lock);
201 
202 	DPRINTK("write msg");
203 	err = xb_write(&msg, sizeof(msg));
204 	DPRINTK("write msg err %d", err);
205 	if (err) {
206 		mutex_exit(&xs_state.xs_lock);
207 		splx(s);
208 		return (err);
209 	}
210 
211 	for (i = 0; i < num_vecs; i++) {
212 		DPRINTK("write iovect");
213 		err = xb_write(iovec[i].iov_base, iovec[i].iov_len);
214 		DPRINTK("write iovect err %d", err);
215 		if (err) {
216 			mutex_exit(&xs_state.xs_lock);
217 			splx(s);
218 			return (err);
219 		}
220 	}
221 
222 	DPRINTK("read");
223 	ret = read_reply(&msg.type, len);
224 	DPRINTK("read done");
225 
226 	mutex_exit(&xs_state.xs_lock);
227 	splx(s);
228 
229 	if (msg.type == XS_ERROR) {
230 		err = get_error(ret);
231 		free(ret, M_DEVBUF);
232 		return (err);
233 	}
234 
235 	KASSERT(msg.type == type);
236 	if (retbuf != NULL)
237 		*retbuf = ret;
238 	else
239 		free(ret, M_DEVBUF);
240 	return 0;
241 }
242 
243 /* Simplified version of xs_talkv: single message. */
244 static int
245 xs_single(struct xenbus_transaction *t,
246 		       enum xsd_sockmsg_type type,
247 		       const char *string,
248 		       unsigned int *len,
249 		       char **ret)
250 {
251 	struct iovec iovec;
252 
253 	/* xs_talkv only reads iovec */
254 	iovec.iov_base = __UNCONST(string);
255 	iovec.iov_len = strlen(string) + 1;
256 	return xs_talkv(t, type, &iovec, 1, len, ret);
257 }
258 
259 static unsigned int
260 count_strings(const char *strings, unsigned int len)
261 {
262 	unsigned int num;
263 	const char *p;
264 
265 	for (p = strings, num = 0; p < strings + len; p += strlen(p) + 1)
266 		num++;
267 
268 	return num;
269 }
270 
271 /* Return the path to dir with /name appended. Buffer must be kfree()'ed. */
272 static char *
273 join(const char *dir, const char *name)
274 {
275 	char *buffer;
276 
277 	buffer = malloc(strlen(dir) + strlen("/") + strlen(name) + 1,
278 			 M_DEVBUF, M_NOWAIT);
279 	if (buffer == NULL)
280 		return NULL;
281 
282 	strcpy(buffer, dir);
283 	if (!streq(name, "")) {
284 		strcat(buffer, "/");
285 		strcat(buffer, name);
286 	}
287 
288 	return buffer;
289 }
290 
291 static char **
292 split(char *strings, unsigned int len, unsigned int *num)
293 {
294 	char *p, **ret;
295 
296 	/* Count the strings. */
297 	*num = count_strings(strings, len);
298 
299 	/* Transfer to one big alloc for easy freeing. */
300 	ret = malloc(*num * sizeof(char *) + len, M_DEVBUF, M_NOWAIT);
301 	if (!ret) {
302 		free(strings, M_DEVBUF);
303 		return NULL;
304 	}
305 	memcpy(&ret[*num], strings, len);
306 	free(strings, M_DEVBUF);
307 
308 	strings = (char *)&ret[*num];
309 	for (p = strings, *num = 0; p < strings + len; p += strlen(p) + 1)
310 		ret[(*num)++] = p;
311 
312 	return ret;
313 }
314 
315 int
316 xenbus_directory(struct xenbus_transaction *t,
317 			const char *dir, const char *node, unsigned int *num,
318 			char ***retbuf)
319 {
320 	char *strings, *path;
321 	unsigned int len;
322 	int err;
323 
324 	path = join(dir, node);
325 	if (path == NULL)
326 		return ENOMEM;
327 
328 	err = xs_single(t, XS_DIRECTORY, path, &len, &strings);
329 	DPRINTK("xs_single %d %d", err, len);
330 	free(path, M_DEVBUF);
331 	if (err)
332 		return err;
333 
334 	DPRINTK("xs_single strings %s", strings);
335 	*retbuf = split(strings, len, num);
336 	if (*retbuf == NULL)
337 		return ENOMEM;
338 	return 0;
339 }
340 
341 /* Check if a path exists. Return 1 if it does. */
342 int
343 xenbus_exists(struct xenbus_transaction *t,
344 		  const char *dir, const char *node)
345 {
346 	char **d;
347 	int dir_n, err;
348 
349 	err = xenbus_directory(t, dir, node, &dir_n, &d);
350 	if (err)
351 		return 0;
352 	free(d, M_DEVBUF);
353 	return 1;
354 }
355 
356 /* Get the value of a single file.
357  * Returns a kmalloced value: call free() on it after use.
358  * len indicates length in bytes.
359  */
360 int
361 xenbus_read(struct xenbus_transaction *t,
362 		  const char *dir, const char *node, unsigned int *len,
363 		  char **ret)
364 {
365 	char *path;
366 	int err;
367 
368 	path = join(dir, node);
369 	if (path == NULL)
370 		return ENOMEM;
371 
372 	err = xs_single(t, XS_READ, path, len, ret);
373 	free(path, M_DEVBUF);
374 	return err;
375 }
376 
377 /* Read a node and convert it to unsigned long. */
378 int
379 xenbus_read_ul(struct xenbus_transaction *t,
380 		  const char *dir, const char *node, unsigned long *val,
381 		  int base)
382 {
383 	char *string, *ep;
384 	int err;
385 
386 	err = xenbus_read(t, dir, node, NULL, &string);
387 	if (err)
388 		return err;
389 	*val = strtoul(string, &ep, base);
390 	if (*ep != '\0') {
391 		free(string, M_DEVBUF);
392 		return EFTYPE;
393 	}
394 	free(string, M_DEVBUF);
395 	return 0;
396 }
397 
398 /* Read a node and convert it to unsigned long long. */
399 int
400 xenbus_read_ull(struct xenbus_transaction *t,
401 		  const char *dir, const char *node, unsigned long long *val,
402 		  int base)
403 {
404 	char *string, *ep;
405 	int err;
406 
407 	err = xenbus_read(t, dir, node, NULL, &string);
408 	if (err)
409 		return err;
410 	*val = strtoull(string, &ep, base);
411 	if (*ep != '\0') {
412 		free(string, M_DEVBUF);
413 		return EFTYPE;
414 	}
415 	free(string, M_DEVBUF);
416 	return 0;
417 }
418 
419 /* Write the value of a single file.
420  * Returns -err on failure.
421  */
422 int
423 xenbus_write(struct xenbus_transaction *t,
424 		 const char *dir, const char *node, const char *string)
425 {
426 	const char *path;
427 	struct iovec iovec[2];
428 	int ret;
429 
430 	path = join(dir, node);
431 	if (path == NULL)
432 		return ENOMEM;
433 
434 	/* xs_talkv only reads iovec */
435 	iovec[0].iov_base = __UNCONST(path);
436 	iovec[0].iov_len = strlen(path) + 1;
437 	iovec[1].iov_base = __UNCONST(string);
438 	iovec[1].iov_len = strlen(string);
439 
440 	ret = xs_talkv(t, XS_WRITE, iovec, 2, NULL, NULL);
441 	return ret;
442 }
443 
444 /* Create a new directory. */
445 int
446 xenbus_mkdir(struct xenbus_transaction *t,
447 		 const char *dir, const char *node)
448 {
449 	char *path;
450 	int ret;
451 
452 	path = join(dir, node);
453 	if (path == NULL)
454 		return ENOMEM;
455 
456 	ret = xs_single(t, XS_MKDIR, path, NULL, NULL);
457 	free(path, M_DEVBUF);
458 	return ret;
459 }
460 
461 /* Destroy a file or directory (directories must be empty). */
462 int xenbus_rm(struct xenbus_transaction *t, const char *dir, const char *node)
463 {
464 	char *path;
465 	int ret;
466 
467 	path = join(dir, node);
468 	if (path == NULL)
469 		return ENOMEM;
470 
471 	ret = xs_single(t, XS_RM, path, NULL, NULL);
472 	free(path, M_DEVBUF);
473 	return ret;
474 }
475 
476 /* Start a transaction: changes by others will not be seen during this
477  * transaction, and changes will not be visible to others until end.
478  * MUST BE CALLED AT IPL_TTY !
479  */
480 struct xenbus_transaction *
481 xenbus_transaction_start(void)
482 {
483 	char *id_str;
484 	unsigned long id, err;
485 
486 	err = xs_single(NULL, XS_TRANSACTION_START, "", NULL, &id_str);
487 	if (err) {
488 		return NULL;
489 	}
490 
491 	id = strtoul(id_str, NULL, 0);
492 	free(id_str, M_DEVBUF);
493 
494 	return (struct xenbus_transaction *)id;
495 }
496 
497 /* End a transaction.
498  * If abandon is true, transaction is discarded instead of committed.
499  * MUST BE CALLED AT IPL_TTY !
500  */
501 int xenbus_transaction_end(struct xenbus_transaction *t, int abort)
502 {
503 	char abortstr[2];
504 	int err;
505 
506 	if (abort)
507 		strcpy(abortstr, "F");
508 	else
509 		strcpy(abortstr, "T");
510 
511 	err = xs_single(t, XS_TRANSACTION_END, abortstr, NULL, NULL);
512 
513 	return err;
514 }
515 
516 /* Single read and scanf: returns -errno or num scanned. */
517 int
518 xenbus_scanf(struct xenbus_transaction *t,
519 		 const char *dir, const char *node, const char *fmt, ...)
520 {
521 	va_list ap;
522 	int ret;
523 	char *val;
524 
525 	ret = xenbus_read(t, dir, node, NULL, &val);
526 	if (ret)
527 		return ret;
528 
529 	va_start(ap, fmt);
530 	//ret = vsscanf(val, fmt, ap);
531 	ret = ENXIO;
532 	printf("xb_scanf format %s in %s\n", fmt, val);
533 	va_end(ap);
534 	free(val, M_DEVBUF);
535 	return ret;
536 }
537 
538 /* Single printf and write: returns -errno or 0. */
539 int
540 xenbus_printf(struct xenbus_transaction *t,
541 		  const char *dir, const char *node, const char *fmt, ...)
542 {
543 	va_list ap;
544 	int ret;
545 #define PRINTF_BUFFER_SIZE 4096
546 	char *printf_buffer;
547 
548 	printf_buffer = malloc(PRINTF_BUFFER_SIZE, M_DEVBUF, M_NOWAIT);
549 	if (printf_buffer == NULL)
550 		return ENOMEM;
551 
552 	va_start(ap, fmt);
553 	ret = vsnprintf(printf_buffer, PRINTF_BUFFER_SIZE, fmt, ap);
554 	va_end(ap);
555 
556 	KASSERT(ret < PRINTF_BUFFER_SIZE);
557 	ret = xenbus_write(t, dir, node, printf_buffer);
558 
559 	free(printf_buffer, M_DEVBUF);
560 
561 	return ret;
562 }
563 
564 /* Takes tuples of names, scanf-style args, and void **, NULL terminated. */
565 int
566 xenbus_gather(struct xenbus_transaction *t, const char *dir, ...)
567 {
568 	va_list ap;
569 	const char *name;
570 	int ret = 0;
571 
572 	va_start(ap, dir);
573 	while (ret == 0 && (name = va_arg(ap, char *)) != NULL) {
574 		const char *fmt = va_arg(ap, char *);
575 		void *result = va_arg(ap, void *);
576 		char *p;
577 
578 		ret = xenbus_read(t, dir, name, NULL, &p);
579 		if (ret)
580 			break;
581 		if (fmt) {
582 			// XXX if (sscanf(p, fmt, result) == 0)
583 				ret = -EINVAL;
584 			free(p, M_DEVBUF);
585 		} else
586 			*(char **)result = p;
587 	}
588 	va_end(ap);
589 	return ret;
590 }
591 
592 static int
593 xs_watch(const char *path, const char *token)
594 {
595 	struct iovec iov[2];
596 
597 	/* xs_talkv only reads iovec */
598 	iov[0].iov_base = __UNCONST(path);
599 	iov[0].iov_len = strlen(path) + 1;
600 	iov[1].iov_base = __UNCONST(token);
601 	iov[1].iov_len = strlen(token) + 1;
602 
603 	return xs_talkv(NULL, XS_WATCH, iov, 2, NULL, NULL);
604 }
605 
606 static int
607 xs_unwatch(const char *path, const char *token)
608 {
609 	struct iovec iov[2];
610 
611 	/* xs_talkv only reads iovec */
612 	iov[0].iov_base = __UNCONST(path);
613 	iov[0].iov_len = strlen(path) + 1;
614 	iov[1].iov_base = __UNCONST(token);
615 	iov[1].iov_len = strlen(token) + 1;
616 
617 	return xs_talkv(NULL, XS_UNWATCH, iov, 2, NULL, NULL);
618 }
619 
620 static struct xenbus_watch *
621 find_watch(const char *token)
622 {
623 	struct xenbus_watch *i, *cmp;
624 
625 	cmp = (void *)strtoul(token, NULL, 16);
626 
627 	SLIST_FOREACH(i, &watches, watch_next) {
628 		if (i == cmp)
629 			return i;
630 	}
631 
632 	return NULL;
633 }
634 
635 /* Register callback to watch this node. */
636 int
637 register_xenbus_watch(struct xenbus_watch *watch)
638 {
639 	/* Pointer in ascii is the token. */
640 	char token[sizeof(watch) * 2 + 1];
641 	int err;
642 
643 	snprintf(token, sizeof(token), "%lX", (long)watch);
644 
645 	mutex_enter(&watches_lock);
646 	KASSERT(find_watch(token) == 0);
647 	SLIST_INSERT_HEAD(&watches, watch, watch_next);
648 	mutex_exit(&watches_lock);
649 
650 	err = xs_watch(watch->node, token);
651 
652 	/* Ignore errors due to multiple registration. */
653 	if ((err != 0) && (err != EEXIST)) {
654 		mutex_enter(&watches_lock);
655 		SLIST_REMOVE(&watches, watch, xenbus_watch, watch_next);
656 		mutex_exit(&watches_lock);
657 	}
658 	return err;
659 }
660 
661 void
662 unregister_xenbus_watch(struct xenbus_watch *watch)
663 {
664 	SIMPLEQ_HEAD(, xs_stored_msg) gclist;
665 	struct xs_stored_msg *msg, *next_msg;
666 	char token[sizeof(watch) * 2 + 1];
667 	int err;
668 
669 	snprintf(token, sizeof(token), "%lX", (long)watch);
670 
671 	mutex_enter(&watches_lock);
672 	KASSERT(find_watch(token));
673 	SLIST_REMOVE(&watches, watch, xenbus_watch, watch_next);
674 	mutex_exit(&watches_lock);
675 
676 	err = xs_unwatch(watch->node, token);
677 	if (err) {
678 		printf(
679 		       "XENBUS Failed to release watch %s: %i\n",
680 		       watch->node, err);
681 	}
682 
683 	/* Cancel pending watch events. */
684 	SIMPLEQ_INIT(&gclist);
685 	mutex_enter(&watch_events_lock);
686 	for (msg = SIMPLEQ_FIRST(&watch_events); msg != NULL; msg = next_msg) {
687 		next_msg = SIMPLEQ_NEXT(msg, msg_next);
688 		if (msg->u.watch.handle != watch)
689 			continue;
690 		SIMPLEQ_REMOVE(&watch_events, msg, xs_stored_msg, msg_next);
691 		SIMPLEQ_INSERT_TAIL(&gclist, msg, msg_next);
692 	}
693 	mutex_exit(&watch_events_lock);
694 
695 	while ((msg = SIMPLEQ_FIRST(&gclist)) != NULL) {
696 		SIMPLEQ_REMOVE(&gclist, msg, xs_stored_msg, msg_next);
697 		free(msg->u.watch.vec, M_DEVBUF);
698 		free(msg, M_DEVBUF);
699 	}
700 }
701 
702 void
703 xs_suspend(void)
704 {
705 	xs_state.suspend_spl = spltty();
706 }
707 
708 void
709 xs_resume(void)
710 {
711 	struct xenbus_watch *watch;
712 	char token[sizeof(watch) * 2 + 1];
713 	/* No need for watches_lock: the suspend_mutex is sufficient. */
714 	SLIST_FOREACH(watch, &watches, watch_next) {
715 		snprintf(token, sizeof(token), "%lX", (long)watch);
716 		xs_watch(watch->node, token);
717 	}
718 
719 	splx(xs_state.suspend_spl);
720 }
721 
722 static void
723 xenwatch_thread(void *unused)
724 {
725 	SIMPLEQ_HEAD(, xs_stored_msg) events_to_proces;
726 	struct xs_stored_msg *msg;
727 
728 	SIMPLEQ_INIT(&events_to_proces);
729 	for (;;) {
730 		mutex_enter(&watch_events_lock);
731 		while (SIMPLEQ_EMPTY(&watch_events))
732 			cv_wait(&watch_cv, &watch_events_lock);
733 		SIMPLEQ_CONCAT(&events_to_proces, &watch_events);
734 		mutex_exit(&watch_events_lock);
735 
736 		DPRINTK("xenwatch_thread: processing events");
737 
738 		while ((msg = SIMPLEQ_FIRST(&events_to_proces)) != NULL) {
739 			DPRINTK("xenwatch_thread: got event");
740 			SIMPLEQ_REMOVE_HEAD(&events_to_proces, msg_next);
741 			msg->u.watch.handle->xbw_callback(
742 				msg->u.watch.handle,
743 				(void *)msg->u.watch.vec,
744 				msg->u.watch.vec_size);
745 			free(msg->u.watch.vec, M_DEVBUF);
746 			free(msg, M_DEVBUF);
747 		}
748 	}
749 }
750 
751 static int
752 process_msg(void)
753 {
754 	struct xs_stored_msg *msg, *s_msg;
755 	char *body;
756 	int err;
757 
758 	msg = malloc(sizeof(*msg), M_DEVBUF, M_NOWAIT);
759 	if (msg == NULL)
760 		return ENOMEM;
761 
762 	err = xb_read(&msg->hdr, sizeof(msg->hdr));
763 	DPRINTK("xb_read hdr %d", err);
764 	if (err) {
765 		free(msg, M_DEVBUF);
766 		return err;
767 	}
768 
769 	body = malloc(msg->hdr.len + 1, M_DEVBUF, M_NOWAIT);
770 	if (body == NULL) {
771 		free(msg, M_DEVBUF);
772 		return ENOMEM;
773 	}
774 
775 	err = xb_read(body, msg->hdr.len);
776 	DPRINTK("xb_read body %d", err);
777 	if (err) {
778 		free(body, M_DEVBUF);
779 		free(msg, M_DEVBUF);
780 		return err;
781 	}
782 	body[msg->hdr.len] = '\0';
783 
784 	if (msg->hdr.type == XS_WATCH_EVENT) {
785 		bool found, repeated;
786 
787 		DPRINTK("process_msg: XS_WATCH_EVENT");
788 		msg->u.watch.vec = split(body, msg->hdr.len,
789 					 &msg->u.watch.vec_size);
790 		if (msg->u.watch.vec == NULL) {
791 			free(msg, M_DEVBUF);
792 			return ENOMEM;
793 		}
794 
795 		mutex_enter(&watches_lock);
796 		msg->u.watch.handle = find_watch(
797 		    msg->u.watch.vec[XS_WATCH_TOKEN]);
798 		found = (msg->u.watch.handle != NULL);
799 		repeated = false;
800 		if (found) {
801 			mutex_enter(&watch_events_lock);
802 			/* Don't add duplicate events to the queue of pending watches */
803 			SIMPLEQ_FOREACH(s_msg, &watch_events, msg_next) {
804 				if (s_msg->u.watch.handle == msg->u.watch.handle) {
805 					repeated = true;
806 					break;
807 				}
808 			}
809 			if (!repeated) {
810 				SIMPLEQ_INSERT_TAIL(&watch_events, msg, msg_next);
811 				cv_broadcast(&watch_cv);
812 			}
813 			mutex_exit(&watch_events_lock);
814 		}
815 		mutex_exit(&watches_lock);
816 		if (!found || repeated) {
817 			free(msg->u.watch.vec, M_DEVBUF);
818 			free(msg, M_DEVBUF);
819 		}
820 	} else {
821 		DPRINTK("process_msg: type %d body %s", msg->hdr.type, body);
822 
823 		msg->u.reply.body = body;
824 		mutex_enter(&xs_state.reply_lock);
825 		SIMPLEQ_INSERT_TAIL(&xs_state.reply_list, msg, msg_next);
826 		cv_broadcast(&xs_state.reply_cv);
827 		mutex_exit(&xs_state.reply_lock);
828 	}
829 
830 	return 0;
831 }
832 
833 static void
834 xenbus_thread(void *unused)
835 {
836 	int err;
837 
838 	for (;;) {
839 		err = process_msg();
840 		if (err)
841 			printk("XENBUS error %d while reading message\n", err);
842 	}
843 }
844 
845 int
846 xs_init(device_t dev)
847 {
848 	int err;
849 
850 	SLIST_INIT(&watches);
851 	mutex_init(&watches_lock, MUTEX_DEFAULT, IPL_TTY);
852 
853 	SIMPLEQ_INIT(&watch_events);
854 	mutex_init(&watch_events_lock, MUTEX_DEFAULT, IPL_TTY);
855 	cv_init(&watch_cv, "evtsq");
856 
857 	SIMPLEQ_INIT(&xs_state.reply_list);
858 	mutex_init(&xs_state.xs_lock, MUTEX_DEFAULT, IPL_NONE);
859 	mutex_init(&xs_state.reply_lock, MUTEX_DEFAULT, IPL_TTY);
860 	cv_init(&xs_state.reply_cv, "rplq");
861 
862 	err = kthread_create(PRI_NONE, 0, NULL, xenwatch_thread,
863 	    NULL, NULL, "xenwatch");
864 	if (err) {
865 		aprint_error_dev(dev, "kthread_create(xenwatch): %d\n", err);
866 		return err;
867 	}
868 
869 	err = kthread_create(PRI_NONE, 0, NULL, xenbus_thread,
870 	    NULL, NULL, "xenbus");
871 	if (err) {
872 		aprint_error_dev(dev, "kthread_create(xenbus): %d\n", err);
873 		return err;
874 	}
875 
876 	return 0;
877 }
878 
879 /*
880  * Local variables:
881  *  c-file-style: "linux"
882  *  indent-tabs-mode: t
883  *  c-indent-level: 8
884  *  c-basic-offset: 8
885  *  tab-width: 8
886  * End:
887  */
888