1*91f110e0Safresh1use strict; 2*91f110e0Safresh1use warnings; 3*91f110e0Safresh1 4*91f110e0Safresh1use Config; 5*91f110e0Safresh1 6*91f110e0Safresh1BEGIN { 7*91f110e0Safresh1 if (! $Config{'useithreads'}) { 8*91f110e0Safresh1 print("1..0 # SKIP Perl not compiled with 'useithreads'\n"); 9*91f110e0Safresh1 exit(0); 10*91f110e0Safresh1 } 11*91f110e0Safresh1 if (! $Config{'d_select'}) { 12*91f110e0Safresh1 print("1..0 # SKIP 'select()' not available for testing\n"); 13*91f110e0Safresh1 exit(0); 14*91f110e0Safresh1 } 15*91f110e0Safresh1} 16*91f110e0Safresh1 17*91f110e0Safresh1use threads; 18*91f110e0Safresh1use Thread::Queue; 19*91f110e0Safresh1 20*91f110e0Safresh1use Test::More; 21*91f110e0Safresh1 22*91f110e0Safresh1my $num_threads = 3; 23*91f110e0Safresh1my $cycles = 2; 24*91f110e0Safresh1my $count = 2; 25*91f110e0Safresh1plan tests => 3*$num_threads*$cycles*$count + 6*$num_threads + 6; 26*91f110e0Safresh1 27*91f110e0Safresh1# Test for end() while threads are blocked and no more items in queue 28*91f110e0Safresh1{ 29*91f110e0Safresh1 my @items = 1..($num_threads*$cycles*$count); 30*91f110e0Safresh1 my $q = Thread::Queue->new(@items); 31*91f110e0Safresh1 my $r = Thread::Queue->new(); 32*91f110e0Safresh1 33*91f110e0Safresh1 my @threads; 34*91f110e0Safresh1 for my $ii (1..$num_threads) { 35*91f110e0Safresh1 push @threads, threads->create( sub { 36*91f110e0Safresh1 # Thread will loop until no more work is coming 37*91f110e0Safresh1 LOOP: 38*91f110e0Safresh1 while (my @set = $q->dequeue($count)) { 39*91f110e0Safresh1 foreach my $item (@set) { 40*91f110e0Safresh1 last LOOP if (! defined($item)); 41*91f110e0Safresh1 pass("'$item' read from queue in thread $ii"); 42*91f110e0Safresh1 } 43*91f110e0Safresh1 select(undef, undef, undef, rand(1)); 44*91f110e0Safresh1 $r->enqueue($ii); 45*91f110e0Safresh1 } 46*91f110e0Safresh1 pass("Thread $ii exiting"); 47*91f110e0Safresh1 }); 48*91f110e0Safresh1 } 49*91f110e0Safresh1 50*91f110e0Safresh1 # Make sure there's nothing in the queue and threads are blocking 51*91f110e0Safresh1 for my $ii (1..($num_threads*$cycles)) { 52*91f110e0Safresh1 $r->dequeue(); 53*91f110e0Safresh1 } 54*91f110e0Safresh1 sleep(1); 55*91f110e0Safresh1 threads->yield(); 56*91f110e0Safresh1 57*91f110e0Safresh1 is($q->pending(), 0, 'Queue is empty'); 58*91f110e0Safresh1 59*91f110e0Safresh1 # Signal no more work is coming 60*91f110e0Safresh1 $q->end(); 61*91f110e0Safresh1 62*91f110e0Safresh1 is($q->pending(), undef, 'Queue is ended'); 63*91f110e0Safresh1 64*91f110e0Safresh1 for my $thread (@threads) { 65*91f110e0Safresh1 $thread->join; 66*91f110e0Safresh1 pass($thread->tid." joined"); 67*91f110e0Safresh1 } 68*91f110e0Safresh1} 69*91f110e0Safresh1 70*91f110e0Safresh1# Test for end() while threads are blocked and items still remain in queue 71*91f110e0Safresh1{ 72*91f110e0Safresh1 my @items = 1..($num_threads*$cycles*$count + 1); 73*91f110e0Safresh1 my $q = Thread::Queue->new(@items); 74*91f110e0Safresh1 my $r = Thread::Queue->new(); 75*91f110e0Safresh1 76*91f110e0Safresh1 my @threads; 77*91f110e0Safresh1 for my $ii (1..$num_threads) { 78*91f110e0Safresh1 push @threads, threads->create( sub { 79*91f110e0Safresh1 # Thread will loop until no more work is coming 80*91f110e0Safresh1 LOOP: 81*91f110e0Safresh1 while (my @set = $q->dequeue($count)) { 82*91f110e0Safresh1 foreach my $item (@set) { 83*91f110e0Safresh1 last LOOP if (! defined($item)); 84*91f110e0Safresh1 pass("'$item' read from queue in thread $ii"); 85*91f110e0Safresh1 } 86*91f110e0Safresh1 select(undef, undef, undef, rand(1)); 87*91f110e0Safresh1 $r->enqueue($ii); 88*91f110e0Safresh1 } 89*91f110e0Safresh1 pass("Thread $ii exiting"); 90*91f110e0Safresh1 }); 91*91f110e0Safresh1 } 92*91f110e0Safresh1 93*91f110e0Safresh1 # Make sure there's nothing in the queue and threads are blocking 94*91f110e0Safresh1 for my $ii (1..($num_threads*$cycles)) { 95*91f110e0Safresh1 $r->dequeue(); 96*91f110e0Safresh1 } 97*91f110e0Safresh1 sleep(1); 98*91f110e0Safresh1 threads->yield(); 99*91f110e0Safresh1 100*91f110e0Safresh1 is($q->pending(), 1, 'Queue has one left'); 101*91f110e0Safresh1 102*91f110e0Safresh1 # Signal no more work is coming 103*91f110e0Safresh1 $q->end(); 104*91f110e0Safresh1 105*91f110e0Safresh1 for my $thread (@threads) { 106*91f110e0Safresh1 $thread->join; 107*91f110e0Safresh1 pass($thread->tid." joined"); 108*91f110e0Safresh1 } 109*91f110e0Safresh1 110*91f110e0Safresh1 is($q->pending(), undef, 'Queue is ended'); 111*91f110e0Safresh1} 112*91f110e0Safresh1 113*91f110e0Safresh1# Test of end() send while items in queue 114*91f110e0Safresh1{ 115*91f110e0Safresh1 my @items = 1..($num_threads*$cycles*$count + 1); 116*91f110e0Safresh1 my $q = Thread::Queue->new(@items); 117*91f110e0Safresh1 118*91f110e0Safresh1 my @threads; 119*91f110e0Safresh1 for my $ii (1..$num_threads) { 120*91f110e0Safresh1 push @threads, threads->create( sub { 121*91f110e0Safresh1 # Thread will loop until no more work is coming 122*91f110e0Safresh1 LOOP: 123*91f110e0Safresh1 while (my @set = $q->dequeue($count)) { 124*91f110e0Safresh1 foreach my $item (@set) { 125*91f110e0Safresh1 last LOOP if (! defined($item)); 126*91f110e0Safresh1 pass("'$item' read from queue in thread $ii"); 127*91f110e0Safresh1 } 128*91f110e0Safresh1 select(undef, undef, undef, rand(1)); 129*91f110e0Safresh1 } 130*91f110e0Safresh1 pass("Thread $ii exiting"); 131*91f110e0Safresh1 }); 132*91f110e0Safresh1 } 133*91f110e0Safresh1 134*91f110e0Safresh1 # Signal no more work is coming to the blocked threads, they 135*91f110e0Safresh1 # should unblock. 136*91f110e0Safresh1 $q->end(); 137*91f110e0Safresh1 138*91f110e0Safresh1 for my $thread (@threads) { 139*91f110e0Safresh1 $thread->join; 140*91f110e0Safresh1 pass($thread->tid." joined"); 141*91f110e0Safresh1 } 142*91f110e0Safresh1} 143*91f110e0Safresh1 144*91f110e0Safresh1exit(0); 145*91f110e0Safresh1 146*91f110e0Safresh1# EOF 147