1 /* $OpenBSD: unsendrecvthr.c,v 1.2 2023/07/09 09:33:30 bluhm Exp $ */
2
3 /*
4 * Copyright (c) 2021 Vitaliy Makkoveev <mvs@openbsd.org>
5 *
6 * Permission to use, copy, modify, and distribute this software for any
7 * purpose with or without fee is hereby granted, provided that the above
8 * copyright notice and this permission notice appear in all copies.
9 *
10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18
19 /*
20 * Create the pair of SOCK_SEQPACKET sockets and perform #count_of_cpus
21 * simultaneous writes on each of them. In half of transmissions the
22 * sockets will be re-locked in kernel space. Be sure no data corruption
23 * or loss.
24 */
25
26 #include <sys/types.h>
27 #include <sys/select.h>
28 #include <sys/socket.h>
29 #include <sys/sysctl.h>
30 #include <sys/un.h>
31 #include <stdarg.h>
32 #include <stdio.h>
33 #include <stdlib.h>
34 #include <err.h>
35 #include <pthread.h>
36 #include <string.h>
37 #include <time.h>
38 #include <unistd.h>
39
40 static pthread_mutex_t therr_mtx = PTHREAD_MUTEX_INITIALIZER;
41
42 static void
therr(int eval,const char * fmt,...)43 therr(int eval, const char *fmt, ...)
44 {
45 va_list ap;
46
47 pthread_mutex_lock(&therr_mtx);
48
49 va_start(ap, fmt);
50 verr(eval, fmt, ap);
51 va_end(ap);
52 }
53
54 static void
therrx(int eval,const char * fmt,...)55 therrx(int eval, const char *fmt, ...)
56 {
57 va_list ap;
58
59 pthread_mutex_lock(&therr_mtx);
60
61 va_start(ap, fmt);
62 verrx(eval, fmt, ap);
63 va_end(ap);
64 }
65
66 static void
therrc(int eval,int code,const char * fmt,...)67 therrc(int eval, int code, const char *fmt, ...)
68 {
69 va_list ap;
70
71 pthread_mutex_lock(&therr_mtx);
72
73 va_start(ap, fmt);
74 verrc(eval, code, fmt, ap);
75 va_end(ap);
76 }
77
78 struct data {
79 int id;
80 unsigned int cnt;
81 };
82
83 struct thr_tx_arg {
84 int s;
85 int id;
86 };
87
88 struct rx_data {
89 unsigned int cnt;
90 };
91
92 struct thr_rx_arg {
93 int s;
94 int rx_data_num;
95 struct rx_data *rx_data;
96 };
97
98 static void *
thr_tx(void * arg)99 thr_tx(void *arg)
100 {
101 struct data data;
102 int s = ((struct thr_tx_arg *)arg)->s;
103
104 data.id = ((struct thr_tx_arg *)arg)->id;
105 data.cnt = 1;
106
107 while (1) {
108 ssize_t ret;
109
110 if ((ret = send(s, &data, sizeof(data), 0)) < 0)
111 therr(1, "send");
112 if (ret != sizeof(data))
113 therrx(1, "send: wrong data size");
114
115 data.cnt++;
116 }
117
118 return NULL;
119 }
120
121 static void *
thr_rx(void * arg)122 thr_rx(void *arg)
123 {
124 int s = ((struct thr_rx_arg *)arg)->s;
125 int rx_data_num = ((struct thr_rx_arg *)arg)->rx_data_num;
126 struct rx_data *rx_data = ((struct thr_rx_arg *)arg)->rx_data;
127
128 while (1) {
129 struct data data;
130 ssize_t ret;
131
132 if ((ret = recv(s, &data, sizeof(data), 0)) < 0)
133 therr(1, "recv");
134 if (ret != sizeof(data))
135 therrx(1, "recv: wrong data size");
136
137 if (data.id >= rx_data_num)
138 therrx(1, "recv: wrong id");
139
140 if (data.cnt != (unsigned int)(rx_data[data.id].cnt + 1)) {
141 therrx(1, "recv: data loss %d -> %d",
142 rx_data[data.id].cnt, data.cnt);
143 }
144 rx_data[data.id].cnt = data.cnt;
145 }
146
147 return NULL;
148 }
149
150 int
main(int argc,char * argv[])151 main(int argc, char *argv[])
152 {
153 struct timespec testtime = {
154 .tv_sec = 60,
155 .tv_nsec = 0,
156 };
157
158 int mib[2], ncpu;
159 size_t len;
160
161 struct rx_data *rx_data[2];
162 struct thr_rx_arg rx_args[2];
163 struct thr_tx_arg *tx_args[2];
164
165 int s[2], i, j;
166
167 if (argc == 2 && !strcmp(argv[1], "--infinite"))
168 testtime.tv_sec = (10 * 365 * 86400);
169
170 mib[0] = CTL_HW;
171 mib[1] = HW_NCPUONLINE;
172 len = sizeof(ncpu);
173
174 if (sysctl(mib, 2, &ncpu, &len, NULL, 0) < 0)
175 err(1, "sysctl");
176 if (ncpu <= 0)
177 errx(1, "Wrong number of CPUs online: %d", ncpu);
178
179 if (socketpair(AF_UNIX, SOCK_SEQPACKET, 0, s) < 0)
180 err(1, "socketpair");
181
182 for (i = 0; i < 2; ++i) {
183 if (!(rx_data[i] = calloc(ncpu, sizeof(struct rx_data))))
184 err(1, "calloc");
185
186 for (j = 0; j < ncpu; ++j)
187 rx_data[i][j].cnt = 0;
188 }
189
190 for (i = 0; i < 2; ++i) {
191 rx_args[i].s = s[i];
192 rx_args[i].rx_data_num = ncpu;
193 rx_args[i].rx_data = rx_data[i];
194 }
195
196 for (i = 0; i < 2; ++i) {
197 if (!(tx_args[i] = calloc(ncpu, sizeof(struct thr_tx_arg))))
198 err(1, "calloc");
199
200 for (j = 0; j < ncpu; ++j) {
201 tx_args[i][j].s = s[i];
202 tx_args[i][j].id = j;
203 }
204 }
205
206 for (i = 0; i < 2; ++i) {
207 pthread_t thr;
208 int error;
209
210 error = pthread_create(&thr, NULL, thr_rx, &rx_args[i]);
211 if (error)
212 therrc(1, error, "pthread_create");
213 }
214
215 for (i = 0; i < 2; ++i) {
216 pthread_t thr;
217 int error;
218
219 for (j = 0; j < ncpu; ++j) {
220 error = pthread_create(&thr, NULL,
221 thr_tx, &tx_args[i][j]);
222 if (error)
223 therrc(1, error, "pthread_create");
224 }
225 }
226
227 nanosleep(&testtime, NULL);
228
229 return 0;
230 }
231