1b39c5158Smillertuse strict; 2b39c5158Smillertuse warnings; 3b39c5158Smillert 4b39c5158SmillertBEGIN { 5b39c5158Smillert use Config; 6b39c5158Smillert if (! $Config{'useithreads'}) { 7b39c5158Smillert print("1..0 # SKIP Perl not compiled with 'useithreads'\n"); 8b39c5158Smillert exit(0); 9b39c5158Smillert } 10b39c5158Smillert} 11b39c5158Smillert 12b39c5158Smillertuse threads; 13b39c5158Smillertuse Thread::Queue; 14b39c5158Smillert 15*f3efcd01Safresh1BEGIN { # perl RT 133382 16b39c5158Smillertif ($] == 5.008) { 17b39c5158Smillert require 't/test.pl'; # Test::More work-alike for Perl 5.8.0 18b39c5158Smillert} else { 19b39c5158Smillert require Test::More; 20b39c5158Smillert} 21b39c5158SmillertTest::More->import(); 22*f3efcd01Safresh1} # end BEGIN 23b39c5158Smillertplan('tests' => 81); 24b39c5158Smillert 25b39c5158Smillert### Basic usage with multiple threads ### 26b39c5158Smillert 27b39c5158Smillertmy $nthreads = 5; 28b39c5158Smillert 29b39c5158Smillertmy $q = Thread::Queue->new(1..$nthreads); 30b39c5158Smillertok($q, 'New queue'); 31b39c5158Smillertis($q->pending(), $nthreads, 'Pre-populated queue count'); 32b39c5158Smillert 33b39c5158Smillertsub reader { 34b39c5158Smillert my $id = threads->tid(); 35b39c5158Smillert while ((my $el = $q->dequeue()) != -1) { 36b39c5158Smillert ok($el >= 1, "Thread $id got $el"); 37b39c5158Smillert select(undef, undef, undef, rand(1)); 38b39c5158Smillert } 39b39c5158Smillert ok(1, "Thread $id done"); 40b39c5158Smillert} 41b39c5158Smillert 42b39c5158Smillertmy @threads; 43b39c5158Smillertpush(@threads, threads->create('reader')) for (1..$nthreads); 44b39c5158Smillert 45b39c5158Smillertfor (1..20) { 46b39c5158Smillert select(undef, undef, undef, rand(1)); 47b39c5158Smillert $q->enqueue($_); 48b39c5158Smillert} 49b39c5158Smillert 50b39c5158Smillert$q->enqueue((-1) x $nthreads); # One end marker for each thread 51b39c5158Smillert 52b39c5158Smillert$_->join() foreach @threads; 53b39c5158Smillertundef(@threads); 54b39c5158Smillert 55b39c5158Smillertis($q->pending(), 0, 'Empty queue'); 56b39c5158Smillert 57b39c5158Smillert 58b39c5158Smillert### ->dequeue_nb() test ### 59b39c5158Smillert 60b39c5158Smillert$q = Thread::Queue->new(); 61b39c5158Smillertok($q, 'New queue'); 62b39c5158Smillertis($q->pending(), 0, 'Empty queue'); 63b39c5158Smillert 64b39c5158Smillertmy @items = qw/foo bar baz/; 65b39c5158Smillert$q->enqueue(@items); 66b39c5158Smillert 67b39c5158Smillertthreads->create(sub { 68b39c5158Smillert is($q->pending(), scalar(@items), 'Queue count in thread'); 69b39c5158Smillert while (my $el = $q->dequeue_nb()) { 70b39c5158Smillert is($el, shift(@items), "Thread got $el"); 71b39c5158Smillert } 72b39c5158Smillert is($q->pending(), 0, 'Empty queue'); 73b39c5158Smillert $q->enqueue('done'); 74b39c5158Smillert})->join(); 75b39c5158Smillert 76b39c5158Smillertis($q->pending(), 1, 'Queue count after thread'); 77b39c5158Smillertis($q->dequeue(), 'done', 'Thread reported done'); 78b39c5158Smillertis($q->pending(), 0, 'Empty queue'); 79b39c5158Smillert 80b39c5158Smillert 81b39c5158Smillert### ->dequeue(COUNT) test ### 82b39c5158Smillert 83b39c5158Smillertmy $count = 3; 84b39c5158Smillert 85b39c5158Smillertsub reader2 { 86b39c5158Smillert my $id = threads->tid(); 87b39c5158Smillert while (1) { 88b39c5158Smillert my @el = $q->dequeue($count); 89b39c5158Smillert is(scalar(@el), $count, "Thread $id got @el"); 90b39c5158Smillert select(undef, undef, undef, rand(1)); 91b39c5158Smillert return if ($el[0] == 0); 92b39c5158Smillert } 93b39c5158Smillert} 94b39c5158Smillert 95b39c5158Smillertpush(@threads, threads->create('reader2')) for (1..$nthreads); 96b39c5158Smillert 97b39c5158Smillert$q->enqueue(1..4*$count*$nthreads); 98b39c5158Smillert$q->enqueue((0) x ($count*$nthreads)); 99b39c5158Smillert 100b39c5158Smillert$_->join() foreach @threads; 101b39c5158Smillertundef(@threads); 102b39c5158Smillert 103b39c5158Smillertis($q->pending(), 0, 'Empty queue'); 104b39c5158Smillert 105b39c5158Smillert 106b39c5158Smillert### ->dequeue_nb(COUNT) test ### 107b39c5158Smillert 108b39c5158Smillert@items = qw/foo bar baz qux exit/; 109b39c5158Smillert$q->enqueue(@items); 110b39c5158Smillertis($q->pending(), scalar(@items), 'Queue count'); 111b39c5158Smillert 112b39c5158Smillertthreads->create(sub { 113b39c5158Smillert is($q->pending(), scalar(@items), 'Queue count in thread'); 114b39c5158Smillert while (my @el = $q->dequeue_nb(2)) { 115b39c5158Smillert is($el[0], shift(@items), "Thread got $el[0]"); 116b39c5158Smillert if ($el[0] eq 'exit') { 117b39c5158Smillert is(scalar(@el), 1, 'Thread to exit'); 118b39c5158Smillert } else { 119b39c5158Smillert is($el[1], shift(@items), "Thread got $el[1]"); 120b39c5158Smillert } 121b39c5158Smillert } 122b39c5158Smillert is($q->pending(), 0, 'Empty queue'); 123b39c5158Smillert $q->enqueue('done'); 124b39c5158Smillert})->join(); 125b39c5158Smillert 126b39c5158Smillertis($q->pending(), 1, 'Queue count after thread'); 127b39c5158Smillertis($q->dequeue(), 'done', 'Thread reported done'); 128b39c5158Smillertis($q->pending(), 0, 'Empty queue'); 129b39c5158Smillert 130b39c5158Smillertexit(0); 131b39c5158Smillert 132b39c5158Smillert# EOF 133