1*824b820dSbluhm /* $OpenBSD: evbuffer.c,v 1.17 2014/10/30 16:45:37 bluhm Exp $ */
2ff9272daSbrad
3ff9272daSbrad /*
4ff9272daSbrad * Copyright (c) 2002-2004 Niels Provos <provos@citi.umich.edu>
5ff9272daSbrad * All rights reserved.
6ff9272daSbrad *
7ff9272daSbrad * Redistribution and use in source and binary forms, with or without
8ff9272daSbrad * modification, are permitted provided that the following conditions
9ff9272daSbrad * are met:
10ff9272daSbrad * 1. Redistributions of source code must retain the above copyright
11ff9272daSbrad * notice, this list of conditions and the following disclaimer.
12ff9272daSbrad * 2. Redistributions in binary form must reproduce the above copyright
13ff9272daSbrad * notice, this list of conditions and the following disclaimer in the
14ff9272daSbrad * documentation and/or other materials provided with the distribution.
15ff9272daSbrad * 3. The name of the author may not be used to endorse or promote products
16ff9272daSbrad * derived from this software without specific prior written permission.
17ff9272daSbrad *
18ff9272daSbrad * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
19ff9272daSbrad * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
20ff9272daSbrad * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
21ff9272daSbrad * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
22ff9272daSbrad * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
23ff9272daSbrad * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
24ff9272daSbrad * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
25ff9272daSbrad * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26ff9272daSbrad * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
27ff9272daSbrad * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28ff9272daSbrad */
29ff9272daSbrad
30ff9272daSbrad #include <sys/types.h>
31ff9272daSbrad #include <sys/time.h>
32ff9272daSbrad
33ff9272daSbrad #include <errno.h>
34ff9272daSbrad #include <stdio.h>
35ff9272daSbrad #include <stdlib.h>
36ff9272daSbrad #include <string.h>
37ff9272daSbrad #include <stdarg.h>
38ff9272daSbrad
39ff9272daSbrad #include "event.h"
40ff9272daSbrad
411085edd8Sbrad /* prototypes */
421085edd8Sbrad
431085edd8Sbrad void bufferevent_read_pressure_cb(struct evbuffer *, size_t, size_t, void *);
441085edd8Sbrad
45ff9272daSbrad static int
bufferevent_add(struct event * ev,int timeout)46ff9272daSbrad bufferevent_add(struct event *ev, int timeout)
47ff9272daSbrad {
48ff9272daSbrad struct timeval tv, *ptv = NULL;
49ff9272daSbrad
50ff9272daSbrad if (timeout) {
5184ea67e3Sbluhm timerclear(&tv);
52ff9272daSbrad tv.tv_sec = timeout;
53ff9272daSbrad ptv = &tv;
54ff9272daSbrad }
55ff9272daSbrad
56ff9272daSbrad return (event_add(ev, ptv));
57ff9272daSbrad }
58ff9272daSbrad
59ff9272daSbrad /*
60ff9272daSbrad * This callback is executed when the size of the input buffer changes.
61ff9272daSbrad * We use it to apply back pressure on the reading side.
62ff9272daSbrad */
63ff9272daSbrad
64ff9272daSbrad void
bufferevent_read_pressure_cb(struct evbuffer * buf,size_t old,size_t now,void * arg)65ff9272daSbrad bufferevent_read_pressure_cb(struct evbuffer *buf, size_t old, size_t now,
66ff9272daSbrad void *arg) {
67ff9272daSbrad struct bufferevent *bufev = arg;
68ff9272daSbrad /*
69ca65f563Sbrad * If we are below the watermark then reschedule reading if it's
70ff9272daSbrad * still enabled.
71ff9272daSbrad */
72ff9272daSbrad if (bufev->wm_read.high == 0 || now < bufev->wm_read.high) {
73ff9272daSbrad evbuffer_setcb(buf, NULL, NULL);
74ff9272daSbrad
75ff9272daSbrad if (bufev->enabled & EV_READ)
76ff9272daSbrad bufferevent_add(&bufev->ev_read, bufev->timeout_read);
77ff9272daSbrad }
78ff9272daSbrad }
79ff9272daSbrad
80ff9272daSbrad static void
bufferevent_readcb(int fd,short event,void * arg)81ff9272daSbrad bufferevent_readcb(int fd, short event, void *arg)
82ff9272daSbrad {
83ff9272daSbrad struct bufferevent *bufev = arg;
84ff9272daSbrad int res = 0;
85ff9272daSbrad short what = EVBUFFER_READ;
86ff9272daSbrad size_t len;
87b5b20efdSbrad int howmuch = -1;
88ff9272daSbrad
89ff9272daSbrad if (event == EV_TIMEOUT) {
90ff9272daSbrad what |= EVBUFFER_TIMEOUT;
91ff9272daSbrad goto error;
92ff9272daSbrad }
93ff9272daSbrad
94b5b20efdSbrad /*
95b5b20efdSbrad * If we have a high watermark configured then we don't want to
96b5b20efdSbrad * read more data than would make us reach the watermark.
97b5b20efdSbrad */
988ead113eSnicm if (bufev->wm_read.high != 0) {
998ead113eSnicm howmuch = bufev->wm_read.high - EVBUFFER_LENGTH(bufev->input);
1008ead113eSnicm /* we might have lowered the watermark, stop reading */
1018ead113eSnicm if (howmuch <= 0) {
1028ead113eSnicm struct evbuffer *buf = bufev->input;
1038ead113eSnicm event_del(&bufev->ev_read);
1048ead113eSnicm evbuffer_setcb(buf,
1058ead113eSnicm bufferevent_read_pressure_cb, bufev);
1068ead113eSnicm return;
1078ead113eSnicm }
1088ead113eSnicm }
109b5b20efdSbrad
110b5b20efdSbrad res = evbuffer_read(bufev->input, fd, howmuch);
111ff9272daSbrad if (res == -1) {
112ff9272daSbrad if (errno == EAGAIN || errno == EINTR)
113ff9272daSbrad goto reschedule;
114ff9272daSbrad /* error case */
115ff9272daSbrad what |= EVBUFFER_ERROR;
116ff9272daSbrad } else if (res == 0) {
117ff9272daSbrad /* eof case */
118ff9272daSbrad what |= EVBUFFER_EOF;
119ff9272daSbrad }
120ff9272daSbrad
121ff9272daSbrad if (res <= 0)
122ff9272daSbrad goto error;
123ff9272daSbrad
124ff9272daSbrad bufferevent_add(&bufev->ev_read, bufev->timeout_read);
125ff9272daSbrad
126ff9272daSbrad /* See if this callbacks meets the water marks */
127ff9272daSbrad len = EVBUFFER_LENGTH(bufev->input);
128ff9272daSbrad if (bufev->wm_read.low != 0 && len < bufev->wm_read.low)
129ff9272daSbrad return;
1308ead113eSnicm if (bufev->wm_read.high != 0 && len >= bufev->wm_read.high) {
131ff9272daSbrad struct evbuffer *buf = bufev->input;
132ff9272daSbrad event_del(&bufev->ev_read);
133ff9272daSbrad
1348ead113eSnicm /* Now schedule a callback for us when the buffer changes */
135ff9272daSbrad evbuffer_setcb(buf, bufferevent_read_pressure_cb, bufev);
136ff9272daSbrad }
137ff9272daSbrad
138ff9272daSbrad /* Invoke the user callback - must always be called last */
139c7727ffcSbrad if (bufev->readcb != NULL)
140ff9272daSbrad (*bufev->readcb)(bufev, bufev->cbarg);
141ff9272daSbrad return;
142ff9272daSbrad
143ff9272daSbrad reschedule:
144ff9272daSbrad bufferevent_add(&bufev->ev_read, bufev->timeout_read);
145ff9272daSbrad return;
146ff9272daSbrad
147ff9272daSbrad error:
148ff9272daSbrad (*bufev->errorcb)(bufev, what, bufev->cbarg);
149ff9272daSbrad }
150ff9272daSbrad
151ff9272daSbrad static void
bufferevent_writecb(int fd,short event,void * arg)152ff9272daSbrad bufferevent_writecb(int fd, short event, void *arg)
153ff9272daSbrad {
154ff9272daSbrad struct bufferevent *bufev = arg;
155ff9272daSbrad int res = 0;
156ff9272daSbrad short what = EVBUFFER_WRITE;
157ff9272daSbrad
158ff9272daSbrad if (event == EV_TIMEOUT) {
159ff9272daSbrad what |= EVBUFFER_TIMEOUT;
160ff9272daSbrad goto error;
161ff9272daSbrad }
162ff9272daSbrad
163ff9272daSbrad if (EVBUFFER_LENGTH(bufev->output)) {
164ff9272daSbrad res = evbuffer_write(bufev->output, fd);
165ff9272daSbrad if (res == -1) {
1664643be29Sbrad if (errno == EAGAIN ||
1674643be29Sbrad errno == EINTR ||
1684643be29Sbrad errno == EINPROGRESS)
169ff9272daSbrad goto reschedule;
170ff9272daSbrad /* error case */
171ff9272daSbrad what |= EVBUFFER_ERROR;
172ff9272daSbrad } else if (res == 0) {
173ff9272daSbrad /* eof case */
174ff9272daSbrad what |= EVBUFFER_EOF;
175ff9272daSbrad }
176ff9272daSbrad if (res <= 0)
177ff9272daSbrad goto error;
178ff9272daSbrad }
179ff9272daSbrad
180ff9272daSbrad if (EVBUFFER_LENGTH(bufev->output) != 0)
181ff9272daSbrad bufferevent_add(&bufev->ev_write, bufev->timeout_write);
182ff9272daSbrad
183ff9272daSbrad /*
184ff9272daSbrad * Invoke the user callback if our buffer is drained or below the
185ff9272daSbrad * low watermark.
186ff9272daSbrad */
187c7727ffcSbrad if (bufev->writecb != NULL &&
188c7727ffcSbrad EVBUFFER_LENGTH(bufev->output) <= bufev->wm_write.low)
189ff9272daSbrad (*bufev->writecb)(bufev, bufev->cbarg);
190ff9272daSbrad
191ff9272daSbrad return;
192ff9272daSbrad
193ff9272daSbrad reschedule:
194ff9272daSbrad if (EVBUFFER_LENGTH(bufev->output) != 0)
195ff9272daSbrad bufferevent_add(&bufev->ev_write, bufev->timeout_write);
196ff9272daSbrad return;
197ff9272daSbrad
198ff9272daSbrad error:
199ff9272daSbrad (*bufev->errorcb)(bufev, what, bufev->cbarg);
200ff9272daSbrad }
201ff9272daSbrad
202ff9272daSbrad /*
203ff9272daSbrad * Create a new buffered event object.
204ff9272daSbrad *
205ff9272daSbrad * The read callback is invoked whenever we read new data.
206ff9272daSbrad * The write callback is invoked whenever the output buffer is drained.
207ff9272daSbrad * The error callback is invoked on a write/read error or on EOF.
208c7727ffcSbrad *
209c7727ffcSbrad * Both read and write callbacks maybe NULL. The error callback is not
210c7727ffcSbrad * allowed to be NULL and have to be provided always.
211ff9272daSbrad */
212ff9272daSbrad
213ff9272daSbrad struct bufferevent *
bufferevent_new(int fd,evbuffercb readcb,evbuffercb writecb,everrorcb errorcb,void * cbarg)214ff9272daSbrad bufferevent_new(int fd, evbuffercb readcb, evbuffercb writecb,
215ff9272daSbrad everrorcb errorcb, void *cbarg)
216ff9272daSbrad {
217ff9272daSbrad struct bufferevent *bufev;
218ff9272daSbrad
219ff9272daSbrad if ((bufev = calloc(1, sizeof(struct bufferevent))) == NULL)
220ff9272daSbrad return (NULL);
221ff9272daSbrad
222ff9272daSbrad if ((bufev->input = evbuffer_new()) == NULL) {
223ff9272daSbrad free(bufev);
224ff9272daSbrad return (NULL);
225ff9272daSbrad }
226ff9272daSbrad
227ff9272daSbrad if ((bufev->output = evbuffer_new()) == NULL) {
228ff9272daSbrad evbuffer_free(bufev->input);
229ff9272daSbrad free(bufev);
230ff9272daSbrad return (NULL);
231ff9272daSbrad }
232ff9272daSbrad
233ff9272daSbrad event_set(&bufev->ev_read, fd, EV_READ, bufferevent_readcb, bufev);
234ff9272daSbrad event_set(&bufev->ev_write, fd, EV_WRITE, bufferevent_writecb, bufev);
235ff9272daSbrad
2368ead113eSnicm bufferevent_setcb(bufev, readcb, writecb, errorcb, cbarg);
237ff9272daSbrad
238b5b20efdSbrad /*
239b5b20efdSbrad * Set to EV_WRITE so that using bufferevent_write is going to
240b5b20efdSbrad * trigger a callback. Reading needs to be explicitly enabled
241b5b20efdSbrad * because otherwise no data will be available.
242b5b20efdSbrad */
243b5b20efdSbrad bufev->enabled = EV_WRITE;
244ff9272daSbrad
245ff9272daSbrad return (bufev);
246ff9272daSbrad }
247ff9272daSbrad
2488ead113eSnicm void
bufferevent_setcb(struct bufferevent * bufev,evbuffercb readcb,evbuffercb writecb,everrorcb errorcb,void * cbarg)2498ead113eSnicm bufferevent_setcb(struct bufferevent *bufev,
2508ead113eSnicm evbuffercb readcb, evbuffercb writecb, everrorcb errorcb, void *cbarg)
2518ead113eSnicm {
2528ead113eSnicm bufev->readcb = readcb;
2538ead113eSnicm bufev->writecb = writecb;
2548ead113eSnicm bufev->errorcb = errorcb;
2558ead113eSnicm
2568ead113eSnicm bufev->cbarg = cbarg;
2578ead113eSnicm }
2588ead113eSnicm
2598ead113eSnicm void
bufferevent_setfd(struct bufferevent * bufev,int fd)2608ead113eSnicm bufferevent_setfd(struct bufferevent *bufev, int fd)
2618ead113eSnicm {
2628ead113eSnicm event_del(&bufev->ev_read);
2638ead113eSnicm event_del(&bufev->ev_write);
2648ead113eSnicm
2658ead113eSnicm event_set(&bufev->ev_read, fd, EV_READ, bufferevent_readcb, bufev);
2668ead113eSnicm event_set(&bufev->ev_write, fd, EV_WRITE, bufferevent_writecb, bufev);
2678ead113eSnicm if (bufev->ev_base != NULL) {
2688ead113eSnicm event_base_set(bufev->ev_base, &bufev->ev_read);
2698ead113eSnicm event_base_set(bufev->ev_base, &bufev->ev_write);
2708ead113eSnicm }
2718ead113eSnicm
2728ead113eSnicm /* might have to manually trigger event registration */
2738ead113eSnicm }
2748ead113eSnicm
2754643be29Sbrad int
bufferevent_priority_set(struct bufferevent * bufev,int priority)2764643be29Sbrad bufferevent_priority_set(struct bufferevent *bufev, int priority)
2774643be29Sbrad {
2784643be29Sbrad if (event_priority_set(&bufev->ev_read, priority) == -1)
2794643be29Sbrad return (-1);
2804643be29Sbrad if (event_priority_set(&bufev->ev_write, priority) == -1)
2814643be29Sbrad return (-1);
2824643be29Sbrad
2834643be29Sbrad return (0);
2844643be29Sbrad }
2854643be29Sbrad
2861085edd8Sbrad /* Closing the file descriptor is the responsibility of the caller */
2871085edd8Sbrad
288ff9272daSbrad void
bufferevent_free(struct bufferevent * bufev)289ff9272daSbrad bufferevent_free(struct bufferevent *bufev)
290ff9272daSbrad {
291ff9272daSbrad event_del(&bufev->ev_read);
292ff9272daSbrad event_del(&bufev->ev_write);
293ff9272daSbrad
294ff9272daSbrad evbuffer_free(bufev->input);
295ff9272daSbrad evbuffer_free(bufev->output);
296ff9272daSbrad
297ff9272daSbrad free(bufev);
298ff9272daSbrad }
299ff9272daSbrad
300ff9272daSbrad /*
301ff9272daSbrad * Returns 0 on success;
302ff9272daSbrad * -1 on failure.
303ff9272daSbrad */
304ff9272daSbrad
305ff9272daSbrad int
bufferevent_write(struct bufferevent * bufev,const void * data,size_t size)306171772baSbrad bufferevent_write(struct bufferevent *bufev, const void *data, size_t size)
307ff9272daSbrad {
308ff9272daSbrad int res;
309ff9272daSbrad
310ff9272daSbrad res = evbuffer_add(bufev->output, data, size);
311ff9272daSbrad
312ff9272daSbrad if (res == -1)
313ff9272daSbrad return (res);
314ff9272daSbrad
315ff9272daSbrad /* If everything is okay, we need to schedule a write */
316ff9272daSbrad if (size > 0 && (bufev->enabled & EV_WRITE))
317ff9272daSbrad bufferevent_add(&bufev->ev_write, bufev->timeout_write);
318ff9272daSbrad
319ff9272daSbrad return (res);
320ff9272daSbrad }
321ff9272daSbrad
322ff9272daSbrad int
bufferevent_write_buffer(struct bufferevent * bufev,struct evbuffer * buf)323ff9272daSbrad bufferevent_write_buffer(struct bufferevent *bufev, struct evbuffer *buf)
324ff9272daSbrad {
325ff9272daSbrad int res;
326ff9272daSbrad
327ff9272daSbrad res = bufferevent_write(bufev, buf->buffer, buf->off);
328ff9272daSbrad if (res != -1)
329ff9272daSbrad evbuffer_drain(buf, buf->off);
330ff9272daSbrad
331ff9272daSbrad return (res);
332ff9272daSbrad }
333ff9272daSbrad
334ff9272daSbrad size_t
bufferevent_read(struct bufferevent * bufev,void * data,size_t size)335ff9272daSbrad bufferevent_read(struct bufferevent *bufev, void *data, size_t size)
336ff9272daSbrad {
337ff9272daSbrad struct evbuffer *buf = bufev->input;
338ff9272daSbrad
339ff9272daSbrad if (buf->off < size)
340ff9272daSbrad size = buf->off;
341ff9272daSbrad
342ff9272daSbrad /* Copy the available data to the user buffer */
343ff9272daSbrad memcpy(data, buf->buffer, size);
344ff9272daSbrad
345ff9272daSbrad if (size)
346ff9272daSbrad evbuffer_drain(buf, size);
347ff9272daSbrad
348ff9272daSbrad return (size);
349ff9272daSbrad }
350ff9272daSbrad
351ff9272daSbrad int
bufferevent_enable(struct bufferevent * bufev,short event)352ff9272daSbrad bufferevent_enable(struct bufferevent *bufev, short event)
353ff9272daSbrad {
354ff9272daSbrad if (event & EV_READ) {
355ff9272daSbrad if (bufferevent_add(&bufev->ev_read, bufev->timeout_read) == -1)
356ff9272daSbrad return (-1);
357ff9272daSbrad }
358ff9272daSbrad if (event & EV_WRITE) {
359ff9272daSbrad if (bufferevent_add(&bufev->ev_write, bufev->timeout_write) == -1)
360ff9272daSbrad return (-1);
361ff9272daSbrad }
362ff9272daSbrad
363ff9272daSbrad bufev->enabled |= event;
364ff9272daSbrad return (0);
365ff9272daSbrad }
366ff9272daSbrad
367ff9272daSbrad int
bufferevent_disable(struct bufferevent * bufev,short event)368ff9272daSbrad bufferevent_disable(struct bufferevent *bufev, short event)
369ff9272daSbrad {
370ff9272daSbrad if (event & EV_READ) {
371ff9272daSbrad if (event_del(&bufev->ev_read) == -1)
372ff9272daSbrad return (-1);
373ff9272daSbrad }
374ff9272daSbrad if (event & EV_WRITE) {
375ff9272daSbrad if (event_del(&bufev->ev_write) == -1)
376ff9272daSbrad return (-1);
377ff9272daSbrad }
378ff9272daSbrad
379ff9272daSbrad bufev->enabled &= ~event;
380ff9272daSbrad return (0);
381ff9272daSbrad }
382ff9272daSbrad
383ff9272daSbrad /*
384ff9272daSbrad * Sets the read and write timeout for a buffered event.
385ff9272daSbrad */
386ff9272daSbrad
387ff9272daSbrad void
bufferevent_settimeout(struct bufferevent * bufev,int timeout_read,int timeout_write)388ff9272daSbrad bufferevent_settimeout(struct bufferevent *bufev,
389ff9272daSbrad int timeout_read, int timeout_write) {
390ff9272daSbrad bufev->timeout_read = timeout_read;
391ff9272daSbrad bufev->timeout_write = timeout_write;
3928ead113eSnicm
3938ead113eSnicm if (event_pending(&bufev->ev_read, EV_READ, NULL))
3948ead113eSnicm bufferevent_add(&bufev->ev_read, timeout_read);
3958ead113eSnicm if (event_pending(&bufev->ev_write, EV_WRITE, NULL))
3968ead113eSnicm bufferevent_add(&bufev->ev_write, timeout_write);
397ff9272daSbrad }
398ff9272daSbrad
399ff9272daSbrad /*
400ff9272daSbrad * Sets the water marks
401ff9272daSbrad */
402ff9272daSbrad
403ff9272daSbrad void
bufferevent_setwatermark(struct bufferevent * bufev,short events,size_t lowmark,size_t highmark)404ff9272daSbrad bufferevent_setwatermark(struct bufferevent *bufev, short events,
405ff9272daSbrad size_t lowmark, size_t highmark)
406ff9272daSbrad {
407ff9272daSbrad if (events & EV_READ) {
408ff9272daSbrad bufev->wm_read.low = lowmark;
409ff9272daSbrad bufev->wm_read.high = highmark;
410ff9272daSbrad }
411ff9272daSbrad
412ff9272daSbrad if (events & EV_WRITE) {
413ff9272daSbrad bufev->wm_write.low = lowmark;
414ff9272daSbrad bufev->wm_write.high = highmark;
415ff9272daSbrad }
416ff9272daSbrad
417ff9272daSbrad /* If the watermarks changed then see if we should call read again */
418ff9272daSbrad bufferevent_read_pressure_cb(bufev->input,
419ff9272daSbrad 0, EVBUFFER_LENGTH(bufev->input), bufev);
420ff9272daSbrad }
421b5b20efdSbrad
422b5b20efdSbrad int
bufferevent_base_set(struct event_base * base,struct bufferevent * bufev)423b5b20efdSbrad bufferevent_base_set(struct event_base *base, struct bufferevent *bufev)
424b5b20efdSbrad {
425b5b20efdSbrad int res;
426b5b20efdSbrad
4278ead113eSnicm bufev->ev_base = base;
4288ead113eSnicm
429b5b20efdSbrad res = event_base_set(base, &bufev->ev_read);
430b5b20efdSbrad if (res == -1)
431b5b20efdSbrad return (res);
432b5b20efdSbrad
433b5b20efdSbrad res = event_base_set(base, &bufev->ev_write);
434b5b20efdSbrad return (res);
435b5b20efdSbrad }
436