1b39c5158Smillertpackage Thread::Queue; 2b39c5158Smillert 3b39c5158Smillertuse strict; 4b39c5158Smillertuse warnings; 5b39c5158Smillert 6*56d68f1eSafresh1our $VERSION = '3.14'; # remember to update version in POD! 7898184e3Ssthen$VERSION = eval $VERSION; 8b39c5158Smillert 9b39c5158Smillertuse threads::shared 1.21; 10b39c5158Smillertuse Scalar::Util 1.10 qw(looks_like_number blessed reftype refaddr); 11b39c5158Smillert 12b39c5158Smillert# Carp errors from threads::shared calls should complain about caller 13b39c5158Smillertour @CARP_NOT = ("threads::shared"); 14b39c5158Smillert 15b39c5158Smillert# Create a new queue possibly pre-populated with items 16b39c5158Smillertsub new 17b39c5158Smillert{ 18b39c5158Smillert my $class = shift; 19b39c5158Smillert my @queue :shared = map { shared_clone($_) } @_; 2091f110e0Safresh1 my %self :shared = ( 'queue' => \@queue ); 2191f110e0Safresh1 return bless(\%self, $class); 22b39c5158Smillert} 23b39c5158Smillert 24b39c5158Smillert# Add items to the tail of a queue 25b39c5158Smillertsub enqueue 26b39c5158Smillert{ 2791f110e0Safresh1 my $self = shift; 2891f110e0Safresh1 lock(%$self); 29b8851fccSafresh1 3091f110e0Safresh1 if ($$self{'ENDED'}) { 3191f110e0Safresh1 require Carp; 3291f110e0Safresh1 Carp::croak("'enqueue' method called on queue that has been 'end'ed"); 3391f110e0Safresh1 } 34b8851fccSafresh1 35b8851fccSafresh1 # Block if queue size exceeds any specified limit 36b8851fccSafresh1 my $queue = $$self{'queue'}; 37b8851fccSafresh1 cond_wait(%$self) while ($$self{'LIMIT'} && (@$queue >= $$self{'LIMIT'})); 38b8851fccSafresh1 39b8851fccSafresh1 # Add items to queue, and then signal other threads 40b8851fccSafresh1 push(@$queue, map { shared_clone($_) } @_) 4191f110e0Safresh1 and cond_signal(%$self); 42b39c5158Smillert} 43b39c5158Smillert 44b8851fccSafresh1# Set or return the max. size for a queue 45b8851fccSafresh1sub limit : lvalue 46b8851fccSafresh1{ 47b8851fccSafresh1 my $self = shift; 48b8851fccSafresh1 lock(%$self); 49b8851fccSafresh1 $$self{'LIMIT'}; 50b8851fccSafresh1} 51b8851fccSafresh1 52b39c5158Smillert# Return a count of the number of items on a queue 53b39c5158Smillertsub pending 54b39c5158Smillert{ 5591f110e0Safresh1 my $self = shift; 5691f110e0Safresh1 lock(%$self); 5791f110e0Safresh1 return if ($$self{'ENDED'} && ! @{$$self{'queue'}}); 5891f110e0Safresh1 return scalar(@{$$self{'queue'}}); 5991f110e0Safresh1} 6091f110e0Safresh1 6191f110e0Safresh1# Indicate that no more data will enter the queue 6291f110e0Safresh1sub end 6391f110e0Safresh1{ 6491f110e0Safresh1 my $self = shift; 65b8851fccSafresh1 lock(%$self); 6691f110e0Safresh1 # No more data is coming 6791f110e0Safresh1 $$self{'ENDED'} = 1; 689f11ffb7Safresh1 699f11ffb7Safresh1 cond_signal(%$self); # Unblock possibly waiting threads 70b39c5158Smillert} 71b39c5158Smillert 72b39c5158Smillert# Return 1 or more items from the head of a queue, blocking if needed 73b39c5158Smillertsub dequeue 74b39c5158Smillert{ 7591f110e0Safresh1 my $self = shift; 7691f110e0Safresh1 lock(%$self); 7791f110e0Safresh1 my $queue = $$self{'queue'}; 78b39c5158Smillert 796fb12b70Safresh1 my $count = @_ ? $self->_validate_count(shift) : 1; 80b39c5158Smillert 81b39c5158Smillert # Wait for requisite number of items 8291f110e0Safresh1 cond_wait(%$self) while ((@$queue < $count) && ! $$self{'ENDED'}); 8391f110e0Safresh1 8491f110e0Safresh1 # If no longer blocking, try getting whatever is left on the queue 8591f110e0Safresh1 return $self->dequeue_nb($count) if ($$self{'ENDED'}); 86b39c5158Smillert 87b39c5158Smillert # Return single item 889f11ffb7Safresh1 if ($count == 1) { 899f11ffb7Safresh1 my $item = shift(@$queue); 909f11ffb7Safresh1 cond_signal(%$self); # Unblock possibly waiting threads 919f11ffb7Safresh1 return $item; 929f11ffb7Safresh1 } 93b39c5158Smillert 94b39c5158Smillert # Return multiple items 95b39c5158Smillert my @items; 96b39c5158Smillert push(@items, shift(@$queue)) for (1..$count); 979f11ffb7Safresh1 cond_signal(%$self); # Unblock possibly waiting threads 98b39c5158Smillert return @items; 99b39c5158Smillert} 100b39c5158Smillert 101b39c5158Smillert# Return items from the head of a queue with no blocking 102b39c5158Smillertsub dequeue_nb 103b39c5158Smillert{ 10491f110e0Safresh1 my $self = shift; 10591f110e0Safresh1 lock(%$self); 10691f110e0Safresh1 my $queue = $$self{'queue'}; 107b39c5158Smillert 1086fb12b70Safresh1 my $count = @_ ? $self->_validate_count(shift) : 1; 109b39c5158Smillert 110b39c5158Smillert # Return single item 1119f11ffb7Safresh1 if ($count == 1) { 1129f11ffb7Safresh1 my $item = shift(@$queue); 1139f11ffb7Safresh1 cond_signal(%$self); # Unblock possibly waiting threads 1149f11ffb7Safresh1 return $item; 1159f11ffb7Safresh1 } 116b39c5158Smillert 117b39c5158Smillert # Return multiple items 118b39c5158Smillert my @items; 119b39c5158Smillert for (1..$count) { 120b39c5158Smillert last if (! @$queue); 121b39c5158Smillert push(@items, shift(@$queue)); 122b39c5158Smillert } 1239f11ffb7Safresh1 cond_signal(%$self); # Unblock possibly waiting threads 124b39c5158Smillert return @items; 125b39c5158Smillert} 126b39c5158Smillert 12791f110e0Safresh1# Return items from the head of a queue, blocking if needed up to a timeout 12891f110e0Safresh1sub dequeue_timed 12991f110e0Safresh1{ 13091f110e0Safresh1 my $self = shift; 13191f110e0Safresh1 lock(%$self); 13291f110e0Safresh1 my $queue = $$self{'queue'}; 13391f110e0Safresh1 13491f110e0Safresh1 # Timeout may be relative or absolute 1356fb12b70Safresh1 my $timeout = @_ ? $self->_validate_timeout(shift) : -1; 13691f110e0Safresh1 # Convert to an absolute time for use with cond_timedwait() 13791f110e0Safresh1 if ($timeout < 32000000) { # More than one year 13891f110e0Safresh1 $timeout += time(); 13991f110e0Safresh1 } 14091f110e0Safresh1 1416fb12b70Safresh1 my $count = @_ ? $self->_validate_count(shift) : 1; 14291f110e0Safresh1 14391f110e0Safresh1 # Wait for requisite number of items, or until timeout 14491f110e0Safresh1 while ((@$queue < $count) && ! $$self{'ENDED'}) { 14591f110e0Safresh1 last if (! cond_timedwait(%$self, $timeout)); 14691f110e0Safresh1 } 14791f110e0Safresh1 14891f110e0Safresh1 # Get whatever we need off the queue if available 14991f110e0Safresh1 return $self->dequeue_nb($count); 15091f110e0Safresh1} 15191f110e0Safresh1 152b39c5158Smillert# Return an item without removing it from a queue 153b39c5158Smillertsub peek 154b39c5158Smillert{ 15591f110e0Safresh1 my $self = shift; 15691f110e0Safresh1 lock(%$self); 1576fb12b70Safresh1 my $index = @_ ? $self->_validate_index(shift) : 0; 15891f110e0Safresh1 return $$self{'queue'}[$index]; 159b39c5158Smillert} 160b39c5158Smillert 161b39c5158Smillert# Insert items anywhere into a queue 162b39c5158Smillertsub insert 163b39c5158Smillert{ 16491f110e0Safresh1 my $self = shift; 16591f110e0Safresh1 lock(%$self); 16691f110e0Safresh1 16791f110e0Safresh1 if ($$self{'ENDED'}) { 16891f110e0Safresh1 require Carp; 16991f110e0Safresh1 Carp::croak("'insert' method called on queue that has been 'end'ed"); 17091f110e0Safresh1 } 17191f110e0Safresh1 17291f110e0Safresh1 my $queue = $$self{'queue'}; 173b39c5158Smillert 1746fb12b70Safresh1 my $index = $self->_validate_index(shift); 175b39c5158Smillert 176b39c5158Smillert return if (! @_); # Nothing to insert 177b39c5158Smillert 178b39c5158Smillert # Support negative indices 179b39c5158Smillert if ($index < 0) { 180b39c5158Smillert $index += @$queue; 181b39c5158Smillert if ($index < 0) { 182b39c5158Smillert $index = 0; 183b39c5158Smillert } 184b39c5158Smillert } 185b39c5158Smillert 186b39c5158Smillert # Dequeue items from $index onward 187b39c5158Smillert my @tmp; 188b39c5158Smillert while (@$queue > $index) { 189b39c5158Smillert unshift(@tmp, pop(@$queue)) 190b39c5158Smillert } 191b39c5158Smillert 192b39c5158Smillert # Add new items to the queue 193b39c5158Smillert push(@$queue, map { shared_clone($_) } @_); 194b39c5158Smillert 195b39c5158Smillert # Add previous items back onto the queue 196b39c5158Smillert push(@$queue, @tmp); 197b39c5158Smillert 1989f11ffb7Safresh1 cond_signal(%$self); # Unblock possibly waiting threads 199b39c5158Smillert} 200b39c5158Smillert 201b39c5158Smillert# Remove items from anywhere in a queue 202b39c5158Smillertsub extract 203b39c5158Smillert{ 20491f110e0Safresh1 my $self = shift; 20591f110e0Safresh1 lock(%$self); 20691f110e0Safresh1 my $queue = $$self{'queue'}; 207b39c5158Smillert 2086fb12b70Safresh1 my $index = @_ ? $self->_validate_index(shift) : 0; 2096fb12b70Safresh1 my $count = @_ ? $self->_validate_count(shift) : 1; 210b39c5158Smillert 211b39c5158Smillert # Support negative indices 212b39c5158Smillert if ($index < 0) { 213b39c5158Smillert $index += @$queue; 214b39c5158Smillert if ($index < 0) { 215b39c5158Smillert $count += $index; 216b39c5158Smillert return if ($count <= 0); # Beyond the head of the queue 21791f110e0Safresh1 return $self->dequeue_nb($count); # Extract from the head 218b39c5158Smillert } 219b39c5158Smillert } 220b39c5158Smillert 221b39c5158Smillert # Dequeue items from $index+$count onward 222b39c5158Smillert my @tmp; 223b39c5158Smillert while (@$queue > ($index+$count)) { 224b39c5158Smillert unshift(@tmp, pop(@$queue)) 225b39c5158Smillert } 226b39c5158Smillert 227b39c5158Smillert # Extract desired items 228b39c5158Smillert my @items; 229b39c5158Smillert unshift(@items, pop(@$queue)) while (@$queue > $index); 230b39c5158Smillert 231b39c5158Smillert # Add back any removed items 232b39c5158Smillert push(@$queue, @tmp); 233b39c5158Smillert 2349f11ffb7Safresh1 cond_signal(%$self); # Unblock possibly waiting threads 2359f11ffb7Safresh1 236b39c5158Smillert # Return single item 237b39c5158Smillert return $items[0] if ($count == 1); 238b39c5158Smillert 239b39c5158Smillert # Return multiple items 240b39c5158Smillert return @items; 241b39c5158Smillert} 242b39c5158Smillert 2436fb12b70Safresh1### Internal Methods ### 244b39c5158Smillert 245b39c5158Smillert# Check value of the requested index 2466fb12b70Safresh1sub _validate_index 2476fb12b70Safresh1{ 2486fb12b70Safresh1 my $self = shift; 249b39c5158Smillert my $index = shift; 250b39c5158Smillert 251b39c5158Smillert if (! defined($index) || 252b39c5158Smillert ! looks_like_number($index) || 253b39c5158Smillert (int($index) != $index)) 254b39c5158Smillert { 255b39c5158Smillert require Carp; 256b39c5158Smillert my ($method) = (caller(1))[3]; 2576fb12b70Safresh1 my $class_name = ref($self); 2586fb12b70Safresh1 $method =~ s/$class_name\:://; 259b39c5158Smillert $index = 'undef' if (! defined($index)); 260b39c5158Smillert Carp::croak("Invalid 'index' argument ($index) to '$method' method"); 261b39c5158Smillert } 262b39c5158Smillert 263b39c5158Smillert return $index; 264b39c5158Smillert}; 265b39c5158Smillert 266b39c5158Smillert# Check value of the requested count 2676fb12b70Safresh1sub _validate_count 2686fb12b70Safresh1{ 2696fb12b70Safresh1 my $self = shift; 270b39c5158Smillert my $count = shift; 271b39c5158Smillert 272b39c5158Smillert if (! defined($count) || 273b39c5158Smillert ! looks_like_number($count) || 274b39c5158Smillert (int($count) != $count) || 2759f11ffb7Safresh1 ($count < 1) || 2769f11ffb7Safresh1 ($$self{'LIMIT'} && $count > $$self{'LIMIT'})) 277b39c5158Smillert { 278b39c5158Smillert require Carp; 279b39c5158Smillert my ($method) = (caller(1))[3]; 2806fb12b70Safresh1 my $class_name = ref($self); 2816fb12b70Safresh1 $method =~ s/$class_name\:://; 282b39c5158Smillert $count = 'undef' if (! defined($count)); 2839f11ffb7Safresh1 if ($$self{'LIMIT'} && $count > $$self{'LIMIT'}) { 2849f11ffb7Safresh1 Carp::croak("'count' argument ($count) to '$method' method exceeds queue size limit ($$self{'LIMIT'})"); 2859f11ffb7Safresh1 } else { 286b39c5158Smillert Carp::croak("Invalid 'count' argument ($count) to '$method' method"); 287b39c5158Smillert } 2889f11ffb7Safresh1 } 289b39c5158Smillert 290b39c5158Smillert return $count; 291b39c5158Smillert}; 292b39c5158Smillert 29391f110e0Safresh1# Check value of the requested timeout 2946fb12b70Safresh1sub _validate_timeout 2956fb12b70Safresh1{ 2966fb12b70Safresh1 my $self = shift; 29791f110e0Safresh1 my $timeout = shift; 29891f110e0Safresh1 29991f110e0Safresh1 if (! defined($timeout) || 30091f110e0Safresh1 ! looks_like_number($timeout)) 30191f110e0Safresh1 { 30291f110e0Safresh1 require Carp; 30391f110e0Safresh1 my ($method) = (caller(1))[3]; 3046fb12b70Safresh1 my $class_name = ref($self); 3056fb12b70Safresh1 $method =~ s/$class_name\:://; 30691f110e0Safresh1 $timeout = 'undef' if (! defined($timeout)); 30791f110e0Safresh1 Carp::croak("Invalid 'timeout' argument ($timeout) to '$method' method"); 30891f110e0Safresh1 } 30991f110e0Safresh1 31091f110e0Safresh1 return $timeout; 31191f110e0Safresh1}; 31291f110e0Safresh1 313b39c5158Smillert1; 314b39c5158Smillert 315b39c5158Smillert=head1 NAME 316b39c5158Smillert 317b39c5158SmillertThread::Queue - Thread-safe queues 318b39c5158Smillert 319b39c5158Smillert=head1 VERSION 320b39c5158Smillert 321*56d68f1eSafresh1This document describes Thread::Queue version 3.14 322b39c5158Smillert 323b39c5158Smillert=head1 SYNOPSIS 324b39c5158Smillert 325b39c5158Smillert use strict; 326b39c5158Smillert use warnings; 327b39c5158Smillert 328b39c5158Smillert use threads; 329b39c5158Smillert use Thread::Queue; 330b39c5158Smillert 331b39c5158Smillert my $q = Thread::Queue->new(); # A new empty queue 332b39c5158Smillert 333b39c5158Smillert # Worker thread 33491f110e0Safresh1 my $thr = threads->create( 33591f110e0Safresh1 sub { 33691f110e0Safresh1 # Thread will loop until no more work 33791f110e0Safresh1 while (defined(my $item = $q->dequeue())) { 338b39c5158Smillert # Do work on $item 33991f110e0Safresh1 ... 340b39c5158Smillert } 34191f110e0Safresh1 } 34291f110e0Safresh1 ); 343b39c5158Smillert 344b39c5158Smillert # Send work to the thread 345b39c5158Smillert $q->enqueue($item1, ...); 34691f110e0Safresh1 # Signal that there is no more work to be sent 34791f110e0Safresh1 $q->end(); 34891f110e0Safresh1 # Join up with the thread when it finishes 34991f110e0Safresh1 $thr->join(); 350b39c5158Smillert 35191f110e0Safresh1 ... 352b39c5158Smillert 353b39c5158Smillert # Count of items in the queue 354b39c5158Smillert my $left = $q->pending(); 355b39c5158Smillert 356b39c5158Smillert # Non-blocking dequeue 357b39c5158Smillert if (defined(my $item = $q->dequeue_nb())) { 358b39c5158Smillert # Work on $item 359b39c5158Smillert } 360b39c5158Smillert 36191f110e0Safresh1 # Blocking dequeue with 5-second timeout 36291f110e0Safresh1 if (defined(my $item = $q->dequeue_timed(5))) { 36391f110e0Safresh1 # Work on $item 36491f110e0Safresh1 } 36591f110e0Safresh1 366b8851fccSafresh1 # Set a size for a queue 367b8851fccSafresh1 $q->limit = 5; 368b8851fccSafresh1 369b39c5158Smillert # Get the second item in the queue without dequeuing anything 370b39c5158Smillert my $item = $q->peek(1); 371b39c5158Smillert 372b39c5158Smillert # Insert two items into the queue just behind the head 373b39c5158Smillert $q->insert(1, $item1, $item2); 374b39c5158Smillert 375b39c5158Smillert # Extract the last two items on the queue 376b39c5158Smillert my ($item1, $item2) = $q->extract(-2, 2); 377b39c5158Smillert 378b39c5158Smillert=head1 DESCRIPTION 379b39c5158Smillert 380b39c5158SmillertThis module provides thread-safe FIFO queues that can be accessed safely by 381b39c5158Smillertany number of threads. 382b39c5158Smillert 383b39c5158SmillertAny data types supported by L<threads::shared> can be passed via queues: 384b39c5158Smillert 385b39c5158Smillert=over 386b39c5158Smillert 387b39c5158Smillert=item Ordinary scalars 388b39c5158Smillert 389b39c5158Smillert=item Array refs 390b39c5158Smillert 391b39c5158Smillert=item Hash refs 392b39c5158Smillert 393b39c5158Smillert=item Scalar refs 394b39c5158Smillert 395b39c5158Smillert=item Objects based on the above 396b39c5158Smillert 397b39c5158Smillert=back 398b39c5158Smillert 399b39c5158SmillertOrdinary scalars are added to queues as they are. 400b39c5158Smillert 401b39c5158SmillertIf not already thread-shared, the other complex data types will be cloned 402b39c5158Smillert(recursively, if needed, and including any C<bless>ings and read-only 403b39c5158Smillertsettings) into thread-shared structures before being placed onto a queue. 404b39c5158Smillert 405b39c5158SmillertFor example, the following would cause L<Thread::Queue> to create a empty, 406b39c5158Smillertshared array reference via C<&shared([])>, copy the elements 'foo', 'bar' 407b39c5158Smillertand 'baz' from C<@ary> into it, and then place that shared reference onto 408b39c5158Smillertthe queue: 409b39c5158Smillert 410b39c5158Smillert my @ary = qw/foo bar baz/; 411b39c5158Smillert $q->enqueue(\@ary); 412b39c5158Smillert 413b39c5158SmillertHowever, for the following, the items are already shared, so their references 414b39c5158Smillertare added directly to the queue, and no cloning takes place: 415b39c5158Smillert 416b39c5158Smillert my @ary :shared = qw/foo bar baz/; 417b39c5158Smillert $q->enqueue(\@ary); 418b39c5158Smillert 419b39c5158Smillert my $obj = &shared({}); 420b39c5158Smillert $$obj{'foo'} = 'bar'; 421b39c5158Smillert $$obj{'qux'} = 99; 422b39c5158Smillert bless($obj, 'My::Class'); 423b39c5158Smillert $q->enqueue($obj); 424b39c5158Smillert 425b39c5158SmillertSee L</"LIMITATIONS"> for caveats related to passing objects via queues. 426b39c5158Smillert 427b39c5158Smillert=head1 QUEUE CREATION 428b39c5158Smillert 429b39c5158Smillert=over 430b39c5158Smillert 431b39c5158Smillert=item ->new() 432b39c5158Smillert 433b39c5158SmillertCreates a new empty queue. 434b39c5158Smillert 435b39c5158Smillert=item ->new(LIST) 436b39c5158Smillert 437b39c5158SmillertCreates a new queue pre-populated with the provided list of items. 438b39c5158Smillert 439b39c5158Smillert=back 440b39c5158Smillert 441b39c5158Smillert=head1 BASIC METHODS 442b39c5158Smillert 443b39c5158SmillertThe following methods deal with queues on a FIFO basis. 444b39c5158Smillert 445b39c5158Smillert=over 446b39c5158Smillert 447b39c5158Smillert=item ->enqueue(LIST) 448b39c5158Smillert 449b39c5158SmillertAdds a list of items onto the end of the queue. 450b39c5158Smillert 451b39c5158Smillert=item ->dequeue() 452b39c5158Smillert 453b39c5158Smillert=item ->dequeue(COUNT) 454b39c5158Smillert 455b39c5158SmillertRemoves the requested number of items (default is 1) from the head of the 456b39c5158Smillertqueue, and returns them. If the queue contains fewer than the requested 457b39c5158Smillertnumber of items, then the thread will be blocked until the requisite number 458b8851fccSafresh1of items are available (i.e., until other threads C<enqueue> more items). 459b39c5158Smillert 460b39c5158Smillert=item ->dequeue_nb() 461b39c5158Smillert 462b39c5158Smillert=item ->dequeue_nb(COUNT) 463b39c5158Smillert 464b39c5158SmillertRemoves the requested number of items (default is 1) from the head of the 465b39c5158Smillertqueue, and returns them. If the queue contains fewer than the requested 466b39c5158Smillertnumber of items, then it immediately (i.e., non-blocking) returns whatever 467b39c5158Smillertitems there are on the queue. If the queue is empty, then C<undef> is 468b39c5158Smillertreturned. 469b39c5158Smillert 47091f110e0Safresh1=item ->dequeue_timed(TIMEOUT) 47191f110e0Safresh1 47291f110e0Safresh1=item ->dequeue_timed(TIMEOUT, COUNT) 47391f110e0Safresh1 47491f110e0Safresh1Removes the requested number of items (default is 1) from the head of the 47591f110e0Safresh1queue, and returns them. If the queue contains fewer than the requested 47691f110e0Safresh1number of items, then the thread will be blocked until the requisite number of 47791f110e0Safresh1items are available, or until the timeout is reached. If the timeout is 47891f110e0Safresh1reached, it returns whatever items there are on the queue, or C<undef> if the 47991f110e0Safresh1queue is empty. 48091f110e0Safresh1 48191f110e0Safresh1The timeout may be a number of seconds relative to the current time (e.g., 5 48291f110e0Safresh1seconds from when the call is made), or may be an absolute timeout in I<epoch> 48391f110e0Safresh1seconds the same as would be used with 48491f110e0Safresh1L<cond_timedwait()|threads::shared/"cond_timedwait VARIABLE, ABS_TIMEOUT">. 48591f110e0Safresh1Fractional seconds (e.g., 2.5 seconds) are also supported (to the extent of 48691f110e0Safresh1the underlying implementation). 48791f110e0Safresh1 4886fb12b70Safresh1If C<TIMEOUT> is missing, C<undef>, or less than or equal to 0, then this call 48991f110e0Safresh1behaves the same as C<dequeue_nb>. 49091f110e0Safresh1 491b39c5158Smillert=item ->pending() 492b39c5158Smillert 49391f110e0Safresh1Returns the number of items still in the queue. Returns C<undef> if the queue 49491f110e0Safresh1has been ended (see below), and there are no more items in the queue. 49591f110e0Safresh1 496b8851fccSafresh1=item ->limit 497b8851fccSafresh1 498b8851fccSafresh1Sets the size of the queue. If set, calls to C<enqueue()> will block until 499b8851fccSafresh1the number of pending items in the queue drops below the C<limit>. The 500b8851fccSafresh1C<limit> does not prevent enqueuing items beyond that count: 501b8851fccSafresh1 502b8851fccSafresh1 my $q = Thread::Queue->new(1, 2); 503b8851fccSafresh1 $q->limit = 4; 504b8851fccSafresh1 $q->enqueue(3, 4, 5); # Does not block 505b8851fccSafresh1 $q->enqueue(6); # Blocks until at least 2 items are 506b8851fccSafresh1 # dequeued 507b8851fccSafresh1 my $size = $q->limit; # Returns the current limit (may return 508b8851fccSafresh1 # 'undef') 509b8851fccSafresh1 $q->limit = 0; # Queue size is now unlimited 510b8851fccSafresh1 5119f11ffb7Safresh1Calling any of the dequeue methods with C<COUNT> greater than a queue's 5129f11ffb7Safresh1C<limit> will generate an error. 5139f11ffb7Safresh1 51491f110e0Safresh1=item ->end() 51591f110e0Safresh1 51691f110e0Safresh1Declares that no more items will be added to the queue. 51791f110e0Safresh1 51891f110e0Safresh1All threads blocking on C<dequeue()> calls will be unblocked with any 51991f110e0Safresh1remaining items in the queue and/or C<undef> being returned. Any subsequent 52091f110e0Safresh1calls to C<dequeue()> will behave like C<dequeue_nb()>. 52191f110e0Safresh1 52291f110e0Safresh1Once ended, no more items may be placed in the queue. 523b39c5158Smillert 524b39c5158Smillert=back 525b39c5158Smillert 526b39c5158Smillert=head1 ADVANCED METHODS 527b39c5158Smillert 528b39c5158SmillertThe following methods can be used to manipulate items anywhere in a queue. 529b39c5158Smillert 530b39c5158SmillertTo prevent the contents of a queue from being modified by another thread 531b39c5158Smillertwhile it is being examined and/or changed, L<lock|threads::shared/"lock 532b39c5158SmillertVARIABLE"> the queue inside a local block: 533b39c5158Smillert 534b39c5158Smillert { 535b39c5158Smillert lock($q); # Keep other threads from changing the queue's contents 536b39c5158Smillert my $item = $q->peek(); 537b39c5158Smillert if ($item ...) { 538b39c5158Smillert ... 539b39c5158Smillert } 540b39c5158Smillert } 541b39c5158Smillert # Queue is now unlocked 542b39c5158Smillert 543b39c5158Smillert=over 544b39c5158Smillert 545b39c5158Smillert=item ->peek() 546b39c5158Smillert 547b39c5158Smillert=item ->peek(INDEX) 548b39c5158Smillert 549b39c5158SmillertReturns an item from the queue without dequeuing anything. Defaults to the 550*56d68f1eSafresh1head of queue (at index position 0) if no index is specified. Negative 551b39c5158Smillertindex values are supported as with L<arrays|perldata/"Subscripts"> (i.e., -1 552b39c5158Smillertis the end of the queue, -2 is next to last, and so on). 553b39c5158Smillert 554b39c5158SmillertIf no items exists at the specified index (i.e., the queue is empty, or the 555b39c5158Smillertindex is beyond the number of items on the queue), then C<undef> is returned. 556b39c5158Smillert 557b39c5158SmillertRemember, the returned item is not removed from the queue, so manipulating a 558b39c5158SmillertC<peek>ed at reference affects the item on the queue. 559b39c5158Smillert 560b39c5158Smillert=item ->insert(INDEX, LIST) 561b39c5158Smillert 562b39c5158SmillertAdds the list of items to the queue at the specified index position (0 563b39c5158Smillertis the head of the list). Any existing items at and beyond that position are 564b39c5158Smillertpushed back past the newly added items: 565b39c5158Smillert 566b39c5158Smillert $q->enqueue(1, 2, 3, 4); 567b39c5158Smillert $q->insert(1, qw/foo bar/); 568b39c5158Smillert # Queue now contains: 1, foo, bar, 2, 3, 4 569b39c5158Smillert 570b39c5158SmillertSpecifying an index position greater than the number of items in the queue 571b39c5158Smillertjust adds the list to the end. 572b39c5158Smillert 573b39c5158SmillertNegative index positions are supported: 574b39c5158Smillert 575b39c5158Smillert $q->enqueue(1, 2, 3, 4); 576b39c5158Smillert $q->insert(-2, qw/foo bar/); 577b39c5158Smillert # Queue now contains: 1, 2, foo, bar, 3, 4 578b39c5158Smillert 579b39c5158SmillertSpecifying a negative index position greater than the number of items in the 580b39c5158Smillertqueue adds the list to the head of the queue. 581b39c5158Smillert 582b39c5158Smillert=item ->extract() 583b39c5158Smillert 584b39c5158Smillert=item ->extract(INDEX) 585b39c5158Smillert 586b39c5158Smillert=item ->extract(INDEX, COUNT) 587b39c5158Smillert 588b39c5158SmillertRemoves and returns the specified number of items (defaults to 1) from the 589b39c5158Smillertspecified index position in the queue (0 is the head of the queue). When 590b39c5158Smillertcalled with no arguments, C<extract> operates the same as C<dequeue_nb>. 591b39c5158Smillert 592b39c5158SmillertThis method is non-blocking, and will return only as many items as are 593b39c5158Smillertavailable to fulfill the request: 594b39c5158Smillert 595b39c5158Smillert $q->enqueue(1, 2, 3, 4); 596b39c5158Smillert my $item = $q->extract(2) # Returns 3 597b39c5158Smillert # Queue now contains: 1, 2, 4 598b39c5158Smillert my @items = $q->extract(1, 3) # Returns (2, 4) 599b39c5158Smillert # Queue now contains: 1 600b39c5158Smillert 601b39c5158SmillertSpecifying an index position greater than the number of items in the 602b39c5158Smillertqueue results in C<undef> or an empty list being returned. 603b39c5158Smillert 604b39c5158Smillert $q->enqueue('foo'); 605b39c5158Smillert my $nada = $q->extract(3) # Returns undef 606b39c5158Smillert my @nada = $q->extract(1, 3) # Returns () 607b39c5158Smillert 608b39c5158SmillertNegative index positions are supported. Specifying a negative index position 609b39c5158Smillertgreater than the number of items in the queue may return items from the head 610b39c5158Smillertof the queue (similar to C<dequeue_nb>) if the count overlaps the head of the 611b39c5158Smillertqueue from the specified position (i.e. if queue size + index + count is 612b39c5158Smillertgreater than zero): 613b39c5158Smillert 614b39c5158Smillert $q->enqueue(qw/foo bar baz/); 615b39c5158Smillert my @nada = $q->extract(-6, 2); # Returns () - (3+(-6)+2) <= 0 616b39c5158Smillert my @some = $q->extract(-6, 4); # Returns (foo) - (3+(-6)+4) > 0 617b39c5158Smillert # Queue now contains: bar, baz 618b8851fccSafresh1 my @rest = $q->extract(-3, 4); # Returns (bar, baz) - 619b8851fccSafresh1 # (2+(-3)+4) > 0 620b39c5158Smillert 621b39c5158Smillert=back 622b39c5158Smillert 623b39c5158Smillert=head1 NOTES 624b39c5158Smillert 625b39c5158SmillertQueues created by L<Thread::Queue> can be used in both threaded and 626b39c5158Smillertnon-threaded applications. 627b39c5158Smillert 628b39c5158Smillert=head1 LIMITATIONS 629b39c5158Smillert 630b39c5158SmillertPassing objects on queues may not work if the objects' classes do not support 631b39c5158Smillertsharing. See L<threads::shared/"BUGS AND LIMITATIONS"> for more. 632b39c5158Smillert 633b39c5158SmillertPassing array/hash refs that contain objects may not work for Perl prior to 634b39c5158Smillert5.10.0. 635b39c5158Smillert 636b39c5158Smillert=head1 SEE ALSO 637b39c5158Smillert 6389f11ffb7Safresh1Thread::Queue on MetaCPAN: 6399f11ffb7Safresh1L<https://metacpan.org/release/Thread-Queue> 6409f11ffb7Safresh1 6419f11ffb7Safresh1Code repository for CPAN distribution: 6429f11ffb7Safresh1L<https://github.com/Dual-Life/Thread-Queue> 643b39c5158Smillert 644b39c5158SmillertL<threads>, L<threads::shared> 645b39c5158Smillert 64691f110e0Safresh1Sample code in the I<examples> directory of this distribution on CPAN. 64791f110e0Safresh1 648b39c5158Smillert=head1 MAINTAINER 649b39c5158Smillert 650b39c5158SmillertJerry D. Hedden, S<E<lt>jdhedden AT cpan DOT orgE<gt>> 651b39c5158Smillert 652b39c5158Smillert=head1 LICENSE 653b39c5158Smillert 654b39c5158SmillertThis program is free software; you can redistribute it and/or modify it under 655b39c5158Smillertthe same terms as Perl itself. 656b39c5158Smillert 657b39c5158Smillert=cut 658