1 /* $NetBSD: bufferevent_pair.c,v 1.5 2016/01/08 21:35:40 christos Exp $ */ 2 3 /* 4 * Copyright (c) 2009-2012 Niels Provos, Nick Mathewson 5 * 6 * Redistribution and use in source and binary forms, with or without 7 * modification, are permitted provided that the following conditions 8 * are met: 9 * 1. Redistributions of source code must retain the above copyright 10 * notice, this list of conditions and the following disclaimer. 11 * 2. Redistributions in binary form must reproduce the above copyright 12 * notice, this list of conditions and the following disclaimer in the 13 * documentation and/or other materials provided with the distribution. 14 * 3. The name of the author may not be used to endorse or promote products 15 * derived from this software without specific prior written permission. 16 * 17 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR 18 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 19 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 20 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, 21 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 22 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 23 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 24 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 25 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 26 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 27 */ 28 #include "event2/event-config.h" 29 #include "evconfig-private.h" 30 31 #include <sys/types.h> 32 33 #ifdef _WIN32 34 #include <winsock2.h> 35 #endif 36 37 #include "event2/util.h" 38 #include "event2/buffer.h" 39 #include "event2/bufferevent.h" 40 #include "event2/bufferevent_struct.h" 41 #include "event2/event.h" 42 #include "defer-internal.h" 43 #include "bufferevent-internal.h" 44 #include "mm-internal.h" 45 #include "util-internal.h" 46 47 struct bufferevent_pair { 48 struct bufferevent_private bev; 49 struct bufferevent_pair *partner; 50 /* For ->destruct() lock checking */ 51 struct bufferevent_pair *unlinked_partner; 52 }; 53 54 55 /* Given a bufferevent that's really a bev part of a bufferevent_pair, 56 * return that bufferevent_filtered. Returns NULL otherwise.*/ 57 static inline struct bufferevent_pair * 58 upcast(struct bufferevent *bev) 59 { 60 struct bufferevent_pair *bev_p; 61 if (bev->be_ops != &bufferevent_ops_pair) 62 return NULL; 63 bev_p = EVUTIL_UPCAST(bev, struct bufferevent_pair, bev.bev); 64 EVUTIL_ASSERT(bev_p->bev.bev.be_ops == &bufferevent_ops_pair); 65 return bev_p; 66 } 67 68 #define downcast(bev_pair) (&(bev_pair)->bev.bev) 69 70 static inline void 71 incref_and_lock(struct bufferevent *b) 72 { 73 struct bufferevent_pair *bevp; 74 bufferevent_incref_and_lock_(b); 75 bevp = upcast(b); 76 if (bevp->partner) 77 bufferevent_incref_and_lock_(downcast(bevp->partner)); 78 } 79 80 static inline void 81 decref_and_unlock(struct bufferevent *b) 82 { 83 struct bufferevent_pair *bevp = upcast(b); 84 if (bevp->partner) 85 bufferevent_decref_and_unlock_(downcast(bevp->partner)); 86 bufferevent_decref_and_unlock_(b); 87 } 88 89 /* XXX Handle close */ 90 91 static void be_pair_outbuf_cb(struct evbuffer *, 92 const struct evbuffer_cb_info *, void *); 93 94 static struct bufferevent_pair * 95 bufferevent_pair_elt_new(struct event_base *base, 96 int options) 97 { 98 struct bufferevent_pair *bufev; 99 if (! (bufev = mm_calloc(1, sizeof(struct bufferevent_pair)))) 100 return NULL; 101 if (bufferevent_init_common_(&bufev->bev, base, &bufferevent_ops_pair, 102 options)) { 103 mm_free(bufev); 104 return NULL; 105 } 106 if (!evbuffer_add_cb(bufev->bev.bev.output, be_pair_outbuf_cb, bufev)) { 107 bufferevent_free(downcast(bufev)); 108 return NULL; 109 } 110 111 bufferevent_init_generic_timeout_cbs_(&bufev->bev.bev); 112 113 return bufev; 114 } 115 116 int 117 bufferevent_pair_new(struct event_base *base, int options, 118 struct bufferevent *pair[2]) 119 { 120 struct bufferevent_pair *bufev1 = NULL, *bufev2 = NULL; 121 int tmp_options; 122 123 options |= BEV_OPT_DEFER_CALLBACKS; 124 tmp_options = options & ~BEV_OPT_THREADSAFE; 125 126 bufev1 = bufferevent_pair_elt_new(base, options); 127 if (!bufev1) 128 return -1; 129 bufev2 = bufferevent_pair_elt_new(base, tmp_options); 130 if (!bufev2) { 131 bufferevent_free(downcast(bufev1)); 132 return -1; 133 } 134 135 if (options & BEV_OPT_THREADSAFE) { 136 /*XXXX check return */ 137 bufferevent_enable_locking_(downcast(bufev2), bufev1->bev.lock); 138 } 139 140 bufev1->partner = bufev2; 141 bufev2->partner = bufev1; 142 143 evbuffer_freeze(downcast(bufev1)->input, 0); 144 evbuffer_freeze(downcast(bufev1)->output, 1); 145 evbuffer_freeze(downcast(bufev2)->input, 0); 146 evbuffer_freeze(downcast(bufev2)->output, 1); 147 148 pair[0] = downcast(bufev1); 149 pair[1] = downcast(bufev2); 150 151 return 0; 152 } 153 154 static void 155 be_pair_transfer(struct bufferevent *src, struct bufferevent *dst, 156 int ignore_wm) 157 { 158 size_t dst_size; 159 size_t n; 160 161 evbuffer_unfreeze(src->output, 1); 162 evbuffer_unfreeze(dst->input, 0); 163 164 if (dst->wm_read.high) { 165 dst_size = evbuffer_get_length(dst->input); 166 if (dst_size < dst->wm_read.high) { 167 n = dst->wm_read.high - dst_size; 168 evbuffer_remove_buffer(src->output, dst->input, n); 169 } else { 170 if (!ignore_wm) 171 goto done; 172 n = evbuffer_get_length(src->output); 173 evbuffer_add_buffer(dst->input, src->output); 174 } 175 } else { 176 n = evbuffer_get_length(src->output); 177 evbuffer_add_buffer(dst->input, src->output); 178 } 179 180 if (n) { 181 BEV_RESET_GENERIC_READ_TIMEOUT(dst); 182 183 if (evbuffer_get_length(dst->output)) 184 BEV_RESET_GENERIC_WRITE_TIMEOUT(dst); 185 else 186 BEV_DEL_GENERIC_WRITE_TIMEOUT(dst); 187 } 188 189 bufferevent_trigger_nolock_(dst, EV_READ, 0); 190 bufferevent_trigger_nolock_(src, EV_WRITE, 0); 191 done: 192 evbuffer_freeze(src->output, 1); 193 evbuffer_freeze(dst->input, 0); 194 } 195 196 static inline int 197 be_pair_wants_to_talk(struct bufferevent_pair *src, 198 struct bufferevent_pair *dst) 199 { 200 return (downcast(src)->enabled & EV_WRITE) && 201 (downcast(dst)->enabled & EV_READ) && 202 !dst->bev.read_suspended && 203 evbuffer_get_length(downcast(src)->output); 204 } 205 206 static void 207 be_pair_outbuf_cb(struct evbuffer *outbuf, 208 const struct evbuffer_cb_info *info, void *arg) 209 { 210 struct bufferevent_pair *bev_pair = arg; 211 struct bufferevent_pair *partner = bev_pair->partner; 212 213 incref_and_lock(downcast(bev_pair)); 214 215 if (info->n_added > info->n_deleted && partner) { 216 /* We got more data. If the other side's reading, then 217 hand it over. */ 218 if (be_pair_wants_to_talk(bev_pair, partner)) { 219 be_pair_transfer(downcast(bev_pair), downcast(partner), 0); 220 } 221 } 222 223 decref_and_unlock(downcast(bev_pair)); 224 } 225 226 static int 227 be_pair_enable(struct bufferevent *bufev, short events) 228 { 229 struct bufferevent_pair *bev_p = upcast(bufev); 230 struct bufferevent_pair *partner = bev_p->partner; 231 232 incref_and_lock(bufev); 233 234 if (events & EV_READ) { 235 BEV_RESET_GENERIC_READ_TIMEOUT(bufev); 236 } 237 if ((events & EV_WRITE) && evbuffer_get_length(bufev->output)) 238 BEV_RESET_GENERIC_WRITE_TIMEOUT(bufev); 239 240 /* We're starting to read! Does the other side have anything to write?*/ 241 if ((events & EV_READ) && partner && 242 be_pair_wants_to_talk(partner, bev_p)) { 243 be_pair_transfer(downcast(partner), bufev, 0); 244 } 245 /* We're starting to write! Does the other side want to read? */ 246 if ((events & EV_WRITE) && partner && 247 be_pair_wants_to_talk(bev_p, partner)) { 248 be_pair_transfer(bufev, downcast(partner), 0); 249 } 250 decref_and_unlock(bufev); 251 return 0; 252 } 253 254 static int 255 be_pair_disable(struct bufferevent *bev, short events) 256 { 257 if (events & EV_READ) { 258 BEV_DEL_GENERIC_READ_TIMEOUT(bev); 259 } 260 if (events & EV_WRITE) { 261 BEV_DEL_GENERIC_WRITE_TIMEOUT(bev); 262 } 263 return 0; 264 } 265 266 static void 267 be_pair_unlink(struct bufferevent *bev) 268 { 269 struct bufferevent_pair *bev_p = upcast(bev); 270 271 if (bev_p->partner) { 272 bev_p->unlinked_partner = bev_p->partner; 273 bev_p->partner->partner = NULL; 274 bev_p->partner = NULL; 275 } 276 } 277 278 /* Free *shared* lock in the latest be (since we share it between two of them). */ 279 static void 280 be_pair_destruct(struct bufferevent *bev) 281 { 282 struct bufferevent_pair *bev_p = upcast(bev); 283 284 /* Transfer ownership of the lock into partner, otherwise we will use 285 * already free'd lock during freeing second bev, see next example: 286 * 287 * bev1->own_lock = 1 288 * bev2->own_lock = 0 289 * bev2->lock = bev1->lock 290 * 291 * bufferevent_free(bev1) # refcnt == 0 -> unlink 292 * bufferevent_free(bev2) # refcnt == 0 -> unlink 293 * 294 * event_base_free() -> finilizers -> EVTHREAD_FREE_LOCK(bev1->lock) 295 * -> BEV_LOCK(bev2->lock) <-- already freed 296 * 297 * Where bev1 == pair[0], bev2 == pair[1]. 298 */ 299 if (bev_p->unlinked_partner && bev_p->bev.own_lock) { 300 bev_p->unlinked_partner->bev.own_lock = 1; 301 bev_p->bev.own_lock = 0; 302 } 303 bev_p->unlinked_partner = NULL; 304 } 305 306 static int 307 be_pair_flush(struct bufferevent *bev, short iotype, 308 enum bufferevent_flush_mode mode) 309 { 310 struct bufferevent_pair *bev_p = upcast(bev); 311 struct bufferevent *partner; 312 incref_and_lock(bev); 313 if (!bev_p->partner) 314 return -1; 315 316 partner = downcast(bev_p->partner); 317 318 if (mode == BEV_NORMAL) 319 return 0; 320 321 if ((iotype & EV_READ) != 0) 322 be_pair_transfer(partner, bev, 1); 323 324 if ((iotype & EV_WRITE) != 0) 325 be_pair_transfer(bev, partner, 1); 326 327 if (mode == BEV_FINISHED) { 328 bufferevent_run_eventcb_(partner, iotype|BEV_EVENT_EOF, 0); 329 } 330 decref_and_unlock(bev); 331 return 0; 332 } 333 334 struct bufferevent * 335 bufferevent_pair_get_partner(struct bufferevent *bev) 336 { 337 struct bufferevent_pair *bev_p; 338 struct bufferevent *partner = NULL; 339 bev_p = upcast(bev); 340 if (! bev_p) 341 return NULL; 342 343 incref_and_lock(bev); 344 if (bev_p->partner) 345 partner = downcast(bev_p->partner); 346 decref_and_unlock(bev); 347 return partner; 348 } 349 350 const struct bufferevent_ops bufferevent_ops_pair = { 351 "pair_elt", 352 evutil_offsetof(struct bufferevent_pair, bev.bev), 353 be_pair_enable, 354 be_pair_disable, 355 be_pair_unlink, 356 be_pair_destruct, 357 bufferevent_generic_adj_timeouts_, 358 be_pair_flush, 359 NULL, /* ctrl */ 360 }; 361