xref: /spdk/test/common/lib/ut_multithread.c (revision c10f8e160e42a2a642e8a593b60c2f84561d5eba)
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright (c) Intel Corporation.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 #include "spdk_cunit.h"
35 #include "spdk/thread.h"
36 #include "spdk_internal/mock.h"
37 
38 static uint32_t g_ut_num_threads;
39 static uint64_t g_current_time_in_us = 0;
40 
41 int allocate_threads(int num_threads);
42 void free_threads(void);
43 void poll_threads(void);
44 int poll_thread(uintptr_t thread_id);
45 void increment_time(uint64_t time_in_us);
46 void reset_time(void);
47 
48 struct ut_msg {
49 	spdk_thread_fn		fn;
50 	void			*ctx;
51 	TAILQ_ENTRY(ut_msg)	link;
52 };
53 
54 struct ut_thread {
55 	struct spdk_thread	*thread;
56 	struct spdk_io_channel	*ch;
57 	TAILQ_HEAD(, ut_msg)	msgs;
58 	TAILQ_HEAD(, ut_poller)	pollers;
59 };
60 
61 struct ut_thread *g_ut_threads;
62 
63 struct ut_poller {
64 	spdk_poller_fn		fn;
65 	void			*arg;
66 	TAILQ_ENTRY(ut_poller)	tailq;
67 	uint64_t		period_us;
68 	uint64_t		next_expiration_in_us;
69 };
70 
71 static void
72 __send_msg(spdk_thread_fn fn, void *ctx, void *thread_ctx)
73 {
74 	struct ut_thread *thread = thread_ctx;
75 	struct ut_msg *msg;
76 
77 	msg = calloc(1, sizeof(*msg));
78 	SPDK_CU_ASSERT_FATAL(msg != NULL);
79 
80 	msg->fn = fn;
81 	msg->ctx = ctx;
82 	TAILQ_INSERT_TAIL(&thread->msgs, msg, link);
83 }
84 
85 static struct spdk_poller *
86 __start_poller(void *thread_ctx, spdk_poller_fn fn, void *arg, uint64_t period_microseconds)
87 {
88 	struct ut_thread *thread = thread_ctx;
89 	struct ut_poller *poller = calloc(1, sizeof(struct ut_poller));
90 
91 	SPDK_CU_ASSERT_FATAL(poller != NULL);
92 
93 	poller->fn = fn;
94 	poller->arg = arg;
95 	poller->period_us = period_microseconds;
96 	poller->next_expiration_in_us = g_current_time_in_us + poller->period_us;
97 
98 	TAILQ_INSERT_TAIL(&thread->pollers, poller, tailq);
99 
100 	return (struct spdk_poller *)poller;
101 }
102 
103 static void
104 __stop_poller(struct spdk_poller *poller, void *thread_ctx)
105 {
106 	struct ut_thread *thread = thread_ctx;
107 
108 	TAILQ_REMOVE(&thread->pollers, (struct ut_poller *)poller, tailq);
109 
110 	free(poller);
111 }
112 
113 #define INVALID_THREAD 0x1000
114 
115 static uintptr_t g_thread_id = INVALID_THREAD;
116 
117 static void
118 set_thread(uintptr_t thread_id)
119 {
120 	g_thread_id = thread_id;
121 	if (thread_id == INVALID_THREAD) {
122 		MOCK_CLEAR(pthread_self);
123 	} else {
124 		MOCK_SET(pthread_self, (pthread_t)thread_id);
125 	}
126 }
127 
128 int
129 allocate_threads(int num_threads)
130 {
131 	struct spdk_thread *thread;
132 	uint32_t i;
133 
134 	g_ut_num_threads = num_threads;
135 
136 	g_ut_threads = calloc(num_threads, sizeof(*g_ut_threads));
137 	SPDK_CU_ASSERT_FATAL(g_ut_threads != NULL);
138 
139 	for (i = 0; i < g_ut_num_threads; i++) {
140 		set_thread(i);
141 		spdk_allocate_thread(__send_msg, __start_poller, __stop_poller,
142 				     &g_ut_threads[i], NULL);
143 		thread = spdk_get_thread();
144 		SPDK_CU_ASSERT_FATAL(thread != NULL);
145 		g_ut_threads[i].thread = thread;
146 		TAILQ_INIT(&g_ut_threads[i].msgs);
147 		TAILQ_INIT(&g_ut_threads[i].pollers);
148 	}
149 
150 	set_thread(INVALID_THREAD);
151 	return 0;
152 }
153 
154 void
155 free_threads(void)
156 {
157 	uint32_t i;
158 
159 	for (i = 0; i < g_ut_num_threads; i++) {
160 		set_thread(i);
161 		spdk_free_thread();
162 	}
163 
164 	g_ut_num_threads = 0;
165 	free(g_ut_threads);
166 	g_ut_threads = NULL;
167 }
168 
169 void
170 increment_time(uint64_t time_in_us)
171 {
172 	g_current_time_in_us += time_in_us;
173 	spdk_delay_us(time_in_us);
174 }
175 
176 static void
177 reset_pollers(void)
178 {
179 	uint32_t		i = 0;
180 	struct ut_thread	*thread = NULL;
181 	struct ut_poller	*poller = NULL;
182 	uintptr_t		original_thread_id = g_thread_id;
183 
184 	CU_ASSERT(g_current_time_in_us == 0);
185 
186 	for (i = 0; i < g_ut_num_threads; i++) {
187 		set_thread(i);
188 		thread = &g_ut_threads[i];
189 
190 		TAILQ_FOREACH(poller, &thread->pollers, tailq) {
191 			poller->next_expiration_in_us = g_current_time_in_us + poller->period_us;
192 		}
193 	}
194 
195 	set_thread(original_thread_id);
196 }
197 
198 void
199 reset_time(void)
200 {
201 	g_current_time_in_us = 0;
202 	reset_pollers();
203 }
204 
205 int
206 poll_thread(uintptr_t thread_id)
207 {
208 	int count = 0;
209 	struct ut_thread *thread = &g_ut_threads[thread_id];
210 	struct ut_msg *msg;
211 	struct ut_poller *poller;
212 	uintptr_t original_thread_id;
213 	TAILQ_HEAD(, ut_poller)	tmp_pollers;
214 
215 	CU_ASSERT(thread_id != (uintptr_t)INVALID_THREAD);
216 	CU_ASSERT(thread_id < g_ut_num_threads);
217 
218 	original_thread_id = g_thread_id;
219 	set_thread(thread_id);
220 
221 	while (!TAILQ_EMPTY(&thread->msgs)) {
222 		msg = TAILQ_FIRST(&thread->msgs);
223 		TAILQ_REMOVE(&thread->msgs, msg, link);
224 
225 		msg->fn(msg->ctx);
226 		count++;
227 		free(msg);
228 	}
229 
230 	TAILQ_INIT(&tmp_pollers);
231 
232 	while (!TAILQ_EMPTY(&thread->pollers)) {
233 		poller = TAILQ_FIRST(&thread->pollers);
234 		TAILQ_REMOVE(&thread->pollers, poller, tailq);
235 
236 		if (g_current_time_in_us >= poller->next_expiration_in_us) {
237 			if (poller->fn) {
238 				poller->fn(poller->arg);
239 			}
240 
241 			if (poller->period_us == 0) {
242 				break;
243 			} else {
244 				poller->next_expiration_in_us += poller->period_us;
245 			}
246 		}
247 
248 		TAILQ_INSERT_TAIL(&tmp_pollers, poller, tailq);
249 	}
250 
251 	TAILQ_SWAP(&tmp_pollers, &thread->pollers, ut_poller, tailq);
252 
253 	set_thread(original_thread_id);
254 
255 	return count;
256 }
257 
258 void
259 poll_threads(void)
260 {
261 	bool msg_processed;
262 	uint32_t i, count;
263 
264 	while (true) {
265 		msg_processed = false;
266 
267 		for (i = 0; i < g_ut_num_threads; i++) {
268 			count = poll_thread(i);
269 			if (count > 0) {
270 				msg_processed = true;
271 			}
272 		}
273 
274 		if (!msg_processed) {
275 			break;
276 		}
277 	}
278 }
279