1package Thread::Queue; 2 3use strict; 4use warnings; 5 6our $VERSION = '3.05'; 7$VERSION = eval $VERSION; 8 9use threads::shared 1.21; 10use Scalar::Util 1.10 qw(looks_like_number blessed reftype refaddr); 11 12# Carp errors from threads::shared calls should complain about caller 13our @CARP_NOT = ("threads::shared"); 14 15# Create a new queue possibly pre-populated with items 16sub new 17{ 18 my $class = shift; 19 my @queue :shared = map { shared_clone($_) } @_; 20 my %self :shared = ( 'queue' => \@queue ); 21 return bless(\%self, $class); 22} 23 24# Add items to the tail of a queue 25sub enqueue 26{ 27 my $self = shift; 28 lock(%$self); 29 if ($$self{'ENDED'}) { 30 require Carp; 31 Carp::croak("'enqueue' method called on queue that has been 'end'ed"); 32 } 33 push(@{$$self{'queue'}}, map { shared_clone($_) } @_) 34 and cond_signal(%$self); 35} 36 37# Return a count of the number of items on a queue 38sub pending 39{ 40 my $self = shift; 41 lock(%$self); 42 return if ($$self{'ENDED'} && ! @{$$self{'queue'}}); 43 return scalar(@{$$self{'queue'}}); 44} 45 46# Indicate that no more data will enter the queue 47sub end 48{ 49 my $self = shift; 50 lock $self; 51 # No more data is coming 52 $$self{'ENDED'} = 1; 53 # Try to release at least one blocked thread 54 cond_signal(%$self); 55} 56 57# Return 1 or more items from the head of a queue, blocking if needed 58sub dequeue 59{ 60 my $self = shift; 61 lock(%$self); 62 my $queue = $$self{'queue'}; 63 64 my $count = @_ ? $self->_validate_count(shift) : 1; 65 66 # Wait for requisite number of items 67 cond_wait(%$self) while ((@$queue < $count) && ! $$self{'ENDED'}); 68 cond_signal(%$self) if ((@$queue > $count) || $$self{'ENDED'}); 69 70 # If no longer blocking, try getting whatever is left on the queue 71 return $self->dequeue_nb($count) if ($$self{'ENDED'}); 72 73 # Return single item 74 return shift(@$queue) if ($count == 1); 75 76 # Return multiple items 77 my @items; 78 push(@items, shift(@$queue)) for (1..$count); 79 return @items; 80} 81 82# Return items from the head of a queue with no blocking 83sub dequeue_nb 84{ 85 my $self = shift; 86 lock(%$self); 87 my $queue = $$self{'queue'}; 88 89 my $count = @_ ? $self->_validate_count(shift) : 1; 90 91 # Return single item 92 return shift(@$queue) if ($count == 1); 93 94 # Return multiple items 95 my @items; 96 for (1..$count) { 97 last if (! @$queue); 98 push(@items, shift(@$queue)); 99 } 100 return @items; 101} 102 103# Return items from the head of a queue, blocking if needed up to a timeout 104sub dequeue_timed 105{ 106 my $self = shift; 107 lock(%$self); 108 my $queue = $$self{'queue'}; 109 110 # Timeout may be relative or absolute 111 my $timeout = @_ ? $self->_validate_timeout(shift) : -1; 112 # Convert to an absolute time for use with cond_timedwait() 113 if ($timeout < 32000000) { # More than one year 114 $timeout += time(); 115 } 116 117 my $count = @_ ? $self->_validate_count(shift) : 1; 118 119 # Wait for requisite number of items, or until timeout 120 while ((@$queue < $count) && ! $$self{'ENDED'}) { 121 last if (! cond_timedwait(%$self, $timeout)); 122 } 123 cond_signal(%$self) if ((@$queue > $count) || $$self{'ENDED'}); 124 125 # Get whatever we need off the queue if available 126 return $self->dequeue_nb($count); 127} 128 129# Return an item without removing it from a queue 130sub peek 131{ 132 my $self = shift; 133 lock(%$self); 134 my $index = @_ ? $self->_validate_index(shift) : 0; 135 return $$self{'queue'}[$index]; 136} 137 138# Insert items anywhere into a queue 139sub insert 140{ 141 my $self = shift; 142 lock(%$self); 143 144 if ($$self{'ENDED'}) { 145 require Carp; 146 Carp::croak("'insert' method called on queue that has been 'end'ed"); 147 } 148 149 my $queue = $$self{'queue'}; 150 151 my $index = $self->_validate_index(shift); 152 153 return if (! @_); # Nothing to insert 154 155 # Support negative indices 156 if ($index < 0) { 157 $index += @$queue; 158 if ($index < 0) { 159 $index = 0; 160 } 161 } 162 163 # Dequeue items from $index onward 164 my @tmp; 165 while (@$queue > $index) { 166 unshift(@tmp, pop(@$queue)) 167 } 168 169 # Add new items to the queue 170 push(@$queue, map { shared_clone($_) } @_); 171 172 # Add previous items back onto the queue 173 push(@$queue, @tmp); 174 175 # Soup's up 176 cond_signal(%$self); 177} 178 179# Remove items from anywhere in a queue 180sub extract 181{ 182 my $self = shift; 183 lock(%$self); 184 my $queue = $$self{'queue'}; 185 186 my $index = @_ ? $self->_validate_index(shift) : 0; 187 my $count = @_ ? $self->_validate_count(shift) : 1; 188 189 # Support negative indices 190 if ($index < 0) { 191 $index += @$queue; 192 if ($index < 0) { 193 $count += $index; 194 return if ($count <= 0); # Beyond the head of the queue 195 return $self->dequeue_nb($count); # Extract from the head 196 } 197 } 198 199 # Dequeue items from $index+$count onward 200 my @tmp; 201 while (@$queue > ($index+$count)) { 202 unshift(@tmp, pop(@$queue)) 203 } 204 205 # Extract desired items 206 my @items; 207 unshift(@items, pop(@$queue)) while (@$queue > $index); 208 209 # Add back any removed items 210 push(@$queue, @tmp); 211 212 # Return single item 213 return $items[0] if ($count == 1); 214 215 # Return multiple items 216 return @items; 217} 218 219### Internal Methods ### 220 221# Check value of the requested index 222sub _validate_index 223{ 224 my $self = shift; 225 my $index = shift; 226 227 if (! defined($index) || 228 ! looks_like_number($index) || 229 (int($index) != $index)) 230 { 231 require Carp; 232 my ($method) = (caller(1))[3]; 233 my $class_name = ref($self); 234 $method =~ s/$class_name\:://; 235 $index = 'undef' if (! defined($index)); 236 Carp::croak("Invalid 'index' argument ($index) to '$method' method"); 237 } 238 239 return $index; 240}; 241 242# Check value of the requested count 243sub _validate_count 244{ 245 my $self = shift; 246 my $count = shift; 247 248 if (! defined($count) || 249 ! looks_like_number($count) || 250 (int($count) != $count) || 251 ($count < 1)) 252 { 253 require Carp; 254 my ($method) = (caller(1))[3]; 255 my $class_name = ref($self); 256 $method =~ s/$class_name\:://; 257 $count = 'undef' if (! defined($count)); 258 Carp::croak("Invalid 'count' argument ($count) to '$method' method"); 259 } 260 261 return $count; 262}; 263 264# Check value of the requested timeout 265sub _validate_timeout 266{ 267 my $self = shift; 268 my $timeout = shift; 269 270 if (! defined($timeout) || 271 ! looks_like_number($timeout)) 272 { 273 require Carp; 274 my ($method) = (caller(1))[3]; 275 my $class_name = ref($self); 276 $method =~ s/$class_name\:://; 277 $timeout = 'undef' if (! defined($timeout)); 278 Carp::croak("Invalid 'timeout' argument ($timeout) to '$method' method"); 279 } 280 281 return $timeout; 282}; 283 2841; 285 286=head1 NAME 287 288Thread::Queue - Thread-safe queues 289 290=head1 VERSION 291 292This document describes Thread::Queue version 3.05 293 294=head1 SYNOPSIS 295 296 use strict; 297 use warnings; 298 299 use threads; 300 use Thread::Queue; 301 302 my $q = Thread::Queue->new(); # A new empty queue 303 304 # Worker thread 305 my $thr = threads->create( 306 sub { 307 # Thread will loop until no more work 308 while (defined(my $item = $q->dequeue())) { 309 # Do work on $item 310 ... 311 } 312 } 313 ); 314 315 # Send work to the thread 316 $q->enqueue($item1, ...); 317 # Signal that there is no more work to be sent 318 $q->end(); 319 # Join up with the thread when it finishes 320 $thr->join(); 321 322 ... 323 324 # Count of items in the queue 325 my $left = $q->pending(); 326 327 # Non-blocking dequeue 328 if (defined(my $item = $q->dequeue_nb())) { 329 # Work on $item 330 } 331 332 # Blocking dequeue with 5-second timeout 333 if (defined(my $item = $q->dequeue_timed(5))) { 334 # Work on $item 335 } 336 337 # Get the second item in the queue without dequeuing anything 338 my $item = $q->peek(1); 339 340 # Insert two items into the queue just behind the head 341 $q->insert(1, $item1, $item2); 342 343 # Extract the last two items on the queue 344 my ($item1, $item2) = $q->extract(-2, 2); 345 346=head1 DESCRIPTION 347 348This module provides thread-safe FIFO queues that can be accessed safely by 349any number of threads. 350 351Any data types supported by L<threads::shared> can be passed via queues: 352 353=over 354 355=item Ordinary scalars 356 357=item Array refs 358 359=item Hash refs 360 361=item Scalar refs 362 363=item Objects based on the above 364 365=back 366 367Ordinary scalars are added to queues as they are. 368 369If not already thread-shared, the other complex data types will be cloned 370(recursively, if needed, and including any C<bless>ings and read-only 371settings) into thread-shared structures before being placed onto a queue. 372 373For example, the following would cause L<Thread::Queue> to create a empty, 374shared array reference via C<&shared([])>, copy the elements 'foo', 'bar' 375and 'baz' from C<@ary> into it, and then place that shared reference onto 376the queue: 377 378 my @ary = qw/foo bar baz/; 379 $q->enqueue(\@ary); 380 381However, for the following, the items are already shared, so their references 382are added directly to the queue, and no cloning takes place: 383 384 my @ary :shared = qw/foo bar baz/; 385 $q->enqueue(\@ary); 386 387 my $obj = &shared({}); 388 $$obj{'foo'} = 'bar'; 389 $$obj{'qux'} = 99; 390 bless($obj, 'My::Class'); 391 $q->enqueue($obj); 392 393See L</"LIMITATIONS"> for caveats related to passing objects via queues. 394 395=head1 QUEUE CREATION 396 397=over 398 399=item ->new() 400 401Creates a new empty queue. 402 403=item ->new(LIST) 404 405Creates a new queue pre-populated with the provided list of items. 406 407=back 408 409=head1 BASIC METHODS 410 411The following methods deal with queues on a FIFO basis. 412 413=over 414 415=item ->enqueue(LIST) 416 417Adds a list of items onto the end of the queue. 418 419=item ->dequeue() 420 421=item ->dequeue(COUNT) 422 423Removes the requested number of items (default is 1) from the head of the 424queue, and returns them. If the queue contains fewer than the requested 425number of items, then the thread will be blocked until the requisite number 426of items are available (i.e., until other threads <enqueue> more items). 427 428=item ->dequeue_nb() 429 430=item ->dequeue_nb(COUNT) 431 432Removes the requested number of items (default is 1) from the head of the 433queue, and returns them. If the queue contains fewer than the requested 434number of items, then it immediately (i.e., non-blocking) returns whatever 435items there are on the queue. If the queue is empty, then C<undef> is 436returned. 437 438=item ->dequeue_timed(TIMEOUT) 439 440=item ->dequeue_timed(TIMEOUT, COUNT) 441 442Removes the requested number of items (default is 1) from the head of the 443queue, and returns them. If the queue contains fewer than the requested 444number of items, then the thread will be blocked until the requisite number of 445items are available, or until the timeout is reached. If the timeout is 446reached, it returns whatever items there are on the queue, or C<undef> if the 447queue is empty. 448 449The timeout may be a number of seconds relative to the current time (e.g., 5 450seconds from when the call is made), or may be an absolute timeout in I<epoch> 451seconds the same as would be used with 452L<cond_timedwait()|threads::shared/"cond_timedwait VARIABLE, ABS_TIMEOUT">. 453Fractional seconds (e.g., 2.5 seconds) are also supported (to the extent of 454the underlying implementation). 455 456If C<TIMEOUT> is missing, C<undef>, or less than or equal to 0, then this call 457behaves the same as C<dequeue_nb>. 458 459=item ->pending() 460 461Returns the number of items still in the queue. Returns C<undef> if the queue 462has been ended (see below), and there are no more items in the queue. 463 464=item ->end() 465 466Declares that no more items will be added to the queue. 467 468All threads blocking on C<dequeue()> calls will be unblocked with any 469remaining items in the queue and/or C<undef> being returned. Any subsequent 470calls to C<dequeue()> will behave like C<dequeue_nb()>. 471 472Once ended, no more items may be placed in the queue. 473 474=back 475 476=head1 ADVANCED METHODS 477 478The following methods can be used to manipulate items anywhere in a queue. 479 480To prevent the contents of a queue from being modified by another thread 481while it is being examined and/or changed, L<lock|threads::shared/"lock 482VARIABLE"> the queue inside a local block: 483 484 { 485 lock($q); # Keep other threads from changing the queue's contents 486 my $item = $q->peek(); 487 if ($item ...) { 488 ... 489 } 490 } 491 # Queue is now unlocked 492 493=over 494 495=item ->peek() 496 497=item ->peek(INDEX) 498 499Returns an item from the queue without dequeuing anything. Defaults to the 500the head of queue (at index position 0) if no index is specified. Negative 501index values are supported as with L<arrays|perldata/"Subscripts"> (i.e., -1 502is the end of the queue, -2 is next to last, and so on). 503 504If no items exists at the specified index (i.e., the queue is empty, or the 505index is beyond the number of items on the queue), then C<undef> is returned. 506 507Remember, the returned item is not removed from the queue, so manipulating a 508C<peek>ed at reference affects the item on the queue. 509 510=item ->insert(INDEX, LIST) 511 512Adds the list of items to the queue at the specified index position (0 513is the head of the list). Any existing items at and beyond that position are 514pushed back past the newly added items: 515 516 $q->enqueue(1, 2, 3, 4); 517 $q->insert(1, qw/foo bar/); 518 # Queue now contains: 1, foo, bar, 2, 3, 4 519 520Specifying an index position greater than the number of items in the queue 521just adds the list to the end. 522 523Negative index positions are supported: 524 525 $q->enqueue(1, 2, 3, 4); 526 $q->insert(-2, qw/foo bar/); 527 # Queue now contains: 1, 2, foo, bar, 3, 4 528 529Specifying a negative index position greater than the number of items in the 530queue adds the list to the head of the queue. 531 532=item ->extract() 533 534=item ->extract(INDEX) 535 536=item ->extract(INDEX, COUNT) 537 538Removes and returns the specified number of items (defaults to 1) from the 539specified index position in the queue (0 is the head of the queue). When 540called with no arguments, C<extract> operates the same as C<dequeue_nb>. 541 542This method is non-blocking, and will return only as many items as are 543available to fulfill the request: 544 545 $q->enqueue(1, 2, 3, 4); 546 my $item = $q->extract(2) # Returns 3 547 # Queue now contains: 1, 2, 4 548 my @items = $q->extract(1, 3) # Returns (2, 4) 549 # Queue now contains: 1 550 551Specifying an index position greater than the number of items in the 552queue results in C<undef> or an empty list being returned. 553 554 $q->enqueue('foo'); 555 my $nada = $q->extract(3) # Returns undef 556 my @nada = $q->extract(1, 3) # Returns () 557 558Negative index positions are supported. Specifying a negative index position 559greater than the number of items in the queue may return items from the head 560of the queue (similar to C<dequeue_nb>) if the count overlaps the head of the 561queue from the specified position (i.e. if queue size + index + count is 562greater than zero): 563 564 $q->enqueue(qw/foo bar baz/); 565 my @nada = $q->extract(-6, 2); # Returns () - (3+(-6)+2) <= 0 566 my @some = $q->extract(-6, 4); # Returns (foo) - (3+(-6)+4) > 0 567 # Queue now contains: bar, baz 568 my @rest = $q->extract(-3, 4); # Returns (bar, baz) - (2+(-3)+4) > 0 569 570=back 571 572=head1 NOTES 573 574Queues created by L<Thread::Queue> can be used in both threaded and 575non-threaded applications. 576 577=head1 LIMITATIONS 578 579Passing objects on queues may not work if the objects' classes do not support 580sharing. See L<threads::shared/"BUGS AND LIMITATIONS"> for more. 581 582Passing array/hash refs that contain objects may not work for Perl prior to 5835.10.0. 584 585=head1 SEE ALSO 586 587Thread::Queue Discussion Forum on CPAN: 588L<http://www.cpanforum.com/dist/Thread-Queue> 589 590L<threads>, L<threads::shared> 591 592Sample code in the I<examples> directory of this distribution on CPAN. 593 594=head1 MAINTAINER 595 596Jerry D. Hedden, S<E<lt>jdhedden AT cpan DOT orgE<gt>> 597 598=head1 LICENSE 599 600This program is free software; you can redistribute it and/or modify it under 601the same terms as Perl itself. 602 603=cut 604