xref: /openbsd-src/regress/sys/kern/pipe/test-kqueue.c (revision dcab91281821c69b4f0f4be3f497e3427a39a2b9)
1 /*	$OpenBSD: test-kqueue.c,v 1.5 2023/10/10 18:18:05 anton Exp $	*/
2 
3 /*
4  * Copyright (c) 2019 Anton Lindqvist <anton@openbsd.org>
5  *
6  * Permission to use, copy, modify, and distribute this software for any
7  * purpose with or without fee is hereby granted, provided that the above
8  * copyright notice and this permission notice appear in all copies.
9  *
10  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17  */
18 
19 #include <sys/types.h>
20 #include <sys/event.h>
21 #include <sys/time.h>
22 
23 #include <err.h>
24 #include <errno.h>
25 #include <fcntl.h>
26 #include <pthread.h>
27 #include <stdlib.h>
28 #include <unistd.h>
29 
30 #include "pipe.h"
31 
32 enum kqueue_mode {
33 	KQUEUE_READ,
34 	KQUEUE_READ_EOF,
35 	KQUEUE_WRITE,
36 	KQUEUE_WRITE_EOF,
37 };
38 
39 struct context {
40 	enum kqueue_mode c_mode;
41 	int c_alive;
42 
43 	int c_pipe[2];
44 	int c_kq;
45 
46 	char *c_buf;
47 	size_t c_bufsiz;
48 
49 	pthread_t c_th;
50 	pthread_mutex_t c_mtx;
51 };
52 
53 static void ctx_setup(struct context *, enum kqueue_mode, int);
54 static void ctx_teardown(struct context *);
55 static int ctx_thread_alive(struct context *);
56 static void ctx_thread_start(struct context *);
57 static void ctx_lock(struct context *);
58 static void ctx_unlock(struct context *);
59 
60 static void *kqueue_thread(void *);
61 
62 /*
63  * Verify kqueue read event.
64  */
65 int
66 test_kqueue_read(void)
67 {
68 	struct context ctx;
69 
70 	ctx_setup(&ctx, KQUEUE_READ, O_NONBLOCK);
71 	ctx_thread_start(&ctx);
72 
73 	while (ctx_thread_alive(&ctx)) {
74 		ssize_t n;
75 
76 		n = write(ctx.c_pipe[1], &ctx.c_buf[0], 1);
77 		if (n == -1) {
78 			if (errno == EAGAIN)
79 				continue;
80 			err(1, "write");
81 		}
82 		if (n != 1)
83 			errx(1, "write: %ld != 1", n);
84 	}
85 
86 	ctx_teardown(&ctx);
87 
88 	return 0;
89 }
90 
91 /*
92  * Verify kqueue read EOF event.
93  */
94 int
95 test_kqueue_read_eof(void)
96 {
97 	struct context ctx;
98 
99 	ctx_setup(&ctx, KQUEUE_READ_EOF, 0);
100 	ctx_thread_start(&ctx);
101 
102 	while (ctx_thread_alive(&ctx)) {
103 		if (ctx.c_pipe[1] == -1)
104 			continue;
105 
106 		close(ctx.c_pipe[1]);
107 		ctx.c_pipe[1] = -1;
108 	}
109 
110 	ctx_teardown(&ctx);
111 
112 	return 0;
113 }
114 
115 /*
116  * Verify kqueue write event.
117  */
118 int
119 test_kqueue_write(void)
120 {
121 	struct context ctx;
122 	ssize_t n;
123 
124 	ctx_setup(&ctx, KQUEUE_WRITE, 0);
125 
126 	n = write(ctx.c_pipe[1], ctx.c_buf, ctx.c_bufsiz);
127 	if (n == -1)
128 		err(1, "write");
129 	if ((size_t)n != ctx.c_bufsiz)
130 		errx(1, "write: %ld != %zu", n, ctx.c_bufsiz);
131 
132 	ctx_thread_start(&ctx);
133 
134 	while (ctx_thread_alive(&ctx)) {
135 		unsigned char c;
136 
137 		n = read(ctx.c_pipe[0], &c, 1);
138 		if (n == -1)
139 			err(1, "read");
140 		if (n == 0)
141 			break;
142 		if (n != 1)
143 			errx(1, "read: %ld != 1", n);
144 	}
145 
146 	ctx_teardown(&ctx);
147 
148 	return 0;
149 }
150 
151 /*
152  * XXX Verify kqueue write event.
153  */
154 int
155 test_kqueue_write_eof(void)
156 {
157 
158 	return 0;
159 }
160 
161 static void
162 ctx_setup(struct context *ctx, enum kqueue_mode mode, int flags)
163 {
164 	int error;
165 
166 	ctx->c_mode = mode;
167 	ctx->c_alive = 1;
168 
169 	if (flags) {
170 		if (pipe2(ctx->c_pipe, flags) == -1)
171 			err(1, "pipe");
172 	} else {
173 		if (pipe(ctx->c_pipe) == -1)
174 			err(1, "pipe");
175 	}
176 
177 	ctx->c_kq = kqueue();
178 	if (ctx->c_kq == -1)
179 		err(1, "kqueue");
180 
181 	ctx->c_bufsiz = PIPE_SIZE;
182 	ctx->c_buf = malloc(ctx->c_bufsiz);
183 	if (ctx->c_buf == NULL)
184 		err(1, NULL);
185 
186 	error = pthread_mutex_init(&ctx->c_mtx, NULL);
187 	if (error)
188 		errc(1, error, "pthread_mutex_init");
189 }
190 
191 static void
192 ctx_teardown(struct context *ctx)
193 {
194 	int error;
195 
196 	error = pthread_join(ctx->c_th, NULL);
197 	if (error)
198 		errc(1, error, "pthread_join");
199 
200 	error = pthread_mutex_destroy(&ctx->c_mtx);
201 	if (error)
202 		errc(1, error, "pthread_mutex_destroy");
203 
204 	free(ctx->c_buf);
205 
206 	close(ctx->c_pipe[0]);
207 	close(ctx->c_pipe[1]);
208 	close(ctx->c_kq);
209 }
210 
211 static int
212 ctx_thread_alive(struct context *ctx)
213 {
214 	int alive;
215 
216 	ctx_lock(ctx);
217 	alive = ctx->c_alive;
218 	ctx_unlock(ctx);
219 	return alive;
220 }
221 
222 static void
223 ctx_thread_start(struct context *ctx)
224 {
225 	int error;
226 
227 	error = pthread_create(&ctx->c_th, NULL, kqueue_thread, ctx);
228 	if (error)
229 		errc(1, error, "pthread_create");
230 }
231 
232 static void
233 ctx_lock(struct context *ctx)
234 {
235 	int error;
236 
237 	error = pthread_mutex_lock(&ctx->c_mtx);
238 	if (error)
239 		errc(1, error, "pthread_mutex_lock");
240 }
241 
242 static void
243 ctx_unlock(struct context *ctx)
244 {
245 	int error;
246 
247 	error = pthread_mutex_unlock(&ctx->c_mtx);
248 	if (error)
249 		errc(1, error, "pthread_mutex_unlock");
250 }
251 
252 static void *
253 kqueue_thread(void *arg)
254 {
255 	struct context *ctx = arg;
256 	struct kevent kev;
257 	int fd, filter, nevents;
258 
259 	switch (ctx->c_mode) {
260 	case KQUEUE_READ:
261 	case KQUEUE_READ_EOF:
262 		fd = ctx->c_pipe[0];
263 		filter = EVFILT_READ;
264 		break;
265 	case KQUEUE_WRITE:
266 	case KQUEUE_WRITE_EOF:
267 		fd = ctx->c_pipe[1];
268 		filter = EVFILT_WRITE;
269 		break;
270 	}
271 
272 	EV_SET(&kev, fd, filter, EV_ADD, 0, 0, NULL);
273 	nevents = kevent(ctx->c_kq, &kev, 1, NULL, 0, NULL);
274 	if (nevents == -1)
275 		err(1, "kevent");
276 	nevents = kevent(ctx->c_kq, NULL, 0, &kev, 1, NULL);
277 	if (nevents == -1)
278 		err(1, "kevent");
279 	if (nevents != 1)
280 		errx(1, "kevent: %d != 1", nevents);
281 
282 	if ((int)kev.ident != fd)
283 		errx(1, "kevent: ident");
284 	if (kev.filter != filter)
285 		errx(1, "kevent: filter");
286 
287 	switch (ctx->c_mode) {
288 	case KQUEUE_READ_EOF:
289 	case KQUEUE_WRITE_EOF:
290 		if ((kev.flags & EV_EOF) == 0)
291 			errx(1, "kevent: eof");
292 		break;
293 	default:
294 		break;
295 	}
296 
297 	ctx_lock(ctx);
298 	ctx->c_alive = 0;
299 	ctx_unlock(ctx);
300 
301 	close(fd);
302 
303 	return NULL;
304 }
305