1package Thread::Queue; 2 3use strict; 4use warnings; 5 6our $VERSION = '2.11'; 7 8use threads::shared 1.21; 9use Scalar::Util 1.10 qw(looks_like_number blessed reftype refaddr); 10 11# Carp errors from threads::shared calls should complain about caller 12our @CARP_NOT = ("threads::shared"); 13 14# Predeclarations for internal functions 15my ($validate_count, $validate_index); 16 17# Create a new queue possibly pre-populated with items 18sub new 19{ 20 my $class = shift; 21 my @queue :shared = map { shared_clone($_) } @_; 22 return bless(\@queue, $class); 23} 24 25# Add items to the tail of a queue 26sub enqueue 27{ 28 my $queue = shift; 29 lock(@$queue); 30 push(@$queue, map { shared_clone($_) } @_) 31 and cond_signal(@$queue); 32} 33 34# Return a count of the number of items on a queue 35sub pending 36{ 37 my $queue = shift; 38 lock(@$queue); 39 return scalar(@$queue); 40} 41 42# Return 1 or more items from the head of a queue, blocking if needed 43sub dequeue 44{ 45 my $queue = shift; 46 lock(@$queue); 47 48 my $count = @_ ? $validate_count->(shift) : 1; 49 50 # Wait for requisite number of items 51 cond_wait(@$queue) until (@$queue >= $count); 52 cond_signal(@$queue) if (@$queue > $count); 53 54 # Return single item 55 return shift(@$queue) if ($count == 1); 56 57 # Return multiple items 58 my @items; 59 push(@items, shift(@$queue)) for (1..$count); 60 return @items; 61} 62 63# Return items from the head of a queue with no blocking 64sub dequeue_nb 65{ 66 my $queue = shift; 67 lock(@$queue); 68 69 my $count = @_ ? $validate_count->(shift) : 1; 70 71 # Return single item 72 return shift(@$queue) if ($count == 1); 73 74 # Return multiple items 75 my @items; 76 for (1..$count) { 77 last if (! @$queue); 78 push(@items, shift(@$queue)); 79 } 80 return @items; 81} 82 83# Return an item without removing it from a queue 84sub peek 85{ 86 my $queue = shift; 87 lock(@$queue); 88 my $index = @_ ? $validate_index->(shift) : 0; 89 return $$queue[$index]; 90} 91 92# Insert items anywhere into a queue 93sub insert 94{ 95 my $queue = shift; 96 lock(@$queue); 97 98 my $index = $validate_index->(shift); 99 100 return if (! @_); # Nothing to insert 101 102 # Support negative indices 103 if ($index < 0) { 104 $index += @$queue; 105 if ($index < 0) { 106 $index = 0; 107 } 108 } 109 110 # Dequeue items from $index onward 111 my @tmp; 112 while (@$queue > $index) { 113 unshift(@tmp, pop(@$queue)) 114 } 115 116 # Add new items to the queue 117 push(@$queue, map { shared_clone($_) } @_); 118 119 # Add previous items back onto the queue 120 push(@$queue, @tmp); 121 122 # Soup's up 123 cond_signal(@$queue); 124} 125 126# Remove items from anywhere in a queue 127sub extract 128{ 129 my $queue = shift; 130 lock(@$queue); 131 132 my $index = @_ ? $validate_index->(shift) : 0; 133 my $count = @_ ? $validate_count->(shift) : 1; 134 135 # Support negative indices 136 if ($index < 0) { 137 $index += @$queue; 138 if ($index < 0) { 139 $count += $index; 140 return if ($count <= 0); # Beyond the head of the queue 141 return $queue->dequeue_nb($count); # Extract from the head 142 } 143 } 144 145 # Dequeue items from $index+$count onward 146 my @tmp; 147 while (@$queue > ($index+$count)) { 148 unshift(@tmp, pop(@$queue)) 149 } 150 151 # Extract desired items 152 my @items; 153 unshift(@items, pop(@$queue)) while (@$queue > $index); 154 155 # Add back any removed items 156 push(@$queue, @tmp); 157 158 # Return single item 159 return $items[0] if ($count == 1); 160 161 # Return multiple items 162 return @items; 163} 164 165### Internal Functions ### 166 167# Check value of the requested index 168$validate_index = sub { 169 my $index = shift; 170 171 if (! defined($index) || 172 ! looks_like_number($index) || 173 (int($index) != $index)) 174 { 175 require Carp; 176 my ($method) = (caller(1))[3]; 177 $method =~ s/Thread::Queue:://; 178 $index = 'undef' if (! defined($index)); 179 Carp::croak("Invalid 'index' argument ($index) to '$method' method"); 180 } 181 182 return $index; 183}; 184 185# Check value of the requested count 186$validate_count = sub { 187 my $count = shift; 188 189 if (! defined($count) || 190 ! looks_like_number($count) || 191 (int($count) != $count) || 192 ($count < 1)) 193 { 194 require Carp; 195 my ($method) = (caller(1))[3]; 196 $method =~ s/Thread::Queue:://; 197 $count = 'undef' if (! defined($count)); 198 Carp::croak("Invalid 'count' argument ($count) to '$method' method"); 199 } 200 201 return $count; 202}; 203 2041; 205 206=head1 NAME 207 208Thread::Queue - Thread-safe queues 209 210=head1 VERSION 211 212This document describes Thread::Queue version 2.11 213 214=head1 SYNOPSIS 215 216 use strict; 217 use warnings; 218 219 use threads; 220 use Thread::Queue; 221 222 my $q = Thread::Queue->new(); # A new empty queue 223 224 # Worker thread 225 my $thr = threads->create(sub { 226 while (my $item = $q->dequeue()) { 227 # Do work on $item 228 } 229 })->detach(); 230 231 # Send work to the thread 232 $q->enqueue($item1, ...); 233 234 235 # Count of items in the queue 236 my $left = $q->pending(); 237 238 # Non-blocking dequeue 239 if (defined(my $item = $q->dequeue_nb())) { 240 # Work on $item 241 } 242 243 # Get the second item in the queue without dequeuing anything 244 my $item = $q->peek(1); 245 246 # Insert two items into the queue just behind the head 247 $q->insert(1, $item1, $item2); 248 249 # Extract the last two items on the queue 250 my ($item1, $item2) = $q->extract(-2, 2); 251 252=head1 DESCRIPTION 253 254This module provides thread-safe FIFO queues that can be accessed safely by 255any number of threads. 256 257Any data types supported by L<threads::shared> can be passed via queues: 258 259=over 260 261=item Ordinary scalars 262 263=item Array refs 264 265=item Hash refs 266 267=item Scalar refs 268 269=item Objects based on the above 270 271=back 272 273Ordinary scalars are added to queues as they are. 274 275If not already thread-shared, the other complex data types will be cloned 276(recursively, if needed, and including any C<bless>ings and read-only 277settings) into thread-shared structures before being placed onto a queue. 278 279For example, the following would cause L<Thread::Queue> to create a empty, 280shared array reference via C<&shared([])>, copy the elements 'foo', 'bar' 281and 'baz' from C<@ary> into it, and then place that shared reference onto 282the queue: 283 284 my @ary = qw/foo bar baz/; 285 $q->enqueue(\@ary); 286 287However, for the following, the items are already shared, so their references 288are added directly to the queue, and no cloning takes place: 289 290 my @ary :shared = qw/foo bar baz/; 291 $q->enqueue(\@ary); 292 293 my $obj = &shared({}); 294 $$obj{'foo'} = 'bar'; 295 $$obj{'qux'} = 99; 296 bless($obj, 'My::Class'); 297 $q->enqueue($obj); 298 299See L</"LIMITATIONS"> for caveats related to passing objects via queues. 300 301=head1 QUEUE CREATION 302 303=over 304 305=item ->new() 306 307Creates a new empty queue. 308 309=item ->new(LIST) 310 311Creates a new queue pre-populated with the provided list of items. 312 313=back 314 315=head1 BASIC METHODS 316 317The following methods deal with queues on a FIFO basis. 318 319=over 320 321=item ->enqueue(LIST) 322 323Adds a list of items onto the end of the queue. 324 325=item ->dequeue() 326 327=item ->dequeue(COUNT) 328 329Removes the requested number of items (default is 1) from the head of the 330queue, and returns them. If the queue contains fewer than the requested 331number of items, then the thread will be blocked until the requisite number 332of items are available (i.e., until other threads <enqueue> more items). 333 334=item ->dequeue_nb() 335 336=item ->dequeue_nb(COUNT) 337 338Removes the requested number of items (default is 1) from the head of the 339queue, and returns them. If the queue contains fewer than the requested 340number of items, then it immediately (i.e., non-blocking) returns whatever 341items there are on the queue. If the queue is empty, then C<undef> is 342returned. 343 344=item ->pending() 345 346Returns the number of items still in the queue. 347 348=back 349 350=head1 ADVANCED METHODS 351 352The following methods can be used to manipulate items anywhere in a queue. 353 354To prevent the contents of a queue from being modified by another thread 355while it is being examined and/or changed, L<lock|threads::shared/"lock 356VARIABLE"> the queue inside a local block: 357 358 { 359 lock($q); # Keep other threads from changing the queue's contents 360 my $item = $q->peek(); 361 if ($item ...) { 362 ... 363 } 364 } 365 # Queue is now unlocked 366 367=over 368 369=item ->peek() 370 371=item ->peek(INDEX) 372 373Returns an item from the queue without dequeuing anything. Defaults to the 374the head of queue (at index position 0) if no index is specified. Negative 375index values are supported as with L<arrays|perldata/"Subscripts"> (i.e., -1 376is the end of the queue, -2 is next to last, and so on). 377 378If no items exists at the specified index (i.e., the queue is empty, or the 379index is beyond the number of items on the queue), then C<undef> is returned. 380 381Remember, the returned item is not removed from the queue, so manipulating a 382C<peek>ed at reference affects the item on the queue. 383 384=item ->insert(INDEX, LIST) 385 386Adds the list of items to the queue at the specified index position (0 387is the head of the list). Any existing items at and beyond that position are 388pushed back past the newly added items: 389 390 $q->enqueue(1, 2, 3, 4); 391 $q->insert(1, qw/foo bar/); 392 # Queue now contains: 1, foo, bar, 2, 3, 4 393 394Specifying an index position greater than the number of items in the queue 395just adds the list to the end. 396 397Negative index positions are supported: 398 399 $q->enqueue(1, 2, 3, 4); 400 $q->insert(-2, qw/foo bar/); 401 # Queue now contains: 1, 2, foo, bar, 3, 4 402 403Specifying a negative index position greater than the number of items in the 404queue adds the list to the head of the queue. 405 406=item ->extract() 407 408=item ->extract(INDEX) 409 410=item ->extract(INDEX, COUNT) 411 412Removes and returns the specified number of items (defaults to 1) from the 413specified index position in the queue (0 is the head of the queue). When 414called with no arguments, C<extract> operates the same as C<dequeue_nb>. 415 416This method is non-blocking, and will return only as many items as are 417available to fulfill the request: 418 419 $q->enqueue(1, 2, 3, 4); 420 my $item = $q->extract(2) # Returns 3 421 # Queue now contains: 1, 2, 4 422 my @items = $q->extract(1, 3) # Returns (2, 4) 423 # Queue now contains: 1 424 425Specifying an index position greater than the number of items in the 426queue results in C<undef> or an empty list being returned. 427 428 $q->enqueue('foo'); 429 my $nada = $q->extract(3) # Returns undef 430 my @nada = $q->extract(1, 3) # Returns () 431 432Negative index positions are supported. Specifying a negative index position 433greater than the number of items in the queue may return items from the head 434of the queue (similar to C<dequeue_nb>) if the count overlaps the head of the 435queue from the specified position (i.e. if queue size + index + count is 436greater than zero): 437 438 $q->enqueue(qw/foo bar baz/); 439 my @nada = $q->extract(-6, 2); # Returns () - (3+(-6)+2) <= 0 440 my @some = $q->extract(-6, 4); # Returns (foo) - (3+(-6)+4) > 0 441 # Queue now contains: bar, baz 442 my @rest = $q->extract(-3, 4); # Returns (bar, baz) - (2+(-3)+4) > 0 443 444=back 445 446=head1 NOTES 447 448Queues created by L<Thread::Queue> can be used in both threaded and 449non-threaded applications. 450 451=head1 LIMITATIONS 452 453Passing objects on queues may not work if the objects' classes do not support 454sharing. See L<threads::shared/"BUGS AND LIMITATIONS"> for more. 455 456Passing array/hash refs that contain objects may not work for Perl prior to 4575.10.0. 458 459=head1 SEE ALSO 460 461Thread::Queue Discussion Forum on CPAN: 462L<http://www.cpanforum.com/dist/Thread-Queue> 463 464Annotated POD for Thread::Queue: 465L<http://annocpan.org/~JDHEDDEN/Thread-Queue-2.11/lib/Thread/Queue.pm> 466 467Source repository: 468L<http://code.google.com/p/thread-queue/> 469 470L<threads>, L<threads::shared> 471 472=head1 MAINTAINER 473 474Jerry D. Hedden, S<E<lt>jdhedden AT cpan DOT orgE<gt>> 475 476=head1 LICENSE 477 478This program is free software; you can redistribute it and/or modify it under 479the same terms as Perl itself. 480 481=cut 482