xref: /netbsd-src/external/bsd/libevent/dist/bufferevent_filter.c (revision 657871a79c9a2060a6255a242fa1a1ef76b56ec6)
1 /*	$NetBSD: bufferevent_filter.c,v 1.1.1.3 2021/04/07 02:43:13 christos Exp $	*/
2 /*
3  * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson
4  * Copyright (c) 2002-2006 Niels Provos <provos@citi.umich.edu>
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  * 1. Redistributions of source code must retain the above copyright
11  *    notice, this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright
13  *    notice, this list of conditions and the following disclaimer in the
14  *    documentation and/or other materials provided with the distribution.
15  * 3. The name of the author may not be used to endorse or promote products
16  *    derived from this software without specific prior written permission.
17  *
18  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
19  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
20  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
21  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
22  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
23  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
24  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
25  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
27  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28  */
29 
30 #include "evconfig-private.h"
31 
32 #include <sys/types.h>
33 
34 #include "event2/event-config.h"
35 #include <sys/cdefs.h>
36 __RCSID("$NetBSD: bufferevent_filter.c,v 1.1.1.3 2021/04/07 02:43:13 christos Exp $");
37 
38 #ifdef EVENT__HAVE_SYS_TIME_H
39 #include <sys/time.h>
40 #endif
41 
42 #include <errno.h>
43 #include <stdio.h>
44 #include <stdlib.h>
45 #include <string.h>
46 #ifdef EVENT__HAVE_STDARG_H
47 #include <stdarg.h>
48 #endif
49 
50 #ifdef _WIN32
51 #include <winsock2.h>
52 #endif
53 
54 #include "event2/util.h"
55 #include "event2/bufferevent.h"
56 #include "event2/buffer.h"
57 #include "event2/bufferevent_struct.h"
58 #include "event2/event.h"
59 #include "log-internal.h"
60 #include "mm-internal.h"
61 #include "bufferevent-internal.h"
62 #include "util-internal.h"
63 
64 /* prototypes */
65 static int be_filter_enable(struct bufferevent *, short);
66 static int be_filter_disable(struct bufferevent *, short);
67 static void be_filter_unlink(struct bufferevent *);
68 static void be_filter_destruct(struct bufferevent *);
69 
70 static void be_filter_readcb(struct bufferevent *, void *);
71 static void be_filter_writecb(struct bufferevent *, void *);
72 static void be_filter_eventcb(struct bufferevent *, short, void *);
73 static int be_filter_flush(struct bufferevent *bufev,
74     short iotype, enum bufferevent_flush_mode mode);
75 static int be_filter_ctrl(struct bufferevent *, enum bufferevent_ctrl_op, union bufferevent_ctrl_data *);
76 
77 static void bufferevent_filtered_inbuf_cb(struct evbuffer *buf,
78     const struct evbuffer_cb_info *cbinfo, void *arg);
79 
80 static void bufferevent_filtered_outbuf_cb(struct evbuffer *buf,
81     const struct evbuffer_cb_info *info, void *arg);
82 
83 struct bufferevent_filtered {
84 	struct bufferevent_private bev;
85 
86 	/** The bufferevent that we read/write filtered data from/to. */
87 	struct bufferevent *underlying;
88 	/** A callback on our inbuf to notice somebory removes data */
89 	struct evbuffer_cb_entry *inbuf_cb;
90 	/** A callback on our outbuf to notice when somebody adds data */
91 	struct evbuffer_cb_entry *outbuf_cb;
92 	/** True iff we have received an EOF callback from the underlying
93 	 * bufferevent. */
94 	unsigned got_eof;
95 
96 	/** Function to free context when we're done. */
97 	void (*free_context)(void *);
98 	/** Input filter */
99 	bufferevent_filter_cb process_in;
100 	/** Output filter */
101 	bufferevent_filter_cb process_out;
102 	/** User-supplied argument to the filters. */
103 	void *context;
104 };
105 
106 const struct bufferevent_ops bufferevent_ops_filter = {
107 	"filter",
108 	evutil_offsetof(struct bufferevent_filtered, bev.bev),
109 	be_filter_enable,
110 	be_filter_disable,
111 	be_filter_unlink,
112 	be_filter_destruct,
113 	bufferevent_generic_adj_timeouts_,
114 	be_filter_flush,
115 	be_filter_ctrl,
116 };
117 
118 /* Given a bufferevent that's really the bev filter of a bufferevent_filtered,
119  * return that bufferevent_filtered. Returns NULL otherwise.*/
120 static inline struct bufferevent_filtered *
upcast(struct bufferevent * bev)121 upcast(struct bufferevent *bev)
122 {
123 	struct bufferevent_filtered *bev_f;
124 	if (!BEV_IS_FILTER(bev))
125 		return NULL;
126 	bev_f = (void*)( ((char*)bev) -
127 			 evutil_offsetof(struct bufferevent_filtered, bev.bev));
128 	EVUTIL_ASSERT(BEV_IS_FILTER(&bev_f->bev.bev));
129 	return bev_f;
130 }
131 
132 #define downcast(bev_f) (&(bev_f)->bev.bev)
133 
134 /** Return 1 iff bevf's underlying bufferevent's output buffer is at or
135  * over its high watermark such that we should not write to it in a given
136  * flush mode. */
137 static int
be_underlying_writebuf_full(struct bufferevent_filtered * bevf,enum bufferevent_flush_mode state)138 be_underlying_writebuf_full(struct bufferevent_filtered *bevf,
139     enum bufferevent_flush_mode state)
140 {
141 	struct bufferevent *u = bevf->underlying;
142 	return state == BEV_NORMAL &&
143 	    u->wm_write.high &&
144 	    evbuffer_get_length(u->output) >= u->wm_write.high;
145 }
146 
147 /** Return 1 if our input buffer is at or over its high watermark such that we
148  * should not write to it in a given flush mode. */
149 static int
be_readbuf_full(struct bufferevent_filtered * bevf,enum bufferevent_flush_mode state)150 be_readbuf_full(struct bufferevent_filtered *bevf,
151     enum bufferevent_flush_mode state)
152 {
153 	struct bufferevent *bufev = downcast(bevf);
154 	return state == BEV_NORMAL &&
155 	    bufev->wm_read.high &&
156 	    evbuffer_get_length(bufev->input) >= bufev->wm_read.high;
157 }
158 
159 
160 /* Filter to use when we're created with a NULL filter. */
161 static enum bufferevent_filter_result
be_null_filter(struct evbuffer * src,struct evbuffer * dst,ev_ssize_t lim,enum bufferevent_flush_mode state,void * ctx)162 be_null_filter(struct evbuffer *src, struct evbuffer *dst, ev_ssize_t lim,
163 	       enum bufferevent_flush_mode state, void *ctx)
164 {
165 	(void)state;
166 	if (evbuffer_remove_buffer(src, dst, lim) >= 0)
167 		return BEV_OK;
168 	else
169 		return BEV_ERROR;
170 }
171 
172 struct bufferevent *
bufferevent_filter_new(struct bufferevent * underlying,bufferevent_filter_cb input_filter,bufferevent_filter_cb output_filter,int options,void (* free_context)(void *),void * ctx)173 bufferevent_filter_new(struct bufferevent *underlying,
174 		       bufferevent_filter_cb input_filter,
175 		       bufferevent_filter_cb output_filter,
176 		       int options,
177 		       void (*free_context)(void *),
178 		       void *ctx)
179 {
180 	struct bufferevent_filtered *bufev_f;
181 	int tmp_options = options & ~BEV_OPT_THREADSAFE;
182 
183 	if (!underlying)
184 		return NULL;
185 
186 	if (!input_filter)
187 		input_filter = be_null_filter;
188 	if (!output_filter)
189 		output_filter = be_null_filter;
190 
191 	bufev_f = mm_calloc(1, sizeof(struct bufferevent_filtered));
192 	if (!bufev_f)
193 		return NULL;
194 
195 	if (bufferevent_init_common_(&bufev_f->bev, underlying->ev_base,
196 				    &bufferevent_ops_filter, tmp_options) < 0) {
197 		mm_free(bufev_f);
198 		return NULL;
199 	}
200 	if (options & BEV_OPT_THREADSAFE) {
201 		bufferevent_enable_locking_(downcast(bufev_f), NULL);
202 	}
203 
204 	bufev_f->underlying = underlying;
205 
206 	bufev_f->process_in = input_filter;
207 	bufev_f->process_out = output_filter;
208 	bufev_f->free_context = free_context;
209 	bufev_f->context = ctx;
210 
211 	bufferevent_setcb(bufev_f->underlying,
212 	    be_filter_readcb, be_filter_writecb, be_filter_eventcb, bufev_f);
213 
214 	bufev_f->inbuf_cb = evbuffer_add_cb(downcast(bufev_f)->input,
215 		bufferevent_filtered_inbuf_cb, bufev_f);
216 	evbuffer_cb_clear_flags(downcast(bufev_f)->input, bufev_f->inbuf_cb,
217 		EVBUFFER_CB_ENABLED);
218 
219 	bufev_f->outbuf_cb = evbuffer_add_cb(downcast(bufev_f)->output,
220 	   bufferevent_filtered_outbuf_cb, bufev_f);
221 
222 	bufferevent_init_generic_timeout_cbs_(downcast(bufev_f));
223 	bufferevent_incref_(underlying);
224 
225 	bufferevent_enable(underlying, EV_READ|EV_WRITE);
226 	bufferevent_suspend_read_(underlying, BEV_SUSPEND_FILT_READ);
227 
228 	return downcast(bufev_f);
229 }
230 
231 static void
be_filter_unlink(struct bufferevent * bev)232 be_filter_unlink(struct bufferevent *bev)
233 {
234 	struct bufferevent_filtered *bevf = upcast(bev);
235 	EVUTIL_ASSERT(bevf);
236 
237 	if (bevf->bev.options & BEV_OPT_CLOSE_ON_FREE) {
238 		/* Yes, there is also a decref in bufferevent_decref_.
239 		 * That decref corresponds to the incref when we set
240 		 * underlying for the first time.  This decref is an
241 		 * extra one to remove the last reference.
242 		 */
243 		if (BEV_UPCAST(bevf->underlying)->refcnt < 2) {
244 			event_warnx("BEV_OPT_CLOSE_ON_FREE set on an "
245 			    "bufferevent with too few references");
246 		} else {
247 			bufferevent_free(bevf->underlying);
248 		}
249 	} else {
250 		if (bevf->underlying) {
251 			if (bevf->underlying->errorcb == be_filter_eventcb)
252 				bufferevent_setcb(bevf->underlying,
253 				    NULL, NULL, NULL, NULL);
254 			bufferevent_unsuspend_read_(bevf->underlying,
255 			    BEV_SUSPEND_FILT_READ);
256 		}
257 	}
258 }
259 
260 static void
be_filter_destruct(struct bufferevent * bev)261 be_filter_destruct(struct bufferevent *bev)
262 {
263 	struct bufferevent_filtered *bevf = upcast(bev);
264 	EVUTIL_ASSERT(bevf);
265 	if (bevf->free_context)
266 		bevf->free_context(bevf->context);
267 
268 	if (bevf->inbuf_cb)
269 		evbuffer_remove_cb_entry(bev->input, bevf->inbuf_cb);
270 
271 	if (bevf->outbuf_cb)
272 		evbuffer_remove_cb_entry(bev->output, bevf->outbuf_cb);
273 }
274 
275 static int
be_filter_enable(struct bufferevent * bev,short event)276 be_filter_enable(struct bufferevent *bev, short event)
277 {
278 	struct bufferevent_filtered *bevf = upcast(bev);
279 	if (event & EV_WRITE)
280 		BEV_RESET_GENERIC_WRITE_TIMEOUT(bev);
281 
282 	if (event & EV_READ) {
283 		BEV_RESET_GENERIC_READ_TIMEOUT(bev);
284 		bufferevent_unsuspend_read_(bevf->underlying,
285 		    BEV_SUSPEND_FILT_READ);
286 	}
287 	return 0;
288 }
289 
290 static int
be_filter_disable(struct bufferevent * bev,short event)291 be_filter_disable(struct bufferevent *bev, short event)
292 {
293 	struct bufferevent_filtered *bevf = upcast(bev);
294 	if (event & EV_WRITE)
295 		BEV_DEL_GENERIC_WRITE_TIMEOUT(bev);
296 	if (event & EV_READ) {
297 		BEV_DEL_GENERIC_READ_TIMEOUT(bev);
298 		bufferevent_suspend_read_(bevf->underlying,
299 		    BEV_SUSPEND_FILT_READ);
300 	}
301 	return 0;
302 }
303 
304 static enum bufferevent_filter_result
be_filter_process_input(struct bufferevent_filtered * bevf,enum bufferevent_flush_mode state,int * processed_out)305 be_filter_process_input(struct bufferevent_filtered *bevf,
306 			enum bufferevent_flush_mode state,
307 			int *processed_out)
308 {
309 	enum bufferevent_filter_result res;
310 	struct bufferevent *bev = downcast(bevf);
311 
312 	if (state == BEV_NORMAL) {
313 		/* If we're in 'normal' mode, don't urge data on the filter
314 		 * unless we're reading data and under our high-water mark.*/
315 		if (!(bev->enabled & EV_READ) ||
316 		    be_readbuf_full(bevf, state))
317 			return BEV_OK;
318 	}
319 
320 	do {
321 		ev_ssize_t limit = -1;
322 		if (state == BEV_NORMAL && bev->wm_read.high)
323 			limit = bev->wm_read.high -
324 			    evbuffer_get_length(bev->input);
325 
326 		res = bevf->process_in(bevf->underlying->input,
327 		    bev->input, limit, state, bevf->context);
328 
329 		if (res == BEV_OK)
330 			*processed_out = 1;
331 	} while (res == BEV_OK &&
332 		 (bev->enabled & EV_READ) &&
333 		 evbuffer_get_length(bevf->underlying->input) &&
334 		 !be_readbuf_full(bevf, state));
335 
336 	if (*processed_out)
337 		BEV_RESET_GENERIC_READ_TIMEOUT(bev);
338 
339 	return res;
340 }
341 
342 
343 static enum bufferevent_filter_result
be_filter_process_output(struct bufferevent_filtered * bevf,enum bufferevent_flush_mode state,int * processed_out)344 be_filter_process_output(struct bufferevent_filtered *bevf,
345 			 enum bufferevent_flush_mode state,
346 			 int *processed_out)
347 {
348 	/* Requires references and lock: might call writecb */
349 	enum bufferevent_filter_result res = BEV_OK;
350 	struct bufferevent *bufev = downcast(bevf);
351 	int again = 0;
352 
353 	if (state == BEV_NORMAL) {
354 		/* If we're in 'normal' mode, don't urge data on the
355 		 * filter unless we're writing data, and the underlying
356 		 * bufferevent is accepting data, and we have data to
357 		 * give the filter.  If we're in 'flush' or 'finish',
358 		 * call the filter no matter what. */
359 		if (!(bufev->enabled & EV_WRITE) ||
360 		    be_underlying_writebuf_full(bevf, state) ||
361 		    !evbuffer_get_length(bufev->output))
362 			return BEV_OK;
363 	}
364 
365 	/* disable the callback that calls this function
366 	   when the user adds to the output buffer. */
367 	evbuffer_cb_clear_flags(bufev->output, bevf->outbuf_cb,
368 	    EVBUFFER_CB_ENABLED);
369 
370 	do {
371 		int processed = 0;
372 		again = 0;
373 
374 		do {
375 			ev_ssize_t limit = -1;
376 			if (state == BEV_NORMAL &&
377 			    bevf->underlying->wm_write.high)
378 				limit = bevf->underlying->wm_write.high -
379 				    evbuffer_get_length(bevf->underlying->output);
380 
381 			res = bevf->process_out(downcast(bevf)->output,
382 			    bevf->underlying->output,
383 			    limit,
384 			    state,
385 			    bevf->context);
386 
387 			if (res == BEV_OK)
388 				processed = *processed_out = 1;
389 		} while (/* Stop if the filter wasn't successful...*/
390 			res == BEV_OK &&
391 			/* Or if we aren't writing any more. */
392 			(bufev->enabled & EV_WRITE) &&
393 			/* Of if we have nothing more to write and we are
394 			 * not flushing. */
395 			evbuffer_get_length(bufev->output) &&
396 			/* Or if we have filled the underlying output buffer. */
397 			!be_underlying_writebuf_full(bevf,state));
398 
399 		if (processed) {
400 			/* call the write callback.*/
401 			bufferevent_trigger_nolock_(bufev, EV_WRITE, 0);
402 
403 			if (res == BEV_OK &&
404 			    (bufev->enabled & EV_WRITE) &&
405 			    evbuffer_get_length(bufev->output) &&
406 			    !be_underlying_writebuf_full(bevf, state)) {
407 				again = 1;
408 			}
409 		}
410 	} while (again);
411 
412 	/* reenable the outbuf_cb */
413 	evbuffer_cb_set_flags(bufev->output,bevf->outbuf_cb,
414 	    EVBUFFER_CB_ENABLED);
415 
416 	if (*processed_out)
417 		BEV_RESET_GENERIC_WRITE_TIMEOUT(bufev);
418 
419 	return res;
420 }
421 
422 /* Called when the size of our outbuf changes. */
423 static void
bufferevent_filtered_outbuf_cb(struct evbuffer * buf,const struct evbuffer_cb_info * cbinfo,void * arg)424 bufferevent_filtered_outbuf_cb(struct evbuffer *buf,
425     const struct evbuffer_cb_info *cbinfo, void *arg)
426 {
427 	struct bufferevent_filtered *bevf = arg;
428 	struct bufferevent *bev = downcast(bevf);
429 
430 	if (cbinfo->n_added) {
431 		int processed_any = 0;
432 		/* Somebody added more data to the output buffer. Try to
433 		 * process it, if we should. */
434 		bufferevent_incref_and_lock_(bev);
435 		be_filter_process_output(bevf, BEV_NORMAL, &processed_any);
436 		bufferevent_decref_and_unlock_(bev);
437 	}
438 }
439 
440 static void
be_filter_read_nolock_(struct bufferevent * underlying,void * me_)441 be_filter_read_nolock_(struct bufferevent *underlying, void *me_)
442 {
443 	struct bufferevent_filtered *bevf = me_;
444 	enum bufferevent_filter_result res;
445 	enum bufferevent_flush_mode state;
446 	struct bufferevent *bufev = downcast(bevf);
447 	struct bufferevent_private *bufev_private = BEV_UPCAST(bufev);
448 	int processed_any = 0;
449 
450 	// It's possible our refcount is 0 at this point if another thread free'd our filterevent
451 	EVUTIL_ASSERT(bufev_private->refcnt >= 0);
452 
453 	// If our refcount is > 0
454 	if (bufev_private->refcnt > 0) {
455 
456 		if (bevf->got_eof)
457 			state = BEV_FINISHED;
458 		else
459 			state = BEV_NORMAL;
460 
461 		/* XXXX use return value */
462 		res = be_filter_process_input(bevf, state, &processed_any);
463 		(void)res;
464 
465 		/* XXX This should be in process_input, not here.  There are
466 		 * other places that can call process-input, and they should
467 		 * force readcb calls as needed. */
468 		if (processed_any) {
469 			bufferevent_trigger_nolock_(bufev, EV_READ, 0);
470 			if (evbuffer_get_length(underlying->input) > 0 &&
471 				be_readbuf_full(bevf, state)) {
472 				/* data left in underlying buffer and filter input buffer
473 				 * hit its read high watermark.
474 				 * Schedule callback to avoid data gets stuck in underlying
475 				 * input buffer.
476 				 */
477 				evbuffer_cb_set_flags(bufev->input, bevf->inbuf_cb,
478 					EVBUFFER_CB_ENABLED);
479 			}
480 		}
481 	}
482 }
483 
484 /* Called when the size of our inbuf changes. */
485 static void
bufferevent_filtered_inbuf_cb(struct evbuffer * buf,const struct evbuffer_cb_info * cbinfo,void * arg)486 bufferevent_filtered_inbuf_cb(struct evbuffer *buf,
487     const struct evbuffer_cb_info *cbinfo, void *arg)
488 {
489 	struct bufferevent_filtered *bevf = arg;
490 	enum bufferevent_flush_mode state;
491 	struct bufferevent *bev = downcast(bevf);
492 
493 	BEV_LOCK(bev);
494 
495 	if (bevf->got_eof)
496 		state = BEV_FINISHED;
497 	else
498 		state = BEV_NORMAL;
499 
500 
501 	if (!be_readbuf_full(bevf, state)) {
502 		/* opportunity to read data which was left in underlying
503 		 * input buffer because filter input buffer hit read
504 		 * high watermark.
505 		 */
506 		evbuffer_cb_clear_flags(bev->input, bevf->inbuf_cb,
507 			EVBUFFER_CB_ENABLED);
508 		if (evbuffer_get_length(bevf->underlying->input) > 0)
509 			be_filter_read_nolock_(bevf->underlying, bevf);
510 	}
511 
512 	BEV_UNLOCK(bev);
513 }
514 
515 /* Called when the underlying socket has read. */
516 static void
be_filter_readcb(struct bufferevent * underlying,void * me_)517 be_filter_readcb(struct bufferevent *underlying, void *me_)
518 {
519 	struct bufferevent_filtered *bevf = me_;
520 	struct bufferevent *bev = downcast(bevf);
521 
522 	BEV_LOCK(bev);
523 
524 	be_filter_read_nolock_(underlying, me_);
525 
526 	BEV_UNLOCK(bev);
527 }
528 
529 /* Called when the underlying socket has drained enough that we can write to
530    it. */
531 static void
be_filter_writecb(struct bufferevent * underlying,void * me_)532 be_filter_writecb(struct bufferevent *underlying, void *me_)
533 {
534 	struct bufferevent_filtered *bevf = me_;
535 	struct bufferevent *bev = downcast(bevf);
536 	struct bufferevent_private *bufev_private = BEV_UPCAST(bev);
537 	int processed_any = 0;
538 
539 	BEV_LOCK(bev);
540 
541 	// It's possible our refcount is 0 at this point if another thread free'd our filterevent
542 	EVUTIL_ASSERT(bufev_private->refcnt >= 0);
543 
544 	// If our refcount is > 0
545 	if (bufev_private->refcnt > 0) {
546 		be_filter_process_output(bevf, BEV_NORMAL, &processed_any);
547 	}
548 
549 	BEV_UNLOCK(bev);
550 }
551 
552 /* Called when the underlying socket has given us an error */
553 static void
be_filter_eventcb(struct bufferevent * underlying,short what,void * me_)554 be_filter_eventcb(struct bufferevent *underlying, short what, void *me_)
555 {
556 	struct bufferevent_filtered *bevf = me_;
557 	struct bufferevent *bev = downcast(bevf);
558 	struct bufferevent_private *bufev_private = BEV_UPCAST(bev);
559 
560 	BEV_LOCK(bev);
561 
562 	// It's possible our refcount is 0 at this point if another thread free'd our filterevent
563 	EVUTIL_ASSERT(bufev_private->refcnt >= 0);
564 
565 	// If our refcount is > 0
566 	if (bufev_private->refcnt > 0) {
567 
568 		/* All we can really to is tell our own eventcb. */
569 		bufferevent_run_eventcb_(bev, what, 0);
570 	}
571 
572 	BEV_UNLOCK(bev);
573 }
574 
575 static int
be_filter_flush(struct bufferevent * bufev,short iotype,enum bufferevent_flush_mode mode)576 be_filter_flush(struct bufferevent *bufev,
577     short iotype, enum bufferevent_flush_mode mode)
578 {
579 	struct bufferevent_filtered *bevf = upcast(bufev);
580 	int processed_any = 0;
581 	EVUTIL_ASSERT(bevf);
582 
583 	bufferevent_incref_and_lock_(bufev);
584 
585 	if (iotype & EV_READ) {
586 		be_filter_process_input(bevf, mode, &processed_any);
587 	}
588 	if (iotype & EV_WRITE) {
589 		be_filter_process_output(bevf, mode, &processed_any);
590 	}
591 	/* XXX check the return value? */
592 	/* XXX does this want to recursively call lower-level flushes? */
593 	bufferevent_flush(bevf->underlying, iotype, mode);
594 
595 	bufferevent_decref_and_unlock_(bufev);
596 
597 	return processed_any;
598 }
599 
600 static int
be_filter_ctrl(struct bufferevent * bev,enum bufferevent_ctrl_op op,union bufferevent_ctrl_data * data)601 be_filter_ctrl(struct bufferevent *bev, enum bufferevent_ctrl_op op,
602     union bufferevent_ctrl_data *data)
603 {
604 	struct bufferevent_filtered *bevf;
605 	switch (op) {
606 	case BEV_CTRL_GET_UNDERLYING:
607 		bevf = upcast(bev);
608 		data->ptr = bevf->underlying;
609 		return 0;
610 	case BEV_CTRL_SET_FD:
611 	case BEV_CTRL_GET_FD:
612 		bevf = upcast(bev);
613 
614 		if (bevf->underlying &&
615 			bevf->underlying->be_ops &&
616 			bevf->underlying->be_ops->ctrl) {
617 		    return (bevf->underlying->be_ops->ctrl)(bevf->underlying, op, data);
618 		}
619 		EVUTIL_FALLTHROUGH;
620 
621 	case BEV_CTRL_CANCEL_ALL:
622 		EVUTIL_FALLTHROUGH;
623 	default:
624 		return -1;
625 	}
626 
627 	return -1;
628 }
629