1 #include "test/jemalloc_test.h" 2 3 #include "jemalloc/internal/mpsc_queue.h" 4 5 typedef struct elem_s elem_t; 6 typedef ql_head(elem_t) elem_list_t; 7 typedef mpsc_queue(elem_t) elem_mpsc_queue_t; 8 struct elem_s { 9 int thread; 10 int idx; 11 ql_elm(elem_t) link; 12 }; 13 14 /* Include both proto and gen to make sure they match up. */ 15 mpsc_queue_proto(static, elem_mpsc_queue_, elem_mpsc_queue_t, elem_t, 16 elem_list_t); 17 mpsc_queue_gen(static, elem_mpsc_queue_, elem_mpsc_queue_t, elem_t, 18 elem_list_t, link); 19 20 static void 21 init_elems_simple(elem_t *elems, int nelems, int thread) { 22 for (int i = 0; i < nelems; i++) { 23 elems[i].thread = thread; 24 elems[i].idx = i; 25 ql_elm_new(&elems[i], link); 26 } 27 } 28 29 static void 30 check_elems_simple(elem_list_t *list, int nelems, int thread) { 31 elem_t *elem; 32 int next_idx = 0; 33 ql_foreach(elem, list, link) { 34 expect_d_lt(next_idx, nelems, "Too many list items"); 35 expect_d_eq(thread, elem->thread, ""); 36 expect_d_eq(next_idx, elem->idx, "List out of order"); 37 next_idx++; 38 } 39 } 40 41 TEST_BEGIN(test_simple) { 42 enum {NELEMS = 10}; 43 elem_t elems[NELEMS]; 44 elem_list_t list; 45 elem_mpsc_queue_t queue; 46 47 /* Pop empty queue onto empty list -> empty list */ 48 ql_new(&list); 49 elem_mpsc_queue_new(&queue); 50 elem_mpsc_queue_pop_batch(&queue, &list); 51 expect_true(ql_empty(&list), ""); 52 53 /* Pop empty queue onto nonempty list -> list unchanged */ 54 ql_new(&list); 55 elem_mpsc_queue_new(&queue); 56 init_elems_simple(elems, NELEMS, 0); 57 for (int i = 0; i < NELEMS; i++) { 58 ql_tail_insert(&list, &elems[i], link); 59 } 60 elem_mpsc_queue_pop_batch(&queue, &list); 61 check_elems_simple(&list, NELEMS, 0); 62 63 /* Pop nonempty queue onto empty list -> list takes queue contents */ 64 ql_new(&list); 65 elem_mpsc_queue_new(&queue); 66 init_elems_simple(elems, NELEMS, 0); 67 for (int i = 0; i < NELEMS; i++) { 68 elem_mpsc_queue_push(&queue, &elems[i]); 69 } 70 elem_mpsc_queue_pop_batch(&queue, &list); 71 check_elems_simple(&list, NELEMS, 0); 72 73 /* Pop nonempty queue onto nonempty list -> list gains queue contents */ 74 ql_new(&list); 75 elem_mpsc_queue_new(&queue); 76 init_elems_simple(elems, NELEMS, 0); 77 for (int i = 0; i < NELEMS / 2; i++) { 78 ql_tail_insert(&list, &elems[i], link); 79 } 80 for (int i = NELEMS / 2; i < NELEMS; i++) { 81 elem_mpsc_queue_push(&queue, &elems[i]); 82 } 83 elem_mpsc_queue_pop_batch(&queue, &list); 84 check_elems_simple(&list, NELEMS, 0); 85 86 } 87 TEST_END 88 89 TEST_BEGIN(test_push_single_or_batch) { 90 enum { 91 BATCH_MAX = 10, 92 /* 93 * We'll push i items one-at-a-time, then i items as a batch, 94 * then i items as a batch again, as i ranges from 1 to 95 * BATCH_MAX. So we need 3 times the sum of the numbers from 1 96 * to BATCH_MAX elements total. 97 */ 98 NELEMS = 3 * BATCH_MAX * (BATCH_MAX - 1) / 2 99 }; 100 elem_t elems[NELEMS]; 101 init_elems_simple(elems, NELEMS, 0); 102 elem_list_t list; 103 ql_new(&list); 104 elem_mpsc_queue_t queue; 105 elem_mpsc_queue_new(&queue); 106 int next_idx = 0; 107 for (int i = 1; i < 10; i++) { 108 /* Push i items 1 at a time. */ 109 for (int j = 0; j < i; j++) { 110 elem_mpsc_queue_push(&queue, &elems[next_idx]); 111 next_idx++; 112 } 113 /* Push i items in batch. */ 114 for (int j = 0; j < i; j++) { 115 ql_tail_insert(&list, &elems[next_idx], link); 116 next_idx++; 117 } 118 elem_mpsc_queue_push_batch(&queue, &list); 119 expect_true(ql_empty(&list), "Batch push should empty source"); 120 /* 121 * Push i items in batch, again. This tests two batches 122 * proceeding one after the other. 123 */ 124 for (int j = 0; j < i; j++) { 125 ql_tail_insert(&list, &elems[next_idx], link); 126 next_idx++; 127 } 128 elem_mpsc_queue_push_batch(&queue, &list); 129 expect_true(ql_empty(&list), "Batch push should empty source"); 130 } 131 expect_d_eq(NELEMS, next_idx, "Miscomputed number of elems to push."); 132 133 expect_true(ql_empty(&list), ""); 134 elem_mpsc_queue_pop_batch(&queue, &list); 135 check_elems_simple(&list, NELEMS, 0); 136 } 137 TEST_END 138 139 TEST_BEGIN(test_multi_op) { 140 enum {NELEMS = 20}; 141 elem_t elems[NELEMS]; 142 init_elems_simple(elems, NELEMS, 0); 143 elem_list_t push_list; 144 ql_new(&push_list); 145 elem_list_t result_list; 146 ql_new(&result_list); 147 elem_mpsc_queue_t queue; 148 elem_mpsc_queue_new(&queue); 149 150 int next_idx = 0; 151 /* Push first quarter 1-at-a-time. */ 152 for (int i = 0; i < NELEMS / 4; i++) { 153 elem_mpsc_queue_push(&queue, &elems[next_idx]); 154 next_idx++; 155 } 156 /* Push second quarter in batch. */ 157 for (int i = NELEMS / 4; i < NELEMS / 2; i++) { 158 ql_tail_insert(&push_list, &elems[next_idx], link); 159 next_idx++; 160 } 161 elem_mpsc_queue_push_batch(&queue, &push_list); 162 /* Batch pop all pushed elements. */ 163 elem_mpsc_queue_pop_batch(&queue, &result_list); 164 /* Push third quarter in batch. */ 165 for (int i = NELEMS / 2; i < 3 * NELEMS / 4; i++) { 166 ql_tail_insert(&push_list, &elems[next_idx], link); 167 next_idx++; 168 } 169 elem_mpsc_queue_push_batch(&queue, &push_list); 170 /* Push last quarter one-at-a-time. */ 171 for (int i = 3 * NELEMS / 4; i < NELEMS; i++) { 172 elem_mpsc_queue_push(&queue, &elems[next_idx]); 173 next_idx++; 174 } 175 /* Pop them again. Order of existing list should be preserved. */ 176 elem_mpsc_queue_pop_batch(&queue, &result_list); 177 178 check_elems_simple(&result_list, NELEMS, 0); 179 180 } 181 TEST_END 182 183 typedef struct pusher_arg_s pusher_arg_t; 184 struct pusher_arg_s { 185 elem_mpsc_queue_t *queue; 186 int thread; 187 elem_t *elems; 188 int nelems; 189 }; 190 191 typedef struct popper_arg_s popper_arg_t; 192 struct popper_arg_s { 193 elem_mpsc_queue_t *queue; 194 int npushers; 195 int nelems_per_pusher; 196 int *pusher_counts; 197 }; 198 199 static void * 200 thd_pusher(void *void_arg) { 201 pusher_arg_t *arg = (pusher_arg_t *)void_arg; 202 int next_idx = 0; 203 while (next_idx < arg->nelems) { 204 /* Push 10 items in batch. */ 205 elem_list_t list; 206 ql_new(&list); 207 int limit = next_idx + 10; 208 while (next_idx < arg->nelems && next_idx < limit) { 209 ql_tail_insert(&list, &arg->elems[next_idx], link); 210 next_idx++; 211 } 212 elem_mpsc_queue_push_batch(arg->queue, &list); 213 /* Push 10 items one-at-a-time. */ 214 limit = next_idx + 10; 215 while (next_idx < arg->nelems && next_idx < limit) { 216 elem_mpsc_queue_push(arg->queue, &arg->elems[next_idx]); 217 next_idx++; 218 } 219 220 } 221 return NULL; 222 } 223 224 static void * 225 thd_popper(void *void_arg) { 226 popper_arg_t *arg = (popper_arg_t *)void_arg; 227 int done_pushers = 0; 228 while (done_pushers < arg->npushers) { 229 elem_list_t list; 230 ql_new(&list); 231 elem_mpsc_queue_pop_batch(arg->queue, &list); 232 elem_t *elem; 233 ql_foreach(elem, &list, link) { 234 int thread = elem->thread; 235 int idx = elem->idx; 236 expect_d_eq(arg->pusher_counts[thread], idx, 237 "Thread's pushes reordered"); 238 arg->pusher_counts[thread]++; 239 if (arg->pusher_counts[thread] 240 == arg->nelems_per_pusher) { 241 done_pushers++; 242 } 243 } 244 } 245 return NULL; 246 } 247 248 TEST_BEGIN(test_multiple_threads) { 249 enum { 250 NPUSHERS = 4, 251 NELEMS_PER_PUSHER = 1000*1000, 252 }; 253 thd_t pushers[NPUSHERS]; 254 pusher_arg_t pusher_arg[NPUSHERS]; 255 256 thd_t popper; 257 popper_arg_t popper_arg; 258 259 elem_mpsc_queue_t queue; 260 elem_mpsc_queue_new(&queue); 261 262 elem_t *elems = calloc(NPUSHERS * NELEMS_PER_PUSHER, sizeof(elem_t)); 263 elem_t *elem_iter = elems; 264 for (int i = 0; i < NPUSHERS; i++) { 265 pusher_arg[i].queue = &queue; 266 pusher_arg[i].thread = i; 267 pusher_arg[i].elems = elem_iter; 268 pusher_arg[i].nelems = NELEMS_PER_PUSHER; 269 270 init_elems_simple(elem_iter, NELEMS_PER_PUSHER, i); 271 elem_iter += NELEMS_PER_PUSHER; 272 } 273 popper_arg.queue = &queue; 274 popper_arg.npushers = NPUSHERS; 275 popper_arg.nelems_per_pusher = NELEMS_PER_PUSHER; 276 int pusher_counts[NPUSHERS] = {0}; 277 popper_arg.pusher_counts = pusher_counts; 278 279 thd_create(&popper, thd_popper, (void *)&popper_arg); 280 for (int i = 0; i < NPUSHERS; i++) { 281 thd_create(&pushers[i], thd_pusher, &pusher_arg[i]); 282 } 283 284 thd_join(popper, NULL); 285 for (int i = 0; i < NPUSHERS; i++) { 286 thd_join(pushers[i], NULL); 287 } 288 289 for (int i = 0; i < NPUSHERS; i++) { 290 expect_d_eq(NELEMS_PER_PUSHER, pusher_counts[i], ""); 291 } 292 293 free(elems); 294 } 295 TEST_END 296 297 int 298 main(void) { 299 return test_no_reentrancy( 300 test_simple, 301 test_push_single_or_batch, 302 test_multi_op, 303 test_multiple_threads); 304 } 305