1 /* Copyright Joyent, Inc. and other Node contributors. All rights reserved. 2 * 3 * Permission is hereby granted, free of charge, to any person obtaining a copy 4 * of this software and associated documentation files (the "Software"), to 5 * deal in the Software without restriction, including without limitation the 6 * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or 7 * sell copies of the Software, and to permit persons to whom the Software is 8 * furnished to do so, subject to the following conditions: 9 * 10 * The above copyright notice and this permission notice shall be included in 11 * all copies or substantial portions of the Software. 12 * 13 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 14 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 15 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 16 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 17 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 18 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS 19 * IN THE SOFTWARE. 20 */ 21 22 #include "uv-common.h" 23 24 #if !defined(_WIN32) 25 # include "unix/internal.h" 26 #endif 27 28 #include <stdlib.h> 29 30 #define MAX_THREADPOOL_SIZE 1024 31 32 static uv_once_t once = UV_ONCE_INIT; 33 static uv_cond_t cond; 34 static uv_mutex_t mutex; 35 static unsigned int idle_threads; 36 static unsigned int slow_io_work_running; 37 static unsigned int nthreads; 38 static uv_thread_t* threads; 39 static uv_thread_t default_threads[4]; 40 static QUEUE exit_message; 41 static QUEUE wq; 42 static QUEUE run_slow_work_message; 43 static QUEUE slow_io_pending_wq; 44 45 static unsigned int slow_work_thread_threshold(void) { 46 return (nthreads + 1) / 2; 47 } 48 49 static void uv__cancelled(struct uv__work* w) { 50 abort(); 51 } 52 53 54 /* To avoid deadlock with uv_cancel() it's crucial that the worker 55 * never holds the global mutex and the loop-local mutex at the same time. 56 */ 57 static void worker(void* arg) { 58 struct uv__work* w; 59 QUEUE* q; 60 int is_slow_work; 61 62 uv_sem_post((uv_sem_t*) arg); 63 arg = NULL; 64 65 uv_mutex_lock(&mutex); 66 for (;;) { 67 /* `mutex` should always be locked at this point. */ 68 69 /* Keep waiting while either no work is present or only slow I/O 70 and we're at the threshold for that. */ 71 while (QUEUE_EMPTY(&wq) || 72 (QUEUE_HEAD(&wq) == &run_slow_work_message && 73 QUEUE_NEXT(&run_slow_work_message) == &wq && 74 slow_io_work_running >= slow_work_thread_threshold())) { 75 idle_threads += 1; 76 uv_cond_wait(&cond, &mutex); 77 idle_threads -= 1; 78 } 79 80 q = QUEUE_HEAD(&wq); 81 if (q == &exit_message) { 82 uv_cond_signal(&cond); 83 uv_mutex_unlock(&mutex); 84 break; 85 } 86 87 QUEUE_REMOVE(q); 88 QUEUE_INIT(q); /* Signal uv_cancel() that the work req is executing. */ 89 90 is_slow_work = 0; 91 if (q == &run_slow_work_message) { 92 /* If we're at the slow I/O threshold, re-schedule until after all 93 other work in the queue is done. */ 94 if (slow_io_work_running >= slow_work_thread_threshold()) { 95 QUEUE_INSERT_TAIL(&wq, q); 96 continue; 97 } 98 99 /* If we encountered a request to run slow I/O work but there is none 100 to run, that means it's cancelled => Start over. */ 101 if (QUEUE_EMPTY(&slow_io_pending_wq)) 102 continue; 103 104 is_slow_work = 1; 105 slow_io_work_running++; 106 107 q = QUEUE_HEAD(&slow_io_pending_wq); 108 QUEUE_REMOVE(q); 109 QUEUE_INIT(q); 110 111 /* If there is more slow I/O work, schedule it to be run as well. */ 112 if (!QUEUE_EMPTY(&slow_io_pending_wq)) { 113 QUEUE_INSERT_TAIL(&wq, &run_slow_work_message); 114 if (idle_threads > 0) 115 uv_cond_signal(&cond); 116 } 117 } 118 119 uv_mutex_unlock(&mutex); 120 121 w = QUEUE_DATA(q, struct uv__work, wq); 122 w->work(w); 123 124 uv_mutex_lock(&w->loop->wq_mutex); 125 w->work = NULL; /* Signal uv_cancel() that the work req is done 126 executing. */ 127 QUEUE_INSERT_TAIL(&w->loop->wq, &w->wq); 128 uv_async_send(&w->loop->wq_async); 129 uv_mutex_unlock(&w->loop->wq_mutex); 130 131 /* Lock `mutex` since that is expected at the start of the next 132 * iteration. */ 133 uv_mutex_lock(&mutex); 134 if (is_slow_work) { 135 /* `slow_io_work_running` is protected by `mutex`. */ 136 slow_io_work_running--; 137 } 138 } 139 } 140 141 142 static void post(QUEUE* q, enum uv__work_kind kind) { 143 uv_mutex_lock(&mutex); 144 if (kind == UV__WORK_SLOW_IO) { 145 /* Insert into a separate queue. */ 146 QUEUE_INSERT_TAIL(&slow_io_pending_wq, q); 147 if (!QUEUE_EMPTY(&run_slow_work_message)) { 148 /* Running slow I/O tasks is already scheduled => Nothing to do here. 149 The worker that runs said other task will schedule this one as well. */ 150 uv_mutex_unlock(&mutex); 151 return; 152 } 153 q = &run_slow_work_message; 154 } 155 156 QUEUE_INSERT_TAIL(&wq, q); 157 if (idle_threads > 0) 158 uv_cond_signal(&cond); 159 uv_mutex_unlock(&mutex); 160 } 161 162 163 #ifdef __MVS__ 164 /* TODO(itodorov) - zos: revisit when Woz compiler is available. */ 165 __attribute__((destructor)) 166 #endif 167 void uv__threadpool_cleanup(void) { 168 unsigned int i; 169 170 if (nthreads == 0) 171 return; 172 173 #ifndef __MVS__ 174 /* TODO(gabylb) - zos: revisit when Woz compiler is available. */ 175 post(&exit_message, UV__WORK_CPU); 176 #endif 177 178 for (i = 0; i < nthreads; i++) 179 if (uv_thread_join(threads + i)) 180 abort(); 181 182 if (threads != default_threads) 183 uv__free(threads); 184 185 uv_mutex_destroy(&mutex); 186 uv_cond_destroy(&cond); 187 188 threads = NULL; 189 nthreads = 0; 190 } 191 192 193 static void init_threads(void) { 194 unsigned int i; 195 const char* val; 196 uv_sem_t sem; 197 198 nthreads = ARRAY_SIZE(default_threads); 199 val = getenv("UV_THREADPOOL_SIZE"); 200 if (val != NULL) 201 nthreads = atoi(val); 202 if (nthreads == 0) 203 nthreads = 1; 204 if (nthreads > MAX_THREADPOOL_SIZE) 205 nthreads = MAX_THREADPOOL_SIZE; 206 207 threads = default_threads; 208 if (nthreads > ARRAY_SIZE(default_threads)) { 209 threads = uv__malloc(nthreads * sizeof(threads[0])); 210 if (threads == NULL) { 211 nthreads = ARRAY_SIZE(default_threads); 212 threads = default_threads; 213 } 214 } 215 216 if (uv_cond_init(&cond)) 217 abort(); 218 219 if (uv_mutex_init(&mutex)) 220 abort(); 221 222 QUEUE_INIT(&wq); 223 QUEUE_INIT(&slow_io_pending_wq); 224 QUEUE_INIT(&run_slow_work_message); 225 226 if (uv_sem_init(&sem, 0)) 227 abort(); 228 229 for (i = 0; i < nthreads; i++) 230 if (uv_thread_create(threads + i, worker, &sem)) 231 abort(); 232 233 for (i = 0; i < nthreads; i++) 234 uv_sem_wait(&sem); 235 236 uv_sem_destroy(&sem); 237 } 238 239 240 #ifndef _WIN32 241 static void reset_once(void) { 242 uv_once_t child_once = UV_ONCE_INIT; 243 memcpy(&once, &child_once, sizeof(child_once)); 244 } 245 #endif 246 247 248 static void init_once(void) { 249 #ifndef _WIN32 250 /* Re-initialize the threadpool after fork. 251 * Note that this discards the global mutex and condition as well 252 * as the work queue. 253 */ 254 if (pthread_atfork(NULL, NULL, &reset_once)) 255 abort(); 256 #endif 257 init_threads(); 258 } 259 260 261 void uv__work_submit(uv_loop_t* loop, 262 struct uv__work* w, 263 enum uv__work_kind kind, 264 void (*work)(struct uv__work* w), 265 void (*done)(struct uv__work* w, int status)) { 266 uv_once(&once, init_once); 267 w->loop = loop; 268 w->work = work; 269 w->done = done; 270 post(&w->wq, kind); 271 } 272 273 274 static int uv__work_cancel(uv_loop_t* loop, uv_req_t* req, struct uv__work* w) { 275 int cancelled; 276 277 uv_mutex_lock(&mutex); 278 uv_mutex_lock(&w->loop->wq_mutex); 279 280 cancelled = !QUEUE_EMPTY(&w->wq) && w->work != NULL; 281 if (cancelled) 282 QUEUE_REMOVE(&w->wq); 283 284 uv_mutex_unlock(&w->loop->wq_mutex); 285 uv_mutex_unlock(&mutex); 286 287 if (!cancelled) 288 return UV_EBUSY; 289 290 w->work = uv__cancelled; 291 uv_mutex_lock(&loop->wq_mutex); 292 QUEUE_INSERT_TAIL(&loop->wq, &w->wq); 293 uv_async_send(&loop->wq_async); 294 uv_mutex_unlock(&loop->wq_mutex); 295 296 return 0; 297 } 298 299 300 void uv__work_done(uv_async_t* handle) { 301 struct uv__work* w; 302 uv_loop_t* loop; 303 QUEUE* q; 304 QUEUE wq; 305 int err; 306 307 loop = container_of(handle, uv_loop_t, wq_async); 308 uv_mutex_lock(&loop->wq_mutex); 309 QUEUE_MOVE(&loop->wq, &wq); 310 uv_mutex_unlock(&loop->wq_mutex); 311 312 while (!QUEUE_EMPTY(&wq)) { 313 q = QUEUE_HEAD(&wq); 314 QUEUE_REMOVE(q); 315 316 w = container_of(q, struct uv__work, wq); 317 err = (w->work == uv__cancelled) ? UV_ECANCELED : 0; 318 w->done(w, err); 319 } 320 } 321 322 323 static void uv__queue_work(struct uv__work* w) { 324 uv_work_t* req = container_of(w, uv_work_t, work_req); 325 326 req->work_cb(req); 327 } 328 329 330 static void uv__queue_done(struct uv__work* w, int err) { 331 uv_work_t* req; 332 333 req = container_of(w, uv_work_t, work_req); 334 uv__req_unregister(req->loop, req); 335 336 if (req->after_work_cb == NULL) 337 return; 338 339 req->after_work_cb(req, err); 340 } 341 342 343 int uv_queue_work(uv_loop_t* loop, 344 uv_work_t* req, 345 uv_work_cb work_cb, 346 uv_after_work_cb after_work_cb) { 347 if (work_cb == NULL) 348 return UV_EINVAL; 349 350 uv__req_init(loop, req, UV_WORK); 351 req->loop = loop; 352 req->work_cb = work_cb; 353 req->after_work_cb = after_work_cb; 354 uv__work_submit(loop, 355 &req->work_req, 356 UV__WORK_CPU, 357 uv__queue_work, 358 uv__queue_done); 359 return 0; 360 } 361 362 363 int uv_cancel(uv_req_t* req) { 364 struct uv__work* wreq; 365 uv_loop_t* loop; 366 367 switch (req->type) { 368 case UV_FS: 369 loop = ((uv_fs_t*) req)->loop; 370 wreq = &((uv_fs_t*) req)->work_req; 371 break; 372 case UV_GETADDRINFO: 373 loop = ((uv_getaddrinfo_t*) req)->loop; 374 wreq = &((uv_getaddrinfo_t*) req)->work_req; 375 break; 376 case UV_GETNAMEINFO: 377 loop = ((uv_getnameinfo_t*) req)->loop; 378 wreq = &((uv_getnameinfo_t*) req)->work_req; 379 break; 380 case UV_RANDOM: 381 loop = ((uv_random_t*) req)->loop; 382 wreq = &((uv_random_t*) req)->work_req; 383 break; 384 case UV_WORK: 385 loop = ((uv_work_t*) req)->loop; 386 wreq = &((uv_work_t*) req)->work_req; 387 break; 388 default: 389 return UV_EINVAL; 390 } 391 392 return uv__work_cancel(loop, req, wreq); 393 } 394