10e552da7Schristos /* Copyright Joyent, Inc. and other Node contributors. All rights reserved.
20e552da7Schristos * Permission is hereby granted, free of charge, to any person obtaining a copy
30e552da7Schristos * of this software and associated documentation files (the "Software"), to
40e552da7Schristos * deal in the Software without restriction, including without limitation the
50e552da7Schristos * rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
60e552da7Schristos * sell copies of the Software, and to permit persons to whom the Software is
70e552da7Schristos * furnished to do so, subject to the following conditions:
80e552da7Schristos *
90e552da7Schristos * The above copyright notice and this permission notice shall be included in
100e552da7Schristos * all copies or substantial portions of the Software.
110e552da7Schristos *
120e552da7Schristos * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
130e552da7Schristos * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
140e552da7Schristos * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
150e552da7Schristos * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
160e552da7Schristos * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
170e552da7Schristos * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
180e552da7Schristos * IN THE SOFTWARE.
190e552da7Schristos */
200e552da7Schristos
210e552da7Schristos /* This file contains both the uv__async internal infrastructure and the
220e552da7Schristos * user-facing uv_async_t functions.
230e552da7Schristos */
240e552da7Schristos
250e552da7Schristos #include "uv.h"
260e552da7Schristos #include "internal.h"
270e552da7Schristos #include "atomic-ops.h"
280e552da7Schristos
290e552da7Schristos #include <errno.h>
300e552da7Schristos #include <stdio.h> /* snprintf() */
310e552da7Schristos #include <assert.h>
320e552da7Schristos #include <stdlib.h>
330e552da7Schristos #include <string.h>
340e552da7Schristos #include <unistd.h>
350e552da7Schristos #include <sched.h> /* sched_yield() */
360e552da7Schristos
370e552da7Schristos #ifdef __linux__
380e552da7Schristos #include <sys/eventfd.h>
390e552da7Schristos #endif
400e552da7Schristos
410e552da7Schristos static void uv__async_send(uv_loop_t* loop);
420e552da7Schristos static int uv__async_start(uv_loop_t* loop);
430e552da7Schristos
440e552da7Schristos
uv_async_init(uv_loop_t * loop,uv_async_t * handle,uv_async_cb async_cb)450e552da7Schristos int uv_async_init(uv_loop_t* loop, uv_async_t* handle, uv_async_cb async_cb) {
460e552da7Schristos int err;
470e552da7Schristos
480e552da7Schristos err = uv__async_start(loop);
490e552da7Schristos if (err)
500e552da7Schristos return err;
510e552da7Schristos
520e552da7Schristos uv__handle_init(loop, (uv_handle_t*)handle, UV_ASYNC);
530e552da7Schristos handle->async_cb = async_cb;
540e552da7Schristos handle->pending = 0;
550e552da7Schristos
560e552da7Schristos QUEUE_INSERT_TAIL(&loop->async_handles, &handle->queue);
570e552da7Schristos uv__handle_start(handle);
580e552da7Schristos
590e552da7Schristos return 0;
600e552da7Schristos }
610e552da7Schristos
620e552da7Schristos
uv_async_send(uv_async_t * handle)630e552da7Schristos int uv_async_send(uv_async_t* handle) {
640e552da7Schristos /* Do a cheap read first. */
650e552da7Schristos if (ACCESS_ONCE(int, handle->pending) != 0)
660e552da7Schristos return 0;
670e552da7Schristos
680e552da7Schristos /* Tell the other thread we're busy with the handle. */
690e552da7Schristos if (cmpxchgi(&handle->pending, 0, 1) != 0)
700e552da7Schristos return 0;
710e552da7Schristos
720e552da7Schristos /* Wake up the other thread's event loop. */
730e552da7Schristos uv__async_send(handle->loop);
740e552da7Schristos
750e552da7Schristos /* Tell the other thread we're done. */
760e552da7Schristos if (cmpxchgi(&handle->pending, 1, 2) != 1)
770e552da7Schristos abort();
780e552da7Schristos
790e552da7Schristos return 0;
800e552da7Schristos }
810e552da7Schristos
820e552da7Schristos
830e552da7Schristos /* Only call this from the event loop thread. */
uv__async_spin(uv_async_t * handle)840e552da7Schristos static int uv__async_spin(uv_async_t* handle) {
850e552da7Schristos int i;
860e552da7Schristos int rc;
870e552da7Schristos
880e552da7Schristos for (;;) {
890e552da7Schristos /* 997 is not completely chosen at random. It's a prime number, acyclical
900e552da7Schristos * by nature, and should therefore hopefully dampen sympathetic resonance.
910e552da7Schristos */
920e552da7Schristos for (i = 0; i < 997; i++) {
930e552da7Schristos /* rc=0 -- handle is not pending.
940e552da7Schristos * rc=1 -- handle is pending, other thread is still working with it.
950e552da7Schristos * rc=2 -- handle is pending, other thread is done.
960e552da7Schristos */
970e552da7Schristos rc = cmpxchgi(&handle->pending, 2, 0);
980e552da7Schristos
990e552da7Schristos if (rc != 1)
1000e552da7Schristos return rc;
1010e552da7Schristos
1020e552da7Schristos /* Other thread is busy with this handle, spin until it's done. */
1030e552da7Schristos cpu_relax();
1040e552da7Schristos }
1050e552da7Schristos
1060e552da7Schristos /* Yield the CPU. We may have preempted the other thread while it's
1070e552da7Schristos * inside the critical section and if it's running on the same CPU
1080e552da7Schristos * as us, we'll just burn CPU cycles until the end of our time slice.
1090e552da7Schristos */
1100e552da7Schristos sched_yield();
1110e552da7Schristos }
1120e552da7Schristos }
1130e552da7Schristos
1140e552da7Schristos
uv__async_close(uv_async_t * handle)1150e552da7Schristos void uv__async_close(uv_async_t* handle) {
1160e552da7Schristos uv__async_spin(handle);
1170e552da7Schristos QUEUE_REMOVE(&handle->queue);
1180e552da7Schristos uv__handle_stop(handle);
1190e552da7Schristos }
1200e552da7Schristos
1210e552da7Schristos
uv__async_io(uv_loop_t * loop,uv__io_t * w,unsigned int events)1220e552da7Schristos static void uv__async_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
1230e552da7Schristos char buf[1024];
1240e552da7Schristos ssize_t r;
1250e552da7Schristos QUEUE queue;
1260e552da7Schristos QUEUE* q;
1270e552da7Schristos uv_async_t* h;
1280e552da7Schristos
1290e552da7Schristos assert(w == &loop->async_io_watcher);
1300e552da7Schristos
1310e552da7Schristos for (;;) {
1320e552da7Schristos r = read(w->fd, buf, sizeof(buf));
1330e552da7Schristos
1340e552da7Schristos if (r == sizeof(buf))
1350e552da7Schristos continue;
1360e552da7Schristos
1370e552da7Schristos if (r != -1)
1380e552da7Schristos break;
1390e552da7Schristos
1400e552da7Schristos if (errno == EAGAIN || errno == EWOULDBLOCK)
1410e552da7Schristos break;
1420e552da7Schristos
1430e552da7Schristos if (errno == EINTR)
1440e552da7Schristos continue;
1450e552da7Schristos
1460e552da7Schristos abort();
1470e552da7Schristos }
1480e552da7Schristos
1490e552da7Schristos QUEUE_MOVE(&loop->async_handles, &queue);
1500e552da7Schristos while (!QUEUE_EMPTY(&queue)) {
1510e552da7Schristos q = QUEUE_HEAD(&queue);
1520e552da7Schristos h = QUEUE_DATA(q, uv_async_t, queue);
1530e552da7Schristos
1540e552da7Schristos QUEUE_REMOVE(q);
1550e552da7Schristos QUEUE_INSERT_TAIL(&loop->async_handles, q);
1560e552da7Schristos
1570e552da7Schristos if (0 == uv__async_spin(h))
1580e552da7Schristos continue; /* Not pending. */
1590e552da7Schristos
1600e552da7Schristos if (h->async_cb == NULL)
1610e552da7Schristos continue;
1620e552da7Schristos
1630e552da7Schristos h->async_cb(h);
1640e552da7Schristos }
1650e552da7Schristos }
1660e552da7Schristos
1670e552da7Schristos
uv__async_send(uv_loop_t * loop)1680e552da7Schristos static void uv__async_send(uv_loop_t* loop) {
1690e552da7Schristos const void* buf;
1700e552da7Schristos ssize_t len;
1710e552da7Schristos int fd;
1720e552da7Schristos int r;
1730e552da7Schristos
1740e552da7Schristos buf = "";
1750e552da7Schristos len = 1;
1760e552da7Schristos fd = loop->async_wfd;
1770e552da7Schristos
1780e552da7Schristos #if defined(__linux__)
1790e552da7Schristos if (fd == -1) {
1800e552da7Schristos static const uint64_t val = 1;
1810e552da7Schristos buf = &val;
1820e552da7Schristos len = sizeof(val);
1830e552da7Schristos fd = loop->async_io_watcher.fd; /* eventfd */
1840e552da7Schristos }
1850e552da7Schristos #endif
1860e552da7Schristos
1870e552da7Schristos do
1880e552da7Schristos r = write(fd, buf, len);
1890e552da7Schristos while (r == -1 && errno == EINTR);
1900e552da7Schristos
1910e552da7Schristos if (r == len)
1920e552da7Schristos return;
1930e552da7Schristos
1940e552da7Schristos if (r == -1)
1950e552da7Schristos if (errno == EAGAIN || errno == EWOULDBLOCK)
1960e552da7Schristos return;
1970e552da7Schristos
1980e552da7Schristos abort();
1990e552da7Schristos }
2000e552da7Schristos
2010e552da7Schristos
uv__async_start(uv_loop_t * loop)2020e552da7Schristos static int uv__async_start(uv_loop_t* loop) {
2030e552da7Schristos int pipefd[2];
2040e552da7Schristos int err;
2050e552da7Schristos
2060e552da7Schristos if (loop->async_io_watcher.fd != -1)
2070e552da7Schristos return 0;
2080e552da7Schristos
2090e552da7Schristos #ifdef __linux__
2100e552da7Schristos err = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
2110e552da7Schristos if (err < 0)
2120e552da7Schristos return UV__ERR(errno);
2130e552da7Schristos
2140e552da7Schristos pipefd[0] = err;
2150e552da7Schristos pipefd[1] = -1;
2160e552da7Schristos #else
217*5f2f4271Schristos err = uv__make_pipe(pipefd, UV_NONBLOCK_PIPE);
2180e552da7Schristos if (err < 0)
2190e552da7Schristos return err;
2200e552da7Schristos #endif
2210e552da7Schristos
2220e552da7Schristos uv__io_init(&loop->async_io_watcher, uv__async_io, pipefd[0]);
2230e552da7Schristos uv__io_start(loop, &loop->async_io_watcher, POLLIN);
2240e552da7Schristos loop->async_wfd = pipefd[1];
2250e552da7Schristos
2260e552da7Schristos return 0;
2270e552da7Schristos }
2280e552da7Schristos
2290e552da7Schristos
uv__async_fork(uv_loop_t * loop)2300e552da7Schristos int uv__async_fork(uv_loop_t* loop) {
2310e552da7Schristos if (loop->async_io_watcher.fd == -1) /* never started */
2320e552da7Schristos return 0;
2330e552da7Schristos
2340e552da7Schristos uv__async_stop(loop);
2350e552da7Schristos
2360e552da7Schristos return uv__async_start(loop);
2370e552da7Schristos }
2380e552da7Schristos
2390e552da7Schristos
uv__async_stop(uv_loop_t * loop)2400e552da7Schristos void uv__async_stop(uv_loop_t* loop) {
2410e552da7Schristos if (loop->async_io_watcher.fd == -1)
2420e552da7Schristos return;
2430e552da7Schristos
2440e552da7Schristos if (loop->async_wfd != -1) {
2450e552da7Schristos if (loop->async_wfd != loop->async_io_watcher.fd)
2460e552da7Schristos uv__close(loop->async_wfd);
2470e552da7Schristos loop->async_wfd = -1;
2480e552da7Schristos }
2490e552da7Schristos
2500e552da7Schristos uv__io_stop(loop, &loop->async_io_watcher, POLLIN);
2510e552da7Schristos uv__close(loop->async_io_watcher.fd);
2520e552da7Schristos loop->async_io_watcher.fd = -1;
2530e552da7Schristos }
254