1c43e99fdSEd Maste /*
2c43e99fdSEd Maste * Copyright (c) 2007-2012 Niels Provos and Nick Mathewson
3c43e99fdSEd Maste * Copyright (c) 2002-2006 Niels Provos <provos@citi.umich.edu>
4c43e99fdSEd Maste * All rights reserved.
5c43e99fdSEd Maste *
6c43e99fdSEd Maste * Redistribution and use in source and binary forms, with or without
7c43e99fdSEd Maste * modification, are permitted provided that the following conditions
8c43e99fdSEd Maste * are met:
9c43e99fdSEd Maste * 1. Redistributions of source code must retain the above copyright
10c43e99fdSEd Maste * notice, this list of conditions and the following disclaimer.
11c43e99fdSEd Maste * 2. Redistributions in binary form must reproduce the above copyright
12c43e99fdSEd Maste * notice, this list of conditions and the following disclaimer in the
13c43e99fdSEd Maste * documentation and/or other materials provided with the distribution.
14c43e99fdSEd Maste * 3. The name of the author may not be used to endorse or promote products
15c43e99fdSEd Maste * derived from this software without specific prior written permission.
16c43e99fdSEd Maste *
17c43e99fdSEd Maste * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18c43e99fdSEd Maste * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19c43e99fdSEd Maste * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20c43e99fdSEd Maste * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21c43e99fdSEd Maste * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22c43e99fdSEd Maste * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23c43e99fdSEd Maste * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24c43e99fdSEd Maste * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25c43e99fdSEd Maste * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26c43e99fdSEd Maste * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27c43e99fdSEd Maste */
28c43e99fdSEd Maste
29c43e99fdSEd Maste #include "evconfig-private.h"
30c43e99fdSEd Maste
31c43e99fdSEd Maste #include <sys/types.h>
32c43e99fdSEd Maste
33c43e99fdSEd Maste #include "event2/event-config.h"
34c43e99fdSEd Maste
35c43e99fdSEd Maste #ifdef EVENT__HAVE_SYS_TIME_H
36c43e99fdSEd Maste #include <sys/time.h>
37c43e99fdSEd Maste #endif
38c43e99fdSEd Maste
39c43e99fdSEd Maste #include <errno.h>
40c43e99fdSEd Maste #include <stdio.h>
41c43e99fdSEd Maste #include <stdlib.h>
42c43e99fdSEd Maste #include <string.h>
43c43e99fdSEd Maste #ifdef EVENT__HAVE_STDARG_H
44c43e99fdSEd Maste #include <stdarg.h>
45c43e99fdSEd Maste #endif
46c43e99fdSEd Maste
47c43e99fdSEd Maste #ifdef _WIN32
48c43e99fdSEd Maste #include <winsock2.h>
49c43e99fdSEd Maste #endif
50c43e99fdSEd Maste
51c43e99fdSEd Maste #include "event2/util.h"
52c43e99fdSEd Maste #include "event2/bufferevent.h"
53c43e99fdSEd Maste #include "event2/buffer.h"
54c43e99fdSEd Maste #include "event2/bufferevent_struct.h"
55c43e99fdSEd Maste #include "event2/event.h"
56c43e99fdSEd Maste #include "log-internal.h"
57c43e99fdSEd Maste #include "mm-internal.h"
58c43e99fdSEd Maste #include "bufferevent-internal.h"
59c43e99fdSEd Maste #include "util-internal.h"
60c43e99fdSEd Maste
61c43e99fdSEd Maste /* prototypes */
62c43e99fdSEd Maste static int be_filter_enable(struct bufferevent *, short);
63c43e99fdSEd Maste static int be_filter_disable(struct bufferevent *, short);
64c43e99fdSEd Maste static void be_filter_unlink(struct bufferevent *);
65c43e99fdSEd Maste static void be_filter_destruct(struct bufferevent *);
66c43e99fdSEd Maste
67c43e99fdSEd Maste static void be_filter_readcb(struct bufferevent *, void *);
68c43e99fdSEd Maste static void be_filter_writecb(struct bufferevent *, void *);
69c43e99fdSEd Maste static void be_filter_eventcb(struct bufferevent *, short, void *);
70c43e99fdSEd Maste static int be_filter_flush(struct bufferevent *bufev,
71c43e99fdSEd Maste short iotype, enum bufferevent_flush_mode mode);
72c43e99fdSEd Maste static int be_filter_ctrl(struct bufferevent *, enum bufferevent_ctrl_op, union bufferevent_ctrl_data *);
73c43e99fdSEd Maste
74c43e99fdSEd Maste static void bufferevent_filtered_inbuf_cb(struct evbuffer *buf,
75c43e99fdSEd Maste const struct evbuffer_cb_info *cbinfo, void *arg);
76c43e99fdSEd Maste
77c43e99fdSEd Maste static void bufferevent_filtered_outbuf_cb(struct evbuffer *buf,
78c43e99fdSEd Maste const struct evbuffer_cb_info *info, void *arg);
79c43e99fdSEd Maste
80c43e99fdSEd Maste struct bufferevent_filtered {
81c43e99fdSEd Maste struct bufferevent_private bev;
82c43e99fdSEd Maste
83c43e99fdSEd Maste /** The bufferevent that we read/write filtered data from/to. */
84c43e99fdSEd Maste struct bufferevent *underlying;
85c43e99fdSEd Maste /** A callback on our inbuf to notice somebory removes data */
86c43e99fdSEd Maste struct evbuffer_cb_entry *inbuf_cb;
87c43e99fdSEd Maste /** A callback on our outbuf to notice when somebody adds data */
88c43e99fdSEd Maste struct evbuffer_cb_entry *outbuf_cb;
89c43e99fdSEd Maste /** True iff we have received an EOF callback from the underlying
90c43e99fdSEd Maste * bufferevent. */
91c43e99fdSEd Maste unsigned got_eof;
92c43e99fdSEd Maste
93c43e99fdSEd Maste /** Function to free context when we're done. */
94c43e99fdSEd Maste void (*free_context)(void *);
95c43e99fdSEd Maste /** Input filter */
96c43e99fdSEd Maste bufferevent_filter_cb process_in;
97c43e99fdSEd Maste /** Output filter */
98c43e99fdSEd Maste bufferevent_filter_cb process_out;
99c43e99fdSEd Maste /** User-supplied argument to the filters. */
100c43e99fdSEd Maste void *context;
101c43e99fdSEd Maste };
102c43e99fdSEd Maste
103c43e99fdSEd Maste const struct bufferevent_ops bufferevent_ops_filter = {
104c43e99fdSEd Maste "filter",
105c43e99fdSEd Maste evutil_offsetof(struct bufferevent_filtered, bev.bev),
106c43e99fdSEd Maste be_filter_enable,
107c43e99fdSEd Maste be_filter_disable,
108c43e99fdSEd Maste be_filter_unlink,
109c43e99fdSEd Maste be_filter_destruct,
110c43e99fdSEd Maste bufferevent_generic_adj_timeouts_,
111c43e99fdSEd Maste be_filter_flush,
112c43e99fdSEd Maste be_filter_ctrl,
113c43e99fdSEd Maste };
114c43e99fdSEd Maste
115c43e99fdSEd Maste /* Given a bufferevent that's really the bev filter of a bufferevent_filtered,
116c43e99fdSEd Maste * return that bufferevent_filtered. Returns NULL otherwise.*/
117c43e99fdSEd Maste static inline struct bufferevent_filtered *
upcast(struct bufferevent * bev)118c43e99fdSEd Maste upcast(struct bufferevent *bev)
119c43e99fdSEd Maste {
120c43e99fdSEd Maste struct bufferevent_filtered *bev_f;
121*b50261e2SCy Schubert if (!BEV_IS_FILTER(bev))
122c43e99fdSEd Maste return NULL;
123c43e99fdSEd Maste bev_f = (void*)( ((char*)bev) -
124c43e99fdSEd Maste evutil_offsetof(struct bufferevent_filtered, bev.bev));
125*b50261e2SCy Schubert EVUTIL_ASSERT(BEV_IS_FILTER(&bev_f->bev.bev));
126c43e99fdSEd Maste return bev_f;
127c43e99fdSEd Maste }
128c43e99fdSEd Maste
129c43e99fdSEd Maste #define downcast(bev_f) (&(bev_f)->bev.bev)
130c43e99fdSEd Maste
131c43e99fdSEd Maste /** Return 1 iff bevf's underlying bufferevent's output buffer is at or
132c43e99fdSEd Maste * over its high watermark such that we should not write to it in a given
133c43e99fdSEd Maste * flush mode. */
134c43e99fdSEd Maste static int
be_underlying_writebuf_full(struct bufferevent_filtered * bevf,enum bufferevent_flush_mode state)135c43e99fdSEd Maste be_underlying_writebuf_full(struct bufferevent_filtered *bevf,
136c43e99fdSEd Maste enum bufferevent_flush_mode state)
137c43e99fdSEd Maste {
138c43e99fdSEd Maste struct bufferevent *u = bevf->underlying;
139c43e99fdSEd Maste return state == BEV_NORMAL &&
140c43e99fdSEd Maste u->wm_write.high &&
141c43e99fdSEd Maste evbuffer_get_length(u->output) >= u->wm_write.high;
142c43e99fdSEd Maste }
143c43e99fdSEd Maste
144c43e99fdSEd Maste /** Return 1 if our input buffer is at or over its high watermark such that we
145c43e99fdSEd Maste * should not write to it in a given flush mode. */
146c43e99fdSEd Maste static int
be_readbuf_full(struct bufferevent_filtered * bevf,enum bufferevent_flush_mode state)147c43e99fdSEd Maste be_readbuf_full(struct bufferevent_filtered *bevf,
148c43e99fdSEd Maste enum bufferevent_flush_mode state)
149c43e99fdSEd Maste {
150c43e99fdSEd Maste struct bufferevent *bufev = downcast(bevf);
151c43e99fdSEd Maste return state == BEV_NORMAL &&
152c43e99fdSEd Maste bufev->wm_read.high &&
153c43e99fdSEd Maste evbuffer_get_length(bufev->input) >= bufev->wm_read.high;
154c43e99fdSEd Maste }
155c43e99fdSEd Maste
156c43e99fdSEd Maste
157c43e99fdSEd Maste /* Filter to use when we're created with a NULL filter. */
158c43e99fdSEd Maste static enum bufferevent_filter_result
be_null_filter(struct evbuffer * src,struct evbuffer * dst,ev_ssize_t lim,enum bufferevent_flush_mode state,void * ctx)159c43e99fdSEd Maste be_null_filter(struct evbuffer *src, struct evbuffer *dst, ev_ssize_t lim,
160c43e99fdSEd Maste enum bufferevent_flush_mode state, void *ctx)
161c43e99fdSEd Maste {
162c43e99fdSEd Maste (void)state;
163*b50261e2SCy Schubert if (evbuffer_remove_buffer(src, dst, lim) >= 0)
164c43e99fdSEd Maste return BEV_OK;
165c43e99fdSEd Maste else
166c43e99fdSEd Maste return BEV_ERROR;
167c43e99fdSEd Maste }
168c43e99fdSEd Maste
169c43e99fdSEd Maste struct bufferevent *
bufferevent_filter_new(struct bufferevent * underlying,bufferevent_filter_cb input_filter,bufferevent_filter_cb output_filter,int options,void (* free_context)(void *),void * ctx)170c43e99fdSEd Maste bufferevent_filter_new(struct bufferevent *underlying,
171c43e99fdSEd Maste bufferevent_filter_cb input_filter,
172c43e99fdSEd Maste bufferevent_filter_cb output_filter,
173c43e99fdSEd Maste int options,
174c43e99fdSEd Maste void (*free_context)(void *),
175c43e99fdSEd Maste void *ctx)
176c43e99fdSEd Maste {
177c43e99fdSEd Maste struct bufferevent_filtered *bufev_f;
178c43e99fdSEd Maste int tmp_options = options & ~BEV_OPT_THREADSAFE;
179c43e99fdSEd Maste
180c43e99fdSEd Maste if (!underlying)
181c43e99fdSEd Maste return NULL;
182c43e99fdSEd Maste
183c43e99fdSEd Maste if (!input_filter)
184c43e99fdSEd Maste input_filter = be_null_filter;
185c43e99fdSEd Maste if (!output_filter)
186c43e99fdSEd Maste output_filter = be_null_filter;
187c43e99fdSEd Maste
188c43e99fdSEd Maste bufev_f = mm_calloc(1, sizeof(struct bufferevent_filtered));
189c43e99fdSEd Maste if (!bufev_f)
190c43e99fdSEd Maste return NULL;
191c43e99fdSEd Maste
192c43e99fdSEd Maste if (bufferevent_init_common_(&bufev_f->bev, underlying->ev_base,
193c43e99fdSEd Maste &bufferevent_ops_filter, tmp_options) < 0) {
194c43e99fdSEd Maste mm_free(bufev_f);
195c43e99fdSEd Maste return NULL;
196c43e99fdSEd Maste }
197c43e99fdSEd Maste if (options & BEV_OPT_THREADSAFE) {
198c43e99fdSEd Maste bufferevent_enable_locking_(downcast(bufev_f), NULL);
199c43e99fdSEd Maste }
200c43e99fdSEd Maste
201c43e99fdSEd Maste bufev_f->underlying = underlying;
202c43e99fdSEd Maste
203c43e99fdSEd Maste bufev_f->process_in = input_filter;
204c43e99fdSEd Maste bufev_f->process_out = output_filter;
205c43e99fdSEd Maste bufev_f->free_context = free_context;
206c43e99fdSEd Maste bufev_f->context = ctx;
207c43e99fdSEd Maste
208c43e99fdSEd Maste bufferevent_setcb(bufev_f->underlying,
209c43e99fdSEd Maste be_filter_readcb, be_filter_writecb, be_filter_eventcb, bufev_f);
210c43e99fdSEd Maste
211c43e99fdSEd Maste bufev_f->inbuf_cb = evbuffer_add_cb(downcast(bufev_f)->input,
212c43e99fdSEd Maste bufferevent_filtered_inbuf_cb, bufev_f);
213c43e99fdSEd Maste evbuffer_cb_clear_flags(downcast(bufev_f)->input, bufev_f->inbuf_cb,
214c43e99fdSEd Maste EVBUFFER_CB_ENABLED);
215c43e99fdSEd Maste
216c43e99fdSEd Maste bufev_f->outbuf_cb = evbuffer_add_cb(downcast(bufev_f)->output,
217c43e99fdSEd Maste bufferevent_filtered_outbuf_cb, bufev_f);
218c43e99fdSEd Maste
219c43e99fdSEd Maste bufferevent_init_generic_timeout_cbs_(downcast(bufev_f));
220c43e99fdSEd Maste bufferevent_incref_(underlying);
221c43e99fdSEd Maste
222c43e99fdSEd Maste bufferevent_enable(underlying, EV_READ|EV_WRITE);
223c43e99fdSEd Maste bufferevent_suspend_read_(underlying, BEV_SUSPEND_FILT_READ);
224c43e99fdSEd Maste
225c43e99fdSEd Maste return downcast(bufev_f);
226c43e99fdSEd Maste }
227c43e99fdSEd Maste
228c43e99fdSEd Maste static void
be_filter_unlink(struct bufferevent * bev)229c43e99fdSEd Maste be_filter_unlink(struct bufferevent *bev)
230c43e99fdSEd Maste {
231c43e99fdSEd Maste struct bufferevent_filtered *bevf = upcast(bev);
232c43e99fdSEd Maste EVUTIL_ASSERT(bevf);
233c43e99fdSEd Maste
234c43e99fdSEd Maste if (bevf->bev.options & BEV_OPT_CLOSE_ON_FREE) {
235c43e99fdSEd Maste /* Yes, there is also a decref in bufferevent_decref_.
236c43e99fdSEd Maste * That decref corresponds to the incref when we set
237c43e99fdSEd Maste * underlying for the first time. This decref is an
238c43e99fdSEd Maste * extra one to remove the last reference.
239c43e99fdSEd Maste */
240c43e99fdSEd Maste if (BEV_UPCAST(bevf->underlying)->refcnt < 2) {
241c43e99fdSEd Maste event_warnx("BEV_OPT_CLOSE_ON_FREE set on an "
242c43e99fdSEd Maste "bufferevent with too few references");
243c43e99fdSEd Maste } else {
244c43e99fdSEd Maste bufferevent_free(bevf->underlying);
245c43e99fdSEd Maste }
246c43e99fdSEd Maste } else {
247c43e99fdSEd Maste if (bevf->underlying) {
248c43e99fdSEd Maste if (bevf->underlying->errorcb == be_filter_eventcb)
249c43e99fdSEd Maste bufferevent_setcb(bevf->underlying,
250c43e99fdSEd Maste NULL, NULL, NULL, NULL);
251c43e99fdSEd Maste bufferevent_unsuspend_read_(bevf->underlying,
252c43e99fdSEd Maste BEV_SUSPEND_FILT_READ);
253c43e99fdSEd Maste }
254c43e99fdSEd Maste }
255c43e99fdSEd Maste }
256c43e99fdSEd Maste
257c43e99fdSEd Maste static void
be_filter_destruct(struct bufferevent * bev)258c43e99fdSEd Maste be_filter_destruct(struct bufferevent *bev)
259c43e99fdSEd Maste {
260c43e99fdSEd Maste struct bufferevent_filtered *bevf = upcast(bev);
261c43e99fdSEd Maste EVUTIL_ASSERT(bevf);
262c43e99fdSEd Maste if (bevf->free_context)
263c43e99fdSEd Maste bevf->free_context(bevf->context);
264c43e99fdSEd Maste
265c43e99fdSEd Maste if (bevf->inbuf_cb)
266c43e99fdSEd Maste evbuffer_remove_cb_entry(bev->input, bevf->inbuf_cb);
267c43e99fdSEd Maste
268c43e99fdSEd Maste if (bevf->outbuf_cb)
269c43e99fdSEd Maste evbuffer_remove_cb_entry(bev->output, bevf->outbuf_cb);
270c43e99fdSEd Maste }
271c43e99fdSEd Maste
272c43e99fdSEd Maste static int
be_filter_enable(struct bufferevent * bev,short event)273c43e99fdSEd Maste be_filter_enable(struct bufferevent *bev, short event)
274c43e99fdSEd Maste {
275c43e99fdSEd Maste struct bufferevent_filtered *bevf = upcast(bev);
276c43e99fdSEd Maste if (event & EV_WRITE)
277c43e99fdSEd Maste BEV_RESET_GENERIC_WRITE_TIMEOUT(bev);
278c43e99fdSEd Maste
279c43e99fdSEd Maste if (event & EV_READ) {
280c43e99fdSEd Maste BEV_RESET_GENERIC_READ_TIMEOUT(bev);
281c43e99fdSEd Maste bufferevent_unsuspend_read_(bevf->underlying,
282c43e99fdSEd Maste BEV_SUSPEND_FILT_READ);
283c43e99fdSEd Maste }
284c43e99fdSEd Maste return 0;
285c43e99fdSEd Maste }
286c43e99fdSEd Maste
287c43e99fdSEd Maste static int
be_filter_disable(struct bufferevent * bev,short event)288c43e99fdSEd Maste be_filter_disable(struct bufferevent *bev, short event)
289c43e99fdSEd Maste {
290c43e99fdSEd Maste struct bufferevent_filtered *bevf = upcast(bev);
291c43e99fdSEd Maste if (event & EV_WRITE)
292c43e99fdSEd Maste BEV_DEL_GENERIC_WRITE_TIMEOUT(bev);
293c43e99fdSEd Maste if (event & EV_READ) {
294c43e99fdSEd Maste BEV_DEL_GENERIC_READ_TIMEOUT(bev);
295c43e99fdSEd Maste bufferevent_suspend_read_(bevf->underlying,
296c43e99fdSEd Maste BEV_SUSPEND_FILT_READ);
297c43e99fdSEd Maste }
298c43e99fdSEd Maste return 0;
299c43e99fdSEd Maste }
300c43e99fdSEd Maste
301c43e99fdSEd Maste static enum bufferevent_filter_result
be_filter_process_input(struct bufferevent_filtered * bevf,enum bufferevent_flush_mode state,int * processed_out)302c43e99fdSEd Maste be_filter_process_input(struct bufferevent_filtered *bevf,
303c43e99fdSEd Maste enum bufferevent_flush_mode state,
304c43e99fdSEd Maste int *processed_out)
305c43e99fdSEd Maste {
306c43e99fdSEd Maste enum bufferevent_filter_result res;
307c43e99fdSEd Maste struct bufferevent *bev = downcast(bevf);
308c43e99fdSEd Maste
309c43e99fdSEd Maste if (state == BEV_NORMAL) {
310c43e99fdSEd Maste /* If we're in 'normal' mode, don't urge data on the filter
311c43e99fdSEd Maste * unless we're reading data and under our high-water mark.*/
312c43e99fdSEd Maste if (!(bev->enabled & EV_READ) ||
313c43e99fdSEd Maste be_readbuf_full(bevf, state))
314c43e99fdSEd Maste return BEV_OK;
315c43e99fdSEd Maste }
316c43e99fdSEd Maste
317c43e99fdSEd Maste do {
318c43e99fdSEd Maste ev_ssize_t limit = -1;
319c43e99fdSEd Maste if (state == BEV_NORMAL && bev->wm_read.high)
320c43e99fdSEd Maste limit = bev->wm_read.high -
321c43e99fdSEd Maste evbuffer_get_length(bev->input);
322c43e99fdSEd Maste
323c43e99fdSEd Maste res = bevf->process_in(bevf->underlying->input,
324c43e99fdSEd Maste bev->input, limit, state, bevf->context);
325c43e99fdSEd Maste
326c43e99fdSEd Maste if (res == BEV_OK)
327c43e99fdSEd Maste *processed_out = 1;
328c43e99fdSEd Maste } while (res == BEV_OK &&
329c43e99fdSEd Maste (bev->enabled & EV_READ) &&
330c43e99fdSEd Maste evbuffer_get_length(bevf->underlying->input) &&
331c43e99fdSEd Maste !be_readbuf_full(bevf, state));
332c43e99fdSEd Maste
333c43e99fdSEd Maste if (*processed_out)
334c43e99fdSEd Maste BEV_RESET_GENERIC_READ_TIMEOUT(bev);
335c43e99fdSEd Maste
336c43e99fdSEd Maste return res;
337c43e99fdSEd Maste }
338c43e99fdSEd Maste
339c43e99fdSEd Maste
340c43e99fdSEd Maste static enum bufferevent_filter_result
be_filter_process_output(struct bufferevent_filtered * bevf,enum bufferevent_flush_mode state,int * processed_out)341c43e99fdSEd Maste be_filter_process_output(struct bufferevent_filtered *bevf,
342c43e99fdSEd Maste enum bufferevent_flush_mode state,
343c43e99fdSEd Maste int *processed_out)
344c43e99fdSEd Maste {
345c43e99fdSEd Maste /* Requires references and lock: might call writecb */
346c43e99fdSEd Maste enum bufferevent_filter_result res = BEV_OK;
347c43e99fdSEd Maste struct bufferevent *bufev = downcast(bevf);
348c43e99fdSEd Maste int again = 0;
349c43e99fdSEd Maste
350c43e99fdSEd Maste if (state == BEV_NORMAL) {
351c43e99fdSEd Maste /* If we're in 'normal' mode, don't urge data on the
352c43e99fdSEd Maste * filter unless we're writing data, and the underlying
353c43e99fdSEd Maste * bufferevent is accepting data, and we have data to
354c43e99fdSEd Maste * give the filter. If we're in 'flush' or 'finish',
355c43e99fdSEd Maste * call the filter no matter what. */
356c43e99fdSEd Maste if (!(bufev->enabled & EV_WRITE) ||
357c43e99fdSEd Maste be_underlying_writebuf_full(bevf, state) ||
358c43e99fdSEd Maste !evbuffer_get_length(bufev->output))
359c43e99fdSEd Maste return BEV_OK;
360c43e99fdSEd Maste }
361c43e99fdSEd Maste
362c43e99fdSEd Maste /* disable the callback that calls this function
363c43e99fdSEd Maste when the user adds to the output buffer. */
364c43e99fdSEd Maste evbuffer_cb_clear_flags(bufev->output, bevf->outbuf_cb,
365c43e99fdSEd Maste EVBUFFER_CB_ENABLED);
366c43e99fdSEd Maste
367c43e99fdSEd Maste do {
368c43e99fdSEd Maste int processed = 0;
369c43e99fdSEd Maste again = 0;
370c43e99fdSEd Maste
371c43e99fdSEd Maste do {
372c43e99fdSEd Maste ev_ssize_t limit = -1;
373c43e99fdSEd Maste if (state == BEV_NORMAL &&
374c43e99fdSEd Maste bevf->underlying->wm_write.high)
375c43e99fdSEd Maste limit = bevf->underlying->wm_write.high -
376c43e99fdSEd Maste evbuffer_get_length(bevf->underlying->output);
377c43e99fdSEd Maste
378c43e99fdSEd Maste res = bevf->process_out(downcast(bevf)->output,
379c43e99fdSEd Maste bevf->underlying->output,
380c43e99fdSEd Maste limit,
381c43e99fdSEd Maste state,
382c43e99fdSEd Maste bevf->context);
383c43e99fdSEd Maste
384c43e99fdSEd Maste if (res == BEV_OK)
385c43e99fdSEd Maste processed = *processed_out = 1;
386c43e99fdSEd Maste } while (/* Stop if the filter wasn't successful...*/
387c43e99fdSEd Maste res == BEV_OK &&
388c43e99fdSEd Maste /* Or if we aren't writing any more. */
389c43e99fdSEd Maste (bufev->enabled & EV_WRITE) &&
390c43e99fdSEd Maste /* Of if we have nothing more to write and we are
391c43e99fdSEd Maste * not flushing. */
392c43e99fdSEd Maste evbuffer_get_length(bufev->output) &&
393c43e99fdSEd Maste /* Or if we have filled the underlying output buffer. */
394c43e99fdSEd Maste !be_underlying_writebuf_full(bevf,state));
395c43e99fdSEd Maste
396c43e99fdSEd Maste if (processed) {
397c43e99fdSEd Maste /* call the write callback.*/
398c43e99fdSEd Maste bufferevent_trigger_nolock_(bufev, EV_WRITE, 0);
399c43e99fdSEd Maste
400c43e99fdSEd Maste if (res == BEV_OK &&
401c43e99fdSEd Maste (bufev->enabled & EV_WRITE) &&
402c43e99fdSEd Maste evbuffer_get_length(bufev->output) &&
403c43e99fdSEd Maste !be_underlying_writebuf_full(bevf, state)) {
404c43e99fdSEd Maste again = 1;
405c43e99fdSEd Maste }
406c43e99fdSEd Maste }
407c43e99fdSEd Maste } while (again);
408c43e99fdSEd Maste
409c43e99fdSEd Maste /* reenable the outbuf_cb */
410c43e99fdSEd Maste evbuffer_cb_set_flags(bufev->output,bevf->outbuf_cb,
411c43e99fdSEd Maste EVBUFFER_CB_ENABLED);
412c43e99fdSEd Maste
413c43e99fdSEd Maste if (*processed_out)
414c43e99fdSEd Maste BEV_RESET_GENERIC_WRITE_TIMEOUT(bufev);
415c43e99fdSEd Maste
416c43e99fdSEd Maste return res;
417c43e99fdSEd Maste }
418c43e99fdSEd Maste
419c43e99fdSEd Maste /* Called when the size of our outbuf changes. */
420c43e99fdSEd Maste static void
bufferevent_filtered_outbuf_cb(struct evbuffer * buf,const struct evbuffer_cb_info * cbinfo,void * arg)421c43e99fdSEd Maste bufferevent_filtered_outbuf_cb(struct evbuffer *buf,
422c43e99fdSEd Maste const struct evbuffer_cb_info *cbinfo, void *arg)
423c43e99fdSEd Maste {
424c43e99fdSEd Maste struct bufferevent_filtered *bevf = arg;
425c43e99fdSEd Maste struct bufferevent *bev = downcast(bevf);
426c43e99fdSEd Maste
427c43e99fdSEd Maste if (cbinfo->n_added) {
428c43e99fdSEd Maste int processed_any = 0;
429c43e99fdSEd Maste /* Somebody added more data to the output buffer. Try to
430c43e99fdSEd Maste * process it, if we should. */
431c43e99fdSEd Maste bufferevent_incref_and_lock_(bev);
432c43e99fdSEd Maste be_filter_process_output(bevf, BEV_NORMAL, &processed_any);
433c43e99fdSEd Maste bufferevent_decref_and_unlock_(bev);
434c43e99fdSEd Maste }
435c43e99fdSEd Maste }
436c43e99fdSEd Maste
437c43e99fdSEd Maste static void
be_filter_read_nolock_(struct bufferevent * underlying,void * me_)438c43e99fdSEd Maste be_filter_read_nolock_(struct bufferevent *underlying, void *me_)
439c43e99fdSEd Maste {
440c43e99fdSEd Maste struct bufferevent_filtered *bevf = me_;
441c43e99fdSEd Maste enum bufferevent_filter_result res;
442c43e99fdSEd Maste enum bufferevent_flush_mode state;
443c43e99fdSEd Maste struct bufferevent *bufev = downcast(bevf);
444c43e99fdSEd Maste struct bufferevent_private *bufev_private = BEV_UPCAST(bufev);
445c43e99fdSEd Maste int processed_any = 0;
446c43e99fdSEd Maste
447c43e99fdSEd Maste // It's possible our refcount is 0 at this point if another thread free'd our filterevent
448c43e99fdSEd Maste EVUTIL_ASSERT(bufev_private->refcnt >= 0);
449c43e99fdSEd Maste
450c43e99fdSEd Maste // If our refcount is > 0
451c43e99fdSEd Maste if (bufev_private->refcnt > 0) {
452c43e99fdSEd Maste
453c43e99fdSEd Maste if (bevf->got_eof)
454c43e99fdSEd Maste state = BEV_FINISHED;
455c43e99fdSEd Maste else
456c43e99fdSEd Maste state = BEV_NORMAL;
457c43e99fdSEd Maste
458c43e99fdSEd Maste /* XXXX use return value */
459c43e99fdSEd Maste res = be_filter_process_input(bevf, state, &processed_any);
460c43e99fdSEd Maste (void)res;
461c43e99fdSEd Maste
462c43e99fdSEd Maste /* XXX This should be in process_input, not here. There are
463c43e99fdSEd Maste * other places that can call process-input, and they should
464c43e99fdSEd Maste * force readcb calls as needed. */
465c43e99fdSEd Maste if (processed_any) {
466c43e99fdSEd Maste bufferevent_trigger_nolock_(bufev, EV_READ, 0);
467c43e99fdSEd Maste if (evbuffer_get_length(underlying->input) > 0 &&
468c43e99fdSEd Maste be_readbuf_full(bevf, state)) {
469c43e99fdSEd Maste /* data left in underlying buffer and filter input buffer
470c43e99fdSEd Maste * hit its read high watermark.
471c43e99fdSEd Maste * Schedule callback to avoid data gets stuck in underlying
472c43e99fdSEd Maste * input buffer.
473c43e99fdSEd Maste */
474c43e99fdSEd Maste evbuffer_cb_set_flags(bufev->input, bevf->inbuf_cb,
475c43e99fdSEd Maste EVBUFFER_CB_ENABLED);
476c43e99fdSEd Maste }
477c43e99fdSEd Maste }
478c43e99fdSEd Maste }
479c43e99fdSEd Maste }
480c43e99fdSEd Maste
481c43e99fdSEd Maste /* Called when the size of our inbuf changes. */
482c43e99fdSEd Maste static void
bufferevent_filtered_inbuf_cb(struct evbuffer * buf,const struct evbuffer_cb_info * cbinfo,void * arg)483c43e99fdSEd Maste bufferevent_filtered_inbuf_cb(struct evbuffer *buf,
484c43e99fdSEd Maste const struct evbuffer_cb_info *cbinfo, void *arg)
485c43e99fdSEd Maste {
486c43e99fdSEd Maste struct bufferevent_filtered *bevf = arg;
487c43e99fdSEd Maste enum bufferevent_flush_mode state;
488c43e99fdSEd Maste struct bufferevent *bev = downcast(bevf);
489c43e99fdSEd Maste
490c43e99fdSEd Maste BEV_LOCK(bev);
491c43e99fdSEd Maste
492c43e99fdSEd Maste if (bevf->got_eof)
493c43e99fdSEd Maste state = BEV_FINISHED;
494c43e99fdSEd Maste else
495c43e99fdSEd Maste state = BEV_NORMAL;
496c43e99fdSEd Maste
497c43e99fdSEd Maste
498c43e99fdSEd Maste if (!be_readbuf_full(bevf, state)) {
499c43e99fdSEd Maste /* opportunity to read data which was left in underlying
500c43e99fdSEd Maste * input buffer because filter input buffer hit read
501c43e99fdSEd Maste * high watermark.
502c43e99fdSEd Maste */
503c43e99fdSEd Maste evbuffer_cb_clear_flags(bev->input, bevf->inbuf_cb,
504c43e99fdSEd Maste EVBUFFER_CB_ENABLED);
505c43e99fdSEd Maste if (evbuffer_get_length(bevf->underlying->input) > 0)
506c43e99fdSEd Maste be_filter_read_nolock_(bevf->underlying, bevf);
507c43e99fdSEd Maste }
508c43e99fdSEd Maste
509c43e99fdSEd Maste BEV_UNLOCK(bev);
510c43e99fdSEd Maste }
511c43e99fdSEd Maste
512c43e99fdSEd Maste /* Called when the underlying socket has read. */
513c43e99fdSEd Maste static void
be_filter_readcb(struct bufferevent * underlying,void * me_)514c43e99fdSEd Maste be_filter_readcb(struct bufferevent *underlying, void *me_)
515c43e99fdSEd Maste {
516c43e99fdSEd Maste struct bufferevent_filtered *bevf = me_;
517c43e99fdSEd Maste struct bufferevent *bev = downcast(bevf);
518c43e99fdSEd Maste
519c43e99fdSEd Maste BEV_LOCK(bev);
520c43e99fdSEd Maste
521c43e99fdSEd Maste be_filter_read_nolock_(underlying, me_);
522c43e99fdSEd Maste
523c43e99fdSEd Maste BEV_UNLOCK(bev);
524c43e99fdSEd Maste }
525c43e99fdSEd Maste
526c43e99fdSEd Maste /* Called when the underlying socket has drained enough that we can write to
527c43e99fdSEd Maste it. */
528c43e99fdSEd Maste static void
be_filter_writecb(struct bufferevent * underlying,void * me_)529c43e99fdSEd Maste be_filter_writecb(struct bufferevent *underlying, void *me_)
530c43e99fdSEd Maste {
531c43e99fdSEd Maste struct bufferevent_filtered *bevf = me_;
532c43e99fdSEd Maste struct bufferevent *bev = downcast(bevf);
533c43e99fdSEd Maste struct bufferevent_private *bufev_private = BEV_UPCAST(bev);
534c43e99fdSEd Maste int processed_any = 0;
535c43e99fdSEd Maste
536c43e99fdSEd Maste BEV_LOCK(bev);
537c43e99fdSEd Maste
538c43e99fdSEd Maste // It's possible our refcount is 0 at this point if another thread free'd our filterevent
539c43e99fdSEd Maste EVUTIL_ASSERT(bufev_private->refcnt >= 0);
540c43e99fdSEd Maste
541c43e99fdSEd Maste // If our refcount is > 0
542c43e99fdSEd Maste if (bufev_private->refcnt > 0) {
543c43e99fdSEd Maste be_filter_process_output(bevf, BEV_NORMAL, &processed_any);
544c43e99fdSEd Maste }
545c43e99fdSEd Maste
546c43e99fdSEd Maste BEV_UNLOCK(bev);
547c43e99fdSEd Maste }
548c43e99fdSEd Maste
549c43e99fdSEd Maste /* Called when the underlying socket has given us an error */
550c43e99fdSEd Maste static void
be_filter_eventcb(struct bufferevent * underlying,short what,void * me_)551c43e99fdSEd Maste be_filter_eventcb(struct bufferevent *underlying, short what, void *me_)
552c43e99fdSEd Maste {
553c43e99fdSEd Maste struct bufferevent_filtered *bevf = me_;
554c43e99fdSEd Maste struct bufferevent *bev = downcast(bevf);
555c43e99fdSEd Maste struct bufferevent_private *bufev_private = BEV_UPCAST(bev);
556c43e99fdSEd Maste
557c43e99fdSEd Maste BEV_LOCK(bev);
558c43e99fdSEd Maste
559c43e99fdSEd Maste // It's possible our refcount is 0 at this point if another thread free'd our filterevent
560c43e99fdSEd Maste EVUTIL_ASSERT(bufev_private->refcnt >= 0);
561c43e99fdSEd Maste
562c43e99fdSEd Maste // If our refcount is > 0
563c43e99fdSEd Maste if (bufev_private->refcnt > 0) {
564c43e99fdSEd Maste
565c43e99fdSEd Maste /* All we can really to is tell our own eventcb. */
566c43e99fdSEd Maste bufferevent_run_eventcb_(bev, what, 0);
567c43e99fdSEd Maste }
568c43e99fdSEd Maste
569c43e99fdSEd Maste BEV_UNLOCK(bev);
570c43e99fdSEd Maste }
571c43e99fdSEd Maste
572c43e99fdSEd Maste static int
be_filter_flush(struct bufferevent * bufev,short iotype,enum bufferevent_flush_mode mode)573c43e99fdSEd Maste be_filter_flush(struct bufferevent *bufev,
574c43e99fdSEd Maste short iotype, enum bufferevent_flush_mode mode)
575c43e99fdSEd Maste {
576c43e99fdSEd Maste struct bufferevent_filtered *bevf = upcast(bufev);
577c43e99fdSEd Maste int processed_any = 0;
578c43e99fdSEd Maste EVUTIL_ASSERT(bevf);
579c43e99fdSEd Maste
580c43e99fdSEd Maste bufferevent_incref_and_lock_(bufev);
581c43e99fdSEd Maste
582c43e99fdSEd Maste if (iotype & EV_READ) {
583c43e99fdSEd Maste be_filter_process_input(bevf, mode, &processed_any);
584c43e99fdSEd Maste }
585c43e99fdSEd Maste if (iotype & EV_WRITE) {
586c43e99fdSEd Maste be_filter_process_output(bevf, mode, &processed_any);
587c43e99fdSEd Maste }
588c43e99fdSEd Maste /* XXX check the return value? */
589c43e99fdSEd Maste /* XXX does this want to recursively call lower-level flushes? */
590c43e99fdSEd Maste bufferevent_flush(bevf->underlying, iotype, mode);
591c43e99fdSEd Maste
592c43e99fdSEd Maste bufferevent_decref_and_unlock_(bufev);
593c43e99fdSEd Maste
594c43e99fdSEd Maste return processed_any;
595c43e99fdSEd Maste }
596c43e99fdSEd Maste
597c43e99fdSEd Maste static int
be_filter_ctrl(struct bufferevent * bev,enum bufferevent_ctrl_op op,union bufferevent_ctrl_data * data)598c43e99fdSEd Maste be_filter_ctrl(struct bufferevent *bev, enum bufferevent_ctrl_op op,
599c43e99fdSEd Maste union bufferevent_ctrl_data *data)
600c43e99fdSEd Maste {
601c43e99fdSEd Maste struct bufferevent_filtered *bevf;
602c43e99fdSEd Maste switch (op) {
603c43e99fdSEd Maste case BEV_CTRL_GET_UNDERLYING:
604c43e99fdSEd Maste bevf = upcast(bev);
605c43e99fdSEd Maste data->ptr = bevf->underlying;
606c43e99fdSEd Maste return 0;
607c43e99fdSEd Maste case BEV_CTRL_SET_FD:
608*b50261e2SCy Schubert case BEV_CTRL_GET_FD:
609c43e99fdSEd Maste bevf = upcast(bev);
610c43e99fdSEd Maste
611c43e99fdSEd Maste if (bevf->underlying &&
612c43e99fdSEd Maste bevf->underlying->be_ops &&
613c43e99fdSEd Maste bevf->underlying->be_ops->ctrl) {
614c43e99fdSEd Maste return (bevf->underlying->be_ops->ctrl)(bevf->underlying, op, data);
615c43e99fdSEd Maste }
616*b50261e2SCy Schubert EVUTIL_FALLTHROUGH;
617c43e99fdSEd Maste
618c43e99fdSEd Maste case BEV_CTRL_CANCEL_ALL:
619*b50261e2SCy Schubert EVUTIL_FALLTHROUGH;
620c43e99fdSEd Maste default:
621c43e99fdSEd Maste return -1;
622c43e99fdSEd Maste }
623c43e99fdSEd Maste
624c43e99fdSEd Maste return -1;
625c43e99fdSEd Maste }
626