1b8851fccSafresh1use strict; 2b8851fccSafresh1use warnings; 3b8851fccSafresh1 4b8851fccSafresh1use Config; 5b8851fccSafresh1 6b8851fccSafresh1BEGIN { 7b8851fccSafresh1 if (! $Config{'useithreads'}) { 8b8851fccSafresh1 print("1..0 # SKIP Perl not compiled with 'useithreads'\n"); 9b8851fccSafresh1 exit(0); 10b8851fccSafresh1 } 11b8851fccSafresh1 if (! $Config{'d_select'}) { 12b8851fccSafresh1 print("1..0 # SKIP 'select()' not available for testing\n"); 13b8851fccSafresh1 exit(0); 14b8851fccSafresh1 } 15b8851fccSafresh1} 16b8851fccSafresh1 17b8851fccSafresh1use threads; 18b8851fccSafresh1use Thread::Queue; 19b8851fccSafresh1 20b8851fccSafresh1use Test::More; 21b8851fccSafresh1 22*5759b3d2Safresh1plan tests => 13; 23b8851fccSafresh1 24b8851fccSafresh1my $q = Thread::Queue->new(); 25b8851fccSafresh1my $rpt = Thread::Queue->new(); 26b8851fccSafresh1 27b8851fccSafresh1my $th = threads->create( sub { 28b8851fccSafresh1 # (1) Set queue limit, and report it 29b8851fccSafresh1 $q->limit = 3; 30b8851fccSafresh1 $rpt->enqueue($q->limit); 31b8851fccSafresh1 32b8851fccSafresh1 # (3) Fetch an item from queue 33b8851fccSafresh1 my $item = $q->dequeue(); 34b8851fccSafresh1 is($item, 1, 'Dequeued item 1'); 35b8851fccSafresh1 # Report queue count 36b8851fccSafresh1 $rpt->enqueue($q->pending()); 37b8851fccSafresh1 38b8851fccSafresh1 # q = (2, 3, 4, 5); r = (4) 39b8851fccSafresh1 40b8851fccSafresh1 # (4) Enqueue more items - will block 41b8851fccSafresh1 $q->enqueue(6, 7); 42b8851fccSafresh1 # q = (5, 'foo', 6, 7); r = (4, 3, 4, 3) 43b8851fccSafresh1 44b8851fccSafresh1 # (6) Get reports from main 45b8851fccSafresh1 my @items = $rpt->dequeue(5); 46b8851fccSafresh1 is_deeply(\@items, [4, 3, 4, 3, 'go'], 'Queue reports'); 47b8851fccSafresh1}); 48b8851fccSafresh1 49b8851fccSafresh1# (2) Read queue limit from thread 50b8851fccSafresh1my $item = $rpt->dequeue(); 51b8851fccSafresh1is($item, $q->limit, 'Queue limit set'); 52b8851fccSafresh1# Send items 53b8851fccSafresh1$q->enqueue(1, 2, 3, 4, 5); 54b8851fccSafresh1 55b8851fccSafresh1# (5) Read queue count 56b8851fccSafresh1$item = $rpt->dequeue; 57b8851fccSafresh1# q = (2, 3, 4, 5); r = () 58b8851fccSafresh1is($item, $q->pending(), 'Queue count'); 59b8851fccSafresh1# Report back the queue count 60b8851fccSafresh1$rpt->enqueue($q->pending); 61b8851fccSafresh1# q = (2, 3, 4, 5); r = (4) 62b8851fccSafresh1 63b8851fccSafresh1# Read an item from queue 64b8851fccSafresh1$item = $q->dequeue(); 65b8851fccSafresh1is($item, 2, 'Dequeued item 2'); 66b8851fccSafresh1# q = (3, 4, 5); r = (4) 67b8851fccSafresh1# Report back the queue count 68b8851fccSafresh1$rpt->enqueue($q->pending); 69b8851fccSafresh1# q = (3, 4, 5); r = (4, 3) 70b8851fccSafresh1 71b8851fccSafresh1# 'insert' doesn't care about queue limit 72b8851fccSafresh1$q->insert(3, 'foo'); 73b8851fccSafresh1$rpt->enqueue($q->pending); 74b8851fccSafresh1# q = (3, 4, 5, 'foo'); r = (4, 3, 4) 75b8851fccSafresh1 76b8851fccSafresh1# Read an item from queue 77b8851fccSafresh1$item = $q->dequeue(); 78b8851fccSafresh1is($item, 3, 'Dequeued item 3'); 79b8851fccSafresh1# q = (4, 5, 'foo'); r = (4, 3, 4) 80b8851fccSafresh1# Report back the queue count 81b8851fccSafresh1$rpt->enqueue($q->pending); 82b8851fccSafresh1# q = (4, 5, 'foo'); r = (4, 3, 4, 3) 83b8851fccSafresh1 84b8851fccSafresh1# Read all items from queue 85*5759b3d2Safresh1my @items = $q->dequeue(3); 86*5759b3d2Safresh1is_deeply(\@items, [4, 5, 'foo'], 'Dequeued 3 items'); 87b8851fccSafresh1# Thread is now unblocked 88b8851fccSafresh1 89*5759b3d2Safresh1@items = $q->dequeue(2); 90*5759b3d2Safresh1is_deeply(\@items, [6, 7], 'Dequeued 2 items'); 91b8851fccSafresh1 92b8851fccSafresh1# Thread is now unblocked 93b8851fccSafresh1# Handshake with thread 94b8851fccSafresh1$rpt->enqueue('go'); 95b8851fccSafresh1 96b8851fccSafresh1# (7) - Done 97b8851fccSafresh1$th->join; 98b8851fccSafresh1 99*5759b3d2Safresh1# It's an error to call dequeue methods with COUNT > LIMIT 100*5759b3d2Safresh1eval { $q->dequeue(5); }; 101*5759b3d2Safresh1like($@, qr/exceeds queue size limit/, $@); 102*5759b3d2Safresh1 103*5759b3d2Safresh1# Bug #120157 104*5759b3d2Safresh1# Fix deadlock from combination of dequeue_nb, enqueue and queue size limit 105*5759b3d2Safresh1 106*5759b3d2Safresh1# (1) Fill queue 107*5759b3d2Safresh1$q->enqueue(1..3); 108*5759b3d2Safresh1is($q->pending, 3, 'Queue loaded'); 109*5759b3d2Safresh1 110*5759b3d2Safresh1# (2) Thread will block trying to add to full queue 111*5759b3d2Safresh1$th = threads->create( sub { 112*5759b3d2Safresh1 $q->enqueue(99); 113*5759b3d2Safresh1 return('OK'); 114*5759b3d2Safresh1}); 115*5759b3d2Safresh1threads->yield(); 116*5759b3d2Safresh1 117*5759b3d2Safresh1# (3) Dequeue an item so that thread can unblock 118*5759b3d2Safresh1is($q->dequeue_nb(), 1, 'Dequeued item'); 119*5759b3d2Safresh1 120*5759b3d2Safresh1# (4) Thread unblocks 121*5759b3d2Safresh1is($th->join(), 'OK', 'Thread exited'); 122*5759b3d2Safresh1 123*5759b3d2Safresh1# (5) Fetch queue to show thread's item was enqueued 124*5759b3d2Safresh1@items = (); 125*5759b3d2Safresh1while (my $item = $q->dequeue_nb()) { 126*5759b3d2Safresh1 push(@items, $item); 127*5759b3d2Safresh1} 128*5759b3d2Safresh1is_deeply(\@items, [2,3,99], 'Dequeued remaining'); 129*5759b3d2Safresh1 130b8851fccSafresh1exit(0); 131b8851fccSafresh1 132b8851fccSafresh1# EOF 133