1use strict; 2use warnings; 3 4BEGIN { 5 use Config; 6 if (! $Config{'useithreads'}) { 7 print("1..0 # SKIP Perl not compiled with 'useithreads'\n"); 8 exit(0); 9 } 10} 11 12use threads; 13use Thread::Queue; 14 15BEGIN { # perl RT 133382 16if ($] == 5.008) { 17 require 't/test.pl'; # Test::More work-alike for Perl 5.8.0 18} else { 19 require Test::More; 20} 21Test::More->import(); 22} # end BEGIN 23plan('tests' => 81); 24 25### Basic usage with multiple threads ### 26 27my $nthreads = 5; 28 29my $q = Thread::Queue->new(1..$nthreads); 30ok($q, 'New queue'); 31is($q->pending(), $nthreads, 'Pre-populated queue count'); 32 33sub reader { 34 my $id = threads->tid(); 35 while ((my $el = $q->dequeue()) != -1) { 36 ok($el >= 1, "Thread $id got $el"); 37 select(undef, undef, undef, rand(1)); 38 } 39 ok(1, "Thread $id done"); 40} 41 42my @threads; 43push(@threads, threads->create('reader')) for (1..$nthreads); 44 45for (1..20) { 46 select(undef, undef, undef, rand(1)); 47 $q->enqueue($_); 48} 49 50$q->enqueue((-1) x $nthreads); # One end marker for each thread 51 52$_->join() foreach @threads; 53undef(@threads); 54 55is($q->pending(), 0, 'Empty queue'); 56 57 58### ->dequeue_nb() test ### 59 60$q = Thread::Queue->new(); 61ok($q, 'New queue'); 62is($q->pending(), 0, 'Empty queue'); 63 64my @items = qw/foo bar baz/; 65$q->enqueue(@items); 66 67threads->create(sub { 68 is($q->pending(), scalar(@items), 'Queue count in thread'); 69 while (my $el = $q->dequeue_nb()) { 70 is($el, shift(@items), "Thread got $el"); 71 } 72 is($q->pending(), 0, 'Empty queue'); 73 $q->enqueue('done'); 74})->join(); 75 76is($q->pending(), 1, 'Queue count after thread'); 77is($q->dequeue(), 'done', 'Thread reported done'); 78is($q->pending(), 0, 'Empty queue'); 79 80 81### ->dequeue(COUNT) test ### 82 83my $count = 3; 84 85sub reader2 { 86 my $id = threads->tid(); 87 while (1) { 88 my @el = $q->dequeue($count); 89 is(scalar(@el), $count, "Thread $id got @el"); 90 select(undef, undef, undef, rand(1)); 91 return if ($el[0] == 0); 92 } 93} 94 95push(@threads, threads->create('reader2')) for (1..$nthreads); 96 97$q->enqueue(1..4*$count*$nthreads); 98$q->enqueue((0) x ($count*$nthreads)); 99 100$_->join() foreach @threads; 101undef(@threads); 102 103is($q->pending(), 0, 'Empty queue'); 104 105 106### ->dequeue_nb(COUNT) test ### 107 108@items = qw/foo bar baz qux exit/; 109$q->enqueue(@items); 110is($q->pending(), scalar(@items), 'Queue count'); 111 112threads->create(sub { 113 is($q->pending(), scalar(@items), 'Queue count in thread'); 114 while (my @el = $q->dequeue_nb(2)) { 115 is($el[0], shift(@items), "Thread got $el[0]"); 116 if ($el[0] eq 'exit') { 117 is(scalar(@el), 1, 'Thread to exit'); 118 } else { 119 is($el[1], shift(@items), "Thread got $el[1]"); 120 } 121 } 122 is($q->pending(), 0, 'Empty queue'); 123 $q->enqueue('done'); 124})->join(); 125 126is($q->pending(), 1, 'Queue count after thread'); 127is($q->dequeue(), 'done', 'Thread reported done'); 128is($q->pending(), 0, 'Empty queue'); 129 130exit(0); 131 132# EOF 133