1package Thread::Queue; 2 3use strict; 4use warnings; 5 6our $VERSION = '3.09'; 7$VERSION = eval $VERSION; 8 9use threads::shared 1.21; 10use Scalar::Util 1.10 qw(looks_like_number blessed reftype refaddr); 11 12# Carp errors from threads::shared calls should complain about caller 13our @CARP_NOT = ("threads::shared"); 14 15# Create a new queue possibly pre-populated with items 16sub new 17{ 18 my $class = shift; 19 my @queue :shared = map { shared_clone($_) } @_; 20 my %self :shared = ( 'queue' => \@queue ); 21 return bless(\%self, $class); 22} 23 24# Add items to the tail of a queue 25sub enqueue 26{ 27 my $self = shift; 28 lock(%$self); 29 30 if ($$self{'ENDED'}) { 31 require Carp; 32 Carp::croak("'enqueue' method called on queue that has been 'end'ed"); 33 } 34 35 # Block if queue size exceeds any specified limit 36 my $queue = $$self{'queue'}; 37 cond_wait(%$self) while ($$self{'LIMIT'} && (@$queue >= $$self{'LIMIT'})); 38 39 # Add items to queue, and then signal other threads 40 push(@$queue, map { shared_clone($_) } @_) 41 and cond_signal(%$self); 42} 43 44# Set or return the max. size for a queue 45sub limit : lvalue 46{ 47 my $self = shift; 48 lock(%$self); 49 $$self{'LIMIT'}; 50} 51 52# Return a count of the number of items on a queue 53sub pending 54{ 55 my $self = shift; 56 lock(%$self); 57 return if ($$self{'ENDED'} && ! @{$$self{'queue'}}); 58 return scalar(@{$$self{'queue'}}); 59} 60 61# Indicate that no more data will enter the queue 62sub end 63{ 64 my $self = shift; 65 lock(%$self); 66 # No more data is coming 67 $$self{'ENDED'} = 1; 68 # Try to release at least one blocked thread 69 cond_signal(%$self); 70} 71 72# Return 1 or more items from the head of a queue, blocking if needed 73sub dequeue 74{ 75 my $self = shift; 76 lock(%$self); 77 my $queue = $$self{'queue'}; 78 79 my $count = @_ ? $self->_validate_count(shift) : 1; 80 81 # Wait for requisite number of items 82 cond_wait(%$self) while ((@$queue < $count) && ! $$self{'ENDED'}); 83 cond_signal(%$self) if ((@$queue >= $count) || $$self{'ENDED'}); 84 85 # If no longer blocking, try getting whatever is left on the queue 86 return $self->dequeue_nb($count) if ($$self{'ENDED'}); 87 88 # Return single item 89 return shift(@$queue) if ($count == 1); 90 91 # Return multiple items 92 my @items; 93 push(@items, shift(@$queue)) for (1..$count); 94 return @items; 95} 96 97# Return items from the head of a queue with no blocking 98sub dequeue_nb 99{ 100 my $self = shift; 101 lock(%$self); 102 my $queue = $$self{'queue'}; 103 104 my $count = @_ ? $self->_validate_count(shift) : 1; 105 106 # Return single item 107 return shift(@$queue) if ($count == 1); 108 109 # Return multiple items 110 my @items; 111 for (1..$count) { 112 last if (! @$queue); 113 push(@items, shift(@$queue)); 114 } 115 return @items; 116} 117 118# Return items from the head of a queue, blocking if needed up to a timeout 119sub dequeue_timed 120{ 121 my $self = shift; 122 lock(%$self); 123 my $queue = $$self{'queue'}; 124 125 # Timeout may be relative or absolute 126 my $timeout = @_ ? $self->_validate_timeout(shift) : -1; 127 # Convert to an absolute time for use with cond_timedwait() 128 if ($timeout < 32000000) { # More than one year 129 $timeout += time(); 130 } 131 132 my $count = @_ ? $self->_validate_count(shift) : 1; 133 134 # Wait for requisite number of items, or until timeout 135 while ((@$queue < $count) && ! $$self{'ENDED'}) { 136 last if (! cond_timedwait(%$self, $timeout)); 137 } 138 cond_signal(%$self) if ((@$queue >= $count) || $$self{'ENDED'}); 139 140 # Get whatever we need off the queue if available 141 return $self->dequeue_nb($count); 142} 143 144# Return an item without removing it from a queue 145sub peek 146{ 147 my $self = shift; 148 lock(%$self); 149 my $index = @_ ? $self->_validate_index(shift) : 0; 150 return $$self{'queue'}[$index]; 151} 152 153# Insert items anywhere into a queue 154sub insert 155{ 156 my $self = shift; 157 lock(%$self); 158 159 if ($$self{'ENDED'}) { 160 require Carp; 161 Carp::croak("'insert' method called on queue that has been 'end'ed"); 162 } 163 164 my $queue = $$self{'queue'}; 165 166 my $index = $self->_validate_index(shift); 167 168 return if (! @_); # Nothing to insert 169 170 # Support negative indices 171 if ($index < 0) { 172 $index += @$queue; 173 if ($index < 0) { 174 $index = 0; 175 } 176 } 177 178 # Dequeue items from $index onward 179 my @tmp; 180 while (@$queue > $index) { 181 unshift(@tmp, pop(@$queue)) 182 } 183 184 # Add new items to the queue 185 push(@$queue, map { shared_clone($_) } @_); 186 187 # Add previous items back onto the queue 188 push(@$queue, @tmp); 189 190 # Soup's up 191 cond_signal(%$self); 192} 193 194# Remove items from anywhere in a queue 195sub extract 196{ 197 my $self = shift; 198 lock(%$self); 199 my $queue = $$self{'queue'}; 200 201 my $index = @_ ? $self->_validate_index(shift) : 0; 202 my $count = @_ ? $self->_validate_count(shift) : 1; 203 204 # Support negative indices 205 if ($index < 0) { 206 $index += @$queue; 207 if ($index < 0) { 208 $count += $index; 209 return if ($count <= 0); # Beyond the head of the queue 210 return $self->dequeue_nb($count); # Extract from the head 211 } 212 } 213 214 # Dequeue items from $index+$count onward 215 my @tmp; 216 while (@$queue > ($index+$count)) { 217 unshift(@tmp, pop(@$queue)) 218 } 219 220 # Extract desired items 221 my @items; 222 unshift(@items, pop(@$queue)) while (@$queue > $index); 223 224 # Add back any removed items 225 push(@$queue, @tmp); 226 227 # Return single item 228 return $items[0] if ($count == 1); 229 230 # Return multiple items 231 return @items; 232} 233 234### Internal Methods ### 235 236# Check value of the requested index 237sub _validate_index 238{ 239 my $self = shift; 240 my $index = shift; 241 242 if (! defined($index) || 243 ! looks_like_number($index) || 244 (int($index) != $index)) 245 { 246 require Carp; 247 my ($method) = (caller(1))[3]; 248 my $class_name = ref($self); 249 $method =~ s/$class_name\:://; 250 $index = 'undef' if (! defined($index)); 251 Carp::croak("Invalid 'index' argument ($index) to '$method' method"); 252 } 253 254 return $index; 255}; 256 257# Check value of the requested count 258sub _validate_count 259{ 260 my $self = shift; 261 my $count = shift; 262 263 if (! defined($count) || 264 ! looks_like_number($count) || 265 (int($count) != $count) || 266 ($count < 1)) 267 { 268 require Carp; 269 my ($method) = (caller(1))[3]; 270 my $class_name = ref($self); 271 $method =~ s/$class_name\:://; 272 $count = 'undef' if (! defined($count)); 273 Carp::croak("Invalid 'count' argument ($count) to '$method' method"); 274 } 275 276 return $count; 277}; 278 279# Check value of the requested timeout 280sub _validate_timeout 281{ 282 my $self = shift; 283 my $timeout = shift; 284 285 if (! defined($timeout) || 286 ! looks_like_number($timeout)) 287 { 288 require Carp; 289 my ($method) = (caller(1))[3]; 290 my $class_name = ref($self); 291 $method =~ s/$class_name\:://; 292 $timeout = 'undef' if (! defined($timeout)); 293 Carp::croak("Invalid 'timeout' argument ($timeout) to '$method' method"); 294 } 295 296 return $timeout; 297}; 298 2991; 300 301=head1 NAME 302 303Thread::Queue - Thread-safe queues 304 305=head1 VERSION 306 307This document describes Thread::Queue version 3.09 308 309=head1 SYNOPSIS 310 311 use strict; 312 use warnings; 313 314 use threads; 315 use Thread::Queue; 316 317 my $q = Thread::Queue->new(); # A new empty queue 318 319 # Worker thread 320 my $thr = threads->create( 321 sub { 322 # Thread will loop until no more work 323 while (defined(my $item = $q->dequeue())) { 324 # Do work on $item 325 ... 326 } 327 } 328 ); 329 330 # Send work to the thread 331 $q->enqueue($item1, ...); 332 # Signal that there is no more work to be sent 333 $q->end(); 334 # Join up with the thread when it finishes 335 $thr->join(); 336 337 ... 338 339 # Count of items in the queue 340 my $left = $q->pending(); 341 342 # Non-blocking dequeue 343 if (defined(my $item = $q->dequeue_nb())) { 344 # Work on $item 345 } 346 347 # Blocking dequeue with 5-second timeout 348 if (defined(my $item = $q->dequeue_timed(5))) { 349 # Work on $item 350 } 351 352 # Set a size for a queue 353 $q->limit = 5; 354 355 # Get the second item in the queue without dequeuing anything 356 my $item = $q->peek(1); 357 358 # Insert two items into the queue just behind the head 359 $q->insert(1, $item1, $item2); 360 361 # Extract the last two items on the queue 362 my ($item1, $item2) = $q->extract(-2, 2); 363 364=head1 DESCRIPTION 365 366This module provides thread-safe FIFO queues that can be accessed safely by 367any number of threads. 368 369Any data types supported by L<threads::shared> can be passed via queues: 370 371=over 372 373=item Ordinary scalars 374 375=item Array refs 376 377=item Hash refs 378 379=item Scalar refs 380 381=item Objects based on the above 382 383=back 384 385Ordinary scalars are added to queues as they are. 386 387If not already thread-shared, the other complex data types will be cloned 388(recursively, if needed, and including any C<bless>ings and read-only 389settings) into thread-shared structures before being placed onto a queue. 390 391For example, the following would cause L<Thread::Queue> to create a empty, 392shared array reference via C<&shared([])>, copy the elements 'foo', 'bar' 393and 'baz' from C<@ary> into it, and then place that shared reference onto 394the queue: 395 396 my @ary = qw/foo bar baz/; 397 $q->enqueue(\@ary); 398 399However, for the following, the items are already shared, so their references 400are added directly to the queue, and no cloning takes place: 401 402 my @ary :shared = qw/foo bar baz/; 403 $q->enqueue(\@ary); 404 405 my $obj = &shared({}); 406 $$obj{'foo'} = 'bar'; 407 $$obj{'qux'} = 99; 408 bless($obj, 'My::Class'); 409 $q->enqueue($obj); 410 411See L</"LIMITATIONS"> for caveats related to passing objects via queues. 412 413=head1 QUEUE CREATION 414 415=over 416 417=item ->new() 418 419Creates a new empty queue. 420 421=item ->new(LIST) 422 423Creates a new queue pre-populated with the provided list of items. 424 425=back 426 427=head1 BASIC METHODS 428 429The following methods deal with queues on a FIFO basis. 430 431=over 432 433=item ->enqueue(LIST) 434 435Adds a list of items onto the end of the queue. 436 437=item ->dequeue() 438 439=item ->dequeue(COUNT) 440 441Removes the requested number of items (default is 1) from the head of the 442queue, and returns them. If the queue contains fewer than the requested 443number of items, then the thread will be blocked until the requisite number 444of items are available (i.e., until other threads C<enqueue> more items). 445 446=item ->dequeue_nb() 447 448=item ->dequeue_nb(COUNT) 449 450Removes the requested number of items (default is 1) from the head of the 451queue, and returns them. If the queue contains fewer than the requested 452number of items, then it immediately (i.e., non-blocking) returns whatever 453items there are on the queue. If the queue is empty, then C<undef> is 454returned. 455 456=item ->dequeue_timed(TIMEOUT) 457 458=item ->dequeue_timed(TIMEOUT, COUNT) 459 460Removes the requested number of items (default is 1) from the head of the 461queue, and returns them. If the queue contains fewer than the requested 462number of items, then the thread will be blocked until the requisite number of 463items are available, or until the timeout is reached. If the timeout is 464reached, it returns whatever items there are on the queue, or C<undef> if the 465queue is empty. 466 467The timeout may be a number of seconds relative to the current time (e.g., 5 468seconds from when the call is made), or may be an absolute timeout in I<epoch> 469seconds the same as would be used with 470L<cond_timedwait()|threads::shared/"cond_timedwait VARIABLE, ABS_TIMEOUT">. 471Fractional seconds (e.g., 2.5 seconds) are also supported (to the extent of 472the underlying implementation). 473 474If C<TIMEOUT> is missing, C<undef>, or less than or equal to 0, then this call 475behaves the same as C<dequeue_nb>. 476 477=item ->pending() 478 479Returns the number of items still in the queue. Returns C<undef> if the queue 480has been ended (see below), and there are no more items in the queue. 481 482=item ->limit 483 484Sets the size of the queue. If set, calls to C<enqueue()> will block until 485the number of pending items in the queue drops below the C<limit>. The 486C<limit> does not prevent enqueuing items beyond that count: 487 488 my $q = Thread::Queue->new(1, 2); 489 $q->limit = 4; 490 $q->enqueue(3, 4, 5); # Does not block 491 $q->enqueue(6); # Blocks until at least 2 items are 492 # dequeued 493 my $size = $q->limit; # Returns the current limit (may return 494 # 'undef') 495 $q->limit = 0; # Queue size is now unlimited 496 497=item ->end() 498 499Declares that no more items will be added to the queue. 500 501All threads blocking on C<dequeue()> calls will be unblocked with any 502remaining items in the queue and/or C<undef> being returned. Any subsequent 503calls to C<dequeue()> will behave like C<dequeue_nb()>. 504 505Once ended, no more items may be placed in the queue. 506 507=back 508 509=head1 ADVANCED METHODS 510 511The following methods can be used to manipulate items anywhere in a queue. 512 513To prevent the contents of a queue from being modified by another thread 514while it is being examined and/or changed, L<lock|threads::shared/"lock 515VARIABLE"> the queue inside a local block: 516 517 { 518 lock($q); # Keep other threads from changing the queue's contents 519 my $item = $q->peek(); 520 if ($item ...) { 521 ... 522 } 523 } 524 # Queue is now unlocked 525 526=over 527 528=item ->peek() 529 530=item ->peek(INDEX) 531 532Returns an item from the queue without dequeuing anything. Defaults to the 533the head of queue (at index position 0) if no index is specified. Negative 534index values are supported as with L<arrays|perldata/"Subscripts"> (i.e., -1 535is the end of the queue, -2 is next to last, and so on). 536 537If no items exists at the specified index (i.e., the queue is empty, or the 538index is beyond the number of items on the queue), then C<undef> is returned. 539 540Remember, the returned item is not removed from the queue, so manipulating a 541C<peek>ed at reference affects the item on the queue. 542 543=item ->insert(INDEX, LIST) 544 545Adds the list of items to the queue at the specified index position (0 546is the head of the list). Any existing items at and beyond that position are 547pushed back past the newly added items: 548 549 $q->enqueue(1, 2, 3, 4); 550 $q->insert(1, qw/foo bar/); 551 # Queue now contains: 1, foo, bar, 2, 3, 4 552 553Specifying an index position greater than the number of items in the queue 554just adds the list to the end. 555 556Negative index positions are supported: 557 558 $q->enqueue(1, 2, 3, 4); 559 $q->insert(-2, qw/foo bar/); 560 # Queue now contains: 1, 2, foo, bar, 3, 4 561 562Specifying a negative index position greater than the number of items in the 563queue adds the list to the head of the queue. 564 565=item ->extract() 566 567=item ->extract(INDEX) 568 569=item ->extract(INDEX, COUNT) 570 571Removes and returns the specified number of items (defaults to 1) from the 572specified index position in the queue (0 is the head of the queue). When 573called with no arguments, C<extract> operates the same as C<dequeue_nb>. 574 575This method is non-blocking, and will return only as many items as are 576available to fulfill the request: 577 578 $q->enqueue(1, 2, 3, 4); 579 my $item = $q->extract(2) # Returns 3 580 # Queue now contains: 1, 2, 4 581 my @items = $q->extract(1, 3) # Returns (2, 4) 582 # Queue now contains: 1 583 584Specifying an index position greater than the number of items in the 585queue results in C<undef> or an empty list being returned. 586 587 $q->enqueue('foo'); 588 my $nada = $q->extract(3) # Returns undef 589 my @nada = $q->extract(1, 3) # Returns () 590 591Negative index positions are supported. Specifying a negative index position 592greater than the number of items in the queue may return items from the head 593of the queue (similar to C<dequeue_nb>) if the count overlaps the head of the 594queue from the specified position (i.e. if queue size + index + count is 595greater than zero): 596 597 $q->enqueue(qw/foo bar baz/); 598 my @nada = $q->extract(-6, 2); # Returns () - (3+(-6)+2) <= 0 599 my @some = $q->extract(-6, 4); # Returns (foo) - (3+(-6)+4) > 0 600 # Queue now contains: bar, baz 601 my @rest = $q->extract(-3, 4); # Returns (bar, baz) - 602 # (2+(-3)+4) > 0 603 604=back 605 606=head1 NOTES 607 608Queues created by L<Thread::Queue> can be used in both threaded and 609non-threaded applications. 610 611=head1 LIMITATIONS 612 613Passing objects on queues may not work if the objects' classes do not support 614sharing. See L<threads::shared/"BUGS AND LIMITATIONS"> for more. 615 616Passing array/hash refs that contain objects may not work for Perl prior to 6175.10.0. 618 619=head1 SEE ALSO 620 621Thread::Queue Discussion Forum on CPAN: 622L<http://www.cpanforum.com/dist/Thread-Queue> 623 624L<threads>, L<threads::shared> 625 626Sample code in the I<examples> directory of this distribution on CPAN. 627 628=head1 MAINTAINER 629 630Jerry D. Hedden, S<E<lt>jdhedden AT cpan DOT orgE<gt>> 631 632=head1 LICENSE 633 634This program is free software; you can redistribute it and/or modify it under 635the same terms as Perl itself. 636 637=cut 638