xref: /openbsd-src/regress/sys/kern/unixsockets/unsendrecvthr.c (revision 63cd6ed3906b495e90f1710ea58c012bc61429dc)
1 /* $OpenBSD: unsendrecvthr.c,v 1.2 2023/07/09 09:33:30 bluhm Exp $ */
2 
3 /*
4  * Copyright (c) 2021 Vitaliy Makkoveev <mvs@openbsd.org>
5  *
6  * Permission to use, copy, modify, and distribute this software for any
7  * purpose with or without fee is hereby granted, provided that the above
8  * copyright notice and this permission notice appear in all copies.
9  *
10  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17  */
18 
19 /*
20  * Create the pair of SOCK_SEQPACKET sockets and perform #count_of_cpus
21  * simultaneous writes on each of them. In half of transmissions the
22  * sockets will be re-locked in kernel space. Be sure no data corruption
23  * or loss.
24  */
25 
26 #include <sys/types.h>
27 #include <sys/select.h>
28 #include <sys/socket.h>
29 #include <sys/sysctl.h>
30 #include <sys/un.h>
31 #include <stdarg.h>
32 #include <stdio.h>
33 #include <stdlib.h>
34 #include <err.h>
35 #include <pthread.h>
36 #include <string.h>
37 #include <time.h>
38 #include <unistd.h>
39 
40 static pthread_mutex_t therr_mtx = PTHREAD_MUTEX_INITIALIZER;
41 
42 static void
therr(int eval,const char * fmt,...)43 therr(int eval, const char *fmt, ...)
44 {
45 	va_list ap;
46 
47 	pthread_mutex_lock(&therr_mtx);
48 
49 	va_start(ap, fmt);
50 	verr(eval, fmt, ap);
51 	va_end(ap);
52 }
53 
54 static void
therrx(int eval,const char * fmt,...)55 therrx(int eval, const char *fmt, ...)
56 {
57 	va_list ap;
58 
59 	pthread_mutex_lock(&therr_mtx);
60 
61 	va_start(ap, fmt);
62 	verrx(eval, fmt, ap);
63 	va_end(ap);
64 }
65 
66 static void
therrc(int eval,int code,const char * fmt,...)67 therrc(int eval, int code, const char *fmt, ...)
68 {
69 	va_list ap;
70 
71 	pthread_mutex_lock(&therr_mtx);
72 
73 	va_start(ap, fmt);
74 	verrc(eval, code, fmt, ap);
75 	va_end(ap);
76 }
77 
78 struct data {
79 	int id;
80 	unsigned int cnt;
81 };
82 
83 struct thr_tx_arg {
84 	int s;
85 	int id;
86 };
87 
88 struct rx_data {
89 	unsigned int cnt;
90 };
91 
92 struct thr_rx_arg {
93 	int s;
94 	int rx_data_num;
95 	struct rx_data *rx_data;
96 };
97 
98 static void *
thr_tx(void * arg)99 thr_tx(void *arg)
100 {
101 	struct data data;
102 	int s = ((struct thr_tx_arg *)arg)->s;
103 
104 	data.id = ((struct thr_tx_arg *)arg)->id;
105 	data.cnt = 1;
106 
107 	while (1) {
108 		ssize_t ret;
109 
110 		if ((ret = send(s, &data, sizeof(data), 0)) < 0)
111 			therr(1, "send");
112 		if (ret != sizeof(data))
113 			therrx(1, "send: wrong data size");
114 
115 		data.cnt++;
116 	}
117 
118 	return NULL;
119 }
120 
121 static void *
thr_rx(void * arg)122 thr_rx(void *arg)
123 {
124 	int s = ((struct thr_rx_arg *)arg)->s;
125 	int rx_data_num = ((struct thr_rx_arg *)arg)->rx_data_num;
126 	struct rx_data *rx_data = ((struct thr_rx_arg *)arg)->rx_data;
127 
128 	while (1) {
129 		struct data data;
130 		ssize_t ret;
131 
132 		if ((ret = recv(s, &data, sizeof(data), 0)) < 0)
133 			therr(1, "recv");
134 		if (ret != sizeof(data))
135 			therrx(1, "recv: wrong data size");
136 
137 		if (data.id >= rx_data_num)
138 			therrx(1, "recv: wrong id");
139 
140 		if (data.cnt != (unsigned int)(rx_data[data.id].cnt + 1)) {
141 			therrx(1, "recv: data loss %d -> %d",
142 			    rx_data[data.id].cnt, data.cnt);
143 		}
144 		rx_data[data.id].cnt = data.cnt;
145 	}
146 
147 	return NULL;
148 }
149 
150 int
main(int argc,char * argv[])151 main(int argc, char *argv[])
152 {
153 	struct timespec testtime = {
154 		.tv_sec = 60,
155 		.tv_nsec = 0,
156 	};
157 
158 	int mib[2], ncpu;
159 	size_t len;
160 
161 	struct rx_data *rx_data[2];
162 	struct thr_rx_arg rx_args[2];
163 	struct thr_tx_arg *tx_args[2];
164 
165 	int s[2], i, j;
166 
167 	if (argc == 2 && !strcmp(argv[1], "--infinite"))
168 		testtime.tv_sec = (10 * 365 * 86400);
169 
170 	mib[0] = CTL_HW;
171 	mib[1] = HW_NCPUONLINE;
172 	len = sizeof(ncpu);
173 
174 	if (sysctl(mib, 2, &ncpu, &len, NULL, 0) < 0)
175 		err(1, "sysctl");
176 	if (ncpu <= 0)
177 		errx(1, "Wrong number of CPUs online: %d", ncpu);
178 
179 	if (socketpair(AF_UNIX, SOCK_SEQPACKET, 0, s) < 0)
180 		err(1, "socketpair");
181 
182 	for (i = 0; i < 2; ++i) {
183 		if (!(rx_data[i] = calloc(ncpu, sizeof(struct rx_data))))
184 			err(1, "calloc");
185 
186 		for (j = 0; j < ncpu; ++j)
187 			rx_data[i][j].cnt = 0;
188 	}
189 
190 	for (i = 0; i < 2; ++i) {
191 		rx_args[i].s = s[i];
192 		rx_args[i].rx_data_num = ncpu;
193 		rx_args[i].rx_data = rx_data[i];
194 	}
195 
196 	for (i = 0; i < 2; ++i) {
197 		if (!(tx_args[i] = calloc(ncpu, sizeof(struct thr_tx_arg))))
198 			err(1, "calloc");
199 
200 		for (j = 0; j < ncpu; ++j) {
201 			tx_args[i][j].s = s[i];
202 			tx_args[i][j].id = j;
203 		}
204 	}
205 
206 	for (i = 0; i < 2; ++i) {
207 		pthread_t thr;
208 		int error;
209 
210 		error = pthread_create(&thr, NULL, thr_rx, &rx_args[i]);
211 		if (error)
212 			therrc(1, error, "pthread_create");
213 	}
214 
215 	for (i = 0; i < 2; ++i) {
216 		pthread_t thr;
217 		int error;
218 
219 		for (j = 0; j < ncpu; ++j) {
220 			error = pthread_create(&thr, NULL,
221 			    thr_tx, &tx_args[i][j]);
222 			if (error)
223 				therrc(1, error, "pthread_create");
224 		}
225 	}
226 
227 	nanosleep(&testtime, NULL);
228 
229 	return 0;
230 }
231