1 /* $OpenBSD: test-kqueue.c,v 1.6 2023/10/14 13:05:43 anton Exp $ */
2
3 /*
4 * Copyright (c) 2019 Anton Lindqvist <anton@openbsd.org>
5 *
6 * Permission to use, copy, modify, and distribute this software for any
7 * purpose with or without fee is hereby granted, provided that the above
8 * copyright notice and this permission notice appear in all copies.
9 *
10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18
19 #include <sys/types.h>
20 #include <sys/event.h>
21 #include <sys/time.h>
22
23 #include <err.h>
24 #include <errno.h>
25 #include <fcntl.h>
26 #include <pthread.h>
27 #include <stdlib.h>
28 #include <unistd.h>
29
30 #include "pipe.h"
31
32 enum kqueue_mode {
33 KQUEUE_READ,
34 KQUEUE_READ_EOF,
35 KQUEUE_WRITE,
36 KQUEUE_WRITE_EOF,
37 };
38
39 struct context {
40 enum kqueue_mode c_mode;
41 int c_alive;
42
43 int c_pipe[2];
44 int c_kq;
45
46 char *c_buf;
47 size_t c_bufsiz;
48
49 pthread_t c_th;
50 pthread_mutex_t c_mtx;
51 };
52
53 static void ctx_setup(struct context *, enum kqueue_mode, int);
54 static void ctx_teardown(struct context *);
55 static int ctx_thread_alive(struct context *);
56 static void ctx_thread_start(struct context *);
57 static void ctx_lock(struct context *);
58 static void ctx_unlock(struct context *);
59
60 static void *kqueue_thread(void *);
61
62 /*
63 * Verify kqueue read event.
64 */
65 int
test_kqueue_read(void)66 test_kqueue_read(void)
67 {
68 struct context ctx;
69
70 ctx_setup(&ctx, KQUEUE_READ, O_NONBLOCK);
71 ctx_thread_start(&ctx);
72
73 while (ctx_thread_alive(&ctx)) {
74 ssize_t n;
75
76 n = write(ctx.c_pipe[1], &ctx.c_buf[0], 1);
77 if (n == -1) {
78 if (errno == EPIPE)
79 break;
80 if (errno == EAGAIN)
81 continue;
82 err(1, "write");
83 }
84 if (n != 1)
85 errx(1, "write: %ld != 1", n);
86 }
87
88 ctx_teardown(&ctx);
89
90 return 0;
91 }
92
93 /*
94 * Verify kqueue read EOF event.
95 */
96 int
test_kqueue_read_eof(void)97 test_kqueue_read_eof(void)
98 {
99 struct context ctx;
100
101 ctx_setup(&ctx, KQUEUE_READ_EOF, 0);
102 ctx_thread_start(&ctx);
103
104 while (ctx_thread_alive(&ctx)) {
105 if (ctx.c_pipe[1] == -1)
106 continue;
107
108 close(ctx.c_pipe[1]);
109 ctx.c_pipe[1] = -1;
110 }
111
112 ctx_teardown(&ctx);
113
114 return 0;
115 }
116
117 /*
118 * Verify kqueue write event.
119 */
120 int
test_kqueue_write(void)121 test_kqueue_write(void)
122 {
123 struct context ctx;
124 ssize_t n;
125
126 ctx_setup(&ctx, KQUEUE_WRITE, 0);
127
128 n = write(ctx.c_pipe[1], ctx.c_buf, ctx.c_bufsiz);
129 if (n == -1)
130 err(1, "write");
131 if ((size_t)n != ctx.c_bufsiz)
132 errx(1, "write: %ld != %zu", n, ctx.c_bufsiz);
133
134 ctx_thread_start(&ctx);
135
136 while (ctx_thread_alive(&ctx)) {
137 unsigned char c;
138
139 n = read(ctx.c_pipe[0], &c, 1);
140 if (n == -1)
141 err(1, "read");
142 if (n == 0)
143 break;
144 if (n != 1)
145 errx(1, "read: %ld != 1", n);
146 }
147
148 ctx_teardown(&ctx);
149
150 return 0;
151 }
152
153 /*
154 * XXX Verify kqueue write event.
155 */
156 int
test_kqueue_write_eof(void)157 test_kqueue_write_eof(void)
158 {
159
160 return 0;
161 }
162
163 static void
ctx_setup(struct context * ctx,enum kqueue_mode mode,int flags)164 ctx_setup(struct context *ctx, enum kqueue_mode mode, int flags)
165 {
166 int error;
167
168 ctx->c_mode = mode;
169 ctx->c_alive = 1;
170
171 if (flags) {
172 if (pipe2(ctx->c_pipe, flags) == -1)
173 err(1, "pipe");
174 } else {
175 if (pipe(ctx->c_pipe) == -1)
176 err(1, "pipe");
177 }
178
179 ctx->c_kq = kqueue();
180 if (ctx->c_kq == -1)
181 err(1, "kqueue");
182
183 ctx->c_bufsiz = PIPE_SIZE;
184 ctx->c_buf = malloc(ctx->c_bufsiz);
185 if (ctx->c_buf == NULL)
186 err(1, NULL);
187
188 error = pthread_mutex_init(&ctx->c_mtx, NULL);
189 if (error)
190 errc(1, error, "pthread_mutex_init");
191 }
192
193 static void
ctx_teardown(struct context * ctx)194 ctx_teardown(struct context *ctx)
195 {
196 int error;
197
198 error = pthread_join(ctx->c_th, NULL);
199 if (error)
200 errc(1, error, "pthread_join");
201
202 error = pthread_mutex_destroy(&ctx->c_mtx);
203 if (error)
204 errc(1, error, "pthread_mutex_destroy");
205
206 free(ctx->c_buf);
207
208 close(ctx->c_pipe[0]);
209 close(ctx->c_pipe[1]);
210 close(ctx->c_kq);
211 }
212
213 static int
ctx_thread_alive(struct context * ctx)214 ctx_thread_alive(struct context *ctx)
215 {
216 int alive;
217
218 ctx_lock(ctx);
219 alive = ctx->c_alive;
220 ctx_unlock(ctx);
221 return alive;
222 }
223
224 static void
ctx_thread_start(struct context * ctx)225 ctx_thread_start(struct context *ctx)
226 {
227 int error;
228
229 error = pthread_create(&ctx->c_th, NULL, kqueue_thread, ctx);
230 if (error)
231 errc(1, error, "pthread_create");
232 }
233
234 static void
ctx_lock(struct context * ctx)235 ctx_lock(struct context *ctx)
236 {
237 int error;
238
239 error = pthread_mutex_lock(&ctx->c_mtx);
240 if (error)
241 errc(1, error, "pthread_mutex_lock");
242 }
243
244 static void
ctx_unlock(struct context * ctx)245 ctx_unlock(struct context *ctx)
246 {
247 int error;
248
249 error = pthread_mutex_unlock(&ctx->c_mtx);
250 if (error)
251 errc(1, error, "pthread_mutex_unlock");
252 }
253
254 static void *
kqueue_thread(void * arg)255 kqueue_thread(void *arg)
256 {
257 struct context *ctx = arg;
258 struct kevent kev;
259 int fd, filter, nevents;
260
261 switch (ctx->c_mode) {
262 case KQUEUE_READ:
263 case KQUEUE_READ_EOF:
264 fd = ctx->c_pipe[0];
265 filter = EVFILT_READ;
266 break;
267 case KQUEUE_WRITE:
268 case KQUEUE_WRITE_EOF:
269 fd = ctx->c_pipe[1];
270 filter = EVFILT_WRITE;
271 break;
272 }
273
274 EV_SET(&kev, fd, filter, EV_ADD, 0, 0, NULL);
275 nevents = kevent(ctx->c_kq, &kev, 1, NULL, 0, NULL);
276 if (nevents == -1)
277 err(1, "kevent");
278 nevents = kevent(ctx->c_kq, NULL, 0, &kev, 1, NULL);
279 if (nevents == -1)
280 err(1, "kevent");
281 if (nevents != 1)
282 errx(1, "kevent: %d != 1", nevents);
283
284 if ((int)kev.ident != fd)
285 errx(1, "kevent: ident");
286 if (kev.filter != filter)
287 errx(1, "kevent: filter");
288
289 switch (ctx->c_mode) {
290 case KQUEUE_READ_EOF:
291 case KQUEUE_WRITE_EOF:
292 if ((kev.flags & EV_EOF) == 0)
293 errx(1, "kevent: eof");
294 break;
295 default:
296 break;
297 }
298
299 ctx_lock(ctx);
300 ctx->c_alive = 0;
301 ctx_unlock(ctx);
302
303 close(fd);
304
305 return NULL;
306 }
307