1 /* $OpenBSD: evbuffer.c,v 1.9 2006/11/26 15:22:58 brad Exp $ */ 2 3 /* 4 * Copyright (c) 2002-2004 Niels Provos <provos@citi.umich.edu> 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 1. Redistributions of source code must retain the above copyright 11 * notice, this list of conditions and the following disclaimer. 12 * 2. Redistributions in binary form must reproduce the above copyright 13 * notice, this list of conditions and the following disclaimer in the 14 * documentation and/or other materials provided with the distribution. 15 * 3. The name of the author may not be used to endorse or promote products 16 * derived from this software without specific prior written permission. 17 * 18 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR 19 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES 20 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 21 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, 22 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT 23 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 24 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 25 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 26 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF 27 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 28 */ 29 30 #include <sys/types.h> 31 32 #ifdef HAVE_CONFIG_H 33 #include "config.h" 34 #endif 35 36 #ifdef HAVE_SYS_TIME_H 37 #include <sys/time.h> 38 #endif 39 40 #include <errno.h> 41 #include <stdio.h> 42 #include <stdlib.h> 43 #include <string.h> 44 #ifdef HAVE_STDARG_H 45 #include <stdarg.h> 46 #endif 47 48 #include "event.h" 49 50 /* prototypes */ 51 52 void bufferevent_setwatermark(struct bufferevent *, short, size_t, size_t); 53 void bufferevent_read_pressure_cb(struct evbuffer *, size_t, size_t, void *); 54 55 static int 56 bufferevent_add(struct event *ev, int timeout) 57 { 58 struct timeval tv, *ptv = NULL; 59 60 if (timeout) { 61 timerclear(&tv); 62 tv.tv_sec = timeout; 63 ptv = &tv; 64 } 65 66 return (event_add(ev, ptv)); 67 } 68 69 /* 70 * This callback is executed when the size of the input buffer changes. 71 * We use it to apply back pressure on the reading side. 72 */ 73 74 void 75 bufferevent_read_pressure_cb(struct evbuffer *buf, size_t old, size_t now, 76 void *arg) { 77 struct bufferevent *bufev = arg; 78 /* 79 * If we are below the watermark then reschedule reading if it's 80 * still enabled. 81 */ 82 if (bufev->wm_read.high == 0 || now < bufev->wm_read.high) { 83 evbuffer_setcb(buf, NULL, NULL); 84 85 if (bufev->enabled & EV_READ) 86 bufferevent_add(&bufev->ev_read, bufev->timeout_read); 87 } 88 } 89 90 static void 91 bufferevent_readcb(int fd, short event, void *arg) 92 { 93 struct bufferevent *bufev = arg; 94 int res = 0; 95 short what = EVBUFFER_READ; 96 size_t len; 97 int howmuch = -1; 98 99 if (event == EV_TIMEOUT) { 100 what |= EVBUFFER_TIMEOUT; 101 goto error; 102 } 103 104 /* 105 * If we have a high watermark configured then we don't want to 106 * read more data than would make us reach the watermark. 107 */ 108 if (bufev->wm_read.high != 0) 109 howmuch = bufev->wm_read.high; 110 111 res = evbuffer_read(bufev->input, fd, howmuch); 112 if (res == -1) { 113 if (errno == EAGAIN || errno == EINTR) 114 goto reschedule; 115 /* error case */ 116 what |= EVBUFFER_ERROR; 117 } else if (res == 0) { 118 /* eof case */ 119 what |= EVBUFFER_EOF; 120 } 121 122 if (res <= 0) 123 goto error; 124 125 bufferevent_add(&bufev->ev_read, bufev->timeout_read); 126 127 /* See if this callbacks meets the water marks */ 128 len = EVBUFFER_LENGTH(bufev->input); 129 if (bufev->wm_read.low != 0 && len < bufev->wm_read.low) 130 return; 131 if (bufev->wm_read.high != 0 && len > bufev->wm_read.high) { 132 struct evbuffer *buf = bufev->input; 133 event_del(&bufev->ev_read); 134 135 /* Now schedule a callback for us */ 136 evbuffer_setcb(buf, bufferevent_read_pressure_cb, bufev); 137 return; 138 } 139 140 /* Invoke the user callback - must always be called last */ 141 if (bufev->readcb != NULL) 142 (*bufev->readcb)(bufev, bufev->cbarg); 143 return; 144 145 reschedule: 146 bufferevent_add(&bufev->ev_read, bufev->timeout_read); 147 return; 148 149 error: 150 (*bufev->errorcb)(bufev, what, bufev->cbarg); 151 } 152 153 static void 154 bufferevent_writecb(int fd, short event, void *arg) 155 { 156 struct bufferevent *bufev = arg; 157 int res = 0; 158 short what = EVBUFFER_WRITE; 159 160 if (event == EV_TIMEOUT) { 161 what |= EVBUFFER_TIMEOUT; 162 goto error; 163 } 164 165 if (EVBUFFER_LENGTH(bufev->output)) { 166 res = evbuffer_write(bufev->output, fd); 167 if (res == -1) { 168 if (errno == EAGAIN || 169 errno == EINTR || 170 errno == EINPROGRESS) 171 goto reschedule; 172 /* error case */ 173 what |= EVBUFFER_ERROR; 174 } else if (res == 0) { 175 /* eof case */ 176 what |= EVBUFFER_EOF; 177 } 178 if (res <= 0) 179 goto error; 180 } 181 182 if (EVBUFFER_LENGTH(bufev->output) != 0) 183 bufferevent_add(&bufev->ev_write, bufev->timeout_write); 184 185 /* 186 * Invoke the user callback if our buffer is drained or below the 187 * low watermark. 188 */ 189 if (bufev->writecb != NULL && 190 EVBUFFER_LENGTH(bufev->output) <= bufev->wm_write.low) 191 (*bufev->writecb)(bufev, bufev->cbarg); 192 193 return; 194 195 reschedule: 196 if (EVBUFFER_LENGTH(bufev->output) != 0) 197 bufferevent_add(&bufev->ev_write, bufev->timeout_write); 198 return; 199 200 error: 201 (*bufev->errorcb)(bufev, what, bufev->cbarg); 202 } 203 204 /* 205 * Create a new buffered event object. 206 * 207 * The read callback is invoked whenever we read new data. 208 * The write callback is invoked whenever the output buffer is drained. 209 * The error callback is invoked on a write/read error or on EOF. 210 * 211 * Both read and write callbacks maybe NULL. The error callback is not 212 * allowed to be NULL and have to be provided always. 213 */ 214 215 struct bufferevent * 216 bufferevent_new(int fd, evbuffercb readcb, evbuffercb writecb, 217 everrorcb errorcb, void *cbarg) 218 { 219 struct bufferevent *bufev; 220 221 if ((bufev = calloc(1, sizeof(struct bufferevent))) == NULL) 222 return (NULL); 223 224 if ((bufev->input = evbuffer_new()) == NULL) { 225 free(bufev); 226 return (NULL); 227 } 228 229 if ((bufev->output = evbuffer_new()) == NULL) { 230 evbuffer_free(bufev->input); 231 free(bufev); 232 return (NULL); 233 } 234 235 event_set(&bufev->ev_read, fd, EV_READ, bufferevent_readcb, bufev); 236 event_set(&bufev->ev_write, fd, EV_WRITE, bufferevent_writecb, bufev); 237 238 bufev->readcb = readcb; 239 bufev->writecb = writecb; 240 bufev->errorcb = errorcb; 241 242 bufev->cbarg = cbarg; 243 244 /* 245 * Set to EV_WRITE so that using bufferevent_write is going to 246 * trigger a callback. Reading needs to be explicitly enabled 247 * because otherwise no data will be available. 248 */ 249 bufev->enabled = EV_WRITE; 250 251 return (bufev); 252 } 253 254 int 255 bufferevent_priority_set(struct bufferevent *bufev, int priority) 256 { 257 if (event_priority_set(&bufev->ev_read, priority) == -1) 258 return (-1); 259 if (event_priority_set(&bufev->ev_write, priority) == -1) 260 return (-1); 261 262 return (0); 263 } 264 265 /* Closing the file descriptor is the responsibility of the caller */ 266 267 void 268 bufferevent_free(struct bufferevent *bufev) 269 { 270 event_del(&bufev->ev_read); 271 event_del(&bufev->ev_write); 272 273 evbuffer_free(bufev->input); 274 evbuffer_free(bufev->output); 275 276 free(bufev); 277 } 278 279 /* 280 * Returns 0 on success; 281 * -1 on failure. 282 */ 283 284 int 285 bufferevent_write(struct bufferevent *bufev, void *data, size_t size) 286 { 287 int res; 288 289 res = evbuffer_add(bufev->output, data, size); 290 291 if (res == -1) 292 return (res); 293 294 /* If everything is okay, we need to schedule a write */ 295 if (size > 0 && (bufev->enabled & EV_WRITE)) 296 bufferevent_add(&bufev->ev_write, bufev->timeout_write); 297 298 return (res); 299 } 300 301 int 302 bufferevent_write_buffer(struct bufferevent *bufev, struct evbuffer *buf) 303 { 304 int res; 305 306 res = bufferevent_write(bufev, buf->buffer, buf->off); 307 if (res != -1) 308 evbuffer_drain(buf, buf->off); 309 310 return (res); 311 } 312 313 size_t 314 bufferevent_read(struct bufferevent *bufev, void *data, size_t size) 315 { 316 struct evbuffer *buf = bufev->input; 317 318 if (buf->off < size) 319 size = buf->off; 320 321 /* Copy the available data to the user buffer */ 322 memcpy(data, buf->buffer, size); 323 324 if (size) 325 evbuffer_drain(buf, size); 326 327 return (size); 328 } 329 330 int 331 bufferevent_enable(struct bufferevent *bufev, short event) 332 { 333 if (event & EV_READ) { 334 if (bufferevent_add(&bufev->ev_read, bufev->timeout_read) == -1) 335 return (-1); 336 } 337 if (event & EV_WRITE) { 338 if (bufferevent_add(&bufev->ev_write, bufev->timeout_write) == -1) 339 return (-1); 340 } 341 342 bufev->enabled |= event; 343 return (0); 344 } 345 346 int 347 bufferevent_disable(struct bufferevent *bufev, short event) 348 { 349 if (event & EV_READ) { 350 if (event_del(&bufev->ev_read) == -1) 351 return (-1); 352 } 353 if (event & EV_WRITE) { 354 if (event_del(&bufev->ev_write) == -1) 355 return (-1); 356 } 357 358 bufev->enabled &= ~event; 359 return (0); 360 } 361 362 /* 363 * Sets the read and write timeout for a buffered event. 364 */ 365 366 void 367 bufferevent_settimeout(struct bufferevent *bufev, 368 int timeout_read, int timeout_write) { 369 bufev->timeout_read = timeout_read; 370 bufev->timeout_write = timeout_write; 371 } 372 373 /* 374 * Sets the water marks 375 */ 376 377 void 378 bufferevent_setwatermark(struct bufferevent *bufev, short events, 379 size_t lowmark, size_t highmark) 380 { 381 if (events & EV_READ) { 382 bufev->wm_read.low = lowmark; 383 bufev->wm_read.high = highmark; 384 } 385 386 if (events & EV_WRITE) { 387 bufev->wm_write.low = lowmark; 388 bufev->wm_write.high = highmark; 389 } 390 391 /* If the watermarks changed then see if we should call read again */ 392 bufferevent_read_pressure_cb(bufev->input, 393 0, EVBUFFER_LENGTH(bufev->input), bufev); 394 } 395 396 int 397 bufferevent_base_set(struct event_base *base, struct bufferevent *bufev) 398 { 399 int res; 400 401 res = event_base_set(base, &bufev->ev_read); 402 if (res == -1) 403 return (res); 404 405 res = event_base_set(base, &bufev->ev_write); 406 return (res); 407 } 408