1 /* $NetBSD: async.c,v 1.2 2025/01/26 16:25:36 christos Exp $ */ 2 3 /* 4 * Copyright (C) Internet Systems Consortium, Inc. ("ISC") 5 * 6 * SPDX-License-Identifier: MPL-2.0 7 * 8 * This Source Code Form is subject to the terms of the Mozilla Public 9 * License, v. 2.0. If a copy of the MPL was not distributed with this 10 * file, you can obtain one at https://mozilla.org/MPL/2.0/. 11 * 12 * See the COPYRIGHT file distributed with this work for additional 13 * information regarding copyright ownership. 14 */ 15 16 #include <stdlib.h> 17 #include <sys/types.h> 18 #include <unistd.h> 19 20 #include <isc/async.h> 21 #include <isc/atomic.h> 22 #include <isc/barrier.h> 23 #include <isc/condition.h> 24 #include <isc/job.h> 25 #include <isc/loop.h> 26 #include <isc/magic.h> 27 #include <isc/mem.h> 28 #include <isc/mutex.h> 29 #include <isc/refcount.h> 30 #include <isc/result.h> 31 #include <isc/signal.h> 32 #include <isc/strerr.h> 33 #include <isc/thread.h> 34 #include <isc/util.h> 35 #include <isc/uv.h> 36 #include <isc/work.h> 37 38 #include "async_p.h" 39 #include "job_p.h" 40 #include "loop_p.h" 41 42 void 43 isc_async_run(isc_loop_t *loop, isc_job_cb cb, void *cbarg) { 44 REQUIRE(VALID_LOOP(loop)); 45 REQUIRE(cb != NULL); 46 47 isc_job_t *job = isc_mem_get(loop->mctx, sizeof(*job)); 48 *job = (isc_job_t){ 49 .cb = cb, 50 .cbarg = cbarg, 51 }; 52 53 cds_wfcq_node_init(&job->wfcq_node); 54 55 /* 56 * cds_wfcq_enqueue() is non-blocking and enqueues the job to async 57 * queue. 58 * 59 * The function returns 'false' in case the queue was empty - in such 60 * case we need to trigger the async callback. 61 */ 62 if (!cds_wfcq_enqueue(&loop->async_jobs.head, &loop->async_jobs.tail, 63 &job->wfcq_node)) 64 { 65 int r = uv_async_send(&loop->async_trigger); 66 UV_RUNTIME_CHECK(uv_async_send, r); 67 } 68 } 69 70 void 71 isc__async_cb(uv_async_t *handle) { 72 isc_loop_t *loop = uv_handle_get_data(handle); 73 isc_jobqueue_t jobs; 74 75 REQUIRE(VALID_LOOP(loop)); 76 77 /* Initialize local wfcqueue */ 78 __cds_wfcq_init(&jobs.head, &jobs.tail); 79 80 /* 81 * Move all the elements from loop->async_jobs to a local jobs queue. 82 * 83 * __cds_wfcq_splice_blocking() assumes that synchronization is 84 * done externally - there's no internal locking, unlike 85 * cds_wfcq_splice_blocking(), and we do not need to check whether 86 * it needs to block, unlike __cds_wfcq_splice_nonblocking(). 87 * 88 * The reason we can use __cds_wfcq_splice_blocking() is that the 89 * only other function we use is cds_wfcq_enqueue() which doesn't 90 * require any synchronization (see the table in urcu/wfcqueue.h 91 * for more details). 92 */ 93 enum cds_wfcq_ret ret = __cds_wfcq_splice_blocking( 94 &jobs.head, &jobs.tail, &loop->async_jobs.head, 95 &loop->async_jobs.tail); 96 INSIST(ret != CDS_WFCQ_RET_WOULDBLOCK); 97 if (ret == CDS_WFCQ_RET_SRC_EMPTY) { 98 /* 99 * Nothing to do, the source queue was empty - most 100 * probably we were called from isc__async_close() below. 101 */ 102 return; 103 } 104 105 /* 106 * Walk through the local queue which has now all the members copied 107 * locally, and call the callbacks and free all the isc_job_t(s). 108 */ 109 struct cds_wfcq_node *node, *next; 110 __cds_wfcq_for_each_blocking_safe(&jobs.head, &jobs.tail, node, next) { 111 isc_job_t *job = caa_container_of(node, isc_job_t, wfcq_node); 112 113 job->cb(job->cbarg); 114 115 isc_mem_put(loop->mctx, job, sizeof(*job)); 116 } 117 } 118 119 void 120 isc__async_close(uv_handle_t *handle) { 121 isc_loop_t *loop = uv_handle_get_data(handle); 122 123 isc__async_cb(&loop->async_trigger); 124 } 125