1 /* $OpenBSD: evbuffer.c,v 1.11 2008/05/02 18:26:42 brad Exp $ */ 2 3 /* 4 * Copyright (c) 2002-2004 Niels Provos <provos@citi.umich.edu> 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 1. Redistributions of source code must retain the above copyright 11 * notice, this list of conditions and the following disclaimer. 12 * 2. Redistributions in binary form must reproduce the above copyright 13 * notice, this list of conditions and the following disclaimer in the 14 * documentation and/or other materials provided with the distribution. 15 * 3. The name of the author may not be used to endorse or promote products 16 * derived from this software without specific prior written permission. 17 * 18 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR 19 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 20 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 21 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, 22 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 23 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 24 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 25 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 26 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 27 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 28 */ 29 30 #include <sys/types.h> 31 32 #ifdef HAVE_CONFIG_H 33 #include "config.h" 34 #endif 35 36 #ifdef HAVE_SYS_TIME_H 37 #include <sys/time.h> 38 #endif 39 40 #include <errno.h> 41 #include <stdio.h> 42 #include <stdlib.h> 43 #include <string.h> 44 #ifdef HAVE_STDARG_H 45 #include <stdarg.h> 46 #endif 47 48 #include "event.h" 49 50 /* prototypes */ 51 52 void bufferevent_setwatermark(struct bufferevent *, short, size_t, size_t); 53 void bufferevent_read_pressure_cb(struct evbuffer *, size_t, size_t, void *); 54 55 static int 56 bufferevent_add(struct event *ev, int timeout) 57 { 58 struct timeval tv, *ptv = NULL; 59 60 if (timeout) { 61 timerclear(&tv); 62 tv.tv_sec = timeout; 63 ptv = &tv; 64 } 65 66 return (event_add(ev, ptv)); 67 } 68 69 /* 70 * This callback is executed when the size of the input buffer changes. 71 * We use it to apply back pressure on the reading side. 72 */ 73 74 void 75 bufferevent_read_pressure_cb(struct evbuffer *buf, size_t old, size_t now, 76 void *arg) { 77 struct bufferevent *bufev = arg; 78 /* 79 * If we are below the watermark then reschedule reading if it's 80 * still enabled. 81 */ 82 if (bufev->wm_read.high == 0 || now < bufev->wm_read.high) { 83 evbuffer_setcb(buf, NULL, NULL); 84 85 if (bufev->enabled & EV_READ) 86 bufferevent_add(&bufev->ev_read, bufev->timeout_read); 87 } 88 } 89 90 static void 91 bufferevent_readcb(int fd, short event, void *arg) 92 { 93 struct bufferevent *bufev = arg; 94 int res = 0; 95 short what = EVBUFFER_READ; 96 size_t len; 97 int howmuch = -1; 98 99 if (event == EV_TIMEOUT) { 100 what |= EVBUFFER_TIMEOUT; 101 goto error; 102 } 103 104 /* 105 * If we have a high watermark configured then we don't want to 106 * read more data than would make us reach the watermark. 107 */ 108 if (bufev->wm_read.high != 0) 109 howmuch = bufev->wm_read.high; 110 111 res = evbuffer_read(bufev->input, fd, howmuch); 112 if (res == -1) { 113 if (errno == EAGAIN || errno == EINTR) 114 goto reschedule; 115 /* error case */ 116 what |= EVBUFFER_ERROR; 117 } else if (res == 0) { 118 /* eof case */ 119 what |= EVBUFFER_EOF; 120 } 121 122 if (res <= 0) 123 goto error; 124 125 bufferevent_add(&bufev->ev_read, bufev->timeout_read); 126 127 /* See if this callbacks meets the water marks */ 128 len = EVBUFFER_LENGTH(bufev->input); 129 if (bufev->wm_read.low != 0 && len < bufev->wm_read.low) 130 return; 131 if (bufev->wm_read.high != 0 && len > bufev->wm_read.high) { 132 struct evbuffer *buf = bufev->input; 133 event_del(&bufev->ev_read); 134 135 /* Now schedule a callback for us */ 136 evbuffer_setcb(buf, bufferevent_read_pressure_cb, bufev); 137 return; 138 } 139 140 /* Invoke the user callback - must always be called last */ 141 if (bufev->readcb != NULL) 142 (*bufev->readcb)(bufev, bufev->cbarg); 143 return; 144 145 reschedule: 146 bufferevent_add(&bufev->ev_read, bufev->timeout_read); 147 return; 148 149 error: 150 (*bufev->errorcb)(bufev, what, bufev->cbarg); 151 } 152 153 static void 154 bufferevent_writecb(int fd, short event, void *arg) 155 { 156 struct bufferevent *bufev = arg; 157 int res = 0; 158 short what = EVBUFFER_WRITE; 159 160 if (event == EV_TIMEOUT) { 161 what |= EVBUFFER_TIMEOUT; 162 goto error; 163 } 164 165 if (EVBUFFER_LENGTH(bufev->output)) { 166 res = evbuffer_write(bufev->output, fd); 167 if (res == -1) { 168 #ifndef WIN32 169 /*todo. evbuffer uses WriteFile when WIN32 is set. WIN32 system calls do not 170 *set errno. thus this error checking is not portable*/ 171 if (errno == EAGAIN || 172 errno == EINTR || 173 errno == EINPROGRESS) 174 goto reschedule; 175 /* error case */ 176 what |= EVBUFFER_ERROR; 177 178 #else 179 goto reschedule; 180 #endif 181 182 } else if (res == 0) { 183 /* eof case */ 184 what |= EVBUFFER_EOF; 185 } 186 if (res <= 0) 187 goto error; 188 } 189 190 if (EVBUFFER_LENGTH(bufev->output) != 0) 191 bufferevent_add(&bufev->ev_write, bufev->timeout_write); 192 193 /* 194 * Invoke the user callback if our buffer is drained or below the 195 * low watermark. 196 */ 197 if (bufev->writecb != NULL && 198 EVBUFFER_LENGTH(bufev->output) <= bufev->wm_write.low) 199 (*bufev->writecb)(bufev, bufev->cbarg); 200 201 return; 202 203 reschedule: 204 if (EVBUFFER_LENGTH(bufev->output) != 0) 205 bufferevent_add(&bufev->ev_write, bufev->timeout_write); 206 return; 207 208 error: 209 (*bufev->errorcb)(bufev, what, bufev->cbarg); 210 } 211 212 /* 213 * Create a new buffered event object. 214 * 215 * The read callback is invoked whenever we read new data. 216 * The write callback is invoked whenever the output buffer is drained. 217 * The error callback is invoked on a write/read error or on EOF. 218 * 219 * Both read and write callbacks maybe NULL. The error callback is not 220 * allowed to be NULL and have to be provided always. 221 */ 222 223 struct bufferevent * 224 bufferevent_new(int fd, evbuffercb readcb, evbuffercb writecb, 225 everrorcb errorcb, void *cbarg) 226 { 227 struct bufferevent *bufev; 228 229 if ((bufev = calloc(1, sizeof(struct bufferevent))) == NULL) 230 return (NULL); 231 232 if ((bufev->input = evbuffer_new()) == NULL) { 233 free(bufev); 234 return (NULL); 235 } 236 237 if ((bufev->output = evbuffer_new()) == NULL) { 238 evbuffer_free(bufev->input); 239 free(bufev); 240 return (NULL); 241 } 242 243 event_set(&bufev->ev_read, fd, EV_READ, bufferevent_readcb, bufev); 244 event_set(&bufev->ev_write, fd, EV_WRITE, bufferevent_writecb, bufev); 245 246 bufev->readcb = readcb; 247 bufev->writecb = writecb; 248 bufev->errorcb = errorcb; 249 250 bufev->cbarg = cbarg; 251 252 /* 253 * Set to EV_WRITE so that using bufferevent_write is going to 254 * trigger a callback. Reading needs to be explicitly enabled 255 * because otherwise no data will be available. 256 */ 257 bufev->enabled = EV_WRITE; 258 259 return (bufev); 260 } 261 262 int 263 bufferevent_priority_set(struct bufferevent *bufev, int priority) 264 { 265 if (event_priority_set(&bufev->ev_read, priority) == -1) 266 return (-1); 267 if (event_priority_set(&bufev->ev_write, priority) == -1) 268 return (-1); 269 270 return (0); 271 } 272 273 /* Closing the file descriptor is the responsibility of the caller */ 274 275 void 276 bufferevent_free(struct bufferevent *bufev) 277 { 278 event_del(&bufev->ev_read); 279 event_del(&bufev->ev_write); 280 281 evbuffer_free(bufev->input); 282 evbuffer_free(bufev->output); 283 284 free(bufev); 285 } 286 287 /* 288 * Returns 0 on success; 289 * -1 on failure. 290 */ 291 292 int 293 bufferevent_write(struct bufferevent *bufev, const void *data, size_t size) 294 { 295 int res; 296 297 res = evbuffer_add(bufev->output, data, size); 298 299 if (res == -1) 300 return (res); 301 302 /* If everything is okay, we need to schedule a write */ 303 if (size > 0 && (bufev->enabled & EV_WRITE)) 304 bufferevent_add(&bufev->ev_write, bufev->timeout_write); 305 306 return (res); 307 } 308 309 int 310 bufferevent_write_buffer(struct bufferevent *bufev, struct evbuffer *buf) 311 { 312 int res; 313 314 res = bufferevent_write(bufev, buf->buffer, buf->off); 315 if (res != -1) 316 evbuffer_drain(buf, buf->off); 317 318 return (res); 319 } 320 321 size_t 322 bufferevent_read(struct bufferevent *bufev, void *data, size_t size) 323 { 324 struct evbuffer *buf = bufev->input; 325 326 if (buf->off < size) 327 size = buf->off; 328 329 /* Copy the available data to the user buffer */ 330 memcpy(data, buf->buffer, size); 331 332 if (size) 333 evbuffer_drain(buf, size); 334 335 return (size); 336 } 337 338 int 339 bufferevent_enable(struct bufferevent *bufev, short event) 340 { 341 if (event & EV_READ) { 342 if (bufferevent_add(&bufev->ev_read, bufev->timeout_read) == -1) 343 return (-1); 344 } 345 if (event & EV_WRITE) { 346 if (bufferevent_add(&bufev->ev_write, bufev->timeout_write) == -1) 347 return (-1); 348 } 349 350 bufev->enabled |= event; 351 return (0); 352 } 353 354 int 355 bufferevent_disable(struct bufferevent *bufev, short event) 356 { 357 if (event & EV_READ) { 358 if (event_del(&bufev->ev_read) == -1) 359 return (-1); 360 } 361 if (event & EV_WRITE) { 362 if (event_del(&bufev->ev_write) == -1) 363 return (-1); 364 } 365 366 bufev->enabled &= ~event; 367 return (0); 368 } 369 370 /* 371 * Sets the read and write timeout for a buffered event. 372 */ 373 374 void 375 bufferevent_settimeout(struct bufferevent *bufev, 376 int timeout_read, int timeout_write) { 377 bufev->timeout_read = timeout_read; 378 bufev->timeout_write = timeout_write; 379 } 380 381 /* 382 * Sets the water marks 383 */ 384 385 void 386 bufferevent_setwatermark(struct bufferevent *bufev, short events, 387 size_t lowmark, size_t highmark) 388 { 389 if (events & EV_READ) { 390 bufev->wm_read.low = lowmark; 391 bufev->wm_read.high = highmark; 392 } 393 394 if (events & EV_WRITE) { 395 bufev->wm_write.low = lowmark; 396 bufev->wm_write.high = highmark; 397 } 398 399 /* If the watermarks changed then see if we should call read again */ 400 bufferevent_read_pressure_cb(bufev->input, 401 0, EVBUFFER_LENGTH(bufev->input), bufev); 402 } 403 404 int 405 bufferevent_base_set(struct event_base *base, struct bufferevent *bufev) 406 { 407 int res; 408 409 res = event_base_set(base, &bufev->ev_read); 410 if (res == -1) 411 return (res); 412 413 res = event_base_set(base, &bufev->ev_write); 414 return (res); 415 } 416