1 /*- 2 * BSD LICENSE 3 * 4 * Copyright (c) Intel Corporation. 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * * Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * * Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * * Neither the name of Intel Corporation nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 24 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 25 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 26 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 27 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 28 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 29 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 30 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 31 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 32 */ 33 34 #include "spdk_cunit.h" 35 #include "spdk/thread.h" 36 #include "spdk_internal/mock.h" 37 38 static uint32_t g_ut_num_threads; 39 static uint64_t g_current_time_in_us = 0; 40 41 int allocate_threads(int num_threads); 42 void free_threads(void); 43 void poll_threads(void); 44 int poll_thread(uintptr_t thread_id); 45 void increment_time(uint64_t time_in_us); 46 void reset_time(void); 47 48 struct ut_msg { 49 spdk_thread_fn fn; 50 void *ctx; 51 TAILQ_ENTRY(ut_msg) link; 52 }; 53 54 struct ut_thread { 55 struct spdk_thread *thread; 56 struct spdk_io_channel *ch; 57 TAILQ_HEAD(, ut_msg) msgs; 58 TAILQ_HEAD(, ut_poller) pollers; 59 }; 60 61 struct ut_thread *g_ut_threads; 62 63 struct ut_poller { 64 spdk_poller_fn fn; 65 void *arg; 66 TAILQ_ENTRY(ut_poller) tailq; 67 uint64_t period_us; 68 uint64_t next_expiration_in_us; 69 }; 70 71 static void 72 __send_msg(spdk_thread_fn fn, void *ctx, void *thread_ctx) 73 { 74 struct ut_thread *thread = thread_ctx; 75 struct ut_msg *msg; 76 77 msg = calloc(1, sizeof(*msg)); 78 SPDK_CU_ASSERT_FATAL(msg != NULL); 79 80 msg->fn = fn; 81 msg->ctx = ctx; 82 TAILQ_INSERT_TAIL(&thread->msgs, msg, link); 83 } 84 85 static struct spdk_poller * 86 __start_poller(void *thread_ctx, spdk_poller_fn fn, void *arg, uint64_t period_microseconds) 87 { 88 struct ut_thread *thread = thread_ctx; 89 struct ut_poller *poller = calloc(1, sizeof(struct ut_poller)); 90 91 SPDK_CU_ASSERT_FATAL(poller != NULL); 92 93 poller->fn = fn; 94 poller->arg = arg; 95 poller->period_us = period_microseconds; 96 poller->next_expiration_in_us = g_current_time_in_us + poller->period_us; 97 98 TAILQ_INSERT_TAIL(&thread->pollers, poller, tailq); 99 100 return (struct spdk_poller *)poller; 101 } 102 103 static void 104 __stop_poller(struct spdk_poller *poller, void *thread_ctx) 105 { 106 struct ut_thread *thread = thread_ctx; 107 108 TAILQ_REMOVE(&thread->pollers, (struct ut_poller *)poller, tailq); 109 110 free(poller); 111 } 112 113 #define INVALID_THREAD 0x1000 114 115 static uintptr_t g_thread_id = INVALID_THREAD; 116 117 static void 118 set_thread(uintptr_t thread_id) 119 { 120 g_thread_id = thread_id; 121 if (thread_id == INVALID_THREAD) { 122 MOCK_CLEAR(pthread_self); 123 } else { 124 MOCK_SET(pthread_self, (pthread_t)thread_id); 125 } 126 } 127 128 int 129 allocate_threads(int num_threads) 130 { 131 struct spdk_thread *thread; 132 uint32_t i; 133 134 g_ut_num_threads = num_threads; 135 136 g_ut_threads = calloc(num_threads, sizeof(*g_ut_threads)); 137 SPDK_CU_ASSERT_FATAL(g_ut_threads != NULL); 138 139 for (i = 0; i < g_ut_num_threads; i++) { 140 set_thread(i); 141 spdk_allocate_thread(__send_msg, __start_poller, __stop_poller, 142 &g_ut_threads[i], NULL); 143 thread = spdk_get_thread(); 144 SPDK_CU_ASSERT_FATAL(thread != NULL); 145 g_ut_threads[i].thread = thread; 146 TAILQ_INIT(&g_ut_threads[i].msgs); 147 TAILQ_INIT(&g_ut_threads[i].pollers); 148 } 149 150 set_thread(INVALID_THREAD); 151 return 0; 152 } 153 154 void 155 free_threads(void) 156 { 157 uint32_t i; 158 159 for (i = 0; i < g_ut_num_threads; i++) { 160 set_thread(i); 161 spdk_free_thread(); 162 } 163 164 g_ut_num_threads = 0; 165 free(g_ut_threads); 166 g_ut_threads = NULL; 167 } 168 169 void 170 increment_time(uint64_t time_in_us) 171 { 172 g_current_time_in_us += time_in_us; 173 spdk_delay_us(time_in_us); 174 } 175 176 static void 177 reset_pollers(void) 178 { 179 uint32_t i = 0; 180 struct ut_thread *thread = NULL; 181 struct ut_poller *poller = NULL; 182 uintptr_t original_thread_id = g_thread_id; 183 184 CU_ASSERT(g_current_time_in_us == 0); 185 186 for (i = 0; i < g_ut_num_threads; i++) { 187 set_thread(i); 188 thread = &g_ut_threads[i]; 189 190 TAILQ_FOREACH(poller, &thread->pollers, tailq) { 191 poller->next_expiration_in_us = g_current_time_in_us + poller->period_us; 192 } 193 } 194 195 set_thread(original_thread_id); 196 } 197 198 void 199 reset_time(void) 200 { 201 g_current_time_in_us = 0; 202 reset_pollers(); 203 } 204 205 int 206 poll_thread(uintptr_t thread_id) 207 { 208 int count = 0; 209 struct ut_thread *thread = &g_ut_threads[thread_id]; 210 struct ut_msg *msg; 211 struct ut_poller *poller; 212 uintptr_t original_thread_id; 213 TAILQ_HEAD(, ut_poller) tmp_pollers; 214 215 CU_ASSERT(thread_id != (uintptr_t)INVALID_THREAD); 216 CU_ASSERT(thread_id < g_ut_num_threads); 217 218 original_thread_id = g_thread_id; 219 set_thread(thread_id); 220 221 while (!TAILQ_EMPTY(&thread->msgs)) { 222 msg = TAILQ_FIRST(&thread->msgs); 223 TAILQ_REMOVE(&thread->msgs, msg, link); 224 225 msg->fn(msg->ctx); 226 count++; 227 free(msg); 228 } 229 230 TAILQ_INIT(&tmp_pollers); 231 232 while (!TAILQ_EMPTY(&thread->pollers)) { 233 poller = TAILQ_FIRST(&thread->pollers); 234 TAILQ_REMOVE(&thread->pollers, poller, tailq); 235 236 if (g_current_time_in_us >= poller->next_expiration_in_us) { 237 if (poller->fn) { 238 poller->fn(poller->arg); 239 } 240 241 if (poller->period_us == 0) { 242 break; 243 } else { 244 poller->next_expiration_in_us += poller->period_us; 245 } 246 } 247 248 TAILQ_INSERT_TAIL(&tmp_pollers, poller, tailq); 249 } 250 251 TAILQ_SWAP(&tmp_pollers, &thread->pollers, ut_poller, tailq); 252 253 set_thread(original_thread_id); 254 255 return count; 256 } 257 258 void 259 poll_threads(void) 260 { 261 bool msg_processed; 262 uint32_t i, count; 263 264 while (true) { 265 msg_processed = false; 266 267 for (i = 0; i < g_ut_num_threads; i++) { 268 count = poll_thread(i); 269 if (count > 0) { 270 msg_processed = true; 271 } 272 } 273 274 if (!msg_processed) { 275 break; 276 } 277 } 278 } 279