xref: /freebsd-src/contrib/libevent/bufferevent_filter.c (revision b50261e21f39a6c7249a49e7b60aa878c98512a8)
1c43e99fdSEd Maste /*
2c43e99fdSEd Maste  * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson
3c43e99fdSEd Maste  * Copyright (c) 2002-2006 Niels Provos <provos@citi.umich.edu>
4c43e99fdSEd Maste  * All rights reserved.
5c43e99fdSEd Maste  *
6c43e99fdSEd Maste  * Redistribution and use in source and binary forms, with or without
7c43e99fdSEd Maste  * modification, are permitted provided that the following conditions
8c43e99fdSEd Maste  * are met:
9c43e99fdSEd Maste  * 1. Redistributions of source code must retain the above copyright
10c43e99fdSEd Maste  *    notice, this list of conditions and the following disclaimer.
11c43e99fdSEd Maste  * 2. Redistributions in binary form must reproduce the above copyright
12c43e99fdSEd Maste  *    notice, this list of conditions and the following disclaimer in the
13c43e99fdSEd Maste  *    documentation and/or other materials provided with the distribution.
14c43e99fdSEd Maste  * 3. The name of the author may not be used to endorse or promote products
15c43e99fdSEd Maste  *    derived from this software without specific prior written permission.
16c43e99fdSEd Maste  *
17c43e99fdSEd Maste  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18c43e99fdSEd Maste  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19c43e99fdSEd Maste  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20c43e99fdSEd Maste  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21c43e99fdSEd Maste  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22c43e99fdSEd Maste  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23c43e99fdSEd Maste  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24c43e99fdSEd Maste  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25c43e99fdSEd Maste  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26c43e99fdSEd Maste  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27c43e99fdSEd Maste  */
28c43e99fdSEd Maste 
29c43e99fdSEd Maste #include "evconfig-private.h"
30c43e99fdSEd Maste 
31c43e99fdSEd Maste #include <sys/types.h>
32c43e99fdSEd Maste 
33c43e99fdSEd Maste #include "event2/event-config.h"
34c43e99fdSEd Maste 
35c43e99fdSEd Maste #ifdef EVENT__HAVE_SYS_TIME_H
36c43e99fdSEd Maste #include <sys/time.h>
37c43e99fdSEd Maste #endif
38c43e99fdSEd Maste 
39c43e99fdSEd Maste #include <errno.h>
40c43e99fdSEd Maste #include <stdio.h>
41c43e99fdSEd Maste #include <stdlib.h>
42c43e99fdSEd Maste #include <string.h>
43c43e99fdSEd Maste #ifdef EVENT__HAVE_STDARG_H
44c43e99fdSEd Maste #include <stdarg.h>
45c43e99fdSEd Maste #endif
46c43e99fdSEd Maste 
47c43e99fdSEd Maste #ifdef _WIN32
48c43e99fdSEd Maste #include <winsock2.h>
49c43e99fdSEd Maste #endif
50c43e99fdSEd Maste 
51c43e99fdSEd Maste #include "event2/util.h"
52c43e99fdSEd Maste #include "event2/bufferevent.h"
53c43e99fdSEd Maste #include "event2/buffer.h"
54c43e99fdSEd Maste #include "event2/bufferevent_struct.h"
55c43e99fdSEd Maste #include "event2/event.h"
56c43e99fdSEd Maste #include "log-internal.h"
57c43e99fdSEd Maste #include "mm-internal.h"
58c43e99fdSEd Maste #include "bufferevent-internal.h"
59c43e99fdSEd Maste #include "util-internal.h"
60c43e99fdSEd Maste 
61c43e99fdSEd Maste /* prototypes */
62c43e99fdSEd Maste static int be_filter_enable(struct bufferevent *, short);
63c43e99fdSEd Maste static int be_filter_disable(struct bufferevent *, short);
64c43e99fdSEd Maste static void be_filter_unlink(struct bufferevent *);
65c43e99fdSEd Maste static void be_filter_destruct(struct bufferevent *);
66c43e99fdSEd Maste 
67c43e99fdSEd Maste static void be_filter_readcb(struct bufferevent *, void *);
68c43e99fdSEd Maste static void be_filter_writecb(struct bufferevent *, void *);
69c43e99fdSEd Maste static void be_filter_eventcb(struct bufferevent *, short, void *);
70c43e99fdSEd Maste static int be_filter_flush(struct bufferevent *bufev,
71c43e99fdSEd Maste     short iotype, enum bufferevent_flush_mode mode);
72c43e99fdSEd Maste static int be_filter_ctrl(struct bufferevent *, enum bufferevent_ctrl_op, union bufferevent_ctrl_data *);
73c43e99fdSEd Maste 
74c43e99fdSEd Maste static void bufferevent_filtered_inbuf_cb(struct evbuffer *buf,
75c43e99fdSEd Maste     const struct evbuffer_cb_info *cbinfo, void *arg);
76c43e99fdSEd Maste 
77c43e99fdSEd Maste static void bufferevent_filtered_outbuf_cb(struct evbuffer *buf,
78c43e99fdSEd Maste     const struct evbuffer_cb_info *info, void *arg);
79c43e99fdSEd Maste 
80c43e99fdSEd Maste struct bufferevent_filtered {
81c43e99fdSEd Maste 	struct bufferevent_private bev;
82c43e99fdSEd Maste 
83c43e99fdSEd Maste 	/** The bufferevent that we read/write filtered data from/to. */
84c43e99fdSEd Maste 	struct bufferevent *underlying;
85c43e99fdSEd Maste 	/** A callback on our inbuf to notice somebory removes data */
86c43e99fdSEd Maste 	struct evbuffer_cb_entry *inbuf_cb;
87c43e99fdSEd Maste 	/** A callback on our outbuf to notice when somebody adds data */
88c43e99fdSEd Maste 	struct evbuffer_cb_entry *outbuf_cb;
89c43e99fdSEd Maste 	/** True iff we have received an EOF callback from the underlying
90c43e99fdSEd Maste 	 * bufferevent. */
91c43e99fdSEd Maste 	unsigned got_eof;
92c43e99fdSEd Maste 
93c43e99fdSEd Maste 	/** Function to free context when we're done. */
94c43e99fdSEd Maste 	void (*free_context)(void *);
95c43e99fdSEd Maste 	/** Input filter */
96c43e99fdSEd Maste 	bufferevent_filter_cb process_in;
97c43e99fdSEd Maste 	/** Output filter */
98c43e99fdSEd Maste 	bufferevent_filter_cb process_out;
99c43e99fdSEd Maste 	/** User-supplied argument to the filters. */
100c43e99fdSEd Maste 	void *context;
101c43e99fdSEd Maste };
102c43e99fdSEd Maste 
103c43e99fdSEd Maste const struct bufferevent_ops bufferevent_ops_filter = {
104c43e99fdSEd Maste 	"filter",
105c43e99fdSEd Maste 	evutil_offsetof(struct bufferevent_filtered, bev.bev),
106c43e99fdSEd Maste 	be_filter_enable,
107c43e99fdSEd Maste 	be_filter_disable,
108c43e99fdSEd Maste 	be_filter_unlink,
109c43e99fdSEd Maste 	be_filter_destruct,
110c43e99fdSEd Maste 	bufferevent_generic_adj_timeouts_,
111c43e99fdSEd Maste 	be_filter_flush,
112c43e99fdSEd Maste 	be_filter_ctrl,
113c43e99fdSEd Maste };
114c43e99fdSEd Maste 
115c43e99fdSEd Maste /* Given a bufferevent that's really the bev filter of a bufferevent_filtered,
116c43e99fdSEd Maste  * return that bufferevent_filtered. Returns NULL otherwise.*/
117c43e99fdSEd Maste static inline struct bufferevent_filtered *
upcast(struct bufferevent * bev)118c43e99fdSEd Maste upcast(struct bufferevent *bev)
119c43e99fdSEd Maste {
120c43e99fdSEd Maste 	struct bufferevent_filtered *bev_f;
121*b50261e2SCy Schubert 	if (!BEV_IS_FILTER(bev))
122c43e99fdSEd Maste 		return NULL;
123c43e99fdSEd Maste 	bev_f = (void*)( ((char*)bev) -
124c43e99fdSEd Maste 			 evutil_offsetof(struct bufferevent_filtered, bev.bev));
125*b50261e2SCy Schubert 	EVUTIL_ASSERT(BEV_IS_FILTER(&bev_f->bev.bev));
126c43e99fdSEd Maste 	return bev_f;
127c43e99fdSEd Maste }
128c43e99fdSEd Maste 
129c43e99fdSEd Maste #define downcast(bev_f) (&(bev_f)->bev.bev)
130c43e99fdSEd Maste 
131c43e99fdSEd Maste /** Return 1 iff bevf's underlying bufferevent's output buffer is at or
132c43e99fdSEd Maste  * over its high watermark such that we should not write to it in a given
133c43e99fdSEd Maste  * flush mode. */
134c43e99fdSEd Maste static int
be_underlying_writebuf_full(struct bufferevent_filtered * bevf,enum bufferevent_flush_mode state)135c43e99fdSEd Maste be_underlying_writebuf_full(struct bufferevent_filtered *bevf,
136c43e99fdSEd Maste     enum bufferevent_flush_mode state)
137c43e99fdSEd Maste {
138c43e99fdSEd Maste 	struct bufferevent *u = bevf->underlying;
139c43e99fdSEd Maste 	return state == BEV_NORMAL &&
140c43e99fdSEd Maste 	    u->wm_write.high &&
141c43e99fdSEd Maste 	    evbuffer_get_length(u->output) >= u->wm_write.high;
142c43e99fdSEd Maste }
143c43e99fdSEd Maste 
144c43e99fdSEd Maste /** Return 1 if our input buffer is at or over its high watermark such that we
145c43e99fdSEd Maste  * should not write to it in a given flush mode. */
146c43e99fdSEd Maste static int
be_readbuf_full(struct bufferevent_filtered * bevf,enum bufferevent_flush_mode state)147c43e99fdSEd Maste be_readbuf_full(struct bufferevent_filtered *bevf,
148c43e99fdSEd Maste     enum bufferevent_flush_mode state)
149c43e99fdSEd Maste {
150c43e99fdSEd Maste 	struct bufferevent *bufev = downcast(bevf);
151c43e99fdSEd Maste 	return state == BEV_NORMAL &&
152c43e99fdSEd Maste 	    bufev->wm_read.high &&
153c43e99fdSEd Maste 	    evbuffer_get_length(bufev->input) >= bufev->wm_read.high;
154c43e99fdSEd Maste }
155c43e99fdSEd Maste 
156c43e99fdSEd Maste 
157c43e99fdSEd Maste /* Filter to use when we're created with a NULL filter. */
158c43e99fdSEd Maste static enum bufferevent_filter_result
be_null_filter(struct evbuffer * src,struct evbuffer * dst,ev_ssize_t lim,enum bufferevent_flush_mode state,void * ctx)159c43e99fdSEd Maste be_null_filter(struct evbuffer *src, struct evbuffer *dst, ev_ssize_t lim,
160c43e99fdSEd Maste 	       enum bufferevent_flush_mode state, void *ctx)
161c43e99fdSEd Maste {
162c43e99fdSEd Maste 	(void)state;
163*b50261e2SCy Schubert 	if (evbuffer_remove_buffer(src, dst, lim) >= 0)
164c43e99fdSEd Maste 		return BEV_OK;
165c43e99fdSEd Maste 	else
166c43e99fdSEd Maste 		return BEV_ERROR;
167c43e99fdSEd Maste }
168c43e99fdSEd Maste 
169c43e99fdSEd Maste struct bufferevent *
bufferevent_filter_new(struct bufferevent * underlying,bufferevent_filter_cb input_filter,bufferevent_filter_cb output_filter,int options,void (* free_context)(void *),void * ctx)170c43e99fdSEd Maste bufferevent_filter_new(struct bufferevent *underlying,
171c43e99fdSEd Maste 		       bufferevent_filter_cb input_filter,
172c43e99fdSEd Maste 		       bufferevent_filter_cb output_filter,
173c43e99fdSEd Maste 		       int options,
174c43e99fdSEd Maste 		       void (*free_context)(void *),
175c43e99fdSEd Maste 		       void *ctx)
176c43e99fdSEd Maste {
177c43e99fdSEd Maste 	struct bufferevent_filtered *bufev_f;
178c43e99fdSEd Maste 	int tmp_options = options & ~BEV_OPT_THREADSAFE;
179c43e99fdSEd Maste 
180c43e99fdSEd Maste 	if (!underlying)
181c43e99fdSEd Maste 		return NULL;
182c43e99fdSEd Maste 
183c43e99fdSEd Maste 	if (!input_filter)
184c43e99fdSEd Maste 		input_filter = be_null_filter;
185c43e99fdSEd Maste 	if (!output_filter)
186c43e99fdSEd Maste 		output_filter = be_null_filter;
187c43e99fdSEd Maste 
188c43e99fdSEd Maste 	bufev_f = mm_calloc(1, sizeof(struct bufferevent_filtered));
189c43e99fdSEd Maste 	if (!bufev_f)
190c43e99fdSEd Maste 		return NULL;
191c43e99fdSEd Maste 
192c43e99fdSEd Maste 	if (bufferevent_init_common_(&bufev_f->bev, underlying->ev_base,
193c43e99fdSEd Maste 				    &bufferevent_ops_filter, tmp_options) < 0) {
194c43e99fdSEd Maste 		mm_free(bufev_f);
195c43e99fdSEd Maste 		return NULL;
196c43e99fdSEd Maste 	}
197c43e99fdSEd Maste 	if (options & BEV_OPT_THREADSAFE) {
198c43e99fdSEd Maste 		bufferevent_enable_locking_(downcast(bufev_f), NULL);
199c43e99fdSEd Maste 	}
200c43e99fdSEd Maste 
201c43e99fdSEd Maste 	bufev_f->underlying = underlying;
202c43e99fdSEd Maste 
203c43e99fdSEd Maste 	bufev_f->process_in = input_filter;
204c43e99fdSEd Maste 	bufev_f->process_out = output_filter;
205c43e99fdSEd Maste 	bufev_f->free_context = free_context;
206c43e99fdSEd Maste 	bufev_f->context = ctx;
207c43e99fdSEd Maste 
208c43e99fdSEd Maste 	bufferevent_setcb(bufev_f->underlying,
209c43e99fdSEd Maste 	    be_filter_readcb, be_filter_writecb, be_filter_eventcb, bufev_f);
210c43e99fdSEd Maste 
211c43e99fdSEd Maste 	bufev_f->inbuf_cb = evbuffer_add_cb(downcast(bufev_f)->input,
212c43e99fdSEd Maste 		bufferevent_filtered_inbuf_cb, bufev_f);
213c43e99fdSEd Maste 	evbuffer_cb_clear_flags(downcast(bufev_f)->input, bufev_f->inbuf_cb,
214c43e99fdSEd Maste 		EVBUFFER_CB_ENABLED);
215c43e99fdSEd Maste 
216c43e99fdSEd Maste 	bufev_f->outbuf_cb = evbuffer_add_cb(downcast(bufev_f)->output,
217c43e99fdSEd Maste 	   bufferevent_filtered_outbuf_cb, bufev_f);
218c43e99fdSEd Maste 
219c43e99fdSEd Maste 	bufferevent_init_generic_timeout_cbs_(downcast(bufev_f));
220c43e99fdSEd Maste 	bufferevent_incref_(underlying);
221c43e99fdSEd Maste 
222c43e99fdSEd Maste 	bufferevent_enable(underlying, EV_READ|EV_WRITE);
223c43e99fdSEd Maste 	bufferevent_suspend_read_(underlying, BEV_SUSPEND_FILT_READ);
224c43e99fdSEd Maste 
225c43e99fdSEd Maste 	return downcast(bufev_f);
226c43e99fdSEd Maste }
227c43e99fdSEd Maste 
228c43e99fdSEd Maste static void
be_filter_unlink(struct bufferevent * bev)229c43e99fdSEd Maste be_filter_unlink(struct bufferevent *bev)
230c43e99fdSEd Maste {
231c43e99fdSEd Maste 	struct bufferevent_filtered *bevf = upcast(bev);
232c43e99fdSEd Maste 	EVUTIL_ASSERT(bevf);
233c43e99fdSEd Maste 
234c43e99fdSEd Maste 	if (bevf->bev.options & BEV_OPT_CLOSE_ON_FREE) {
235c43e99fdSEd Maste 		/* Yes, there is also a decref in bufferevent_decref_.
236c43e99fdSEd Maste 		 * That decref corresponds to the incref when we set
237c43e99fdSEd Maste 		 * underlying for the first time.  This decref is an
238c43e99fdSEd Maste 		 * extra one to remove the last reference.
239c43e99fdSEd Maste 		 */
240c43e99fdSEd Maste 		if (BEV_UPCAST(bevf->underlying)->refcnt < 2) {
241c43e99fdSEd Maste 			event_warnx("BEV_OPT_CLOSE_ON_FREE set on an "
242c43e99fdSEd Maste 			    "bufferevent with too few references");
243c43e99fdSEd Maste 		} else {
244c43e99fdSEd Maste 			bufferevent_free(bevf->underlying);
245c43e99fdSEd Maste 		}
246c43e99fdSEd Maste 	} else {
247c43e99fdSEd Maste 		if (bevf->underlying) {
248c43e99fdSEd Maste 			if (bevf->underlying->errorcb == be_filter_eventcb)
249c43e99fdSEd Maste 				bufferevent_setcb(bevf->underlying,
250c43e99fdSEd Maste 				    NULL, NULL, NULL, NULL);
251c43e99fdSEd Maste 			bufferevent_unsuspend_read_(bevf->underlying,
252c43e99fdSEd Maste 			    BEV_SUSPEND_FILT_READ);
253c43e99fdSEd Maste 		}
254c43e99fdSEd Maste 	}
255c43e99fdSEd Maste }
256c43e99fdSEd Maste 
257c43e99fdSEd Maste static void
be_filter_destruct(struct bufferevent * bev)258c43e99fdSEd Maste be_filter_destruct(struct bufferevent *bev)
259c43e99fdSEd Maste {
260c43e99fdSEd Maste 	struct bufferevent_filtered *bevf = upcast(bev);
261c43e99fdSEd Maste 	EVUTIL_ASSERT(bevf);
262c43e99fdSEd Maste 	if (bevf->free_context)
263c43e99fdSEd Maste 		bevf->free_context(bevf->context);
264c43e99fdSEd Maste 
265c43e99fdSEd Maste 	if (bevf->inbuf_cb)
266c43e99fdSEd Maste 		evbuffer_remove_cb_entry(bev->input, bevf->inbuf_cb);
267c43e99fdSEd Maste 
268c43e99fdSEd Maste 	if (bevf->outbuf_cb)
269c43e99fdSEd Maste 		evbuffer_remove_cb_entry(bev->output, bevf->outbuf_cb);
270c43e99fdSEd Maste }
271c43e99fdSEd Maste 
272c43e99fdSEd Maste static int
be_filter_enable(struct bufferevent * bev,short event)273c43e99fdSEd Maste be_filter_enable(struct bufferevent *bev, short event)
274c43e99fdSEd Maste {
275c43e99fdSEd Maste 	struct bufferevent_filtered *bevf = upcast(bev);
276c43e99fdSEd Maste 	if (event & EV_WRITE)
277c43e99fdSEd Maste 		BEV_RESET_GENERIC_WRITE_TIMEOUT(bev);
278c43e99fdSEd Maste 
279c43e99fdSEd Maste 	if (event & EV_READ) {
280c43e99fdSEd Maste 		BEV_RESET_GENERIC_READ_TIMEOUT(bev);
281c43e99fdSEd Maste 		bufferevent_unsuspend_read_(bevf->underlying,
282c43e99fdSEd Maste 		    BEV_SUSPEND_FILT_READ);
283c43e99fdSEd Maste 	}
284c43e99fdSEd Maste 	return 0;
285c43e99fdSEd Maste }
286c43e99fdSEd Maste 
287c43e99fdSEd Maste static int
be_filter_disable(struct bufferevent * bev,short event)288c43e99fdSEd Maste be_filter_disable(struct bufferevent *bev, short event)
289c43e99fdSEd Maste {
290c43e99fdSEd Maste 	struct bufferevent_filtered *bevf = upcast(bev);
291c43e99fdSEd Maste 	if (event & EV_WRITE)
292c43e99fdSEd Maste 		BEV_DEL_GENERIC_WRITE_TIMEOUT(bev);
293c43e99fdSEd Maste 	if (event & EV_READ) {
294c43e99fdSEd Maste 		BEV_DEL_GENERIC_READ_TIMEOUT(bev);
295c43e99fdSEd Maste 		bufferevent_suspend_read_(bevf->underlying,
296c43e99fdSEd Maste 		    BEV_SUSPEND_FILT_READ);
297c43e99fdSEd Maste 	}
298c43e99fdSEd Maste 	return 0;
299c43e99fdSEd Maste }
300c43e99fdSEd Maste 
301c43e99fdSEd Maste static enum bufferevent_filter_result
be_filter_process_input(struct bufferevent_filtered * bevf,enum bufferevent_flush_mode state,int * processed_out)302c43e99fdSEd Maste be_filter_process_input(struct bufferevent_filtered *bevf,
303c43e99fdSEd Maste 			enum bufferevent_flush_mode state,
304c43e99fdSEd Maste 			int *processed_out)
305c43e99fdSEd Maste {
306c43e99fdSEd Maste 	enum bufferevent_filter_result res;
307c43e99fdSEd Maste 	struct bufferevent *bev = downcast(bevf);
308c43e99fdSEd Maste 
309c43e99fdSEd Maste 	if (state == BEV_NORMAL) {
310c43e99fdSEd Maste 		/* If we're in 'normal' mode, don't urge data on the filter
311c43e99fdSEd Maste 		 * unless we're reading data and under our high-water mark.*/
312c43e99fdSEd Maste 		if (!(bev->enabled & EV_READ) ||
313c43e99fdSEd Maste 		    be_readbuf_full(bevf, state))
314c43e99fdSEd Maste 			return BEV_OK;
315c43e99fdSEd Maste 	}
316c43e99fdSEd Maste 
317c43e99fdSEd Maste 	do {
318c43e99fdSEd Maste 		ev_ssize_t limit = -1;
319c43e99fdSEd Maste 		if (state == BEV_NORMAL && bev->wm_read.high)
320c43e99fdSEd Maste 			limit = bev->wm_read.high -
321c43e99fdSEd Maste 			    evbuffer_get_length(bev->input);
322c43e99fdSEd Maste 
323c43e99fdSEd Maste 		res = bevf->process_in(bevf->underlying->input,
324c43e99fdSEd Maste 		    bev->input, limit, state, bevf->context);
325c43e99fdSEd Maste 
326c43e99fdSEd Maste 		if (res == BEV_OK)
327c43e99fdSEd Maste 			*processed_out = 1;
328c43e99fdSEd Maste 	} while (res == BEV_OK &&
329c43e99fdSEd Maste 		 (bev->enabled & EV_READ) &&
330c43e99fdSEd Maste 		 evbuffer_get_length(bevf->underlying->input) &&
331c43e99fdSEd Maste 		 !be_readbuf_full(bevf, state));
332c43e99fdSEd Maste 
333c43e99fdSEd Maste 	if (*processed_out)
334c43e99fdSEd Maste 		BEV_RESET_GENERIC_READ_TIMEOUT(bev);
335c43e99fdSEd Maste 
336c43e99fdSEd Maste 	return res;
337c43e99fdSEd Maste }
338c43e99fdSEd Maste 
339c43e99fdSEd Maste 
340c43e99fdSEd Maste static enum bufferevent_filter_result
be_filter_process_output(struct bufferevent_filtered * bevf,enum bufferevent_flush_mode state,int * processed_out)341c43e99fdSEd Maste be_filter_process_output(struct bufferevent_filtered *bevf,
342c43e99fdSEd Maste 			 enum bufferevent_flush_mode state,
343c43e99fdSEd Maste 			 int *processed_out)
344c43e99fdSEd Maste {
345c43e99fdSEd Maste 	/* Requires references and lock: might call writecb */
346c43e99fdSEd Maste 	enum bufferevent_filter_result res = BEV_OK;
347c43e99fdSEd Maste 	struct bufferevent *bufev = downcast(bevf);
348c43e99fdSEd Maste 	int again = 0;
349c43e99fdSEd Maste 
350c43e99fdSEd Maste 	if (state == BEV_NORMAL) {
351c43e99fdSEd Maste 		/* If we're in 'normal' mode, don't urge data on the
352c43e99fdSEd Maste 		 * filter unless we're writing data, and the underlying
353c43e99fdSEd Maste 		 * bufferevent is accepting data, and we have data to
354c43e99fdSEd Maste 		 * give the filter.  If we're in 'flush' or 'finish',
355c43e99fdSEd Maste 		 * call the filter no matter what. */
356c43e99fdSEd Maste 		if (!(bufev->enabled & EV_WRITE) ||
357c43e99fdSEd Maste 		    be_underlying_writebuf_full(bevf, state) ||
358c43e99fdSEd Maste 		    !evbuffer_get_length(bufev->output))
359c43e99fdSEd Maste 			return BEV_OK;
360c43e99fdSEd Maste 	}
361c43e99fdSEd Maste 
362c43e99fdSEd Maste 	/* disable the callback that calls this function
363c43e99fdSEd Maste 	   when the user adds to the output buffer. */
364c43e99fdSEd Maste 	evbuffer_cb_clear_flags(bufev->output, bevf->outbuf_cb,
365c43e99fdSEd Maste 	    EVBUFFER_CB_ENABLED);
366c43e99fdSEd Maste 
367c43e99fdSEd Maste 	do {
368c43e99fdSEd Maste 		int processed = 0;
369c43e99fdSEd Maste 		again = 0;
370c43e99fdSEd Maste 
371c43e99fdSEd Maste 		do {
372c43e99fdSEd Maste 			ev_ssize_t limit = -1;
373c43e99fdSEd Maste 			if (state == BEV_NORMAL &&
374c43e99fdSEd Maste 			    bevf->underlying->wm_write.high)
375c43e99fdSEd Maste 				limit = bevf->underlying->wm_write.high -
376c43e99fdSEd Maste 				    evbuffer_get_length(bevf->underlying->output);
377c43e99fdSEd Maste 
378c43e99fdSEd Maste 			res = bevf->process_out(downcast(bevf)->output,
379c43e99fdSEd Maste 			    bevf->underlying->output,
380c43e99fdSEd Maste 			    limit,
381c43e99fdSEd Maste 			    state,
382c43e99fdSEd Maste 			    bevf->context);
383c43e99fdSEd Maste 
384c43e99fdSEd Maste 			if (res == BEV_OK)
385c43e99fdSEd Maste 				processed = *processed_out = 1;
386c43e99fdSEd Maste 		} while (/* Stop if the filter wasn't successful...*/
387c43e99fdSEd Maste 			res == BEV_OK &&
388c43e99fdSEd Maste 			/* Or if we aren't writing any more. */
389c43e99fdSEd Maste 			(bufev->enabled & EV_WRITE) &&
390c43e99fdSEd Maste 			/* Of if we have nothing more to write and we are
391c43e99fdSEd Maste 			 * not flushing. */
392c43e99fdSEd Maste 			evbuffer_get_length(bufev->output) &&
393c43e99fdSEd Maste 			/* Or if we have filled the underlying output buffer. */
394c43e99fdSEd Maste 			!be_underlying_writebuf_full(bevf,state));
395c43e99fdSEd Maste 
396c43e99fdSEd Maste 		if (processed) {
397c43e99fdSEd Maste 			/* call the write callback.*/
398c43e99fdSEd Maste 			bufferevent_trigger_nolock_(bufev, EV_WRITE, 0);
399c43e99fdSEd Maste 
400c43e99fdSEd Maste 			if (res == BEV_OK &&
401c43e99fdSEd Maste 			    (bufev->enabled & EV_WRITE) &&
402c43e99fdSEd Maste 			    evbuffer_get_length(bufev->output) &&
403c43e99fdSEd Maste 			    !be_underlying_writebuf_full(bevf, state)) {
404c43e99fdSEd Maste 				again = 1;
405c43e99fdSEd Maste 			}
406c43e99fdSEd Maste 		}
407c43e99fdSEd Maste 	} while (again);
408c43e99fdSEd Maste 
409c43e99fdSEd Maste 	/* reenable the outbuf_cb */
410c43e99fdSEd Maste 	evbuffer_cb_set_flags(bufev->output,bevf->outbuf_cb,
411c43e99fdSEd Maste 	    EVBUFFER_CB_ENABLED);
412c43e99fdSEd Maste 
413c43e99fdSEd Maste 	if (*processed_out)
414c43e99fdSEd Maste 		BEV_RESET_GENERIC_WRITE_TIMEOUT(bufev);
415c43e99fdSEd Maste 
416c43e99fdSEd Maste 	return res;
417c43e99fdSEd Maste }
418c43e99fdSEd Maste 
419c43e99fdSEd Maste /* Called when the size of our outbuf changes. */
420c43e99fdSEd Maste static void
bufferevent_filtered_outbuf_cb(struct evbuffer * buf,const struct evbuffer_cb_info * cbinfo,void * arg)421c43e99fdSEd Maste bufferevent_filtered_outbuf_cb(struct evbuffer *buf,
422c43e99fdSEd Maste     const struct evbuffer_cb_info *cbinfo, void *arg)
423c43e99fdSEd Maste {
424c43e99fdSEd Maste 	struct bufferevent_filtered *bevf = arg;
425c43e99fdSEd Maste 	struct bufferevent *bev = downcast(bevf);
426c43e99fdSEd Maste 
427c43e99fdSEd Maste 	if (cbinfo->n_added) {
428c43e99fdSEd Maste 		int processed_any = 0;
429c43e99fdSEd Maste 		/* Somebody added more data to the output buffer. Try to
430c43e99fdSEd Maste 		 * process it, if we should. */
431c43e99fdSEd Maste 		bufferevent_incref_and_lock_(bev);
432c43e99fdSEd Maste 		be_filter_process_output(bevf, BEV_NORMAL, &processed_any);
433c43e99fdSEd Maste 		bufferevent_decref_and_unlock_(bev);
434c43e99fdSEd Maste 	}
435c43e99fdSEd Maste }
436c43e99fdSEd Maste 
437c43e99fdSEd Maste static void
be_filter_read_nolock_(struct bufferevent * underlying,void * me_)438c43e99fdSEd Maste be_filter_read_nolock_(struct bufferevent *underlying, void *me_)
439c43e99fdSEd Maste {
440c43e99fdSEd Maste 	struct bufferevent_filtered *bevf = me_;
441c43e99fdSEd Maste 	enum bufferevent_filter_result res;
442c43e99fdSEd Maste 	enum bufferevent_flush_mode state;
443c43e99fdSEd Maste 	struct bufferevent *bufev = downcast(bevf);
444c43e99fdSEd Maste 	struct bufferevent_private *bufev_private = BEV_UPCAST(bufev);
445c43e99fdSEd Maste 	int processed_any = 0;
446c43e99fdSEd Maste 
447c43e99fdSEd Maste 	// It's possible our refcount is 0 at this point if another thread free'd our filterevent
448c43e99fdSEd Maste 	EVUTIL_ASSERT(bufev_private->refcnt >= 0);
449c43e99fdSEd Maste 
450c43e99fdSEd Maste 	// If our refcount is > 0
451c43e99fdSEd Maste 	if (bufev_private->refcnt > 0) {
452c43e99fdSEd Maste 
453c43e99fdSEd Maste 		if (bevf->got_eof)
454c43e99fdSEd Maste 			state = BEV_FINISHED;
455c43e99fdSEd Maste 		else
456c43e99fdSEd Maste 			state = BEV_NORMAL;
457c43e99fdSEd Maste 
458c43e99fdSEd Maste 		/* XXXX use return value */
459c43e99fdSEd Maste 		res = be_filter_process_input(bevf, state, &processed_any);
460c43e99fdSEd Maste 		(void)res;
461c43e99fdSEd Maste 
462c43e99fdSEd Maste 		/* XXX This should be in process_input, not here.  There are
463c43e99fdSEd Maste 		 * other places that can call process-input, and they should
464c43e99fdSEd Maste 		 * force readcb calls as needed. */
465c43e99fdSEd Maste 		if (processed_any) {
466c43e99fdSEd Maste 			bufferevent_trigger_nolock_(bufev, EV_READ, 0);
467c43e99fdSEd Maste 			if (evbuffer_get_length(underlying->input) > 0 &&
468c43e99fdSEd Maste 				be_readbuf_full(bevf, state)) {
469c43e99fdSEd Maste 				/* data left in underlying buffer and filter input buffer
470c43e99fdSEd Maste 				 * hit its read high watermark.
471c43e99fdSEd Maste 				 * Schedule callback to avoid data gets stuck in underlying
472c43e99fdSEd Maste 				 * input buffer.
473c43e99fdSEd Maste 				 */
474c43e99fdSEd Maste 				evbuffer_cb_set_flags(bufev->input, bevf->inbuf_cb,
475c43e99fdSEd Maste 					EVBUFFER_CB_ENABLED);
476c43e99fdSEd Maste 			}
477c43e99fdSEd Maste 		}
478c43e99fdSEd Maste 	}
479c43e99fdSEd Maste }
480c43e99fdSEd Maste 
481c43e99fdSEd Maste /* Called when the size of our inbuf changes. */
482c43e99fdSEd Maste static void
bufferevent_filtered_inbuf_cb(struct evbuffer * buf,const struct evbuffer_cb_info * cbinfo,void * arg)483c43e99fdSEd Maste bufferevent_filtered_inbuf_cb(struct evbuffer *buf,
484c43e99fdSEd Maste     const struct evbuffer_cb_info *cbinfo, void *arg)
485c43e99fdSEd Maste {
486c43e99fdSEd Maste 	struct bufferevent_filtered *bevf = arg;
487c43e99fdSEd Maste 	enum bufferevent_flush_mode state;
488c43e99fdSEd Maste 	struct bufferevent *bev = downcast(bevf);
489c43e99fdSEd Maste 
490c43e99fdSEd Maste 	BEV_LOCK(bev);
491c43e99fdSEd Maste 
492c43e99fdSEd Maste 	if (bevf->got_eof)
493c43e99fdSEd Maste 		state = BEV_FINISHED;
494c43e99fdSEd Maste 	else
495c43e99fdSEd Maste 		state = BEV_NORMAL;
496c43e99fdSEd Maste 
497c43e99fdSEd Maste 
498c43e99fdSEd Maste 	if (!be_readbuf_full(bevf, state)) {
499c43e99fdSEd Maste 		/* opportunity to read data which was left in underlying
500c43e99fdSEd Maste 		 * input buffer because filter input buffer hit read
501c43e99fdSEd Maste 		 * high watermark.
502c43e99fdSEd Maste 		 */
503c43e99fdSEd Maste 		evbuffer_cb_clear_flags(bev->input, bevf->inbuf_cb,
504c43e99fdSEd Maste 			EVBUFFER_CB_ENABLED);
505c43e99fdSEd Maste 		if (evbuffer_get_length(bevf->underlying->input) > 0)
506c43e99fdSEd Maste 			be_filter_read_nolock_(bevf->underlying, bevf);
507c43e99fdSEd Maste 	}
508c43e99fdSEd Maste 
509c43e99fdSEd Maste 	BEV_UNLOCK(bev);
510c43e99fdSEd Maste }
511c43e99fdSEd Maste 
512c43e99fdSEd Maste /* Called when the underlying socket has read. */
513c43e99fdSEd Maste static void
be_filter_readcb(struct bufferevent * underlying,void * me_)514c43e99fdSEd Maste be_filter_readcb(struct bufferevent *underlying, void *me_)
515c43e99fdSEd Maste {
516c43e99fdSEd Maste 	struct bufferevent_filtered *bevf = me_;
517c43e99fdSEd Maste 	struct bufferevent *bev = downcast(bevf);
518c43e99fdSEd Maste 
519c43e99fdSEd Maste 	BEV_LOCK(bev);
520c43e99fdSEd Maste 
521c43e99fdSEd Maste 	be_filter_read_nolock_(underlying, me_);
522c43e99fdSEd Maste 
523c43e99fdSEd Maste 	BEV_UNLOCK(bev);
524c43e99fdSEd Maste }
525c43e99fdSEd Maste 
526c43e99fdSEd Maste /* Called when the underlying socket has drained enough that we can write to
527c43e99fdSEd Maste    it. */
528c43e99fdSEd Maste static void
be_filter_writecb(struct bufferevent * underlying,void * me_)529c43e99fdSEd Maste be_filter_writecb(struct bufferevent *underlying, void *me_)
530c43e99fdSEd Maste {
531c43e99fdSEd Maste 	struct bufferevent_filtered *bevf = me_;
532c43e99fdSEd Maste 	struct bufferevent *bev = downcast(bevf);
533c43e99fdSEd Maste 	struct bufferevent_private *bufev_private = BEV_UPCAST(bev);
534c43e99fdSEd Maste 	int processed_any = 0;
535c43e99fdSEd Maste 
536c43e99fdSEd Maste 	BEV_LOCK(bev);
537c43e99fdSEd Maste 
538c43e99fdSEd Maste 	// It's possible our refcount is 0 at this point if another thread free'd our filterevent
539c43e99fdSEd Maste 	EVUTIL_ASSERT(bufev_private->refcnt >= 0);
540c43e99fdSEd Maste 
541c43e99fdSEd Maste 	// If our refcount is > 0
542c43e99fdSEd Maste 	if (bufev_private->refcnt > 0) {
543c43e99fdSEd Maste 		be_filter_process_output(bevf, BEV_NORMAL, &processed_any);
544c43e99fdSEd Maste 	}
545c43e99fdSEd Maste 
546c43e99fdSEd Maste 	BEV_UNLOCK(bev);
547c43e99fdSEd Maste }
548c43e99fdSEd Maste 
549c43e99fdSEd Maste /* Called when the underlying socket has given us an error */
550c43e99fdSEd Maste static void
be_filter_eventcb(struct bufferevent * underlying,short what,void * me_)551c43e99fdSEd Maste be_filter_eventcb(struct bufferevent *underlying, short what, void *me_)
552c43e99fdSEd Maste {
553c43e99fdSEd Maste 	struct bufferevent_filtered *bevf = me_;
554c43e99fdSEd Maste 	struct bufferevent *bev = downcast(bevf);
555c43e99fdSEd Maste 	struct bufferevent_private *bufev_private = BEV_UPCAST(bev);
556c43e99fdSEd Maste 
557c43e99fdSEd Maste 	BEV_LOCK(bev);
558c43e99fdSEd Maste 
559c43e99fdSEd Maste 	// It's possible our refcount is 0 at this point if another thread free'd our filterevent
560c43e99fdSEd Maste 	EVUTIL_ASSERT(bufev_private->refcnt >= 0);
561c43e99fdSEd Maste 
562c43e99fdSEd Maste 	// If our refcount is > 0
563c43e99fdSEd Maste 	if (bufev_private->refcnt > 0) {
564c43e99fdSEd Maste 
565c43e99fdSEd Maste 		/* All we can really to is tell our own eventcb. */
566c43e99fdSEd Maste 		bufferevent_run_eventcb_(bev, what, 0);
567c43e99fdSEd Maste 	}
568c43e99fdSEd Maste 
569c43e99fdSEd Maste 	BEV_UNLOCK(bev);
570c43e99fdSEd Maste }
571c43e99fdSEd Maste 
572c43e99fdSEd Maste static int
be_filter_flush(struct bufferevent * bufev,short iotype,enum bufferevent_flush_mode mode)573c43e99fdSEd Maste be_filter_flush(struct bufferevent *bufev,
574c43e99fdSEd Maste     short iotype, enum bufferevent_flush_mode mode)
575c43e99fdSEd Maste {
576c43e99fdSEd Maste 	struct bufferevent_filtered *bevf = upcast(bufev);
577c43e99fdSEd Maste 	int processed_any = 0;
578c43e99fdSEd Maste 	EVUTIL_ASSERT(bevf);
579c43e99fdSEd Maste 
580c43e99fdSEd Maste 	bufferevent_incref_and_lock_(bufev);
581c43e99fdSEd Maste 
582c43e99fdSEd Maste 	if (iotype & EV_READ) {
583c43e99fdSEd Maste 		be_filter_process_input(bevf, mode, &processed_any);
584c43e99fdSEd Maste 	}
585c43e99fdSEd Maste 	if (iotype & EV_WRITE) {
586c43e99fdSEd Maste 		be_filter_process_output(bevf, mode, &processed_any);
587c43e99fdSEd Maste 	}
588c43e99fdSEd Maste 	/* XXX check the return value? */
589c43e99fdSEd Maste 	/* XXX does this want to recursively call lower-level flushes? */
590c43e99fdSEd Maste 	bufferevent_flush(bevf->underlying, iotype, mode);
591c43e99fdSEd Maste 
592c43e99fdSEd Maste 	bufferevent_decref_and_unlock_(bufev);
593c43e99fdSEd Maste 
594c43e99fdSEd Maste 	return processed_any;
595c43e99fdSEd Maste }
596c43e99fdSEd Maste 
597c43e99fdSEd Maste static int
be_filter_ctrl(struct bufferevent * bev,enum bufferevent_ctrl_op op,union bufferevent_ctrl_data * data)598c43e99fdSEd Maste be_filter_ctrl(struct bufferevent *bev, enum bufferevent_ctrl_op op,
599c43e99fdSEd Maste     union bufferevent_ctrl_data *data)
600c43e99fdSEd Maste {
601c43e99fdSEd Maste 	struct bufferevent_filtered *bevf;
602c43e99fdSEd Maste 	switch (op) {
603c43e99fdSEd Maste 	case BEV_CTRL_GET_UNDERLYING:
604c43e99fdSEd Maste 		bevf = upcast(bev);
605c43e99fdSEd Maste 		data->ptr = bevf->underlying;
606c43e99fdSEd Maste 		return 0;
607c43e99fdSEd Maste 	case BEV_CTRL_SET_FD:
608*b50261e2SCy Schubert 	case BEV_CTRL_GET_FD:
609c43e99fdSEd Maste 		bevf = upcast(bev);
610c43e99fdSEd Maste 
611c43e99fdSEd Maste 		if (bevf->underlying &&
612c43e99fdSEd Maste 			bevf->underlying->be_ops &&
613c43e99fdSEd Maste 			bevf->underlying->be_ops->ctrl) {
614c43e99fdSEd Maste 		    return (bevf->underlying->be_ops->ctrl)(bevf->underlying, op, data);
615c43e99fdSEd Maste 		}
616*b50261e2SCy Schubert 		EVUTIL_FALLTHROUGH;
617c43e99fdSEd Maste 
618c43e99fdSEd Maste 	case BEV_CTRL_CANCEL_ALL:
619*b50261e2SCy Schubert 		EVUTIL_FALLTHROUGH;
620c43e99fdSEd Maste 	default:
621c43e99fdSEd Maste 		return -1;
622c43e99fdSEd Maste 	}
623c43e99fdSEd Maste 
624c43e99fdSEd Maste 	return -1;
625c43e99fdSEd Maste }
626