xref: /netbsd-src/external/mpl/bind/dist/lib/isc/async.c (revision bcda20f65a8566e103791ec395f7f499ef322704)
1 /*	$NetBSD: async.c,v 1.2 2025/01/26 16:25:36 christos Exp $	*/
2 
3 /*
4  * Copyright (C) Internet Systems Consortium, Inc. ("ISC")
5  *
6  * SPDX-License-Identifier: MPL-2.0
7  *
8  * This Source Code Form is subject to the terms of the Mozilla Public
9  * License, v. 2.0. If a copy of the MPL was not distributed with this
10  * file, you can obtain one at https://mozilla.org/MPL/2.0/.
11  *
12  * See the COPYRIGHT file distributed with this work for additional
13  * information regarding copyright ownership.
14  */
15 
16 #include <stdlib.h>
17 #include <sys/types.h>
18 #include <unistd.h>
19 
20 #include <isc/async.h>
21 #include <isc/atomic.h>
22 #include <isc/barrier.h>
23 #include <isc/condition.h>
24 #include <isc/job.h>
25 #include <isc/loop.h>
26 #include <isc/magic.h>
27 #include <isc/mem.h>
28 #include <isc/mutex.h>
29 #include <isc/refcount.h>
30 #include <isc/result.h>
31 #include <isc/signal.h>
32 #include <isc/strerr.h>
33 #include <isc/thread.h>
34 #include <isc/util.h>
35 #include <isc/uv.h>
36 #include <isc/work.h>
37 
38 #include "async_p.h"
39 #include "job_p.h"
40 #include "loop_p.h"
41 
42 void
43 isc_async_run(isc_loop_t *loop, isc_job_cb cb, void *cbarg) {
44 	REQUIRE(VALID_LOOP(loop));
45 	REQUIRE(cb != NULL);
46 
47 	isc_job_t *job = isc_mem_get(loop->mctx, sizeof(*job));
48 	*job = (isc_job_t){
49 		.cb = cb,
50 		.cbarg = cbarg,
51 	};
52 
53 	cds_wfcq_node_init(&job->wfcq_node);
54 
55 	/*
56 	 * cds_wfcq_enqueue() is non-blocking and enqueues the job to async
57 	 * queue.
58 	 *
59 	 * The function returns 'false' in case the queue was empty - in such
60 	 * case we need to trigger the async callback.
61 	 */
62 	if (!cds_wfcq_enqueue(&loop->async_jobs.head, &loop->async_jobs.tail,
63 			      &job->wfcq_node))
64 	{
65 		int r = uv_async_send(&loop->async_trigger);
66 		UV_RUNTIME_CHECK(uv_async_send, r);
67 	}
68 }
69 
70 void
71 isc__async_cb(uv_async_t *handle) {
72 	isc_loop_t *loop = uv_handle_get_data(handle);
73 	isc_jobqueue_t jobs;
74 
75 	REQUIRE(VALID_LOOP(loop));
76 
77 	/* Initialize local wfcqueue */
78 	__cds_wfcq_init(&jobs.head, &jobs.tail);
79 
80 	/*
81 	 * Move all the elements from loop->async_jobs to a local jobs queue.
82 	 *
83 	 * __cds_wfcq_splice_blocking() assumes that synchronization is
84 	 * done externally - there's no internal locking, unlike
85 	 * cds_wfcq_splice_blocking(), and we do not need to check whether
86 	 * it needs to block, unlike __cds_wfcq_splice_nonblocking().
87 	 *
88 	 * The reason we can use __cds_wfcq_splice_blocking() is that the
89 	 * only other function we use is cds_wfcq_enqueue() which doesn't
90 	 * require any synchronization (see the table in urcu/wfcqueue.h
91 	 * for more details).
92 	 */
93 	enum cds_wfcq_ret ret = __cds_wfcq_splice_blocking(
94 		&jobs.head, &jobs.tail, &loop->async_jobs.head,
95 		&loop->async_jobs.tail);
96 	INSIST(ret != CDS_WFCQ_RET_WOULDBLOCK);
97 	if (ret == CDS_WFCQ_RET_SRC_EMPTY) {
98 		/*
99 		 * Nothing to do, the source queue was empty - most
100 		 * probably we were called from isc__async_close() below.
101 		 */
102 		return;
103 	}
104 
105 	/*
106 	 * Walk through the local queue which has now all the members copied
107 	 * locally, and call the callbacks and free all the isc_job_t(s).
108 	 */
109 	struct cds_wfcq_node *node, *next;
110 	__cds_wfcq_for_each_blocking_safe(&jobs.head, &jobs.tail, node, next) {
111 		isc_job_t *job = caa_container_of(node, isc_job_t, wfcq_node);
112 
113 		job->cb(job->cbarg);
114 
115 		isc_mem_put(loop->mctx, job, sizeof(*job));
116 	}
117 }
118 
119 void
120 isc__async_close(uv_handle_t *handle) {
121 	isc_loop_t *loop = uv_handle_get_data(handle);
122 
123 	isc__async_cb(&loop->async_trigger);
124 }
125