1 /* $NetBSD: bufferevent_pair.c,v 1.1.1.3 2017/01/31 21:14:52 christos Exp $ */ 2 /* 3 * Copyright (c) 2009-2012 Niels Provos, Nick Mathewson 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions 7 * are met: 8 * 1. Redistributions of source code must retain the above copyright 9 * notice, this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright 11 * notice, this list of conditions and the following disclaimer in the 12 * documentation and/or other materials provided with the distribution. 13 * 3. The name of the author may not be used to endorse or promote products 14 * derived from this software without specific prior written permission. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR 17 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 18 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 19 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, 20 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 21 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 22 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 23 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 24 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 25 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 26 */ 27 #include "event2/event-config.h" 28 #include <sys/cdefs.h> 29 __RCSID("$NetBSD: bufferevent_pair.c,v 1.1.1.3 2017/01/31 21:14:52 christos Exp $"); 30 #include "evconfig-private.h" 31 32 #include <sys/types.h> 33 34 #ifdef _WIN32 35 #include <winsock2.h> 36 #endif 37 38 #include "event2/util.h" 39 #include "event2/buffer.h" 40 #include "event2/bufferevent.h" 41 #include "event2/bufferevent_struct.h" 42 #include "event2/event.h" 43 #include "defer-internal.h" 44 #include "bufferevent-internal.h" 45 #include "mm-internal.h" 46 #include "util-internal.h" 47 48 struct bufferevent_pair { 49 struct bufferevent_private bev; 50 struct bufferevent_pair *partner; 51 /* For ->destruct() lock checking */ 52 struct bufferevent_pair *unlinked_partner; 53 }; 54 55 56 /* Given a bufferevent that's really a bev part of a bufferevent_pair, 57 * return that bufferevent_filtered. Returns NULL otherwise.*/ 58 static inline struct bufferevent_pair * 59 upcast(struct bufferevent *bev) 60 { 61 struct bufferevent_pair *bev_p; 62 if (bev->be_ops != &bufferevent_ops_pair) 63 return NULL; 64 bev_p = EVUTIL_UPCAST(bev, struct bufferevent_pair, bev.bev); 65 EVUTIL_ASSERT(bev_p->bev.bev.be_ops == &bufferevent_ops_pair); 66 return bev_p; 67 } 68 69 #define downcast(bev_pair) (&(bev_pair)->bev.bev) 70 71 static inline void 72 incref_and_lock(struct bufferevent *b) 73 { 74 struct bufferevent_pair *bevp; 75 bufferevent_incref_and_lock_(b); 76 bevp = upcast(b); 77 if (bevp->partner) 78 bufferevent_incref_and_lock_(downcast(bevp->partner)); 79 } 80 81 static inline void 82 decref_and_unlock(struct bufferevent *b) 83 { 84 struct bufferevent_pair *bevp = upcast(b); 85 if (bevp->partner) 86 bufferevent_decref_and_unlock_(downcast(bevp->partner)); 87 bufferevent_decref_and_unlock_(b); 88 } 89 90 /* XXX Handle close */ 91 92 static void be_pair_outbuf_cb(struct evbuffer *, 93 const struct evbuffer_cb_info *, void *); 94 95 static struct bufferevent_pair * 96 bufferevent_pair_elt_new(struct event_base *base, 97 int options) 98 { 99 struct bufferevent_pair *bufev; 100 if (! (bufev = mm_calloc(1, sizeof(struct bufferevent_pair)))) 101 return NULL; 102 if (bufferevent_init_common_(&bufev->bev, base, &bufferevent_ops_pair, 103 options)) { 104 mm_free(bufev); 105 return NULL; 106 } 107 if (!evbuffer_add_cb(bufev->bev.bev.output, be_pair_outbuf_cb, bufev)) { 108 bufferevent_free(downcast(bufev)); 109 return NULL; 110 } 111 112 bufferevent_init_generic_timeout_cbs_(&bufev->bev.bev); 113 114 return bufev; 115 } 116 117 int 118 bufferevent_pair_new(struct event_base *base, int options, 119 struct bufferevent *pair[2]) 120 { 121 struct bufferevent_pair *bufev1 = NULL, *bufev2 = NULL; 122 int tmp_options; 123 124 options |= BEV_OPT_DEFER_CALLBACKS; 125 tmp_options = options & ~BEV_OPT_THREADSAFE; 126 127 bufev1 = bufferevent_pair_elt_new(base, options); 128 if (!bufev1) 129 return -1; 130 bufev2 = bufferevent_pair_elt_new(base, tmp_options); 131 if (!bufev2) { 132 bufferevent_free(downcast(bufev1)); 133 return -1; 134 } 135 136 if (options & BEV_OPT_THREADSAFE) { 137 /*XXXX check return */ 138 bufferevent_enable_locking_(downcast(bufev2), bufev1->bev.lock); 139 } 140 141 bufev1->partner = bufev2; 142 bufev2->partner = bufev1; 143 144 evbuffer_freeze(downcast(bufev1)->input, 0); 145 evbuffer_freeze(downcast(bufev1)->output, 1); 146 evbuffer_freeze(downcast(bufev2)->input, 0); 147 evbuffer_freeze(downcast(bufev2)->output, 1); 148 149 pair[0] = downcast(bufev1); 150 pair[1] = downcast(bufev2); 151 152 return 0; 153 } 154 155 static void 156 be_pair_transfer(struct bufferevent *src, struct bufferevent *dst, 157 int ignore_wm) 158 { 159 size_t dst_size; 160 size_t n; 161 162 evbuffer_unfreeze(src->output, 1); 163 evbuffer_unfreeze(dst->input, 0); 164 165 if (dst->wm_read.high) { 166 dst_size = evbuffer_get_length(dst->input); 167 if (dst_size < dst->wm_read.high) { 168 n = dst->wm_read.high - dst_size; 169 evbuffer_remove_buffer(src->output, dst->input, n); 170 } else { 171 if (!ignore_wm) 172 goto done; 173 n = evbuffer_get_length(src->output); 174 evbuffer_add_buffer(dst->input, src->output); 175 } 176 } else { 177 n = evbuffer_get_length(src->output); 178 evbuffer_add_buffer(dst->input, src->output); 179 } 180 181 if (n) { 182 BEV_RESET_GENERIC_READ_TIMEOUT(dst); 183 184 if (evbuffer_get_length(dst->output)) 185 BEV_RESET_GENERIC_WRITE_TIMEOUT(dst); 186 else 187 BEV_DEL_GENERIC_WRITE_TIMEOUT(dst); 188 } 189 190 bufferevent_trigger_nolock_(dst, EV_READ, 0); 191 bufferevent_trigger_nolock_(src, EV_WRITE, 0); 192 done: 193 evbuffer_freeze(src->output, 1); 194 evbuffer_freeze(dst->input, 0); 195 } 196 197 static inline int 198 be_pair_wants_to_talk(struct bufferevent_pair *src, 199 struct bufferevent_pair *dst) 200 { 201 return (downcast(src)->enabled & EV_WRITE) && 202 (downcast(dst)->enabled & EV_READ) && 203 !dst->bev.read_suspended && 204 evbuffer_get_length(downcast(src)->output); 205 } 206 207 static void 208 be_pair_outbuf_cb(struct evbuffer *outbuf, 209 const struct evbuffer_cb_info *info, void *arg) 210 { 211 struct bufferevent_pair *bev_pair = arg; 212 struct bufferevent_pair *partner = bev_pair->partner; 213 214 incref_and_lock(downcast(bev_pair)); 215 216 if (info->n_added > info->n_deleted && partner) { 217 /* We got more data. If the other side's reading, then 218 hand it over. */ 219 if (be_pair_wants_to_talk(bev_pair, partner)) { 220 be_pair_transfer(downcast(bev_pair), downcast(partner), 0); 221 } 222 } 223 224 decref_and_unlock(downcast(bev_pair)); 225 } 226 227 static int 228 be_pair_enable(struct bufferevent *bufev, short events) 229 { 230 struct bufferevent_pair *bev_p = upcast(bufev); 231 struct bufferevent_pair *partner = bev_p->partner; 232 233 incref_and_lock(bufev); 234 235 if (events & EV_READ) { 236 BEV_RESET_GENERIC_READ_TIMEOUT(bufev); 237 } 238 if ((events & EV_WRITE) && evbuffer_get_length(bufev->output)) 239 BEV_RESET_GENERIC_WRITE_TIMEOUT(bufev); 240 241 /* We're starting to read! Does the other side have anything to write?*/ 242 if ((events & EV_READ) && partner && 243 be_pair_wants_to_talk(partner, bev_p)) { 244 be_pair_transfer(downcast(partner), bufev, 0); 245 } 246 /* We're starting to write! Does the other side want to read? */ 247 if ((events & EV_WRITE) && partner && 248 be_pair_wants_to_talk(bev_p, partner)) { 249 be_pair_transfer(bufev, downcast(partner), 0); 250 } 251 decref_and_unlock(bufev); 252 return 0; 253 } 254 255 static int 256 be_pair_disable(struct bufferevent *bev, short events) 257 { 258 if (events & EV_READ) { 259 BEV_DEL_GENERIC_READ_TIMEOUT(bev); 260 } 261 if (events & EV_WRITE) { 262 BEV_DEL_GENERIC_WRITE_TIMEOUT(bev); 263 } 264 return 0; 265 } 266 267 static void 268 be_pair_unlink(struct bufferevent *bev) 269 { 270 struct bufferevent_pair *bev_p = upcast(bev); 271 272 if (bev_p->partner) { 273 bev_p->unlinked_partner = bev_p->partner; 274 bev_p->partner->partner = NULL; 275 bev_p->partner = NULL; 276 } 277 } 278 279 /* Free *shared* lock in the latest be (since we share it between two of them). */ 280 static void 281 be_pair_destruct(struct bufferevent *bev) 282 { 283 struct bufferevent_pair *bev_p = upcast(bev); 284 285 /* Transfer ownership of the lock into partner, otherwise we will use 286 * already free'd lock during freeing second bev, see next example: 287 * 288 * bev1->own_lock = 1 289 * bev2->own_lock = 0 290 * bev2->lock = bev1->lock 291 * 292 * bufferevent_free(bev1) # refcnt == 0 -> unlink 293 * bufferevent_free(bev2) # refcnt == 0 -> unlink 294 * 295 * event_base_free() -> finilizers -> EVTHREAD_FREE_LOCK(bev1->lock) 296 * -> BEV_LOCK(bev2->lock) <-- already freed 297 * 298 * Where bev1 == pair[0], bev2 == pair[1]. 299 */ 300 if (bev_p->unlinked_partner && bev_p->bev.own_lock) { 301 bev_p->unlinked_partner->bev.own_lock = 1; 302 bev_p->bev.own_lock = 0; 303 } 304 bev_p->unlinked_partner = NULL; 305 } 306 307 static int 308 be_pair_flush(struct bufferevent *bev, short iotype, 309 enum bufferevent_flush_mode mode) 310 { 311 struct bufferevent_pair *bev_p = upcast(bev); 312 struct bufferevent *partner; 313 314 if (!bev_p->partner) 315 return -1; 316 317 if (mode == BEV_NORMAL) 318 return 0; 319 320 incref_and_lock(bev); 321 322 partner = downcast(bev_p->partner); 323 324 if ((iotype & EV_READ) != 0) 325 be_pair_transfer(partner, bev, 1); 326 327 if ((iotype & EV_WRITE) != 0) 328 be_pair_transfer(bev, partner, 1); 329 330 if (mode == BEV_FINISHED) { 331 short what = BEV_EVENT_EOF; 332 if (iotype & EV_READ) 333 what |= BEV_EVENT_WRITING; 334 if (iotype & EV_WRITE) 335 what |= BEV_EVENT_READING; 336 bufferevent_run_eventcb_(partner, what, 0); 337 } 338 decref_and_unlock(bev); 339 return 0; 340 } 341 342 struct bufferevent * 343 bufferevent_pair_get_partner(struct bufferevent *bev) 344 { 345 struct bufferevent_pair *bev_p; 346 struct bufferevent *partner = NULL; 347 bev_p = upcast(bev); 348 if (! bev_p) 349 return NULL; 350 351 incref_and_lock(bev); 352 if (bev_p->partner) 353 partner = downcast(bev_p->partner); 354 decref_and_unlock(bev); 355 return partner; 356 } 357 358 const struct bufferevent_ops bufferevent_ops_pair = { 359 "pair_elt", 360 evutil_offsetof(struct bufferevent_pair, bev.bev), 361 be_pair_enable, 362 be_pair_disable, 363 be_pair_unlink, 364 be_pair_destruct, 365 bufferevent_generic_adj_timeouts_, 366 be_pair_flush, 367 NULL, /* ctrl */ 368 }; 369