xref: /netbsd-src/external/bsd/ntp/dist/sntp/libevent/bufferevent.c (revision bdc22b2e01993381dcefeff2bc9b56ca75a4235c)
1 /*	$NetBSD: bufferevent.c,v 1.5 2016/01/08 21:35:40 christos Exp $	*/
2 
3 /*
4  * Copyright (c) 2002-2007 Niels Provos <provos@citi.umich.edu>
5  * Copyright (c) 2007-2012 Niels Provos, Nick Mathewson
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  * 1. Redistributions of source code must retain the above copyright
11  *    notice, this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright
13  *    notice, this list of conditions and the following disclaimer in the
14  *    documentation and/or other materials provided with the distribution.
15  * 3. The name of the author may not be used to endorse or promote products
16  *    derived from this software without specific prior written permission.
17  *
18  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
19  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
20  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
21  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
22  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
23  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
24  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
25  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
27  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28  */
29 
30 #include "event2/event-config.h"
31 #include "evconfig-private.h"
32 
33 #include <sys/types.h>
34 
35 #ifdef EVENT__HAVE_SYS_TIME_H
36 #include <sys/time.h>
37 #endif
38 
39 #include <errno.h>
40 #include <stdio.h>
41 #include <stdlib.h>
42 #include <string.h>
43 #ifdef EVENT__HAVE_STDARG_H
44 #include <stdarg.h>
45 #endif
46 
47 #ifdef _WIN32
48 #include <winsock2.h>
49 #endif
50 #include <errno.h>
51 
52 #include "event2/util.h"
53 #include "event2/buffer.h"
54 #include "event2/buffer_compat.h"
55 #include "event2/bufferevent.h"
56 #include "event2/bufferevent_struct.h"
57 #include "event2/bufferevent_compat.h"
58 #include "event2/event.h"
59 #include "event-internal.h"
60 #include "log-internal.h"
61 #include "mm-internal.h"
62 #include "bufferevent-internal.h"
63 #include "evbuffer-internal.h"
64 #include "util-internal.h"
65 
66 static void bufferevent_cancel_all_(struct bufferevent *bev);
67 static void bufferevent_finalize_cb_(struct event_callback *evcb, void *arg_);
68 
69 void
70 bufferevent_suspend_read_(struct bufferevent *bufev, bufferevent_suspend_flags what)
71 {
72 	struct bufferevent_private *bufev_private =
73 	    EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
74 	BEV_LOCK(bufev);
75 	if (!bufev_private->read_suspended)
76 		bufev->be_ops->disable(bufev, EV_READ);
77 	bufev_private->read_suspended |= what;
78 	BEV_UNLOCK(bufev);
79 }
80 
81 void
82 bufferevent_unsuspend_read_(struct bufferevent *bufev, bufferevent_suspend_flags what)
83 {
84 	struct bufferevent_private *bufev_private =
85 	    EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
86 	BEV_LOCK(bufev);
87 	bufev_private->read_suspended &= ~what;
88 	if (!bufev_private->read_suspended && (bufev->enabled & EV_READ))
89 		bufev->be_ops->enable(bufev, EV_READ);
90 	BEV_UNLOCK(bufev);
91 }
92 
93 void
94 bufferevent_suspend_write_(struct bufferevent *bufev, bufferevent_suspend_flags what)
95 {
96 	struct bufferevent_private *bufev_private =
97 	    EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
98 	BEV_LOCK(bufev);
99 	if (!bufev_private->write_suspended)
100 		bufev->be_ops->disable(bufev, EV_WRITE);
101 	bufev_private->write_suspended |= what;
102 	BEV_UNLOCK(bufev);
103 }
104 
105 void
106 bufferevent_unsuspend_write_(struct bufferevent *bufev, bufferevent_suspend_flags what)
107 {
108 	struct bufferevent_private *bufev_private =
109 	    EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
110 	BEV_LOCK(bufev);
111 	bufev_private->write_suspended &= ~what;
112 	if (!bufev_private->write_suspended && (bufev->enabled & EV_WRITE))
113 		bufev->be_ops->enable(bufev, EV_WRITE);
114 	BEV_UNLOCK(bufev);
115 }
116 
117 
118 /* Callback to implement watermarks on the input buffer.  Only enabled
119  * if the watermark is set. */
120 static void
121 bufferevent_inbuf_wm_cb(struct evbuffer *buf,
122     const struct evbuffer_cb_info *cbinfo,
123     void *arg)
124 {
125 	struct bufferevent *bufev = arg;
126 	size_t size;
127 
128 	size = evbuffer_get_length(buf);
129 
130 	if (size >= bufev->wm_read.high)
131 		bufferevent_wm_suspend_read(bufev);
132 	else
133 		bufferevent_wm_unsuspend_read(bufev);
134 }
135 
136 static void
137 bufferevent_run_deferred_callbacks_locked(struct event_callback *cb, void *arg)
138 {
139 	struct bufferevent_private *bufev_private = arg;
140 	struct bufferevent *bufev = &bufev_private->bev;
141 
142 	BEV_LOCK(bufev);
143 	if ((bufev_private->eventcb_pending & BEV_EVENT_CONNECTED) &&
144 	    bufev->errorcb) {
145 		/* The "connected" happened before any reads or writes, so
146 		   send it first. */
147 		bufev_private->eventcb_pending &= ~BEV_EVENT_CONNECTED;
148 		bufev->errorcb(bufev, BEV_EVENT_CONNECTED, bufev->cbarg);
149 	}
150 	if (bufev_private->readcb_pending && bufev->readcb) {
151 		bufev_private->readcb_pending = 0;
152 		bufev->readcb(bufev, bufev->cbarg);
153 	}
154 	if (bufev_private->writecb_pending && bufev->writecb) {
155 		bufev_private->writecb_pending = 0;
156 		bufev->writecb(bufev, bufev->cbarg);
157 	}
158 	if (bufev_private->eventcb_pending && bufev->errorcb) {
159 		short what = bufev_private->eventcb_pending;
160 		int err = bufev_private->errno_pending;
161 		bufev_private->eventcb_pending = 0;
162 		bufev_private->errno_pending = 0;
163 		EVUTIL_SET_SOCKET_ERROR(err);
164 		bufev->errorcb(bufev, what, bufev->cbarg);
165 	}
166 	bufferevent_decref_and_unlock_(bufev);
167 }
168 
169 static void
170 bufferevent_run_deferred_callbacks_unlocked(struct event_callback *cb, void *arg)
171 {
172 	struct bufferevent_private *bufev_private = arg;
173 	struct bufferevent *bufev = &bufev_private->bev;
174 
175 	BEV_LOCK(bufev);
176 #define UNLOCKED(stmt) \
177 	do { BEV_UNLOCK(bufev); stmt; BEV_LOCK(bufev); } while(0)
178 
179 	if ((bufev_private->eventcb_pending & BEV_EVENT_CONNECTED) &&
180 	    bufev->errorcb) {
181 		/* The "connected" happened before any reads or writes, so
182 		   send it first. */
183 		bufferevent_event_cb errorcb = bufev->errorcb;
184 		void *cbarg = bufev->cbarg;
185 		bufev_private->eventcb_pending &= ~BEV_EVENT_CONNECTED;
186 		UNLOCKED(errorcb(bufev, BEV_EVENT_CONNECTED, cbarg));
187 	}
188 	if (bufev_private->readcb_pending && bufev->readcb) {
189 		bufferevent_data_cb readcb = bufev->readcb;
190 		void *cbarg = bufev->cbarg;
191 		bufev_private->readcb_pending = 0;
192 		UNLOCKED(readcb(bufev, cbarg));
193 	}
194 	if (bufev_private->writecb_pending && bufev->writecb) {
195 		bufferevent_data_cb writecb = bufev->writecb;
196 		void *cbarg = bufev->cbarg;
197 		bufev_private->writecb_pending = 0;
198 		UNLOCKED(writecb(bufev, cbarg));
199 	}
200 	if (bufev_private->eventcb_pending && bufev->errorcb) {
201 		bufferevent_event_cb errorcb = bufev->errorcb;
202 		void *cbarg = bufev->cbarg;
203 		short what = bufev_private->eventcb_pending;
204 		int err = bufev_private->errno_pending;
205 		bufev_private->eventcb_pending = 0;
206 		bufev_private->errno_pending = 0;
207 		EVUTIL_SET_SOCKET_ERROR(err);
208 		UNLOCKED(errorcb(bufev,what,cbarg));
209 	}
210 	bufferevent_decref_and_unlock_(bufev);
211 #undef UNLOCKED
212 }
213 
214 #define SCHEDULE_DEFERRED(bevp)						\
215 	do {								\
216 		if (event_deferred_cb_schedule_(			\
217 			    (bevp)->bev.ev_base,			\
218 			&(bevp)->deferred))				\
219 			bufferevent_incref_(&(bevp)->bev);		\
220 	} while (0)
221 
222 
223 void
224 bufferevent_run_readcb_(struct bufferevent *bufev, int options)
225 {
226 	/* Requires that we hold the lock and a reference */
227 	struct bufferevent_private *p =
228 	    EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
229 	if (bufev->readcb == NULL)
230 		return;
231 	if ((p->options|options) & BEV_OPT_DEFER_CALLBACKS) {
232 		p->readcb_pending = 1;
233 		SCHEDULE_DEFERRED(p);
234 	} else {
235 		bufev->readcb(bufev, bufev->cbarg);
236 	}
237 }
238 
239 void
240 bufferevent_run_writecb_(struct bufferevent *bufev, int options)
241 {
242 	/* Requires that we hold the lock and a reference */
243 	struct bufferevent_private *p =
244 	    EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
245 	if (bufev->writecb == NULL)
246 		return;
247 	if ((p->options|options) & BEV_OPT_DEFER_CALLBACKS) {
248 		p->writecb_pending = 1;
249 		SCHEDULE_DEFERRED(p);
250 	} else {
251 		bufev->writecb(bufev, bufev->cbarg);
252 	}
253 }
254 
255 #define BEV_TRIG_ALL_OPTS (			\
256 		BEV_TRIG_IGNORE_WATERMARKS|	\
257 		BEV_TRIG_DEFER_CALLBACKS	\
258 	)
259 
260 void
261 bufferevent_trigger(struct bufferevent *bufev, short iotype, int options)
262 {
263 	bufferevent_incref_and_lock_(bufev);
264 	bufferevent_trigger_nolock_(bufev, iotype, options&BEV_TRIG_ALL_OPTS);
265 	bufferevent_decref_and_unlock_(bufev);
266 }
267 
268 void
269 bufferevent_run_eventcb_(struct bufferevent *bufev, short what, int options)
270 {
271 	/* Requires that we hold the lock and a reference */
272 	struct bufferevent_private *p =
273 	    EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
274 	if (bufev->errorcb == NULL)
275 		return;
276 	if ((p->options|options) & BEV_OPT_DEFER_CALLBACKS) {
277 		p->eventcb_pending |= what;
278 		p->errno_pending = EVUTIL_SOCKET_ERROR();
279 		SCHEDULE_DEFERRED(p);
280 	} else {
281 		bufev->errorcb(bufev, what, bufev->cbarg);
282 	}
283 }
284 
285 void
286 bufferevent_trigger_event(struct bufferevent *bufev, short what, int options)
287 {
288 	bufferevent_incref_and_lock_(bufev);
289 	bufferevent_run_eventcb_(bufev, what, options&BEV_TRIG_ALL_OPTS);
290 	bufferevent_decref_and_unlock_(bufev);
291 }
292 
293 int
294 bufferevent_init_common_(struct bufferevent_private *bufev_private,
295     struct event_base *base,
296     const struct bufferevent_ops *ops,
297     enum bufferevent_options options)
298 {
299 	struct bufferevent *bufev = &bufev_private->bev;
300 
301 	if (!bufev->input) {
302 		if ((bufev->input = evbuffer_new()) == NULL)
303 			return -1;
304 	}
305 
306 	if (!bufev->output) {
307 		if ((bufev->output = evbuffer_new()) == NULL) {
308 			evbuffer_free(bufev->input);
309 			return -1;
310 		}
311 	}
312 
313 	bufev_private->refcnt = 1;
314 	bufev->ev_base = base;
315 
316 	/* Disable timeouts. */
317 	evutil_timerclear(&bufev->timeout_read);
318 	evutil_timerclear(&bufev->timeout_write);
319 
320 	bufev->be_ops = ops;
321 
322 	bufferevent_ratelim_init_(bufev_private);
323 
324 	/*
325 	 * Set to EV_WRITE so that using bufferevent_write is going to
326 	 * trigger a callback.  Reading needs to be explicitly enabled
327 	 * because otherwise no data will be available.
328 	 */
329 	bufev->enabled = EV_WRITE;
330 
331 #ifndef EVENT__DISABLE_THREAD_SUPPORT
332 	if (options & BEV_OPT_THREADSAFE) {
333 		if (bufferevent_enable_locking_(bufev, NULL) < 0) {
334 			/* cleanup */
335 			evbuffer_free(bufev->input);
336 			evbuffer_free(bufev->output);
337 			bufev->input = NULL;
338 			bufev->output = NULL;
339 			return -1;
340 		}
341 	}
342 #endif
343 	if ((options & (BEV_OPT_DEFER_CALLBACKS|BEV_OPT_UNLOCK_CALLBACKS))
344 	    == BEV_OPT_UNLOCK_CALLBACKS) {
345 		event_warnx("UNLOCK_CALLBACKS requires DEFER_CALLBACKS");
346 		return -1;
347 	}
348 	if (options & BEV_OPT_UNLOCK_CALLBACKS)
349 		event_deferred_cb_init_(
350 		    &bufev_private->deferred,
351 		    event_base_get_npriorities(base) / 2,
352 		    bufferevent_run_deferred_callbacks_unlocked,
353 		    bufev_private);
354 	else
355 		event_deferred_cb_init_(
356 		    &bufev_private->deferred,
357 		    event_base_get_npriorities(base) / 2,
358 		    bufferevent_run_deferred_callbacks_locked,
359 		    bufev_private);
360 
361 	bufev_private->options = options;
362 
363 	evbuffer_set_parent_(bufev->input, bufev);
364 	evbuffer_set_parent_(bufev->output, bufev);
365 
366 	return 0;
367 }
368 
369 void
370 bufferevent_setcb(struct bufferevent *bufev,
371     bufferevent_data_cb readcb, bufferevent_data_cb writecb,
372     bufferevent_event_cb eventcb, void *cbarg)
373 {
374 	BEV_LOCK(bufev);
375 
376 	bufev->readcb = readcb;
377 	bufev->writecb = writecb;
378 	bufev->errorcb = eventcb;
379 
380 	bufev->cbarg = cbarg;
381 	BEV_UNLOCK(bufev);
382 }
383 
384 void
385 bufferevent_getcb(struct bufferevent *bufev,
386     bufferevent_data_cb *readcb_ptr,
387     bufferevent_data_cb *writecb_ptr,
388     bufferevent_event_cb *eventcb_ptr,
389     void **cbarg_ptr)
390 {
391 	BEV_LOCK(bufev);
392 	if (readcb_ptr)
393 		*readcb_ptr = bufev->readcb;
394 	if (writecb_ptr)
395 		*writecb_ptr = bufev->writecb;
396 	if (eventcb_ptr)
397 		*eventcb_ptr = bufev->errorcb;
398 	if (cbarg_ptr)
399 		*cbarg_ptr = bufev->cbarg;
400 
401 	BEV_UNLOCK(bufev);
402 }
403 
404 struct evbuffer *
405 bufferevent_get_input(struct bufferevent *bufev)
406 {
407 	return bufev->input;
408 }
409 
410 struct evbuffer *
411 bufferevent_get_output(struct bufferevent *bufev)
412 {
413 	return bufev->output;
414 }
415 
416 struct event_base *
417 bufferevent_get_base(struct bufferevent *bufev)
418 {
419 	return bufev->ev_base;
420 }
421 
422 int
423 bufferevent_get_priority(const struct bufferevent *bufev)
424 {
425 	if (event_initialized(&bufev->ev_read)) {
426 		return event_get_priority(&bufev->ev_read);
427 	} else {
428 		return event_base_get_npriorities(bufev->ev_base) / 2;
429 	}
430 }
431 
432 int
433 bufferevent_write(struct bufferevent *bufev, const void *data, size_t size)
434 {
435 	if (evbuffer_add(bufev->output, data, size) == -1)
436 		return (-1);
437 
438 	return 0;
439 }
440 
441 int
442 bufferevent_write_buffer(struct bufferevent *bufev, struct evbuffer *buf)
443 {
444 	if (evbuffer_add_buffer(bufev->output, buf) == -1)
445 		return (-1);
446 
447 	return 0;
448 }
449 
450 size_t
451 bufferevent_read(struct bufferevent *bufev, void *data, size_t size)
452 {
453 	return (evbuffer_remove(bufev->input, data, size));
454 }
455 
456 int
457 bufferevent_read_buffer(struct bufferevent *bufev, struct evbuffer *buf)
458 {
459 	return (evbuffer_add_buffer(buf, bufev->input));
460 }
461 
462 int
463 bufferevent_enable(struct bufferevent *bufev, short event)
464 {
465 	struct bufferevent_private *bufev_private =
466 	    EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
467 	short impl_events = event;
468 	int r = 0;
469 
470 	bufferevent_incref_and_lock_(bufev);
471 	if (bufev_private->read_suspended)
472 		impl_events &= ~EV_READ;
473 	if (bufev_private->write_suspended)
474 		impl_events &= ~EV_WRITE;
475 
476 	bufev->enabled |= event;
477 
478 	if (impl_events && bufev->be_ops->enable(bufev, impl_events) < 0)
479 		r = -1;
480 
481 	bufferevent_decref_and_unlock_(bufev);
482 	return r;
483 }
484 
485 int
486 bufferevent_set_timeouts(struct bufferevent *bufev,
487 			 const struct timeval *tv_read,
488 			 const struct timeval *tv_write)
489 {
490 	int r = 0;
491 	BEV_LOCK(bufev);
492 	if (tv_read) {
493 		bufev->timeout_read = *tv_read;
494 	} else {
495 		evutil_timerclear(&bufev->timeout_read);
496 	}
497 	if (tv_write) {
498 		bufev->timeout_write = *tv_write;
499 	} else {
500 		evutil_timerclear(&bufev->timeout_write);
501 	}
502 
503 	if (bufev->be_ops->adj_timeouts)
504 		r = bufev->be_ops->adj_timeouts(bufev);
505 	BEV_UNLOCK(bufev);
506 
507 	return r;
508 }
509 
510 
511 /* Obsolete; use bufferevent_set_timeouts */
512 void
513 bufferevent_settimeout(struct bufferevent *bufev,
514 		       int timeout_read, int timeout_write)
515 {
516 	struct timeval tv_read, tv_write;
517 	struct timeval *ptv_read = NULL, *ptv_write = NULL;
518 
519 	memset(&tv_read, 0, sizeof(tv_read));
520 	memset(&tv_write, 0, sizeof(tv_write));
521 
522 	if (timeout_read) {
523 		tv_read.tv_sec = timeout_read;
524 		ptv_read = &tv_read;
525 	}
526 	if (timeout_write) {
527 		tv_write.tv_sec = timeout_write;
528 		ptv_write = &tv_write;
529 	}
530 
531 	bufferevent_set_timeouts(bufev, ptv_read, ptv_write);
532 }
533 
534 
535 int
536 bufferevent_disable_hard_(struct bufferevent *bufev, short event)
537 {
538 	int r = 0;
539 	struct bufferevent_private *bufev_private =
540 	    EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
541 
542 	BEV_LOCK(bufev);
543 	bufev->enabled &= ~event;
544 
545 	bufev_private->connecting = 0;
546 	if (bufev->be_ops->disable(bufev, event) < 0)
547 		r = -1;
548 
549 	BEV_UNLOCK(bufev);
550 	return r;
551 }
552 
553 int
554 bufferevent_disable(struct bufferevent *bufev, short event)
555 {
556 	int r = 0;
557 
558 	BEV_LOCK(bufev);
559 	bufev->enabled &= ~event;
560 
561 	if (bufev->be_ops->disable(bufev, event) < 0)
562 		r = -1;
563 
564 	BEV_UNLOCK(bufev);
565 	return r;
566 }
567 
568 /*
569  * Sets the water marks
570  */
571 
572 void
573 bufferevent_setwatermark(struct bufferevent *bufev, short events,
574     size_t lowmark, size_t highmark)
575 {
576 	struct bufferevent_private *bufev_private =
577 	    EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
578 
579 	BEV_LOCK(bufev);
580 	if (events & EV_WRITE) {
581 		bufev->wm_write.low = lowmark;
582 		bufev->wm_write.high = highmark;
583 	}
584 
585 	if (events & EV_READ) {
586 		bufev->wm_read.low = lowmark;
587 		bufev->wm_read.high = highmark;
588 
589 		if (highmark) {
590 			/* There is now a new high-water mark for read.
591 			   enable the callback if needed, and see if we should
592 			   suspend/bufferevent_wm_unsuspend. */
593 
594 			if (bufev_private->read_watermarks_cb == NULL) {
595 				bufev_private->read_watermarks_cb =
596 				    evbuffer_add_cb(bufev->input,
597 						    bufferevent_inbuf_wm_cb,
598 						    bufev);
599 			}
600 			evbuffer_cb_set_flags(bufev->input,
601 				      bufev_private->read_watermarks_cb,
602 				      EVBUFFER_CB_ENABLED|EVBUFFER_CB_NODEFER);
603 
604 			if (evbuffer_get_length(bufev->input) >= highmark)
605 				bufferevent_wm_suspend_read(bufev);
606 			else if (evbuffer_get_length(bufev->input) < highmark)
607 				bufferevent_wm_unsuspend_read(bufev);
608 		} else {
609 			/* There is now no high-water mark for read. */
610 			if (bufev_private->read_watermarks_cb)
611 				evbuffer_cb_clear_flags(bufev->input,
612 				    bufev_private->read_watermarks_cb,
613 				    EVBUFFER_CB_ENABLED);
614 			bufferevent_wm_unsuspend_read(bufev);
615 		}
616 	}
617 	BEV_UNLOCK(bufev);
618 }
619 
620 int
621 bufferevent_getwatermark(struct bufferevent *bufev, short events,
622     size_t *lowmark, size_t *highmark)
623 {
624 	if (events == EV_WRITE) {
625 		BEV_LOCK(bufev);
626 		if (lowmark)
627 			*lowmark = bufev->wm_write.low;
628 		if (highmark)
629 			*highmark = bufev->wm_write.high;
630 		BEV_UNLOCK(bufev);
631 		return 0;
632 	}
633 
634 	if (events == EV_READ) {
635 		BEV_LOCK(bufev);
636 		if (lowmark)
637 			*lowmark = bufev->wm_read.low;
638 		if (highmark)
639 			*highmark = bufev->wm_read.high;
640 		BEV_UNLOCK(bufev);
641 		return 0;
642 	}
643 	return -1;
644 }
645 
646 int
647 bufferevent_flush(struct bufferevent *bufev,
648     short iotype,
649     enum bufferevent_flush_mode mode)
650 {
651 	int r = -1;
652 	BEV_LOCK(bufev);
653 	if (bufev->be_ops->flush)
654 		r = bufev->be_ops->flush(bufev, iotype, mode);
655 	BEV_UNLOCK(bufev);
656 	return r;
657 }
658 
659 void
660 bufferevent_incref_and_lock_(struct bufferevent *bufev)
661 {
662 	struct bufferevent_private *bufev_private =
663 	    BEV_UPCAST(bufev);
664 	BEV_LOCK(bufev);
665 	++bufev_private->refcnt;
666 }
667 
668 #if 0
669 static void
670 bufferevent_transfer_lock_ownership_(struct bufferevent *donor,
671     struct bufferevent *recipient)
672 {
673 	struct bufferevent_private *d = BEV_UPCAST(donor);
674 	struct bufferevent_private *r = BEV_UPCAST(recipient);
675 	if (d->lock != r->lock)
676 		return;
677 	if (r->own_lock)
678 		return;
679 	if (d->own_lock) {
680 		d->own_lock = 0;
681 		r->own_lock = 1;
682 	}
683 }
684 #endif
685 
686 int
687 bufferevent_decref_and_unlock_(struct bufferevent *bufev)
688 {
689 	struct bufferevent_private *bufev_private =
690 	    EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
691 	int n_cbs = 0;
692 #define MAX_CBS 16
693 	struct event_callback *cbs[MAX_CBS];
694 
695 	EVUTIL_ASSERT(bufev_private->refcnt > 0);
696 
697 	if (--bufev_private->refcnt) {
698 		BEV_UNLOCK(bufev);
699 		return 0;
700 	}
701 
702 	if (bufev->be_ops->unlink)
703 		bufev->be_ops->unlink(bufev);
704 
705 	/* Okay, we're out of references. Let's finalize this once all the
706 	 * callbacks are done running. */
707 	cbs[0] = &bufev->ev_read.ev_evcallback;
708 	cbs[1] = &bufev->ev_write.ev_evcallback;
709 	cbs[2] = &bufev_private->deferred;
710 	n_cbs = 3;
711 	if (bufev_private->rate_limiting) {
712 		struct event *e = &bufev_private->rate_limiting->refill_bucket_event;
713 		if (event_initialized(e))
714 			cbs[n_cbs++] = &e->ev_evcallback;
715 	}
716 	n_cbs += evbuffer_get_callbacks_(bufev->input, cbs+n_cbs, MAX_CBS-n_cbs);
717 	n_cbs += evbuffer_get_callbacks_(bufev->output, cbs+n_cbs, MAX_CBS-n_cbs);
718 
719 	event_callback_finalize_many_(bufev->ev_base, n_cbs, cbs,
720 	    bufferevent_finalize_cb_);
721 
722 #undef MAX_CBS
723 	BEV_UNLOCK(bufev);
724 
725 	return 1;
726 }
727 
728 static void
729 bufferevent_finalize_cb_(struct event_callback *evcb, void *arg_)
730 {
731 	struct bufferevent *bufev = arg_;
732 	struct bufferevent *underlying;
733 	struct bufferevent_private *bufev_private =
734 	    EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
735 
736 	BEV_LOCK(bufev);
737 	underlying = bufferevent_get_underlying(bufev);
738 
739 	/* Clean up the shared info */
740 	if (bufev->be_ops->destruct)
741 		bufev->be_ops->destruct(bufev);
742 
743 	/* XXX what happens if refcnt for these buffers is > 1?
744 	 * The buffers can share a lock with this bufferevent object,
745 	 * but the lock might be destroyed below. */
746 	/* evbuffer will free the callbacks */
747 	evbuffer_free(bufev->input);
748 	evbuffer_free(bufev->output);
749 
750 	if (bufev_private->rate_limiting) {
751 		if (bufev_private->rate_limiting->group)
752 			bufferevent_remove_from_rate_limit_group_internal_(bufev,0);
753 		mm_free(bufev_private->rate_limiting);
754 		bufev_private->rate_limiting = NULL;
755 	}
756 
757 
758 	BEV_UNLOCK(bufev);
759 
760 	if (bufev_private->own_lock)
761 		EVTHREAD_FREE_LOCK(bufev_private->lock,
762 		    EVTHREAD_LOCKTYPE_RECURSIVE);
763 
764 	/* Free the actual allocated memory. */
765 	mm_free(((char*)bufev) - bufev->be_ops->mem_offset);
766 
767 	/* Release the reference to underlying now that we no longer need the
768 	 * reference to it.  We wait this long mainly in case our lock is
769 	 * shared with underlying.
770 	 *
771 	 * The 'destruct' function will also drop a reference to underlying
772 	 * if BEV_OPT_CLOSE_ON_FREE is set.
773 	 *
774 	 * XXX Should we/can we just refcount evbuffer/bufferevent locks?
775 	 * It would probably save us some headaches.
776 	 */
777 	if (underlying)
778 		bufferevent_decref_(underlying);
779 }
780 
781 int
782 bufferevent_decref_(struct bufferevent *bufev)
783 {
784 	BEV_LOCK(bufev);
785 	return bufferevent_decref_and_unlock_(bufev);
786 }
787 
788 void
789 bufferevent_free(struct bufferevent *bufev)
790 {
791 	BEV_LOCK(bufev);
792 	bufferevent_setcb(bufev, NULL, NULL, NULL, NULL);
793 	bufferevent_cancel_all_(bufev);
794 	bufferevent_decref_and_unlock_(bufev);
795 }
796 
797 void
798 bufferevent_incref_(struct bufferevent *bufev)
799 {
800 	struct bufferevent_private *bufev_private =
801 	    EVUTIL_UPCAST(bufev, struct bufferevent_private, bev);
802 
803 	BEV_LOCK(bufev);
804 	++bufev_private->refcnt;
805 	BEV_UNLOCK(bufev);
806 }
807 
808 int
809 bufferevent_enable_locking_(struct bufferevent *bufev, void *lock)
810 {
811 #ifdef EVENT__DISABLE_THREAD_SUPPORT
812 	return -1;
813 #else
814 	struct bufferevent *underlying;
815 
816 	if (BEV_UPCAST(bufev)->lock)
817 		return -1;
818 	underlying = bufferevent_get_underlying(bufev);
819 
820 	if (!lock && underlying && BEV_UPCAST(underlying)->lock) {
821 		lock = BEV_UPCAST(underlying)->lock;
822 		BEV_UPCAST(bufev)->lock = lock;
823 		BEV_UPCAST(bufev)->own_lock = 0;
824 	} else if (!lock) {
825 		EVTHREAD_ALLOC_LOCK(lock, EVTHREAD_LOCKTYPE_RECURSIVE);
826 		if (!lock)
827 			return -1;
828 		BEV_UPCAST(bufev)->lock = lock;
829 		BEV_UPCAST(bufev)->own_lock = 1;
830 	} else {
831 		BEV_UPCAST(bufev)->lock = lock;
832 		BEV_UPCAST(bufev)->own_lock = 0;
833 	}
834 	evbuffer_enable_locking(bufev->input, lock);
835 	evbuffer_enable_locking(bufev->output, lock);
836 
837 	if (underlying && !BEV_UPCAST(underlying)->lock)
838 		bufferevent_enable_locking_(underlying, lock);
839 
840 	return 0;
841 #endif
842 }
843 
844 int
845 bufferevent_setfd(struct bufferevent *bev, evutil_socket_t fd)
846 {
847 	union bufferevent_ctrl_data d;
848 	int res = -1;
849 	d.fd = fd;
850 	BEV_LOCK(bev);
851 	if (bev->be_ops->ctrl)
852 		res = bev->be_ops->ctrl(bev, BEV_CTRL_SET_FD, &d);
853 	BEV_UNLOCK(bev);
854 	return res;
855 }
856 
857 evutil_socket_t
858 bufferevent_getfd(struct bufferevent *bev)
859 {
860 	union bufferevent_ctrl_data d;
861 	int res = -1;
862 	d.fd = -1;
863 	BEV_LOCK(bev);
864 	if (bev->be_ops->ctrl)
865 		res = bev->be_ops->ctrl(bev, BEV_CTRL_GET_FD, &d);
866 	BEV_UNLOCK(bev);
867 	return (res<0) ? -1 : d.fd;
868 }
869 
870 enum bufferevent_options
871 bufferevent_get_options_(struct bufferevent *bev)
872 {
873 	struct bufferevent_private *bev_p =
874 	    EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
875 	enum bufferevent_options options;
876 
877 	BEV_LOCK(bev);
878 	options = bev_p->options;
879 	BEV_UNLOCK(bev);
880 	return options;
881 }
882 
883 
884 static void
885 bufferevent_cancel_all_(struct bufferevent *bev)
886 {
887 	union bufferevent_ctrl_data d;
888 	memset(&d, 0, sizeof(d));
889 	BEV_LOCK(bev);
890 	if (bev->be_ops->ctrl)
891 		bev->be_ops->ctrl(bev, BEV_CTRL_CANCEL_ALL, &d);
892 	BEV_UNLOCK(bev);
893 }
894 
895 short
896 bufferevent_get_enabled(struct bufferevent *bufev)
897 {
898 	short r;
899 	BEV_LOCK(bufev);
900 	r = bufev->enabled;
901 	BEV_UNLOCK(bufev);
902 	return r;
903 }
904 
905 struct bufferevent *
906 bufferevent_get_underlying(struct bufferevent *bev)
907 {
908 	union bufferevent_ctrl_data d;
909 	int res = -1;
910 	d.ptr = NULL;
911 	BEV_LOCK(bev);
912 	if (bev->be_ops->ctrl)
913 		res = bev->be_ops->ctrl(bev, BEV_CTRL_GET_UNDERLYING, &d);
914 	BEV_UNLOCK(bev);
915 	return (res<0) ? NULL : d.ptr;
916 }
917 
918 static void
919 bufferevent_generic_read_timeout_cb(evutil_socket_t fd, short event, void *ctx)
920 {
921 	struct bufferevent *bev = ctx;
922 	bufferevent_incref_and_lock_(bev);
923 	bufferevent_disable(bev, EV_READ);
924 	bufferevent_run_eventcb_(bev, BEV_EVENT_TIMEOUT|BEV_EVENT_READING, 0);
925 	bufferevent_decref_and_unlock_(bev);
926 }
927 static void
928 bufferevent_generic_write_timeout_cb(evutil_socket_t fd, short event, void *ctx)
929 {
930 	struct bufferevent *bev = ctx;
931 	bufferevent_incref_and_lock_(bev);
932 	bufferevent_disable(bev, EV_WRITE);
933 	bufferevent_run_eventcb_(bev, BEV_EVENT_TIMEOUT|BEV_EVENT_WRITING, 0);
934 	bufferevent_decref_and_unlock_(bev);
935 }
936 
937 void
938 bufferevent_init_generic_timeout_cbs_(struct bufferevent *bev)
939 {
940 	event_assign(&bev->ev_read, bev->ev_base, -1, EV_FINALIZE,
941 	    bufferevent_generic_read_timeout_cb, bev);
942 	event_assign(&bev->ev_write, bev->ev_base, -1, EV_FINALIZE,
943 	    bufferevent_generic_write_timeout_cb, bev);
944 }
945 
946 int
947 bufferevent_generic_adj_timeouts_(struct bufferevent *bev)
948 {
949 	const short enabled = bev->enabled;
950 	struct bufferevent_private *bev_p =
951 	    EVUTIL_UPCAST(bev, struct bufferevent_private, bev);
952 	int r1=0, r2=0;
953 	if ((enabled & EV_READ) && !bev_p->read_suspended &&
954 	    evutil_timerisset(&bev->timeout_read))
955 		r1 = event_add(&bev->ev_read, &bev->timeout_read);
956 	else
957 		r1 = event_del(&bev->ev_read);
958 
959 	if ((enabled & EV_WRITE) && !bev_p->write_suspended &&
960 	    evutil_timerisset(&bev->timeout_write) &&
961 	    evbuffer_get_length(bev->output))
962 		r2 = event_add(&bev->ev_write, &bev->timeout_write);
963 	else
964 		r2 = event_del(&bev->ev_write);
965 	if (r1 < 0 || r2 < 0)
966 		return -1;
967 	return 0;
968 }
969 
970 int
971 bufferevent_add_event_(struct event *ev, const struct timeval *tv)
972 {
973 	if (tv->tv_sec == 0 && tv->tv_usec == 0)
974 		return event_add(ev, NULL);
975 	else
976 		return event_add(ev, tv);
977 }
978 
979 /* For use by user programs only; internally, we should be calling
980    either bufferevent_incref_and_lock_(), or BEV_LOCK. */
981 void
982 bufferevent_lock(struct bufferevent *bev)
983 {
984 	bufferevent_incref_and_lock_(bev);
985 }
986 
987 void
988 bufferevent_unlock(struct bufferevent *bev)
989 {
990 	bufferevent_decref_and_unlock_(bev);
991 }
992