1 /* $OpenBSD: unsendrecvthr.c,v 1.1 2021/12/09 23:37:18 mvs Exp $ */ 2 3 /* 4 * Copyright (c) 2021 Vitaliy Makkoveev <mvs@openbsd.org> 5 * 6 * Permission to use, copy, modify, and distribute this software for any 7 * purpose with or without fee is hereby granted, provided that the above 8 * copyright notice and this permission notice appear in all copies. 9 * 10 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 11 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 12 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 13 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 14 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 15 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 16 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 17 */ 18 19 /* 20 * Create the pair of SOCK_SEQPACKET sockets and perform #count_of_cpus 21 * simultaneous writes on each of them. In half of transmissions the 22 * sockets will be re-locked in kernel space. Be sure no data corruption 23 * or loss. 24 */ 25 26 #include <sys/types.h> 27 #include <sys/select.h> 28 #include <sys/socket.h> 29 #include <sys/sysctl.h> 30 #include <sys/un.h> 31 #include <stdarg.h> 32 #include <stdio.h> 33 #include <stdlib.h> 34 #include <err.h> 35 #include <pthread.h> 36 #include <string.h> 37 #include <time.h> 38 #include <unistd.h> 39 40 static pthread_mutex_t therr_mtx = PTHREAD_MUTEX_INITIALIZER; 41 42 static void 43 therr(int eval, const char *fmt, ...) 44 { 45 va_list ap; 46 47 pthread_mutex_lock(&therr_mtx); 48 49 va_start(ap, fmt); 50 verr(eval, fmt, ap); 51 va_end(ap); 52 } 53 54 static void 55 therrx(int eval, const char *fmt, ...) 56 { 57 va_list ap; 58 59 pthread_mutex_lock(&therr_mtx); 60 61 va_start(ap, fmt); 62 verrx(eval, fmt, ap); 63 va_end(ap); 64 } 65 66 static void 67 therrc(int eval, int code, const char *fmt, ...) 68 { 69 va_list ap; 70 71 pthread_mutex_lock(&therr_mtx); 72 73 va_start(ap, fmt); 74 verrc(eval, code, fmt, ap); 75 va_end(ap); 76 } 77 78 struct data { 79 int id; 80 unsigned int cnt; 81 }; 82 83 struct thr_tx_arg { 84 int s; 85 int id; 86 }; 87 88 struct rx_data { 89 unsigned int cnt; 90 }; 91 92 struct thr_rx_arg { 93 int s; 94 int rx_data_num; 95 struct rx_data *rx_data; 96 }; 97 98 static void * 99 thr_tx(void *arg) 100 { 101 struct data data; 102 int s = ((struct thr_tx_arg *)arg)->s; 103 104 data.id = ((struct thr_tx_arg *)arg)->id; 105 data.cnt = 1; 106 107 while (1) { 108 ssize_t ret; 109 110 if ((ret = send(s, &data, sizeof(data), 0)) < 0) 111 therr(1, "send"); 112 if (ret != sizeof(data)) 113 therrx(1, "send: wrong data size"); 114 115 data.cnt++; 116 } 117 118 return NULL; 119 } 120 121 static void * 122 thr_rx(void *arg) 123 { 124 int s = ((struct thr_rx_arg *)arg)->s; 125 int rx_data_num = ((struct thr_rx_arg *)arg)->rx_data_num; 126 struct rx_data *rx_data = ((struct thr_rx_arg *)arg)->rx_data; 127 128 while (1) { 129 struct data data; 130 ssize_t ret; 131 132 if ((ret = recv(s, &data, sizeof(data), 0)) < 0) 133 therr(1, "recv"); 134 if (ret != sizeof(data)) 135 therrx(1, "recv: wrong data size"); 136 137 if (data.id >= rx_data_num) 138 therrx(1, "recv: wrong id"); 139 140 if (data.cnt != (unsigned int)(rx_data[data.id].cnt + 1)) { 141 therrx(1, "recv: data loss %d -> %d", 142 rx_data[data.id].cnt, data.cnt); 143 } 144 rx_data[data.id].cnt = data.cnt; 145 } 146 147 return NULL; 148 } 149 150 int 151 main(int argc, char *argv[]) 152 { 153 struct timespec testtime = { 154 .tv_sec = 60, 155 .tv_nsec = 0, 156 }; 157 158 int mib[2], ncpu; 159 size_t len; 160 161 struct rx_data *rx_data[2]; 162 struct thr_rx_arg rx_args[2]; 163 struct thr_tx_arg *tx_args[2]; 164 165 int s[2], i, j; 166 167 if (argc == 2 && !strcmp(argv[1], "--infinite")) 168 testtime.tv_sec = (10 * 365 * 86400); 169 170 mib[0] = CTL_HW; 171 mib[1] = HW_NCPUONLINE; 172 len = sizeof(ncpu); 173 174 if (sysctl(mib, 2, &ncpu, &len, NULL, 0) < 0) 175 err(1, "sysctl"); 176 if (ncpu <= 0) 177 errx(1, "Wrong number of CPUs online: %d", ncpu); 178 179 if (socketpair(AF_UNIX, SOCK_SEQPACKET, 0, s) < 0) 180 err(1, "socketpair"); 181 182 for (i = 0; i < 2; ++i) { 183 if (!(rx_data[i] = calloc(ncpu, sizeof(*rx_data)))) 184 err(1, "calloc"); 185 186 for (j = 0; j < ncpu; ++j) 187 rx_data[i][j].cnt = 0; 188 } 189 190 for (i = 0; i < 2; ++i) { 191 rx_args[i].s = s[i]; 192 rx_args[i].rx_data_num = ncpu; 193 rx_args[i].rx_data = rx_data[i]; 194 } 195 196 for (i = 0; i < 2; ++i) { 197 if (!(tx_args[i] = calloc(ncpu, sizeof(*tx_args)))) 198 err(1, "calloc"); 199 200 for (j = 0; j < ncpu; ++j) { 201 tx_args[i][j].s = s[i]; 202 tx_args[i][j].id = j; 203 } 204 } 205 206 for (i = 0; i < 2; ++i) { 207 pthread_t thr; 208 int error; 209 210 error = pthread_create(&thr, NULL, thr_rx, &rx_args[i]); 211 if (error) 212 therrc(1, error, "pthread_create"); 213 } 214 215 for (i = 0; i < 2; ++i) { 216 pthread_t thr; 217 int error; 218 219 for (j = 0; j < ncpu; ++j) { 220 error = pthread_create(&thr, NULL, 221 thr_tx, &tx_args[i][j]); 222 if (error) 223 therrc(1, error, "pthread_create"); 224 } 225 } 226 227 nanosleep(&testtime, NULL); 228 229 return 0; 230 } 231