xref: /netbsd-src/external/mit/libuv/dist/src/unix/async.c (revision 5f2f42719cd62ff11fd913b40b7ce19f07c4fd25)
10e552da7Schristos /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
20e552da7Schristos  * Permission is hereby granted, free of charge, to any person obtaining a copy
30e552da7Schristos  * of this software and associated documentation files (the "Software"), to
40e552da7Schristos  * deal in the Software without restriction, including without limitation the
50e552da7Schristos  * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
60e552da7Schristos  * sell copies of the Software, and to permit persons to whom the Software is
70e552da7Schristos  * furnished to do so, subject to the following conditions:
80e552da7Schristos  *
90e552da7Schristos  * The above copyright notice and this permission notice shall be included in
100e552da7Schristos  * all copies or substantial portions of the Software.
110e552da7Schristos  *
120e552da7Schristos  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
130e552da7Schristos  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
140e552da7Schristos  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
150e552da7Schristos  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
160e552da7Schristos  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
170e552da7Schristos  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
180e552da7Schristos  * IN THE SOFTWARE.
190e552da7Schristos  */
200e552da7Schristos 
210e552da7Schristos /* This file contains both the uv__async internal infrastructure and the
220e552da7Schristos  * user-facing uv_async_t functions.
230e552da7Schristos  */
240e552da7Schristos 
250e552da7Schristos #include "uv.h"
260e552da7Schristos #include "internal.h"
270e552da7Schristos #include "atomic-ops.h"
280e552da7Schristos 
290e552da7Schristos #include <errno.h>
300e552da7Schristos #include <stdio.h>  /* snprintf() */
310e552da7Schristos #include <assert.h>
320e552da7Schristos #include <stdlib.h>
330e552da7Schristos #include <string.h>
340e552da7Schristos #include <unistd.h>
350e552da7Schristos #include <sched.h>  /* sched_yield() */
360e552da7Schristos 
370e552da7Schristos #ifdef __linux__
380e552da7Schristos #include <sys/eventfd.h>
390e552da7Schristos #endif
400e552da7Schristos 
410e552da7Schristos static void uv__async_send(uv_loop_t* loop);
420e552da7Schristos static int uv__async_start(uv_loop_t* loop);
430e552da7Schristos 
440e552da7Schristos 
uv_async_init(uv_loop_t * loop,uv_async_t * handle,uv_async_cb async_cb)450e552da7Schristos int uv_async_init(uv_loop_t* loop, uv_async_t* handle, uv_async_cb async_cb) {
460e552da7Schristos   int err;
470e552da7Schristos 
480e552da7Schristos   err = uv__async_start(loop);
490e552da7Schristos   if (err)
500e552da7Schristos     return err;
510e552da7Schristos 
520e552da7Schristos   uv__handle_init(loop, (uv_handle_t*)handle, UV_ASYNC);
530e552da7Schristos   handle->async_cb = async_cb;
540e552da7Schristos   handle->pending = 0;
550e552da7Schristos 
560e552da7Schristos   QUEUE_INSERT_TAIL(&loop->async_handles, &handle->queue);
570e552da7Schristos   uv__handle_start(handle);
580e552da7Schristos 
590e552da7Schristos   return 0;
600e552da7Schristos }
610e552da7Schristos 
620e552da7Schristos 
uv_async_send(uv_async_t * handle)630e552da7Schristos int uv_async_send(uv_async_t* handle) {
640e552da7Schristos   /* Do a cheap read first. */
650e552da7Schristos   if (ACCESS_ONCE(int, handle->pending) != 0)
660e552da7Schristos     return 0;
670e552da7Schristos 
680e552da7Schristos   /* Tell the other thread we're busy with the handle. */
690e552da7Schristos   if (cmpxchgi(&handle->pending, 0, 1) != 0)
700e552da7Schristos     return 0;
710e552da7Schristos 
720e552da7Schristos   /* Wake up the other thread's event loop. */
730e552da7Schristos   uv__async_send(handle->loop);
740e552da7Schristos 
750e552da7Schristos   /* Tell the other thread we're done. */
760e552da7Schristos   if (cmpxchgi(&handle->pending, 1, 2) != 1)
770e552da7Schristos     abort();
780e552da7Schristos 
790e552da7Schristos   return 0;
800e552da7Schristos }
810e552da7Schristos 
820e552da7Schristos 
830e552da7Schristos /* Only call this from the event loop thread. */
uv__async_spin(uv_async_t * handle)840e552da7Schristos static int uv__async_spin(uv_async_t* handle) {
850e552da7Schristos   int i;
860e552da7Schristos   int rc;
870e552da7Schristos 
880e552da7Schristos   for (;;) {
890e552da7Schristos     /* 997 is not completely chosen at random. It's a prime number, acyclical
900e552da7Schristos      * by nature, and should therefore hopefully dampen sympathetic resonance.
910e552da7Schristos      */
920e552da7Schristos     for (i = 0; i < 997; i++) {
930e552da7Schristos       /* rc=0 -- handle is not pending.
940e552da7Schristos        * rc=1 -- handle is pending, other thread is still working with it.
950e552da7Schristos        * rc=2 -- handle is pending, other thread is done.
960e552da7Schristos        */
970e552da7Schristos       rc = cmpxchgi(&handle->pending, 2, 0);
980e552da7Schristos 
990e552da7Schristos       if (rc != 1)
1000e552da7Schristos         return rc;
1010e552da7Schristos 
1020e552da7Schristos       /* Other thread is busy with this handle, spin until it's done. */
1030e552da7Schristos       cpu_relax();
1040e552da7Schristos     }
1050e552da7Schristos 
1060e552da7Schristos     /* Yield the CPU. We may have preempted the other thread while it's
1070e552da7Schristos      * inside the critical section and if it's running on the same CPU
1080e552da7Schristos      * as us, we'll just burn CPU cycles until the end of our time slice.
1090e552da7Schristos      */
1100e552da7Schristos     sched_yield();
1110e552da7Schristos   }
1120e552da7Schristos }
1130e552da7Schristos 
1140e552da7Schristos 
uv__async_close(uv_async_t * handle)1150e552da7Schristos void uv__async_close(uv_async_t* handle) {
1160e552da7Schristos   uv__async_spin(handle);
1170e552da7Schristos   QUEUE_REMOVE(&handle->queue);
1180e552da7Schristos   uv__handle_stop(handle);
1190e552da7Schristos }
1200e552da7Schristos 
1210e552da7Schristos 
uv__async_io(uv_loop_t * loop,uv__io_t * w,unsigned int events)1220e552da7Schristos static void uv__async_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
1230e552da7Schristos   char buf[1024];
1240e552da7Schristos   ssize_t r;
1250e552da7Schristos   QUEUE queue;
1260e552da7Schristos   QUEUE* q;
1270e552da7Schristos   uv_async_t* h;
1280e552da7Schristos 
1290e552da7Schristos   assert(w == &loop->async_io_watcher);
1300e552da7Schristos 
1310e552da7Schristos   for (;;) {
1320e552da7Schristos     r = read(w->fd, buf, sizeof(buf));
1330e552da7Schristos 
1340e552da7Schristos     if (r == sizeof(buf))
1350e552da7Schristos       continue;
1360e552da7Schristos 
1370e552da7Schristos     if (r != -1)
1380e552da7Schristos       break;
1390e552da7Schristos 
1400e552da7Schristos     if (errno == EAGAIN || errno == EWOULDBLOCK)
1410e552da7Schristos       break;
1420e552da7Schristos 
1430e552da7Schristos     if (errno == EINTR)
1440e552da7Schristos       continue;
1450e552da7Schristos 
1460e552da7Schristos     abort();
1470e552da7Schristos   }
1480e552da7Schristos 
1490e552da7Schristos   QUEUE_MOVE(&loop->async_handles, &queue);
1500e552da7Schristos   while (!QUEUE_EMPTY(&queue)) {
1510e552da7Schristos     q = QUEUE_HEAD(&queue);
1520e552da7Schristos     h = QUEUE_DATA(q, uv_async_t, queue);
1530e552da7Schristos 
1540e552da7Schristos     QUEUE_REMOVE(q);
1550e552da7Schristos     QUEUE_INSERT_TAIL(&loop->async_handles, q);
1560e552da7Schristos 
1570e552da7Schristos     if (0 == uv__async_spin(h))
1580e552da7Schristos       continue;  /* Not pending. */
1590e552da7Schristos 
1600e552da7Schristos     if (h->async_cb == NULL)
1610e552da7Schristos       continue;
1620e552da7Schristos 
1630e552da7Schristos     h->async_cb(h);
1640e552da7Schristos   }
1650e552da7Schristos }
1660e552da7Schristos 
1670e552da7Schristos 
uv__async_send(uv_loop_t * loop)1680e552da7Schristos static void uv__async_send(uv_loop_t* loop) {
1690e552da7Schristos   const void* buf;
1700e552da7Schristos   ssize_t len;
1710e552da7Schristos   int fd;
1720e552da7Schristos   int r;
1730e552da7Schristos 
1740e552da7Schristos   buf = "";
1750e552da7Schristos   len = 1;
1760e552da7Schristos   fd = loop->async_wfd;
1770e552da7Schristos 
1780e552da7Schristos #if defined(__linux__)
1790e552da7Schristos   if (fd == -1) {
1800e552da7Schristos     static const uint64_t val = 1;
1810e552da7Schristos     buf = &val;
1820e552da7Schristos     len = sizeof(val);
1830e552da7Schristos     fd = loop->async_io_watcher.fd;  /* eventfd */
1840e552da7Schristos   }
1850e552da7Schristos #endif
1860e552da7Schristos 
1870e552da7Schristos   do
1880e552da7Schristos     r = write(fd, buf, len);
1890e552da7Schristos   while (r == -1 && errno == EINTR);
1900e552da7Schristos 
1910e552da7Schristos   if (r == len)
1920e552da7Schristos     return;
1930e552da7Schristos 
1940e552da7Schristos   if (r == -1)
1950e552da7Schristos     if (errno == EAGAIN || errno == EWOULDBLOCK)
1960e552da7Schristos       return;
1970e552da7Schristos 
1980e552da7Schristos   abort();
1990e552da7Schristos }
2000e552da7Schristos 
2010e552da7Schristos 
uv__async_start(uv_loop_t * loop)2020e552da7Schristos static int uv__async_start(uv_loop_t* loop) {
2030e552da7Schristos   int pipefd[2];
2040e552da7Schristos   int err;
2050e552da7Schristos 
2060e552da7Schristos   if (loop->async_io_watcher.fd != -1)
2070e552da7Schristos     return 0;
2080e552da7Schristos 
2090e552da7Schristos #ifdef __linux__
2100e552da7Schristos   err = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
2110e552da7Schristos   if (err < 0)
2120e552da7Schristos     return UV__ERR(errno);
2130e552da7Schristos 
2140e552da7Schristos   pipefd[0] = err;
2150e552da7Schristos   pipefd[1] = -1;
2160e552da7Schristos #else
217*5f2f4271Schristos   err = uv__make_pipe(pipefd, UV_NONBLOCK_PIPE);
2180e552da7Schristos   if (err < 0)
2190e552da7Schristos     return err;
2200e552da7Schristos #endif
2210e552da7Schristos 
2220e552da7Schristos   uv__io_init(&loop->async_io_watcher, uv__async_io, pipefd[0]);
2230e552da7Schristos   uv__io_start(loop, &loop->async_io_watcher, POLLIN);
2240e552da7Schristos   loop->async_wfd = pipefd[1];
2250e552da7Schristos 
2260e552da7Schristos   return 0;
2270e552da7Schristos }
2280e552da7Schristos 
2290e552da7Schristos 
uv__async_fork(uv_loop_t * loop)2300e552da7Schristos int uv__async_fork(uv_loop_t* loop) {
2310e552da7Schristos   if (loop->async_io_watcher.fd == -1) /* never started */
2320e552da7Schristos     return 0;
2330e552da7Schristos 
2340e552da7Schristos   uv__async_stop(loop);
2350e552da7Schristos 
2360e552da7Schristos   return uv__async_start(loop);
2370e552da7Schristos }
2380e552da7Schristos 
2390e552da7Schristos 
uv__async_stop(uv_loop_t * loop)2400e552da7Schristos void uv__async_stop(uv_loop_t* loop) {
2410e552da7Schristos   if (loop->async_io_watcher.fd == -1)
2420e552da7Schristos     return;
2430e552da7Schristos 
2440e552da7Schristos   if (loop->async_wfd != -1) {
2450e552da7Schristos     if (loop->async_wfd != loop->async_io_watcher.fd)
2460e552da7Schristos       uv__close(loop->async_wfd);
2470e552da7Schristos     loop->async_wfd = -1;
2480e552da7Schristos   }
2490e552da7Schristos 
2500e552da7Schristos   uv__io_stop(loop, &loop->async_io_watcher, POLLIN);
2510e552da7Schristos   uv__close(loop->async_io_watcher.fd);
2520e552da7Schristos   loop->async_io_watcher.fd = -1;
2530e552da7Schristos }
254