xref: /openbsd-src/lib/libevent/evbuffer.c (revision daf88648c0e349d5c02e1504293082072c981640)
1 /*	$OpenBSD: evbuffer.c,v 1.9 2006/11/26 15:22:58 brad Exp $	*/
2 
3 /*
4  * Copyright (c) 2002-2004 Niels Provos <provos@citi.umich.edu>
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  * 1. Redistributions of source code must retain the above copyright
11  *    notice, this list of conditions and the following disclaimer.
12  * 2. Redistributions in binary form must reproduce the above copyright
13  *    notice, this list of conditions and the following disclaimer in the
14  *    documentation and/or other materials provided with the distribution.
15  * 3. The name of the author may not be used to endorse or promote products
16  *    derived from this software without specific prior written permission.
17  *
18  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
19  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
20  * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
21  * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
22  * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
23  * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
24  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
25  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
27  * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28  */
29 
30 #include <sys/types.h>
31 
32 #ifdef HAVE_CONFIG_H
33 #include "config.h"
34 #endif
35 
36 #ifdef HAVE_SYS_TIME_H
37 #include <sys/time.h>
38 #endif
39 
40 #include <errno.h>
41 #include <stdio.h>
42 #include <stdlib.h>
43 #include <string.h>
44 #ifdef HAVE_STDARG_H
45 #include <stdarg.h>
46 #endif
47 
48 #include "event.h"
49 
50 /* prototypes */
51 
52 void bufferevent_setwatermark(struct bufferevent *, short, size_t, size_t);
53 void bufferevent_read_pressure_cb(struct evbuffer *, size_t, size_t, void *);
54 
55 static int
56 bufferevent_add(struct event *ev, int timeout)
57 {
58 	struct timeval tv, *ptv = NULL;
59 
60 	if (timeout) {
61 		timerclear(&tv);
62 		tv.tv_sec = timeout;
63 		ptv = &tv;
64 	}
65 
66 	return (event_add(ev, ptv));
67 }
68 
69 /*
70  * This callback is executed when the size of the input buffer changes.
71  * We use it to apply back pressure on the reading side.
72  */
73 
74 void
75 bufferevent_read_pressure_cb(struct evbuffer *buf, size_t old, size_t now,
76     void *arg) {
77 	struct bufferevent *bufev = arg;
78 	/*
79 	 * If we are below the watermark then reschedule reading if it's
80 	 * still enabled.
81 	 */
82 	if (bufev->wm_read.high == 0 || now < bufev->wm_read.high) {
83 		evbuffer_setcb(buf, NULL, NULL);
84 
85 		if (bufev->enabled & EV_READ)
86 			bufferevent_add(&bufev->ev_read, bufev->timeout_read);
87 	}
88 }
89 
90 static void
91 bufferevent_readcb(int fd, short event, void *arg)
92 {
93 	struct bufferevent *bufev = arg;
94 	int res = 0;
95 	short what = EVBUFFER_READ;
96 	size_t len;
97 	int howmuch = -1;
98 
99 	if (event == EV_TIMEOUT) {
100 		what |= EVBUFFER_TIMEOUT;
101 		goto error;
102 	}
103 
104 	/*
105 	 * If we have a high watermark configured then we don't want to
106 	 * read more data than would make us reach the watermark.
107 	 */
108 	if (bufev->wm_read.high != 0)
109 		howmuch = bufev->wm_read.high;
110 
111 	res = evbuffer_read(bufev->input, fd, howmuch);
112 	if (res == -1) {
113 		if (errno == EAGAIN || errno == EINTR)
114 			goto reschedule;
115 		/* error case */
116 		what |= EVBUFFER_ERROR;
117 	} else if (res == 0) {
118 		/* eof case */
119 		what |= EVBUFFER_EOF;
120 	}
121 
122 	if (res <= 0)
123 		goto error;
124 
125 	bufferevent_add(&bufev->ev_read, bufev->timeout_read);
126 
127 	/* See if this callbacks meets the water marks */
128 	len = EVBUFFER_LENGTH(bufev->input);
129 	if (bufev->wm_read.low != 0 && len < bufev->wm_read.low)
130 		return;
131 	if (bufev->wm_read.high != 0 && len > bufev->wm_read.high) {
132 		struct evbuffer *buf = bufev->input;
133 		event_del(&bufev->ev_read);
134 
135 		/* Now schedule a callback for us */
136 		evbuffer_setcb(buf, bufferevent_read_pressure_cb, bufev);
137 		return;
138 	}
139 
140 	/* Invoke the user callback - must always be called last */
141 	if (bufev->readcb != NULL)
142 		(*bufev->readcb)(bufev, bufev->cbarg);
143 	return;
144 
145  reschedule:
146 	bufferevent_add(&bufev->ev_read, bufev->timeout_read);
147 	return;
148 
149  error:
150 	(*bufev->errorcb)(bufev, what, bufev->cbarg);
151 }
152 
153 static void
154 bufferevent_writecb(int fd, short event, void *arg)
155 {
156 	struct bufferevent *bufev = arg;
157 	int res = 0;
158 	short what = EVBUFFER_WRITE;
159 
160 	if (event == EV_TIMEOUT) {
161 		what |= EVBUFFER_TIMEOUT;
162 		goto error;
163 	}
164 
165 	if (EVBUFFER_LENGTH(bufev->output)) {
166 	    res = evbuffer_write(bufev->output, fd);
167 	    if (res == -1) {
168 		    if (errno == EAGAIN ||
169 			errno == EINTR ||
170 			errno == EINPROGRESS)
171 			    goto reschedule;
172 		    /* error case */
173 		    what |= EVBUFFER_ERROR;
174 	    } else if (res == 0) {
175 		    /* eof case */
176 		    what |= EVBUFFER_EOF;
177 	    }
178 	    if (res <= 0)
179 		    goto error;
180 	}
181 
182 	if (EVBUFFER_LENGTH(bufev->output) != 0)
183 		bufferevent_add(&bufev->ev_write, bufev->timeout_write);
184 
185 	/*
186 	 * Invoke the user callback if our buffer is drained or below the
187 	 * low watermark.
188 	 */
189 	if (bufev->writecb != NULL &&
190 	    EVBUFFER_LENGTH(bufev->output) <= bufev->wm_write.low)
191 		(*bufev->writecb)(bufev, bufev->cbarg);
192 
193 	return;
194 
195  reschedule:
196 	if (EVBUFFER_LENGTH(bufev->output) != 0)
197 		bufferevent_add(&bufev->ev_write, bufev->timeout_write);
198 	return;
199 
200  error:
201 	(*bufev->errorcb)(bufev, what, bufev->cbarg);
202 }
203 
204 /*
205  * Create a new buffered event object.
206  *
207  * The read callback is invoked whenever we read new data.
208  * The write callback is invoked whenever the output buffer is drained.
209  * The error callback is invoked on a write/read error or on EOF.
210  *
211  * Both read and write callbacks maybe NULL.  The error callback is not
212  * allowed to be NULL and have to be provided always.
213  */
214 
215 struct bufferevent *
216 bufferevent_new(int fd, evbuffercb readcb, evbuffercb writecb,
217     everrorcb errorcb, void *cbarg)
218 {
219 	struct bufferevent *bufev;
220 
221 	if ((bufev = calloc(1, sizeof(struct bufferevent))) == NULL)
222 		return (NULL);
223 
224 	if ((bufev->input = evbuffer_new()) == NULL) {
225 		free(bufev);
226 		return (NULL);
227 	}
228 
229 	if ((bufev->output = evbuffer_new()) == NULL) {
230 		evbuffer_free(bufev->input);
231 		free(bufev);
232 		return (NULL);
233 	}
234 
235 	event_set(&bufev->ev_read, fd, EV_READ, bufferevent_readcb, bufev);
236 	event_set(&bufev->ev_write, fd, EV_WRITE, bufferevent_writecb, bufev);
237 
238 	bufev->readcb = readcb;
239 	bufev->writecb = writecb;
240 	bufev->errorcb = errorcb;
241 
242 	bufev->cbarg = cbarg;
243 
244 	/*
245 	 * Set to EV_WRITE so that using bufferevent_write is going to
246 	 * trigger a callback.  Reading needs to be explicitly enabled
247 	 * because otherwise no data will be available.
248 	 */
249 	bufev->enabled = EV_WRITE;
250 
251 	return (bufev);
252 }
253 
254 int
255 bufferevent_priority_set(struct bufferevent *bufev, int priority)
256 {
257 	if (event_priority_set(&bufev->ev_read, priority) == -1)
258 		return (-1);
259 	if (event_priority_set(&bufev->ev_write, priority) == -1)
260 		return (-1);
261 
262 	return (0);
263 }
264 
265 /* Closing the file descriptor is the responsibility of the caller */
266 
267 void
268 bufferevent_free(struct bufferevent *bufev)
269 {
270 	event_del(&bufev->ev_read);
271 	event_del(&bufev->ev_write);
272 
273 	evbuffer_free(bufev->input);
274 	evbuffer_free(bufev->output);
275 
276 	free(bufev);
277 }
278 
279 /*
280  * Returns 0 on success;
281  *        -1 on failure.
282  */
283 
284 int
285 bufferevent_write(struct bufferevent *bufev, void *data, size_t size)
286 {
287 	int res;
288 
289 	res = evbuffer_add(bufev->output, data, size);
290 
291 	if (res == -1)
292 		return (res);
293 
294 	/* If everything is okay, we need to schedule a write */
295 	if (size > 0 && (bufev->enabled & EV_WRITE))
296 		bufferevent_add(&bufev->ev_write, bufev->timeout_write);
297 
298 	return (res);
299 }
300 
301 int
302 bufferevent_write_buffer(struct bufferevent *bufev, struct evbuffer *buf)
303 {
304 	int res;
305 
306 	res = bufferevent_write(bufev, buf->buffer, buf->off);
307 	if (res != -1)
308 		evbuffer_drain(buf, buf->off);
309 
310 	return (res);
311 }
312 
313 size_t
314 bufferevent_read(struct bufferevent *bufev, void *data, size_t size)
315 {
316 	struct evbuffer *buf = bufev->input;
317 
318 	if (buf->off < size)
319 		size = buf->off;
320 
321 	/* Copy the available data to the user buffer */
322 	memcpy(data, buf->buffer, size);
323 
324 	if (size)
325 		evbuffer_drain(buf, size);
326 
327 	return (size);
328 }
329 
330 int
331 bufferevent_enable(struct bufferevent *bufev, short event)
332 {
333 	if (event & EV_READ) {
334 		if (bufferevent_add(&bufev->ev_read, bufev->timeout_read) == -1)
335 			return (-1);
336 	}
337 	if (event & EV_WRITE) {
338 		if (bufferevent_add(&bufev->ev_write, bufev->timeout_write) == -1)
339 			return (-1);
340 	}
341 
342 	bufev->enabled |= event;
343 	return (0);
344 }
345 
346 int
347 bufferevent_disable(struct bufferevent *bufev, short event)
348 {
349 	if (event & EV_READ) {
350 		if (event_del(&bufev->ev_read) == -1)
351 			return (-1);
352 	}
353 	if (event & EV_WRITE) {
354 		if (event_del(&bufev->ev_write) == -1)
355 			return (-1);
356 	}
357 
358 	bufev->enabled &= ~event;
359 	return (0);
360 }
361 
362 /*
363  * Sets the read and write timeout for a buffered event.
364  */
365 
366 void
367 bufferevent_settimeout(struct bufferevent *bufev,
368     int timeout_read, int timeout_write) {
369 	bufev->timeout_read = timeout_read;
370 	bufev->timeout_write = timeout_write;
371 }
372 
373 /*
374  * Sets the water marks
375  */
376 
377 void
378 bufferevent_setwatermark(struct bufferevent *bufev, short events,
379     size_t lowmark, size_t highmark)
380 {
381 	if (events & EV_READ) {
382 		bufev->wm_read.low = lowmark;
383 		bufev->wm_read.high = highmark;
384 	}
385 
386 	if (events & EV_WRITE) {
387 		bufev->wm_write.low = lowmark;
388 		bufev->wm_write.high = highmark;
389 	}
390 
391 	/* If the watermarks changed then see if we should call read again */
392 	bufferevent_read_pressure_cb(bufev->input,
393 	    0, EVBUFFER_LENGTH(bufev->input), bufev);
394 }
395 
396 int
397 bufferevent_base_set(struct event_base *base, struct bufferevent *bufev)
398 {
399 	int res;
400 
401 	res = event_base_set(base, &bufev->ev_read);
402 	if (res == -1)
403 		return (res);
404 
405 	res = event_base_set(base, &bufev->ev_write);
406 	return (res);
407 }
408