1*7bdf38e5Schristos #include "test/jemalloc_test.h" 2*7bdf38e5Schristos 3*7bdf38e5Schristos #include "jemalloc/internal/mpsc_queue.h" 4*7bdf38e5Schristos 5*7bdf38e5Schristos typedef struct elem_s elem_t; 6*7bdf38e5Schristos typedef ql_head(elem_t) elem_list_t; 7*7bdf38e5Schristos typedef mpsc_queue(elem_t) elem_mpsc_queue_t; 8*7bdf38e5Schristos struct elem_s { 9*7bdf38e5Schristos int thread; 10*7bdf38e5Schristos int idx; 11*7bdf38e5Schristos ql_elm(elem_t) link; 12*7bdf38e5Schristos }; 13*7bdf38e5Schristos 14*7bdf38e5Schristos /* Include both proto and gen to make sure they match up. */ 15*7bdf38e5Schristos mpsc_queue_proto(static, elem_mpsc_queue_, elem_mpsc_queue_t, elem_t, 16*7bdf38e5Schristos elem_list_t); 17*7bdf38e5Schristos mpsc_queue_gen(static, elem_mpsc_queue_, elem_mpsc_queue_t, elem_t, 18*7bdf38e5Schristos elem_list_t, link); 19*7bdf38e5Schristos 20*7bdf38e5Schristos static void 21*7bdf38e5Schristos init_elems_simple(elem_t *elems, int nelems, int thread) { 22*7bdf38e5Schristos for (int i = 0; i < nelems; i++) { 23*7bdf38e5Schristos elems[i].thread = thread; 24*7bdf38e5Schristos elems[i].idx = i; 25*7bdf38e5Schristos ql_elm_new(&elems[i], link); 26*7bdf38e5Schristos } 27*7bdf38e5Schristos } 28*7bdf38e5Schristos 29*7bdf38e5Schristos static void 30*7bdf38e5Schristos check_elems_simple(elem_list_t *list, int nelems, int thread) { 31*7bdf38e5Schristos elem_t *elem; 32*7bdf38e5Schristos int next_idx = 0; 33*7bdf38e5Schristos ql_foreach(elem, list, link) { 34*7bdf38e5Schristos expect_d_lt(next_idx, nelems, "Too many list items"); 35*7bdf38e5Schristos expect_d_eq(thread, elem->thread, ""); 36*7bdf38e5Schristos expect_d_eq(next_idx, elem->idx, "List out of order"); 37*7bdf38e5Schristos next_idx++; 38*7bdf38e5Schristos } 39*7bdf38e5Schristos } 40*7bdf38e5Schristos 41*7bdf38e5Schristos TEST_BEGIN(test_simple) { 42*7bdf38e5Schristos enum {NELEMS = 10}; 43*7bdf38e5Schristos elem_t elems[NELEMS]; 44*7bdf38e5Schristos elem_list_t list; 45*7bdf38e5Schristos elem_mpsc_queue_t queue; 46*7bdf38e5Schristos 47*7bdf38e5Schristos /* Pop empty queue onto empty list -> empty list */ 48*7bdf38e5Schristos ql_new(&list); 49*7bdf38e5Schristos elem_mpsc_queue_new(&queue); 50*7bdf38e5Schristos elem_mpsc_queue_pop_batch(&queue, &list); 51*7bdf38e5Schristos expect_true(ql_empty(&list), ""); 52*7bdf38e5Schristos 53*7bdf38e5Schristos /* Pop empty queue onto nonempty list -> list unchanged */ 54*7bdf38e5Schristos ql_new(&list); 55*7bdf38e5Schristos elem_mpsc_queue_new(&queue); 56*7bdf38e5Schristos init_elems_simple(elems, NELEMS, 0); 57*7bdf38e5Schristos for (int i = 0; i < NELEMS; i++) { 58*7bdf38e5Schristos ql_tail_insert(&list, &elems[i], link); 59*7bdf38e5Schristos } 60*7bdf38e5Schristos elem_mpsc_queue_pop_batch(&queue, &list); 61*7bdf38e5Schristos check_elems_simple(&list, NELEMS, 0); 62*7bdf38e5Schristos 63*7bdf38e5Schristos /* Pop nonempty queue onto empty list -> list takes queue contents */ 64*7bdf38e5Schristos ql_new(&list); 65*7bdf38e5Schristos elem_mpsc_queue_new(&queue); 66*7bdf38e5Schristos init_elems_simple(elems, NELEMS, 0); 67*7bdf38e5Schristos for (int i = 0; i < NELEMS; i++) { 68*7bdf38e5Schristos elem_mpsc_queue_push(&queue, &elems[i]); 69*7bdf38e5Schristos } 70*7bdf38e5Schristos elem_mpsc_queue_pop_batch(&queue, &list); 71*7bdf38e5Schristos check_elems_simple(&list, NELEMS, 0); 72*7bdf38e5Schristos 73*7bdf38e5Schristos /* Pop nonempty queue onto nonempty list -> list gains queue contents */ 74*7bdf38e5Schristos ql_new(&list); 75*7bdf38e5Schristos elem_mpsc_queue_new(&queue); 76*7bdf38e5Schristos init_elems_simple(elems, NELEMS, 0); 77*7bdf38e5Schristos for (int i = 0; i < NELEMS / 2; i++) { 78*7bdf38e5Schristos ql_tail_insert(&list, &elems[i], link); 79*7bdf38e5Schristos } 80*7bdf38e5Schristos for (int i = NELEMS / 2; i < NELEMS; i++) { 81*7bdf38e5Schristos elem_mpsc_queue_push(&queue, &elems[i]); 82*7bdf38e5Schristos } 83*7bdf38e5Schristos elem_mpsc_queue_pop_batch(&queue, &list); 84*7bdf38e5Schristos check_elems_simple(&list, NELEMS, 0); 85*7bdf38e5Schristos 86*7bdf38e5Schristos } 87*7bdf38e5Schristos TEST_END 88*7bdf38e5Schristos 89*7bdf38e5Schristos TEST_BEGIN(test_push_single_or_batch) { 90*7bdf38e5Schristos enum { 91*7bdf38e5Schristos BATCH_MAX = 10, 92*7bdf38e5Schristos /* 93*7bdf38e5Schristos * We'll push i items one-at-a-time, then i items as a batch, 94*7bdf38e5Schristos * then i items as a batch again, as i ranges from 1 to 95*7bdf38e5Schristos * BATCH_MAX. So we need 3 times the sum of the numbers from 1 96*7bdf38e5Schristos * to BATCH_MAX elements total. 97*7bdf38e5Schristos */ 98*7bdf38e5Schristos NELEMS = 3 * BATCH_MAX * (BATCH_MAX - 1) / 2 99*7bdf38e5Schristos }; 100*7bdf38e5Schristos elem_t elems[NELEMS]; 101*7bdf38e5Schristos init_elems_simple(elems, NELEMS, 0); 102*7bdf38e5Schristos elem_list_t list; 103*7bdf38e5Schristos ql_new(&list); 104*7bdf38e5Schristos elem_mpsc_queue_t queue; 105*7bdf38e5Schristos elem_mpsc_queue_new(&queue); 106*7bdf38e5Schristos int next_idx = 0; 107*7bdf38e5Schristos for (int i = 1; i < 10; i++) { 108*7bdf38e5Schristos /* Push i items 1 at a time. */ 109*7bdf38e5Schristos for (int j = 0; j < i; j++) { 110*7bdf38e5Schristos elem_mpsc_queue_push(&queue, &elems[next_idx]); 111*7bdf38e5Schristos next_idx++; 112*7bdf38e5Schristos } 113*7bdf38e5Schristos /* Push i items in batch. */ 114*7bdf38e5Schristos for (int j = 0; j < i; j++) { 115*7bdf38e5Schristos ql_tail_insert(&list, &elems[next_idx], link); 116*7bdf38e5Schristos next_idx++; 117*7bdf38e5Schristos } 118*7bdf38e5Schristos elem_mpsc_queue_push_batch(&queue, &list); 119*7bdf38e5Schristos expect_true(ql_empty(&list), "Batch push should empty source"); 120*7bdf38e5Schristos /* 121*7bdf38e5Schristos * Push i items in batch, again. This tests two batches 122*7bdf38e5Schristos * proceeding one after the other. 123*7bdf38e5Schristos */ 124*7bdf38e5Schristos for (int j = 0; j < i; j++) { 125*7bdf38e5Schristos ql_tail_insert(&list, &elems[next_idx], link); 126*7bdf38e5Schristos next_idx++; 127*7bdf38e5Schristos } 128*7bdf38e5Schristos elem_mpsc_queue_push_batch(&queue, &list); 129*7bdf38e5Schristos expect_true(ql_empty(&list), "Batch push should empty source"); 130*7bdf38e5Schristos } 131*7bdf38e5Schristos expect_d_eq(NELEMS, next_idx, "Miscomputed number of elems to push."); 132*7bdf38e5Schristos 133*7bdf38e5Schristos expect_true(ql_empty(&list), ""); 134*7bdf38e5Schristos elem_mpsc_queue_pop_batch(&queue, &list); 135*7bdf38e5Schristos check_elems_simple(&list, NELEMS, 0); 136*7bdf38e5Schristos } 137*7bdf38e5Schristos TEST_END 138*7bdf38e5Schristos 139*7bdf38e5Schristos TEST_BEGIN(test_multi_op) { 140*7bdf38e5Schristos enum {NELEMS = 20}; 141*7bdf38e5Schristos elem_t elems[NELEMS]; 142*7bdf38e5Schristos init_elems_simple(elems, NELEMS, 0); 143*7bdf38e5Schristos elem_list_t push_list; 144*7bdf38e5Schristos ql_new(&push_list); 145*7bdf38e5Schristos elem_list_t result_list; 146*7bdf38e5Schristos ql_new(&result_list); 147*7bdf38e5Schristos elem_mpsc_queue_t queue; 148*7bdf38e5Schristos elem_mpsc_queue_new(&queue); 149*7bdf38e5Schristos 150*7bdf38e5Schristos int next_idx = 0; 151*7bdf38e5Schristos /* Push first quarter 1-at-a-time. */ 152*7bdf38e5Schristos for (int i = 0; i < NELEMS / 4; i++) { 153*7bdf38e5Schristos elem_mpsc_queue_push(&queue, &elems[next_idx]); 154*7bdf38e5Schristos next_idx++; 155*7bdf38e5Schristos } 156*7bdf38e5Schristos /* Push second quarter in batch. */ 157*7bdf38e5Schristos for (int i = NELEMS / 4; i < NELEMS / 2; i++) { 158*7bdf38e5Schristos ql_tail_insert(&push_list, &elems[next_idx], link); 159*7bdf38e5Schristos next_idx++; 160*7bdf38e5Schristos } 161*7bdf38e5Schristos elem_mpsc_queue_push_batch(&queue, &push_list); 162*7bdf38e5Schristos /* Batch pop all pushed elements. */ 163*7bdf38e5Schristos elem_mpsc_queue_pop_batch(&queue, &result_list); 164*7bdf38e5Schristos /* Push third quarter in batch. */ 165*7bdf38e5Schristos for (int i = NELEMS / 2; i < 3 * NELEMS / 4; i++) { 166*7bdf38e5Schristos ql_tail_insert(&push_list, &elems[next_idx], link); 167*7bdf38e5Schristos next_idx++; 168*7bdf38e5Schristos } 169*7bdf38e5Schristos elem_mpsc_queue_push_batch(&queue, &push_list); 170*7bdf38e5Schristos /* Push last quarter one-at-a-time. */ 171*7bdf38e5Schristos for (int i = 3 * NELEMS / 4; i < NELEMS; i++) { 172*7bdf38e5Schristos elem_mpsc_queue_push(&queue, &elems[next_idx]); 173*7bdf38e5Schristos next_idx++; 174*7bdf38e5Schristos } 175*7bdf38e5Schristos /* Pop them again. Order of existing list should be preserved. */ 176*7bdf38e5Schristos elem_mpsc_queue_pop_batch(&queue, &result_list); 177*7bdf38e5Schristos 178*7bdf38e5Schristos check_elems_simple(&result_list, NELEMS, 0); 179*7bdf38e5Schristos 180*7bdf38e5Schristos } 181*7bdf38e5Schristos TEST_END 182*7bdf38e5Schristos 183*7bdf38e5Schristos typedef struct pusher_arg_s pusher_arg_t; 184*7bdf38e5Schristos struct pusher_arg_s { 185*7bdf38e5Schristos elem_mpsc_queue_t *queue; 186*7bdf38e5Schristos int thread; 187*7bdf38e5Schristos elem_t *elems; 188*7bdf38e5Schristos int nelems; 189*7bdf38e5Schristos }; 190*7bdf38e5Schristos 191*7bdf38e5Schristos typedef struct popper_arg_s popper_arg_t; 192*7bdf38e5Schristos struct popper_arg_s { 193*7bdf38e5Schristos elem_mpsc_queue_t *queue; 194*7bdf38e5Schristos int npushers; 195*7bdf38e5Schristos int nelems_per_pusher; 196*7bdf38e5Schristos int *pusher_counts; 197*7bdf38e5Schristos }; 198*7bdf38e5Schristos 199*7bdf38e5Schristos static void * 200*7bdf38e5Schristos thd_pusher(void *void_arg) { 201*7bdf38e5Schristos pusher_arg_t *arg = (pusher_arg_t *)void_arg; 202*7bdf38e5Schristos int next_idx = 0; 203*7bdf38e5Schristos while (next_idx < arg->nelems) { 204*7bdf38e5Schristos /* Push 10 items in batch. */ 205*7bdf38e5Schristos elem_list_t list; 206*7bdf38e5Schristos ql_new(&list); 207*7bdf38e5Schristos int limit = next_idx + 10; 208*7bdf38e5Schristos while (next_idx < arg->nelems && next_idx < limit) { 209*7bdf38e5Schristos ql_tail_insert(&list, &arg->elems[next_idx], link); 210*7bdf38e5Schristos next_idx++; 211*7bdf38e5Schristos } 212*7bdf38e5Schristos elem_mpsc_queue_push_batch(arg->queue, &list); 213*7bdf38e5Schristos /* Push 10 items one-at-a-time. */ 214*7bdf38e5Schristos limit = next_idx + 10; 215*7bdf38e5Schristos while (next_idx < arg->nelems && next_idx < limit) { 216*7bdf38e5Schristos elem_mpsc_queue_push(arg->queue, &arg->elems[next_idx]); 217*7bdf38e5Schristos next_idx++; 218*7bdf38e5Schristos } 219*7bdf38e5Schristos 220*7bdf38e5Schristos } 221*7bdf38e5Schristos return NULL; 222*7bdf38e5Schristos } 223*7bdf38e5Schristos 224*7bdf38e5Schristos static void * 225*7bdf38e5Schristos thd_popper(void *void_arg) { 226*7bdf38e5Schristos popper_arg_t *arg = (popper_arg_t *)void_arg; 227*7bdf38e5Schristos int done_pushers = 0; 228*7bdf38e5Schristos while (done_pushers < arg->npushers) { 229*7bdf38e5Schristos elem_list_t list; 230*7bdf38e5Schristos ql_new(&list); 231*7bdf38e5Schristos elem_mpsc_queue_pop_batch(arg->queue, &list); 232*7bdf38e5Schristos elem_t *elem; 233*7bdf38e5Schristos ql_foreach(elem, &list, link) { 234*7bdf38e5Schristos int thread = elem->thread; 235*7bdf38e5Schristos int idx = elem->idx; 236*7bdf38e5Schristos expect_d_eq(arg->pusher_counts[thread], idx, 237*7bdf38e5Schristos "Thread's pushes reordered"); 238*7bdf38e5Schristos arg->pusher_counts[thread]++; 239*7bdf38e5Schristos if (arg->pusher_counts[thread] 240*7bdf38e5Schristos == arg->nelems_per_pusher) { 241*7bdf38e5Schristos done_pushers++; 242*7bdf38e5Schristos } 243*7bdf38e5Schristos } 244*7bdf38e5Schristos } 245*7bdf38e5Schristos return NULL; 246*7bdf38e5Schristos } 247*7bdf38e5Schristos 248*7bdf38e5Schristos TEST_BEGIN(test_multiple_threads) { 249*7bdf38e5Schristos enum { 250*7bdf38e5Schristos NPUSHERS = 4, 251*7bdf38e5Schristos NELEMS_PER_PUSHER = 1000*1000, 252*7bdf38e5Schristos }; 253*7bdf38e5Schristos thd_t pushers[NPUSHERS]; 254*7bdf38e5Schristos pusher_arg_t pusher_arg[NPUSHERS]; 255*7bdf38e5Schristos 256*7bdf38e5Schristos thd_t popper; 257*7bdf38e5Schristos popper_arg_t popper_arg; 258*7bdf38e5Schristos 259*7bdf38e5Schristos elem_mpsc_queue_t queue; 260*7bdf38e5Schristos elem_mpsc_queue_new(&queue); 261*7bdf38e5Schristos 262*7bdf38e5Schristos elem_t *elems = calloc(NPUSHERS * NELEMS_PER_PUSHER, sizeof(elem_t)); 263*7bdf38e5Schristos elem_t *elem_iter = elems; 264*7bdf38e5Schristos for (int i = 0; i < NPUSHERS; i++) { 265*7bdf38e5Schristos pusher_arg[i].queue = &queue; 266*7bdf38e5Schristos pusher_arg[i].thread = i; 267*7bdf38e5Schristos pusher_arg[i].elems = elem_iter; 268*7bdf38e5Schristos pusher_arg[i].nelems = NELEMS_PER_PUSHER; 269*7bdf38e5Schristos 270*7bdf38e5Schristos init_elems_simple(elem_iter, NELEMS_PER_PUSHER, i); 271*7bdf38e5Schristos elem_iter += NELEMS_PER_PUSHER; 272*7bdf38e5Schristos } 273*7bdf38e5Schristos popper_arg.queue = &queue; 274*7bdf38e5Schristos popper_arg.npushers = NPUSHERS; 275*7bdf38e5Schristos popper_arg.nelems_per_pusher = NELEMS_PER_PUSHER; 276*7bdf38e5Schristos int pusher_counts[NPUSHERS] = {0}; 277*7bdf38e5Schristos popper_arg.pusher_counts = pusher_counts; 278*7bdf38e5Schristos 279*7bdf38e5Schristos thd_create(&popper, thd_popper, (void *)&popper_arg); 280*7bdf38e5Schristos for (int i = 0; i < NPUSHERS; i++) { 281*7bdf38e5Schristos thd_create(&pushers[i], thd_pusher, &pusher_arg[i]); 282*7bdf38e5Schristos } 283*7bdf38e5Schristos 284*7bdf38e5Schristos thd_join(popper, NULL); 285*7bdf38e5Schristos for (int i = 0; i < NPUSHERS; i++) { 286*7bdf38e5Schristos thd_join(pushers[i], NULL); 287*7bdf38e5Schristos } 288*7bdf38e5Schristos 289*7bdf38e5Schristos for (int i = 0; i < NPUSHERS; i++) { 290*7bdf38e5Schristos expect_d_eq(NELEMS_PER_PUSHER, pusher_counts[i], ""); 291*7bdf38e5Schristos } 292*7bdf38e5Schristos 293*7bdf38e5Schristos free(elems); 294*7bdf38e5Schristos } 295*7bdf38e5Schristos TEST_END 296*7bdf38e5Schristos 297*7bdf38e5Schristos int 298*7bdf38e5Schristos main(void) { 299*7bdf38e5Schristos return test_no_reentrancy( 300*7bdf38e5Schristos test_simple, 301*7bdf38e5Schristos test_push_single_or_batch, 302*7bdf38e5Schristos test_multi_op, 303*7bdf38e5Schristos test_multiple_threads); 304*7bdf38e5Schristos } 305