xref: /netbsd-src/external/bsd/ntp/dist/sntp/libevent/bufferevent_pair.c (revision bdc22b2e01993381dcefeff2bc9b56ca75a4235c)
1 /*	$NetBSD: bufferevent_pair.c,v 1.5 2016/01/08 21:35:40 christos Exp $	*/
2 
3 /*
4  * Copyright (c) 2009-2012 Niels Provos, Nick Mathewson
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions
8  * are met:
9  * 1. Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  * 2. Redistributions in binary form must reproduce the above copyright
12  *    notice, this list of conditions and the following disclaimer in the
13  *    documentation and/or other materials provided with the distribution.
14  * 3. The name of the author may not be used to endorse or promote products
15  *    derived from this software without specific prior written permission.
16  *
17  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
18  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
19  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
20  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
21  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
22  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
23  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
24  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
25  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
26  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
27  */
28 #include "event2/event-config.h"
29 #include "evconfig-private.h"
30 
31 #include <sys/types.h>
32 
33 #ifdef _WIN32
34 #include <winsock2.h>
35 #endif
36 
37 #include "event2/util.h"
38 #include "event2/buffer.h"
39 #include "event2/bufferevent.h"
40 #include "event2/bufferevent_struct.h"
41 #include "event2/event.h"
42 #include "defer-internal.h"
43 #include "bufferevent-internal.h"
44 #include "mm-internal.h"
45 #include "util-internal.h"
46 
47 struct bufferevent_pair {
48 	struct bufferevent_private bev;
49 	struct bufferevent_pair *partner;
50 	/* For ->destruct() lock checking */
51 	struct bufferevent_pair *unlinked_partner;
52 };
53 
54 
55 /* Given a bufferevent that's really a bev part of a bufferevent_pair,
56  * return that bufferevent_filtered. Returns NULL otherwise.*/
57 static inline struct bufferevent_pair *
58 upcast(struct bufferevent *bev)
59 {
60 	struct bufferevent_pair *bev_p;
61 	if (bev->be_ops != &bufferevent_ops_pair)
62 		return NULL;
63 	bev_p = EVUTIL_UPCAST(bev, struct bufferevent_pair, bev.bev);
64 	EVUTIL_ASSERT(bev_p->bev.bev.be_ops == &bufferevent_ops_pair);
65 	return bev_p;
66 }
67 
68 #define downcast(bev_pair) (&(bev_pair)->bev.bev)
69 
70 static inline void
71 incref_and_lock(struct bufferevent *b)
72 {
73 	struct bufferevent_pair *bevp;
74 	bufferevent_incref_and_lock_(b);
75 	bevp = upcast(b);
76 	if (bevp->partner)
77 		bufferevent_incref_and_lock_(downcast(bevp->partner));
78 }
79 
80 static inline void
81 decref_and_unlock(struct bufferevent *b)
82 {
83 	struct bufferevent_pair *bevp = upcast(b);
84 	if (bevp->partner)
85 		bufferevent_decref_and_unlock_(downcast(bevp->partner));
86 	bufferevent_decref_and_unlock_(b);
87 }
88 
89 /* XXX Handle close */
90 
91 static void be_pair_outbuf_cb(struct evbuffer *,
92     const struct evbuffer_cb_info *, void *);
93 
94 static struct bufferevent_pair *
95 bufferevent_pair_elt_new(struct event_base *base,
96     int options)
97 {
98 	struct bufferevent_pair *bufev;
99 	if (! (bufev = mm_calloc(1, sizeof(struct bufferevent_pair))))
100 		return NULL;
101 	if (bufferevent_init_common_(&bufev->bev, base, &bufferevent_ops_pair,
102 		options)) {
103 		mm_free(bufev);
104 		return NULL;
105 	}
106 	if (!evbuffer_add_cb(bufev->bev.bev.output, be_pair_outbuf_cb, bufev)) {
107 		bufferevent_free(downcast(bufev));
108 		return NULL;
109 	}
110 
111 	bufferevent_init_generic_timeout_cbs_(&bufev->bev.bev);
112 
113 	return bufev;
114 }
115 
116 int
117 bufferevent_pair_new(struct event_base *base, int options,
118     struct bufferevent *pair[2])
119 {
120 	struct bufferevent_pair *bufev1 = NULL, *bufev2 = NULL;
121 	int tmp_options;
122 
123 	options |= BEV_OPT_DEFER_CALLBACKS;
124 	tmp_options = options & ~BEV_OPT_THREADSAFE;
125 
126 	bufev1 = bufferevent_pair_elt_new(base, options);
127 	if (!bufev1)
128 		return -1;
129 	bufev2 = bufferevent_pair_elt_new(base, tmp_options);
130 	if (!bufev2) {
131 		bufferevent_free(downcast(bufev1));
132 		return -1;
133 	}
134 
135 	if (options & BEV_OPT_THREADSAFE) {
136 		/*XXXX check return */
137 		bufferevent_enable_locking_(downcast(bufev2), bufev1->bev.lock);
138 	}
139 
140 	bufev1->partner = bufev2;
141 	bufev2->partner = bufev1;
142 
143 	evbuffer_freeze(downcast(bufev1)->input, 0);
144 	evbuffer_freeze(downcast(bufev1)->output, 1);
145 	evbuffer_freeze(downcast(bufev2)->input, 0);
146 	evbuffer_freeze(downcast(bufev2)->output, 1);
147 
148 	pair[0] = downcast(bufev1);
149 	pair[1] = downcast(bufev2);
150 
151 	return 0;
152 }
153 
154 static void
155 be_pair_transfer(struct bufferevent *src, struct bufferevent *dst,
156     int ignore_wm)
157 {
158 	size_t dst_size;
159 	size_t n;
160 
161 	evbuffer_unfreeze(src->output, 1);
162 	evbuffer_unfreeze(dst->input, 0);
163 
164 	if (dst->wm_read.high) {
165 		dst_size = evbuffer_get_length(dst->input);
166 		if (dst_size < dst->wm_read.high) {
167 			n = dst->wm_read.high - dst_size;
168 			evbuffer_remove_buffer(src->output, dst->input, n);
169 		} else {
170 			if (!ignore_wm)
171 				goto done;
172 			n = evbuffer_get_length(src->output);
173 			evbuffer_add_buffer(dst->input, src->output);
174 		}
175 	} else {
176 		n = evbuffer_get_length(src->output);
177 		evbuffer_add_buffer(dst->input, src->output);
178 	}
179 
180 	if (n) {
181 		BEV_RESET_GENERIC_READ_TIMEOUT(dst);
182 
183 		if (evbuffer_get_length(dst->output))
184 			BEV_RESET_GENERIC_WRITE_TIMEOUT(dst);
185 		else
186 			BEV_DEL_GENERIC_WRITE_TIMEOUT(dst);
187 	}
188 
189 	bufferevent_trigger_nolock_(dst, EV_READ, 0);
190 	bufferevent_trigger_nolock_(src, EV_WRITE, 0);
191 done:
192 	evbuffer_freeze(src->output, 1);
193 	evbuffer_freeze(dst->input, 0);
194 }
195 
196 static inline int
197 be_pair_wants_to_talk(struct bufferevent_pair *src,
198     struct bufferevent_pair *dst)
199 {
200 	return (downcast(src)->enabled & EV_WRITE) &&
201 	    (downcast(dst)->enabled & EV_READ) &&
202 	    !dst->bev.read_suspended &&
203 	    evbuffer_get_length(downcast(src)->output);
204 }
205 
206 static void
207 be_pair_outbuf_cb(struct evbuffer *outbuf,
208     const struct evbuffer_cb_info *info, void *arg)
209 {
210 	struct bufferevent_pair *bev_pair = arg;
211 	struct bufferevent_pair *partner = bev_pair->partner;
212 
213 	incref_and_lock(downcast(bev_pair));
214 
215 	if (info->n_added > info->n_deleted && partner) {
216 		/* We got more data.  If the other side's reading, then
217 		   hand it over. */
218 		if (be_pair_wants_to_talk(bev_pair, partner)) {
219 			be_pair_transfer(downcast(bev_pair), downcast(partner), 0);
220 		}
221 	}
222 
223 	decref_and_unlock(downcast(bev_pair));
224 }
225 
226 static int
227 be_pair_enable(struct bufferevent *bufev, short events)
228 {
229 	struct bufferevent_pair *bev_p = upcast(bufev);
230 	struct bufferevent_pair *partner = bev_p->partner;
231 
232 	incref_and_lock(bufev);
233 
234 	if (events & EV_READ) {
235 		BEV_RESET_GENERIC_READ_TIMEOUT(bufev);
236 	}
237 	if ((events & EV_WRITE) && evbuffer_get_length(bufev->output))
238 		BEV_RESET_GENERIC_WRITE_TIMEOUT(bufev);
239 
240 	/* We're starting to read! Does the other side have anything to write?*/
241 	if ((events & EV_READ) && partner &&
242 	    be_pair_wants_to_talk(partner, bev_p)) {
243 		be_pair_transfer(downcast(partner), bufev, 0);
244 	}
245 	/* We're starting to write! Does the other side want to read? */
246 	if ((events & EV_WRITE) && partner &&
247 	    be_pair_wants_to_talk(bev_p, partner)) {
248 		be_pair_transfer(bufev, downcast(partner), 0);
249 	}
250 	decref_and_unlock(bufev);
251 	return 0;
252 }
253 
254 static int
255 be_pair_disable(struct bufferevent *bev, short events)
256 {
257 	if (events & EV_READ) {
258 		BEV_DEL_GENERIC_READ_TIMEOUT(bev);
259 	}
260 	if (events & EV_WRITE) {
261 		BEV_DEL_GENERIC_WRITE_TIMEOUT(bev);
262 	}
263 	return 0;
264 }
265 
266 static void
267 be_pair_unlink(struct bufferevent *bev)
268 {
269 	struct bufferevent_pair *bev_p = upcast(bev);
270 
271 	if (bev_p->partner) {
272 		bev_p->unlinked_partner = bev_p->partner;
273 		bev_p->partner->partner = NULL;
274 		bev_p->partner = NULL;
275 	}
276 }
277 
278 /* Free *shared* lock in the latest be (since we share it between two of them). */
279 static void
280 be_pair_destruct(struct bufferevent *bev)
281 {
282 	struct bufferevent_pair *bev_p = upcast(bev);
283 
284 	/* Transfer ownership of the lock into partner, otherwise we will use
285 	 * already free'd lock during freeing second bev, see next example:
286 	 *
287 	 * bev1->own_lock = 1
288 	 * bev2->own_lock = 0
289 	 * bev2->lock = bev1->lock
290 	 *
291 	 * bufferevent_free(bev1) # refcnt == 0 -> unlink
292 	 * bufferevent_free(bev2) # refcnt == 0 -> unlink
293 	 *
294 	 * event_base_free() -> finilizers -> EVTHREAD_FREE_LOCK(bev1->lock)
295 	 *                                 -> BEV_LOCK(bev2->lock) <-- already freed
296 	 *
297 	 * Where bev1 == pair[0], bev2 == pair[1].
298 	 */
299 	if (bev_p->unlinked_partner && bev_p->bev.own_lock) {
300 		bev_p->unlinked_partner->bev.own_lock = 1;
301 		bev_p->bev.own_lock = 0;
302 	}
303 	bev_p->unlinked_partner = NULL;
304 }
305 
306 static int
307 be_pair_flush(struct bufferevent *bev, short iotype,
308     enum bufferevent_flush_mode mode)
309 {
310 	struct bufferevent_pair *bev_p = upcast(bev);
311 	struct bufferevent *partner;
312 	incref_and_lock(bev);
313 	if (!bev_p->partner)
314 		return -1;
315 
316 	partner = downcast(bev_p->partner);
317 
318 	if (mode == BEV_NORMAL)
319 		return 0;
320 
321 	if ((iotype & EV_READ) != 0)
322 		be_pair_transfer(partner, bev, 1);
323 
324 	if ((iotype & EV_WRITE) != 0)
325 		be_pair_transfer(bev, partner, 1);
326 
327 	if (mode == BEV_FINISHED) {
328 		bufferevent_run_eventcb_(partner, iotype|BEV_EVENT_EOF, 0);
329 	}
330 	decref_and_unlock(bev);
331 	return 0;
332 }
333 
334 struct bufferevent *
335 bufferevent_pair_get_partner(struct bufferevent *bev)
336 {
337 	struct bufferevent_pair *bev_p;
338 	struct bufferevent *partner = NULL;
339 	bev_p = upcast(bev);
340 	if (! bev_p)
341 		return NULL;
342 
343 	incref_and_lock(bev);
344 	if (bev_p->partner)
345 		partner = downcast(bev_p->partner);
346 	decref_and_unlock(bev);
347 	return partner;
348 }
349 
350 const struct bufferevent_ops bufferevent_ops_pair = {
351 	"pair_elt",
352 	evutil_offsetof(struct bufferevent_pair, bev.bev),
353 	be_pair_enable,
354 	be_pair_disable,
355 	be_pair_unlink,
356 	be_pair_destruct,
357 	bufferevent_generic_adj_timeouts_,
358 	be_pair_flush,
359 	NULL, /* ctrl */
360 };
361