xref: /openbsd-src/lib/libevent/evbuffer.c (revision a28daedfc357b214be5c701aa8ba8adb29a7f1c2)
1 /*	$OpenBSD: evbuffer.c,v 1.11 2008/05/02 18:26:42 brad Exp $	*/
2 
3 /*
4  * Copyright (c) 2002-2004 Niels Provos <provos@citi.umich.edu>
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  * 1. Redistributions of source code must retain the above copyright
11  *    notice, this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright
13  *    notice, this list of conditions and the following disclaimer in the
14  *    documentation and/or other materials provided with the distribution.
15  * 3. The name of the author may not be used to endorse or promote products
16  *    derived from this software without specific prior written permission.
17  *
18  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
19  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
20  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
21  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
22  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
23  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
24  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
25  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
27  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28  */
29 
30 #include <sys/types.h>
31 
32 #ifdef HAVE_CONFIG_H
33 #include "config.h"
34 #endif
35 
36 #ifdef HAVE_SYS_TIME_H
37 #include <sys/time.h>
38 #endif
39 
40 #include <errno.h>
41 #include <stdio.h>
42 #include <stdlib.h>
43 #include <string.h>
44 #ifdef HAVE_STDARG_H
45 #include <stdarg.h>
46 #endif
47 
48 #include "event.h"
49 
50 /* prototypes */
51 
52 void bufferevent_setwatermark(struct bufferevent *, short, size_t, size_t);
53 void bufferevent_read_pressure_cb(struct evbuffer *, size_t, size_t, void *);
54 
55 static int
56 bufferevent_add(struct event *ev, int timeout)
57 {
58 	struct timeval tv, *ptv = NULL;
59 
60 	if (timeout) {
61 		timerclear(&tv);
62 		tv.tv_sec = timeout;
63 		ptv = &tv;
64 	}
65 
66 	return (event_add(ev, ptv));
67 }
68 
69 /*
70  * This callback is executed when the size of the input buffer changes.
71  * We use it to apply back pressure on the reading side.
72  */
73 
74 void
75 bufferevent_read_pressure_cb(struct evbuffer *buf, size_t old, size_t now,
76     void *arg) {
77 	struct bufferevent *bufev = arg;
78 	/*
79 	 * If we are below the watermark then reschedule reading if it's
80 	 * still enabled.
81 	 */
82 	if (bufev->wm_read.high == 0 || now < bufev->wm_read.high) {
83 		evbuffer_setcb(buf, NULL, NULL);
84 
85 		if (bufev->enabled & EV_READ)
86 			bufferevent_add(&bufev->ev_read, bufev->timeout_read);
87 	}
88 }
89 
90 static void
91 bufferevent_readcb(int fd, short event, void *arg)
92 {
93 	struct bufferevent *bufev = arg;
94 	int res = 0;
95 	short what = EVBUFFER_READ;
96 	size_t len;
97 	int howmuch = -1;
98 
99 	if (event == EV_TIMEOUT) {
100 		what |= EVBUFFER_TIMEOUT;
101 		goto error;
102 	}
103 
104 	/*
105 	 * If we have a high watermark configured then we don't want to
106 	 * read more data than would make us reach the watermark.
107 	 */
108 	if (bufev->wm_read.high != 0)
109 		howmuch = bufev->wm_read.high;
110 
111 	res = evbuffer_read(bufev->input, fd, howmuch);
112 	if (res == -1) {
113 		if (errno == EAGAIN || errno == EINTR)
114 			goto reschedule;
115 		/* error case */
116 		what |= EVBUFFER_ERROR;
117 	} else if (res == 0) {
118 		/* eof case */
119 		what |= EVBUFFER_EOF;
120 	}
121 
122 	if (res <= 0)
123 		goto error;
124 
125 	bufferevent_add(&bufev->ev_read, bufev->timeout_read);
126 
127 	/* See if this callbacks meets the water marks */
128 	len = EVBUFFER_LENGTH(bufev->input);
129 	if (bufev->wm_read.low != 0 && len < bufev->wm_read.low)
130 		return;
131 	if (bufev->wm_read.high != 0 && len > bufev->wm_read.high) {
132 		struct evbuffer *buf = bufev->input;
133 		event_del(&bufev->ev_read);
134 
135 		/* Now schedule a callback for us */
136 		evbuffer_setcb(buf, bufferevent_read_pressure_cb, bufev);
137 		return;
138 	}
139 
140 	/* Invoke the user callback - must always be called last */
141 	if (bufev->readcb != NULL)
142 		(*bufev->readcb)(bufev, bufev->cbarg);
143 	return;
144 
145  reschedule:
146 	bufferevent_add(&bufev->ev_read, bufev->timeout_read);
147 	return;
148 
149  error:
150 	(*bufev->errorcb)(bufev, what, bufev->cbarg);
151 }
152 
153 static void
154 bufferevent_writecb(int fd, short event, void *arg)
155 {
156 	struct bufferevent *bufev = arg;
157 	int res = 0;
158 	short what = EVBUFFER_WRITE;
159 
160 	if (event == EV_TIMEOUT) {
161 		what |= EVBUFFER_TIMEOUT;
162 		goto error;
163 	}
164 
165 	if (EVBUFFER_LENGTH(bufev->output)) {
166 	    res = evbuffer_write(bufev->output, fd);
167 	    if (res == -1) {
168 #ifndef WIN32
169 /*todo. evbuffer uses WriteFile when WIN32 is set. WIN32 system calls do not
170  *set errno. thus this error checking is not portable*/
171 		    if (errno == EAGAIN ||
172 			errno == EINTR ||
173 			errno == EINPROGRESS)
174 			    goto reschedule;
175 		    /* error case */
176 		    what |= EVBUFFER_ERROR;
177 
178 #else
179 				goto reschedule;
180 #endif
181 
182 	    } else if (res == 0) {
183 		    /* eof case */
184 		    what |= EVBUFFER_EOF;
185 	    }
186 	    if (res <= 0)
187 		    goto error;
188 	}
189 
190 	if (EVBUFFER_LENGTH(bufev->output) != 0)
191 		bufferevent_add(&bufev->ev_write, bufev->timeout_write);
192 
193 	/*
194 	 * Invoke the user callback if our buffer is drained or below the
195 	 * low watermark.
196 	 */
197 	if (bufev->writecb != NULL &&
198 	    EVBUFFER_LENGTH(bufev->output) <= bufev->wm_write.low)
199 		(*bufev->writecb)(bufev, bufev->cbarg);
200 
201 	return;
202 
203  reschedule:
204 	if (EVBUFFER_LENGTH(bufev->output) != 0)
205 		bufferevent_add(&bufev->ev_write, bufev->timeout_write);
206 	return;
207 
208  error:
209 	(*bufev->errorcb)(bufev, what, bufev->cbarg);
210 }
211 
212 /*
213  * Create a new buffered event object.
214  *
215  * The read callback is invoked whenever we read new data.
216  * The write callback is invoked whenever the output buffer is drained.
217  * The error callback is invoked on a write/read error or on EOF.
218  *
219  * Both read and write callbacks maybe NULL.  The error callback is not
220  * allowed to be NULL and have to be provided always.
221  */
222 
223 struct bufferevent *
224 bufferevent_new(int fd, evbuffercb readcb, evbuffercb writecb,
225     everrorcb errorcb, void *cbarg)
226 {
227 	struct bufferevent *bufev;
228 
229 	if ((bufev = calloc(1, sizeof(struct bufferevent))) == NULL)
230 		return (NULL);
231 
232 	if ((bufev->input = evbuffer_new()) == NULL) {
233 		free(bufev);
234 		return (NULL);
235 	}
236 
237 	if ((bufev->output = evbuffer_new()) == NULL) {
238 		evbuffer_free(bufev->input);
239 		free(bufev);
240 		return (NULL);
241 	}
242 
243 	event_set(&bufev->ev_read, fd, EV_READ, bufferevent_readcb, bufev);
244 	event_set(&bufev->ev_write, fd, EV_WRITE, bufferevent_writecb, bufev);
245 
246 	bufev->readcb = readcb;
247 	bufev->writecb = writecb;
248 	bufev->errorcb = errorcb;
249 
250 	bufev->cbarg = cbarg;
251 
252 	/*
253 	 * Set to EV_WRITE so that using bufferevent_write is going to
254 	 * trigger a callback.  Reading needs to be explicitly enabled
255 	 * because otherwise no data will be available.
256 	 */
257 	bufev->enabled = EV_WRITE;
258 
259 	return (bufev);
260 }
261 
262 int
263 bufferevent_priority_set(struct bufferevent *bufev, int priority)
264 {
265 	if (event_priority_set(&bufev->ev_read, priority) == -1)
266 		return (-1);
267 	if (event_priority_set(&bufev->ev_write, priority) == -1)
268 		return (-1);
269 
270 	return (0);
271 }
272 
273 /* Closing the file descriptor is the responsibility of the caller */
274 
275 void
276 bufferevent_free(struct bufferevent *bufev)
277 {
278 	event_del(&bufev->ev_read);
279 	event_del(&bufev->ev_write);
280 
281 	evbuffer_free(bufev->input);
282 	evbuffer_free(bufev->output);
283 
284 	free(bufev);
285 }
286 
287 /*
288  * Returns 0 on success;
289  *        -1 on failure.
290  */
291 
292 int
293 bufferevent_write(struct bufferevent *bufev, const void *data, size_t size)
294 {
295 	int res;
296 
297 	res = evbuffer_add(bufev->output, data, size);
298 
299 	if (res == -1)
300 		return (res);
301 
302 	/* If everything is okay, we need to schedule a write */
303 	if (size > 0 && (bufev->enabled & EV_WRITE))
304 		bufferevent_add(&bufev->ev_write, bufev->timeout_write);
305 
306 	return (res);
307 }
308 
309 int
310 bufferevent_write_buffer(struct bufferevent *bufev, struct evbuffer *buf)
311 {
312 	int res;
313 
314 	res = bufferevent_write(bufev, buf->buffer, buf->off);
315 	if (res != -1)
316 		evbuffer_drain(buf, buf->off);
317 
318 	return (res);
319 }
320 
321 size_t
322 bufferevent_read(struct bufferevent *bufev, void *data, size_t size)
323 {
324 	struct evbuffer *buf = bufev->input;
325 
326 	if (buf->off < size)
327 		size = buf->off;
328 
329 	/* Copy the available data to the user buffer */
330 	memcpy(data, buf->buffer, size);
331 
332 	if (size)
333 		evbuffer_drain(buf, size);
334 
335 	return (size);
336 }
337 
338 int
339 bufferevent_enable(struct bufferevent *bufev, short event)
340 {
341 	if (event & EV_READ) {
342 		if (bufferevent_add(&bufev->ev_read, bufev->timeout_read) == -1)
343 			return (-1);
344 	}
345 	if (event & EV_WRITE) {
346 		if (bufferevent_add(&bufev->ev_write, bufev->timeout_write) == -1)
347 			return (-1);
348 	}
349 
350 	bufev->enabled |= event;
351 	return (0);
352 }
353 
354 int
355 bufferevent_disable(struct bufferevent *bufev, short event)
356 {
357 	if (event & EV_READ) {
358 		if (event_del(&bufev->ev_read) == -1)
359 			return (-1);
360 	}
361 	if (event & EV_WRITE) {
362 		if (event_del(&bufev->ev_write) == -1)
363 			return (-1);
364 	}
365 
366 	bufev->enabled &= ~event;
367 	return (0);
368 }
369 
370 /*
371  * Sets the read and write timeout for a buffered event.
372  */
373 
374 void
375 bufferevent_settimeout(struct bufferevent *bufev,
376     int timeout_read, int timeout_write) {
377 	bufev->timeout_read = timeout_read;
378 	bufev->timeout_write = timeout_write;
379 }
380 
381 /*
382  * Sets the water marks
383  */
384 
385 void
386 bufferevent_setwatermark(struct bufferevent *bufev, short events,
387     size_t lowmark, size_t highmark)
388 {
389 	if (events & EV_READ) {
390 		bufev->wm_read.low = lowmark;
391 		bufev->wm_read.high = highmark;
392 	}
393 
394 	if (events & EV_WRITE) {
395 		bufev->wm_write.low = lowmark;
396 		bufev->wm_write.high = highmark;
397 	}
398 
399 	/* If the watermarks changed then see if we should call read again */
400 	bufferevent_read_pressure_cb(bufev->input,
401 	    0, EVBUFFER_LENGTH(bufev->input), bufev);
402 }
403 
404 int
405 bufferevent_base_set(struct event_base *base, struct bufferevent *bufev)
406 {
407 	int res;
408 
409 	res = event_base_set(base, &bufev->ev_read);
410 	if (res == -1)
411 		return (res);
412 
413 	res = event_base_set(base, &bufev->ev_write);
414 	return (res);
415 }
416