1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved. 2 * 3 * Permission is hereby granted, free of charge, to any person obtaining a copy 4 * of this software and associated documentation files (the "Software"), to 5 * deal in the Software without restriction, including without limitation the 6 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or 7 * sell copies of the Software, and to permit persons to whom the Software is 8 * furnished to do so, subject to the following conditions: 9 * 10 * The above copyright notice and this permission notice shall be included in 11 * all copies or substantial portions of the Software. 12 * 13 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 14 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 15 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 16 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 17 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 18 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 19 * IN THE SOFTWARE. 20 */ 21 22 #include "task.h" 23 #include "uv.h" 24 25 #define IPC_PIPE_NAME TEST_PIPENAME 26 #define NUM_CONNECTS (250 * 1000) 27 28 union stream_handle { 29 uv_pipe_t pipe; 30 uv_tcp_t tcp; 31 }; 32 33 /* Use as (uv_stream_t *) &handle_storage -- it's kind of clunky but it 34 * avoids aliasing warnings. 35 */ 36 typedef unsigned char handle_storage_t[sizeof(union stream_handle)]; 37 38 /* Used for passing around the listen handle, not part of the benchmark proper. 39 * We have an overabundance of server types here. It works like this: 40 * 41 * 1. The main thread starts an IPC pipe server. 42 * 2. The worker threads connect to the IPC server and obtain a listen handle. 43 * 3. The worker threads start accepting requests on the listen handle. 44 * 4. The main thread starts connecting repeatedly. 45 * 46 * Step #4 should perhaps be farmed out over several threads. 47 */ 48 struct ipc_server_ctx { 49 handle_storage_t server_handle; 50 unsigned int num_connects; 51 uv_pipe_t ipc_pipe; 52 }; 53 54 struct ipc_peer_ctx { 55 handle_storage_t peer_handle; 56 uv_write_t write_req; 57 }; 58 59 struct ipc_client_ctx { 60 uv_connect_t connect_req; 61 uv_stream_t* server_handle; 62 uv_pipe_t ipc_pipe; 63 char scratch[16]; 64 }; 65 66 /* Used in the actual benchmark. */ 67 struct server_ctx { 68 handle_storage_t server_handle; 69 unsigned int num_connects; 70 uv_async_t async_handle; 71 uv_thread_t thread_id; 72 uv_sem_t semaphore; 73 }; 74 75 struct client_ctx { 76 handle_storage_t client_handle; 77 unsigned int num_connects; 78 uv_connect_t connect_req; 79 uv_idle_t idle_handle; 80 }; 81 82 static void ipc_connection_cb(uv_stream_t* ipc_pipe, int status); 83 static void ipc_write_cb(uv_write_t* req, int status); 84 static void ipc_close_cb(uv_handle_t* handle); 85 static void ipc_connect_cb(uv_connect_t* req, int status); 86 static void ipc_read_cb(uv_stream_t* handle, 87 ssize_t nread, 88 const uv_buf_t* buf); 89 static void ipc_alloc_cb(uv_handle_t* handle, 90 size_t suggested_size, 91 uv_buf_t* buf); 92 93 static void sv_async_cb(uv_async_t* handle); 94 static void sv_connection_cb(uv_stream_t* server_handle, int status); 95 static void sv_read_cb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf); 96 static void sv_alloc_cb(uv_handle_t* handle, 97 size_t suggested_size, 98 uv_buf_t* buf); 99 100 static void cl_connect_cb(uv_connect_t* req, int status); 101 static void cl_idle_cb(uv_idle_t* handle); 102 static void cl_close_cb(uv_handle_t* handle); 103 104 static struct sockaddr_in listen_addr; 105 106 107 static void ipc_connection_cb(uv_stream_t* ipc_pipe, int status) { 108 struct ipc_server_ctx* sc; 109 struct ipc_peer_ctx* pc; 110 uv_loop_t* loop; 111 uv_buf_t buf; 112 113 loop = ipc_pipe->loop; 114 buf = uv_buf_init("PING", 4); 115 sc = container_of(ipc_pipe, struct ipc_server_ctx, ipc_pipe); 116 pc = calloc(1, sizeof(*pc)); 117 ASSERT(pc != NULL); 118 119 if (ipc_pipe->type == UV_TCP) 120 ASSERT(0 == uv_tcp_init(loop, (uv_tcp_t*) &pc->peer_handle)); 121 else if (ipc_pipe->type == UV_NAMED_PIPE) 122 ASSERT(0 == uv_pipe_init(loop, (uv_pipe_t*) &pc->peer_handle, 1)); 123 else 124 ASSERT(0); 125 126 ASSERT(0 == uv_accept(ipc_pipe, (uv_stream_t*) &pc->peer_handle)); 127 ASSERT(0 == uv_write2(&pc->write_req, 128 (uv_stream_t*) &pc->peer_handle, 129 &buf, 130 1, 131 (uv_stream_t*) &sc->server_handle, 132 ipc_write_cb)); 133 134 if (--sc->num_connects == 0) 135 uv_close((uv_handle_t*) ipc_pipe, NULL); 136 } 137 138 139 static void ipc_write_cb(uv_write_t* req, int status) { 140 struct ipc_peer_ctx* ctx; 141 ctx = container_of(req, struct ipc_peer_ctx, write_req); 142 uv_close((uv_handle_t*) &ctx->peer_handle, ipc_close_cb); 143 } 144 145 146 static void ipc_close_cb(uv_handle_t* handle) { 147 struct ipc_peer_ctx* ctx; 148 ctx = container_of(handle, struct ipc_peer_ctx, peer_handle); 149 free(ctx); 150 } 151 152 153 static void ipc_connect_cb(uv_connect_t* req, int status) { 154 struct ipc_client_ctx* ctx; 155 ctx = container_of(req, struct ipc_client_ctx, connect_req); 156 ASSERT(0 == status); 157 ASSERT(0 == uv_read_start((uv_stream_t*) &ctx->ipc_pipe, 158 ipc_alloc_cb, 159 ipc_read_cb)); 160 } 161 162 163 static void ipc_alloc_cb(uv_handle_t* handle, 164 size_t suggested_size, 165 uv_buf_t* buf) { 166 struct ipc_client_ctx* ctx; 167 ctx = container_of(handle, struct ipc_client_ctx, ipc_pipe); 168 buf->base = ctx->scratch; 169 buf->len = sizeof(ctx->scratch); 170 } 171 172 173 static void ipc_read_cb(uv_stream_t* handle, 174 ssize_t nread, 175 const uv_buf_t* buf) { 176 struct ipc_client_ctx* ctx; 177 uv_loop_t* loop; 178 uv_handle_type type; 179 uv_pipe_t* ipc_pipe; 180 181 ipc_pipe = (uv_pipe_t*) handle; 182 ctx = container_of(ipc_pipe, struct ipc_client_ctx, ipc_pipe); 183 loop = ipc_pipe->loop; 184 185 ASSERT(1 == uv_pipe_pending_count(ipc_pipe)); 186 type = uv_pipe_pending_type(ipc_pipe); 187 if (type == UV_TCP) 188 ASSERT(0 == uv_tcp_init(loop, (uv_tcp_t*) ctx->server_handle)); 189 else if (type == UV_NAMED_PIPE) 190 ASSERT(0 == uv_pipe_init(loop, (uv_pipe_t*) ctx->server_handle, 0)); 191 else 192 ASSERT(0); 193 194 ASSERT(0 == uv_accept(handle, ctx->server_handle)); 195 uv_close((uv_handle_t*) &ctx->ipc_pipe, NULL); 196 } 197 198 199 /* Set up an IPC pipe server that hands out listen sockets to the worker 200 * threads. It's kind of cumbersome for such a simple operation, maybe we 201 * should revive uv_import() and uv_export(). 202 */ 203 static void send_listen_handles(uv_handle_type type, 204 unsigned int num_servers, 205 struct server_ctx* servers) { 206 struct ipc_server_ctx ctx; 207 uv_loop_t* loop; 208 unsigned int i; 209 210 loop = uv_default_loop(); 211 ctx.num_connects = num_servers; 212 213 if (type == UV_TCP) { 214 ASSERT(0 == uv_tcp_init(loop, (uv_tcp_t*) &ctx.server_handle)); 215 ASSERT(0 == uv_tcp_bind((uv_tcp_t*) &ctx.server_handle, 216 (const struct sockaddr*) &listen_addr, 217 0)); 218 } 219 else 220 ASSERT(0); 221 /* We need to initialize this pipe with ipc=0 - this is not a uv_pipe we'll 222 * be sending handles over, it's just for listening for new connections. 223 * If we accept a connection then the connected pipe must be initialized 224 * with ipc=1. 225 */ 226 ASSERT(0 == uv_pipe_init(loop, &ctx.ipc_pipe, 0)); 227 ASSERT(0 == uv_pipe_bind(&ctx.ipc_pipe, IPC_PIPE_NAME)); 228 ASSERT(0 == uv_listen((uv_stream_t*) &ctx.ipc_pipe, 128, ipc_connection_cb)); 229 230 for (i = 0; i < num_servers; i++) 231 uv_sem_post(&servers[i].semaphore); 232 233 ASSERT(0 == uv_run(loop, UV_RUN_DEFAULT)); 234 uv_close((uv_handle_t*) &ctx.server_handle, NULL); 235 ASSERT(0 == uv_run(loop, UV_RUN_DEFAULT)); 236 237 for (i = 0; i < num_servers; i++) 238 uv_sem_wait(&servers[i].semaphore); 239 } 240 241 242 static void get_listen_handle(uv_loop_t* loop, uv_stream_t* server_handle) { 243 struct ipc_client_ctx ctx; 244 245 ctx.server_handle = server_handle; 246 ctx.server_handle->data = "server handle"; 247 248 ASSERT(0 == uv_pipe_init(loop, &ctx.ipc_pipe, 1)); 249 uv_pipe_connect(&ctx.connect_req, 250 &ctx.ipc_pipe, 251 IPC_PIPE_NAME, 252 ipc_connect_cb); 253 ASSERT(0 == uv_run(loop, UV_RUN_DEFAULT)); 254 } 255 256 257 static void server_cb(void *arg) { 258 struct server_ctx *ctx; 259 uv_loop_t loop; 260 261 ctx = arg; 262 ASSERT(0 == uv_loop_init(&loop)); 263 264 ASSERT(0 == uv_async_init(&loop, &ctx->async_handle, sv_async_cb)); 265 uv_unref((uv_handle_t*) &ctx->async_handle); 266 267 /* Wait until the main thread is ready. */ 268 uv_sem_wait(&ctx->semaphore); 269 get_listen_handle(&loop, (uv_stream_t*) &ctx->server_handle); 270 uv_sem_post(&ctx->semaphore); 271 272 /* Now start the actual benchmark. */ 273 ASSERT(0 == uv_listen((uv_stream_t*) &ctx->server_handle, 274 128, 275 sv_connection_cb)); 276 ASSERT(0 == uv_run(&loop, UV_RUN_DEFAULT)); 277 278 uv_loop_close(&loop); 279 } 280 281 282 static void sv_async_cb(uv_async_t* handle) { 283 struct server_ctx* ctx; 284 ctx = container_of(handle, struct server_ctx, async_handle); 285 uv_close((uv_handle_t*) &ctx->server_handle, NULL); 286 uv_close((uv_handle_t*) &ctx->async_handle, NULL); 287 } 288 289 290 static void sv_connection_cb(uv_stream_t* server_handle, int status) { 291 handle_storage_t* storage; 292 struct server_ctx* ctx; 293 294 ctx = container_of(server_handle, struct server_ctx, server_handle); 295 ASSERT(status == 0); 296 297 storage = malloc(sizeof(*storage)); 298 ASSERT(storage != NULL); 299 300 if (server_handle->type == UV_TCP) 301 ASSERT(0 == uv_tcp_init(server_handle->loop, (uv_tcp_t*) storage)); 302 else if (server_handle->type == UV_NAMED_PIPE) 303 ASSERT(0 == uv_pipe_init(server_handle->loop, (uv_pipe_t*) storage, 0)); 304 else 305 ASSERT(0); 306 307 ASSERT(0 == uv_accept(server_handle, (uv_stream_t*) storage)); 308 ASSERT(0 == uv_read_start((uv_stream_t*) storage, sv_alloc_cb, sv_read_cb)); 309 ctx->num_connects++; 310 } 311 312 313 static void sv_alloc_cb(uv_handle_t* handle, 314 size_t suggested_size, 315 uv_buf_t* buf) { 316 static char slab[32]; 317 buf->base = slab; 318 buf->len = sizeof(slab); 319 } 320 321 322 static void sv_read_cb(uv_stream_t* handle, 323 ssize_t nread, 324 const uv_buf_t* buf) { 325 ASSERT(nread == UV_EOF); 326 uv_close((uv_handle_t*) handle, (uv_close_cb) free); 327 } 328 329 330 static void cl_connect_cb(uv_connect_t* req, int status) { 331 struct client_ctx* ctx = container_of(req, struct client_ctx, connect_req); 332 uv_idle_start(&ctx->idle_handle, cl_idle_cb); 333 ASSERT(0 == status); 334 } 335 336 337 static void cl_idle_cb(uv_idle_t* handle) { 338 struct client_ctx* ctx = container_of(handle, struct client_ctx, idle_handle); 339 uv_close((uv_handle_t*) &ctx->client_handle, cl_close_cb); 340 uv_idle_stop(&ctx->idle_handle); 341 } 342 343 344 static void cl_close_cb(uv_handle_t* handle) { 345 struct client_ctx* ctx; 346 347 ctx = container_of(handle, struct client_ctx, client_handle); 348 349 if (--ctx->num_connects == 0) { 350 uv_close((uv_handle_t*) &ctx->idle_handle, NULL); 351 return; 352 } 353 354 ASSERT(0 == uv_tcp_init(handle->loop, (uv_tcp_t*) &ctx->client_handle)); 355 ASSERT(0 == uv_tcp_connect(&ctx->connect_req, 356 (uv_tcp_t*) &ctx->client_handle, 357 (const struct sockaddr*) &listen_addr, 358 cl_connect_cb)); 359 } 360 361 362 static int test_tcp(unsigned int num_servers, unsigned int num_clients) { 363 struct server_ctx* servers; 364 struct client_ctx* clients; 365 uv_loop_t* loop; 366 uv_tcp_t* handle; 367 unsigned int i; 368 double time; 369 370 ASSERT(0 == uv_ip4_addr("127.0.0.1", TEST_PORT, &listen_addr)); 371 loop = uv_default_loop(); 372 373 servers = calloc(num_servers, sizeof(servers[0])); 374 clients = calloc(num_clients, sizeof(clients[0])); 375 ASSERT(servers != NULL); 376 ASSERT(clients != NULL); 377 378 /* We're making the assumption here that from the perspective of the 379 * OS scheduler, threads are functionally equivalent to and interchangeable 380 * with full-blown processes. 381 */ 382 for (i = 0; i < num_servers; i++) { 383 struct server_ctx* ctx = servers + i; 384 ASSERT(0 == uv_sem_init(&ctx->semaphore, 0)); 385 ASSERT(0 == uv_thread_create(&ctx->thread_id, server_cb, ctx)); 386 } 387 388 send_listen_handles(UV_TCP, num_servers, servers); 389 390 for (i = 0; i < num_clients; i++) { 391 struct client_ctx* ctx = clients + i; 392 ctx->num_connects = NUM_CONNECTS / num_clients; 393 handle = (uv_tcp_t*) &ctx->client_handle; 394 handle->data = "client handle"; 395 ASSERT(0 == uv_tcp_init(loop, handle)); 396 ASSERT(0 == uv_tcp_connect(&ctx->connect_req, 397 handle, 398 (const struct sockaddr*) &listen_addr, 399 cl_connect_cb)); 400 ASSERT(0 == uv_idle_init(loop, &ctx->idle_handle)); 401 } 402 403 { 404 uint64_t t = uv_hrtime(); 405 ASSERT(0 == uv_run(loop, UV_RUN_DEFAULT)); 406 t = uv_hrtime() - t; 407 time = t / 1e9; 408 } 409 410 for (i = 0; i < num_servers; i++) { 411 struct server_ctx* ctx = servers + i; 412 uv_async_send(&ctx->async_handle); 413 ASSERT(0 == uv_thread_join(&ctx->thread_id)); 414 uv_sem_destroy(&ctx->semaphore); 415 } 416 417 printf("accept%u: %.0f accepts/sec (%u total)\n", 418 num_servers, 419 NUM_CONNECTS / time, 420 NUM_CONNECTS); 421 422 for (i = 0; i < num_servers; i++) { 423 struct server_ctx* ctx = servers + i; 424 printf(" thread #%u: %.0f accepts/sec (%u total, %.1f%%)\n", 425 i, 426 ctx->num_connects / time, 427 ctx->num_connects, 428 ctx->num_connects * 100.0 / NUM_CONNECTS); 429 } 430 431 free(clients); 432 free(servers); 433 434 MAKE_VALGRIND_HAPPY(); 435 return 0; 436 } 437 438 439 BENCHMARK_IMPL(tcp_multi_accept2) { 440 return test_tcp(2, 40); 441 } 442 443 444 BENCHMARK_IMPL(tcp_multi_accept4) { 445 return test_tcp(4, 40); 446 } 447 448 449 BENCHMARK_IMPL(tcp_multi_accept8) { 450 return test_tcp(8, 40); 451 } 452