1 /* $NetBSD: threadpool.c,v 1.5 2019/01/04 05:35:24 thorpej Exp $ */ 2 3 /*- 4 * Copyright (c) 2018 The NetBSD Foundation, Inc. 5 * All rights reserved. 6 * 7 * This code is derived from software contributed to The NetBSD Foundation 8 * by Jason R. Thorpe. 9 * 10 * Redistribution and use in source and binary forms, with or without 11 * modification, are permitted provided that the following conditions 12 * are met: 13 * 1. Redistributions of source code must retain the above copyright 14 * notice, this list of conditions and the following disclaimer. 15 * 2. Redistributions in binary form must reproduce the above copyright 16 * notice, this list of conditions and the following disclaimer in the 17 * documentation and/or other materials provided with the distribution. 18 * 19 * THIS SOFTWARE IS PROVIDED BY THE NETBSD FOUNDATION, INC. AND 20 * CONTRIBUTORS ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, 21 * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 22 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. 23 * IN NO EVENT SHALL THE FOUNDATION OR CONTRIBUTORS BE LIABLE FOR ANY 24 * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 25 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE 26 * GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 27 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER 28 * IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR 29 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN 30 * IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 31 */ 32 33 #include <sys/cdefs.h> 34 #if !defined(lint) 35 __RCSID("$NetBSD: threadpool.c,v 1.5 2019/01/04 05:35:24 thorpej Exp $"); 36 #endif /* !lint */ 37 38 #include <sys/param.h> 39 #include <sys/condvar.h> 40 #include <sys/kernel.h> 41 #include <sys/kmem.h> 42 #include <sys/mutex.h> 43 #include <sys/threadpool.h> 44 45 #include "kernspace.h" 46 47 void 48 rumptest_threadpool_unbound_lifecycle(void) 49 { 50 struct threadpool *pool0, *pool1, *pool2; 51 int error; 52 53 error = threadpool_get(&pool0, PRI_NONE); 54 KASSERT(error == 0); 55 56 error = threadpool_get(&pool1, PRI_NONE); 57 KASSERT(error == 0); 58 59 KASSERT(pool0 == pool1); 60 61 error = threadpool_get(&pool2, PRI_KERNEL_RT); 62 KASSERT(error == 0); 63 64 KASSERT(pool0 != pool2); 65 66 threadpool_put(pool0, PRI_NONE); 67 threadpool_put(pool1, PRI_NONE); 68 threadpool_put(pool2, PRI_KERNEL_RT); 69 } 70 71 void 72 rumptest_threadpool_percpu_lifecycle(void) 73 { 74 struct threadpool_percpu *pcpu0, *pcpu1, *pcpu2; 75 int error; 76 77 error = threadpool_percpu_get(&pcpu0, PRI_NONE); 78 KASSERT(error == 0); 79 80 error = threadpool_percpu_get(&pcpu1, PRI_NONE); 81 KASSERT(error == 0); 82 83 KASSERT(pcpu0 == pcpu1); 84 85 error = threadpool_percpu_get(&pcpu2, PRI_KERNEL_RT); 86 KASSERT(error == 0); 87 88 KASSERT(pcpu0 != pcpu2); 89 90 threadpool_percpu_put(pcpu0, PRI_NONE); 91 threadpool_percpu_put(pcpu1, PRI_NONE); 92 threadpool_percpu_put(pcpu2, PRI_KERNEL_RT); 93 } 94 95 struct test_job_data { 96 kmutex_t mutex; 97 kcondvar_t cond; 98 unsigned int count; 99 struct threadpool_job job; 100 }; 101 102 #define FINAL_COUNT 12345 103 104 static void 105 test_job_func_schedule(struct threadpool_job *job) 106 { 107 struct test_job_data *data = 108 container_of(job, struct test_job_data, job); 109 110 mutex_enter(&data->mutex); 111 KASSERT(data->count != FINAL_COUNT); 112 data->count++; 113 cv_broadcast(&data->cond); 114 threadpool_job_done(job); 115 mutex_exit(&data->mutex); 116 } 117 118 static void 119 test_job_func_cancel(struct threadpool_job *job) 120 { 121 struct test_job_data *data = 122 container_of(job, struct test_job_data, job); 123 124 mutex_enter(&data->mutex); 125 if (data->count == 0) { 126 data->count = 1; 127 cv_broadcast(&data->cond); 128 } 129 while (data->count != FINAL_COUNT - 1) 130 cv_wait(&data->cond, &data->mutex); 131 data->count = FINAL_COUNT; 132 cv_broadcast(&data->cond); 133 threadpool_job_done(job); 134 mutex_exit(&data->mutex); 135 } 136 137 static void 138 init_test_job_data(struct test_job_data *data, threadpool_job_fn_t fn) 139 { 140 mutex_init(&data->mutex, MUTEX_DEFAULT, IPL_NONE); 141 cv_init(&data->cond, "testjob"); 142 threadpool_job_init(&data->job, fn, &data->mutex, "testjob"); 143 data->count = 0; 144 } 145 146 static void 147 fini_test_job_data(struct test_job_data *data) 148 { 149 threadpool_job_destroy(&data->job); 150 cv_destroy(&data->cond); 151 mutex_destroy(&data->mutex); 152 } 153 154 void 155 rumptest_threadpool_unbound_schedule(void) 156 { 157 struct test_job_data data; 158 struct threadpool *pool; 159 int error; 160 161 error = threadpool_get(&pool, PRI_NONE); 162 KASSERT(error == 0); 163 164 init_test_job_data(&data, test_job_func_schedule); 165 166 mutex_enter(&data.mutex); 167 while (data.count != FINAL_COUNT) { 168 threadpool_schedule_job(pool, &data.job); 169 error = cv_timedwait(&data.cond, &data.mutex, hz * 2); 170 KASSERT(error != EWOULDBLOCK); 171 } 172 mutex_exit(&data.mutex); 173 174 fini_test_job_data(&data); 175 176 threadpool_put(pool, PRI_NONE); 177 } 178 179 void 180 rumptest_threadpool_percpu_schedule(void) 181 { 182 struct test_job_data data; 183 struct threadpool_percpu *pcpu; 184 struct threadpool *pool; 185 int error; 186 187 error = threadpool_percpu_get(&pcpu, PRI_NONE); 188 KASSERT(error == 0); 189 190 pool = threadpool_percpu_ref(pcpu); 191 192 init_test_job_data(&data, test_job_func_schedule); 193 194 mutex_enter(&data.mutex); 195 while (data.count != FINAL_COUNT) { 196 threadpool_schedule_job(pool, &data.job); 197 error = cv_timedwait(&data.cond, &data.mutex, hz * 2); 198 KASSERT(error != EWOULDBLOCK); 199 } 200 mutex_exit(&data.mutex); 201 202 fini_test_job_data(&data); 203 204 threadpool_percpu_put(pcpu, PRI_NONE); 205 } 206 207 void 208 rumptest_threadpool_job_cancel(void) 209 { 210 struct test_job_data data; 211 struct threadpool *pool; 212 int error; 213 bool rv; 214 215 error = threadpool_get(&pool, PRI_NONE); 216 KASSERT(error == 0); 217 218 init_test_job_data(&data, test_job_func_cancel); 219 220 mutex_enter(&data.mutex); 221 threadpool_schedule_job(pool, &data.job); 222 while (data.count == 0) 223 cv_wait(&data.cond, &data.mutex); 224 KASSERT(data.count == 1); 225 226 /* Job is already running (and is not finished); this shold fail. */ 227 rv = threadpool_cancel_job_async(pool, &data.job); 228 KASSERT(rv == false); 229 230 data.count = FINAL_COUNT - 1; 231 cv_broadcast(&data.cond); 232 233 /* Now wait for the job to finish. */ 234 threadpool_cancel_job(pool, &data.job); 235 KASSERT(data.count == FINAL_COUNT); 236 mutex_exit(&data.mutex); 237 238 fini_test_job_data(&data); 239 240 threadpool_put(pool, PRI_NONE); 241 } 242 243 void 244 rumptest_threadpool_job_cancelthrash(void) 245 { 246 struct test_job_data data; 247 struct threadpool *pool; 248 int i, error; 249 250 error = threadpool_get(&pool, PRI_NONE); 251 KASSERT(error == 0); 252 253 init_test_job_data(&data, test_job_func_cancel); 254 255 mutex_enter(&data.mutex); 256 for (i = 0; i < 10000; i++) { 257 threadpool_schedule_job(pool, &data.job); 258 if ((i % 3) == 0) { 259 mutex_exit(&data.mutex); 260 mutex_enter(&data.mutex); 261 } 262 /* 263 * If the job managed to start, ensure that its exit 264 * condition is met so that we don't wait forever 265 * for the job to finish. 266 */ 267 data.count = FINAL_COUNT - 1; 268 cv_broadcast(&data.cond); 269 270 threadpool_cancel_job(pool, &data.job); 271 272 /* 273 * After cancellation, either the job didn't start 274 * (data.count == FINAL_COUNT - 1, per above) or 275 * it finished (data.count == FINAL_COUNT). 276 */ 277 KASSERT(data.count == (FINAL_COUNT - 1) || 278 data.count == FINAL_COUNT); 279 280 /* Reset for the loop. */ 281 data.count = 0; 282 } 283 mutex_exit(&data.mutex); 284 285 fini_test_job_data(&data); 286 287 threadpool_put(pool, PRI_NONE); 288 } 289