1use strict; 2use warnings; 3 4BEGIN { 5 use Config; 6 if (! $Config{'useithreads'}) { 7 print("1..0 # SKIP Perl not compiled with 'useithreads'\n"); 8 exit(0); 9 } 10} 11 12use threads; 13use Thread::Queue; 14 15if ($] == 5.008) { 16 require 't/test.pl'; # Test::More work-alike for Perl 5.8.0 17} else { 18 require Test::More; 19} 20Test::More->import(); 21plan('tests' => 81); 22 23### Basic usage with multiple threads ### 24 25my $nthreads = 5; 26 27my $q = Thread::Queue->new(1..$nthreads); 28ok($q, 'New queue'); 29is($q->pending(), $nthreads, 'Pre-populated queue count'); 30 31sub reader { 32 my $id = threads->tid(); 33 while ((my $el = $q->dequeue()) != -1) { 34 ok($el >= 1, "Thread $id got $el"); 35 select(undef, undef, undef, rand(1)); 36 } 37 ok(1, "Thread $id done"); 38} 39 40my @threads; 41push(@threads, threads->create('reader')) for (1..$nthreads); 42 43for (1..20) { 44 select(undef, undef, undef, rand(1)); 45 $q->enqueue($_); 46} 47 48$q->enqueue((-1) x $nthreads); # One end marker for each thread 49 50$_->join() foreach @threads; 51undef(@threads); 52 53is($q->pending(), 0, 'Empty queue'); 54 55 56### ->dequeue_nb() test ### 57 58$q = Thread::Queue->new(); 59ok($q, 'New queue'); 60is($q->pending(), 0, 'Empty queue'); 61 62my @items = qw/foo bar baz/; 63$q->enqueue(@items); 64 65threads->create(sub { 66 is($q->pending(), scalar(@items), 'Queue count in thread'); 67 while (my $el = $q->dequeue_nb()) { 68 is($el, shift(@items), "Thread got $el"); 69 } 70 is($q->pending(), 0, 'Empty queue'); 71 $q->enqueue('done'); 72})->join(); 73 74is($q->pending(), 1, 'Queue count after thread'); 75is($q->dequeue(), 'done', 'Thread reported done'); 76is($q->pending(), 0, 'Empty queue'); 77 78 79### ->dequeue(COUNT) test ### 80 81my $count = 3; 82 83sub reader2 { 84 my $id = threads->tid(); 85 while (1) { 86 my @el = $q->dequeue($count); 87 is(scalar(@el), $count, "Thread $id got @el"); 88 select(undef, undef, undef, rand(1)); 89 return if ($el[0] == 0); 90 } 91} 92 93push(@threads, threads->create('reader2')) for (1..$nthreads); 94 95$q->enqueue(1..4*$count*$nthreads); 96$q->enqueue((0) x ($count*$nthreads)); 97 98$_->join() foreach @threads; 99undef(@threads); 100 101is($q->pending(), 0, 'Empty queue'); 102 103 104### ->dequeue_nb(COUNT) test ### 105 106@items = qw/foo bar baz qux exit/; 107$q->enqueue(@items); 108is($q->pending(), scalar(@items), 'Queue count'); 109 110threads->create(sub { 111 is($q->pending(), scalar(@items), 'Queue count in thread'); 112 while (my @el = $q->dequeue_nb(2)) { 113 is($el[0], shift(@items), "Thread got $el[0]"); 114 if ($el[0] eq 'exit') { 115 is(scalar(@el), 1, 'Thread to exit'); 116 } else { 117 is($el[1], shift(@items), "Thread got $el[1]"); 118 } 119 } 120 is($q->pending(), 0, 'Empty queue'); 121 $q->enqueue('done'); 122})->join(); 123 124is($q->pending(), 1, 'Queue count after thread'); 125is($q->dequeue(), 'done', 'Thread reported done'); 126is($q->pending(), 0, 'Empty queue'); 127 128exit(0); 129 130# EOF 131