xref: /openbsd-src/gnu/usr.bin/perl/dist/Thread-Queue/lib/Thread/Queue.pm (revision c90a81c56dcebd6a1b73fe4aff9b03385b8e63b3)
1package Thread::Queue;
2
3use strict;
4use warnings;
5
6our $VERSION = '3.09';
7$VERSION = eval $VERSION;
8
9use threads::shared 1.21;
10use Scalar::Util 1.10 qw(looks_like_number blessed reftype refaddr);
11
12# Carp errors from threads::shared calls should complain about caller
13our @CARP_NOT = ("threads::shared");
14
15# Create a new queue possibly pre-populated with items
16sub new
17{
18    my $class = shift;
19    my @queue :shared = map { shared_clone($_) } @_;
20    my %self :shared = ( 'queue' => \@queue );
21    return bless(\%self, $class);
22}
23
24# Add items to the tail of a queue
25sub enqueue
26{
27    my $self = shift;
28    lock(%$self);
29
30    if ($$self{'ENDED'}) {
31        require Carp;
32        Carp::croak("'enqueue' method called on queue that has been 'end'ed");
33    }
34
35    # Block if queue size exceeds any specified limit
36    my $queue = $$self{'queue'};
37    cond_wait(%$self) while ($$self{'LIMIT'} && (@$queue >= $$self{'LIMIT'}));
38
39    # Add items to queue, and then signal other threads
40    push(@$queue, map { shared_clone($_) } @_)
41        and cond_signal(%$self);
42}
43
44# Set or return the max. size for a queue
45sub limit : lvalue
46{
47    my $self = shift;
48    lock(%$self);
49    $$self{'LIMIT'};
50}
51
52# Return a count of the number of items on a queue
53sub pending
54{
55    my $self = shift;
56    lock(%$self);
57    return if ($$self{'ENDED'} && ! @{$$self{'queue'}});
58    return scalar(@{$$self{'queue'}});
59}
60
61# Indicate that no more data will enter the queue
62sub end
63{
64    my $self = shift;
65    lock(%$self);
66    # No more data is coming
67    $$self{'ENDED'} = 1;
68    # Try to release at least one blocked thread
69    cond_signal(%$self);
70}
71
72# Return 1 or more items from the head of a queue, blocking if needed
73sub dequeue
74{
75    my $self = shift;
76    lock(%$self);
77    my $queue = $$self{'queue'};
78
79    my $count = @_ ? $self->_validate_count(shift) : 1;
80
81    # Wait for requisite number of items
82    cond_wait(%$self) while ((@$queue < $count) && ! $$self{'ENDED'});
83    cond_signal(%$self) if ((@$queue >= $count) || $$self{'ENDED'});
84
85    # If no longer blocking, try getting whatever is left on the queue
86    return $self->dequeue_nb($count) if ($$self{'ENDED'});
87
88    # Return single item
89    return shift(@$queue) if ($count == 1);
90
91    # Return multiple items
92    my @items;
93    push(@items, shift(@$queue)) for (1..$count);
94    return @items;
95}
96
97# Return items from the head of a queue with no blocking
98sub dequeue_nb
99{
100    my $self = shift;
101    lock(%$self);
102    my $queue = $$self{'queue'};
103
104    my $count = @_ ? $self->_validate_count(shift) : 1;
105
106    # Return single item
107    return shift(@$queue) if ($count == 1);
108
109    # Return multiple items
110    my @items;
111    for (1..$count) {
112        last if (! @$queue);
113        push(@items, shift(@$queue));
114    }
115    return @items;
116}
117
118# Return items from the head of a queue, blocking if needed up to a timeout
119sub dequeue_timed
120{
121    my $self = shift;
122    lock(%$self);
123    my $queue = $$self{'queue'};
124
125    # Timeout may be relative or absolute
126    my $timeout = @_ ? $self->_validate_timeout(shift) : -1;
127    # Convert to an absolute time for use with cond_timedwait()
128    if ($timeout < 32000000) {   # More than one year
129        $timeout += time();
130    }
131
132    my $count = @_ ? $self->_validate_count(shift) : 1;
133
134    # Wait for requisite number of items, or until timeout
135    while ((@$queue < $count) && ! $$self{'ENDED'}) {
136        last if (! cond_timedwait(%$self, $timeout));
137    }
138    cond_signal(%$self) if ((@$queue >= $count) || $$self{'ENDED'});
139
140    # Get whatever we need off the queue if available
141    return $self->dequeue_nb($count);
142}
143
144# Return an item without removing it from a queue
145sub peek
146{
147    my $self = shift;
148    lock(%$self);
149    my $index = @_ ? $self->_validate_index(shift) : 0;
150    return $$self{'queue'}[$index];
151}
152
153# Insert items anywhere into a queue
154sub insert
155{
156    my $self = shift;
157    lock(%$self);
158
159    if ($$self{'ENDED'}) {
160        require Carp;
161        Carp::croak("'insert' method called on queue that has been 'end'ed");
162    }
163
164    my $queue = $$self{'queue'};
165
166    my $index = $self->_validate_index(shift);
167
168    return if (! @_);   # Nothing to insert
169
170    # Support negative indices
171    if ($index < 0) {
172        $index += @$queue;
173        if ($index < 0) {
174            $index = 0;
175        }
176    }
177
178    # Dequeue items from $index onward
179    my @tmp;
180    while (@$queue > $index) {
181        unshift(@tmp, pop(@$queue))
182    }
183
184    # Add new items to the queue
185    push(@$queue, map { shared_clone($_) } @_);
186
187    # Add previous items back onto the queue
188    push(@$queue, @tmp);
189
190    # Soup's up
191    cond_signal(%$self);
192}
193
194# Remove items from anywhere in a queue
195sub extract
196{
197    my $self = shift;
198    lock(%$self);
199    my $queue = $$self{'queue'};
200
201    my $index = @_ ? $self->_validate_index(shift) : 0;
202    my $count = @_ ? $self->_validate_count(shift) : 1;
203
204    # Support negative indices
205    if ($index < 0) {
206        $index += @$queue;
207        if ($index < 0) {
208            $count += $index;
209            return if ($count <= 0);            # Beyond the head of the queue
210            return $self->dequeue_nb($count);  # Extract from the head
211        }
212    }
213
214    # Dequeue items from $index+$count onward
215    my @tmp;
216    while (@$queue > ($index+$count)) {
217        unshift(@tmp, pop(@$queue))
218    }
219
220    # Extract desired items
221    my @items;
222    unshift(@items, pop(@$queue)) while (@$queue > $index);
223
224    # Add back any removed items
225    push(@$queue, @tmp);
226
227    # Return single item
228    return $items[0] if ($count == 1);
229
230    # Return multiple items
231    return @items;
232}
233
234### Internal Methods ###
235
236# Check value of the requested index
237sub _validate_index
238{
239    my $self = shift;
240    my $index = shift;
241
242    if (! defined($index) ||
243        ! looks_like_number($index) ||
244        (int($index) != $index))
245    {
246        require Carp;
247        my ($method) = (caller(1))[3];
248        my $class_name = ref($self);
249        $method =~ s/$class_name\:://;
250        $index = 'undef' if (! defined($index));
251        Carp::croak("Invalid 'index' argument ($index) to '$method' method");
252    }
253
254    return $index;
255};
256
257# Check value of the requested count
258sub _validate_count
259{
260    my $self = shift;
261    my $count = shift;
262
263    if (! defined($count) ||
264        ! looks_like_number($count) ||
265        (int($count) != $count) ||
266        ($count < 1))
267    {
268        require Carp;
269        my ($method) = (caller(1))[3];
270        my $class_name = ref($self);
271        $method =~ s/$class_name\:://;
272        $count = 'undef' if (! defined($count));
273        Carp::croak("Invalid 'count' argument ($count) to '$method' method");
274    }
275
276    return $count;
277};
278
279# Check value of the requested timeout
280sub _validate_timeout
281{
282    my $self = shift;
283    my $timeout = shift;
284
285    if (! defined($timeout) ||
286        ! looks_like_number($timeout))
287    {
288        require Carp;
289        my ($method) = (caller(1))[3];
290        my $class_name = ref($self);
291        $method =~ s/$class_name\:://;
292        $timeout = 'undef' if (! defined($timeout));
293        Carp::croak("Invalid 'timeout' argument ($timeout) to '$method' method");
294    }
295
296    return $timeout;
297};
298
2991;
300
301=head1 NAME
302
303Thread::Queue - Thread-safe queues
304
305=head1 VERSION
306
307This document describes Thread::Queue version 3.09
308
309=head1 SYNOPSIS
310
311    use strict;
312    use warnings;
313
314    use threads;
315    use Thread::Queue;
316
317    my $q = Thread::Queue->new();    # A new empty queue
318
319    # Worker thread
320    my $thr = threads->create(
321        sub {
322            # Thread will loop until no more work
323            while (defined(my $item = $q->dequeue())) {
324                # Do work on $item
325                ...
326            }
327        }
328    );
329
330    # Send work to the thread
331    $q->enqueue($item1, ...);
332    # Signal that there is no more work to be sent
333    $q->end();
334    # Join up with the thread when it finishes
335    $thr->join();
336
337    ...
338
339    # Count of items in the queue
340    my $left = $q->pending();
341
342    # Non-blocking dequeue
343    if (defined(my $item = $q->dequeue_nb())) {
344        # Work on $item
345    }
346
347    # Blocking dequeue with 5-second timeout
348    if (defined(my $item = $q->dequeue_timed(5))) {
349        # Work on $item
350    }
351
352    # Set a size for a queue
353    $q->limit = 5;
354
355    # Get the second item in the queue without dequeuing anything
356    my $item = $q->peek(1);
357
358    # Insert two items into the queue just behind the head
359    $q->insert(1, $item1, $item2);
360
361    # Extract the last two items on the queue
362    my ($item1, $item2) = $q->extract(-2, 2);
363
364=head1 DESCRIPTION
365
366This module provides thread-safe FIFO queues that can be accessed safely by
367any number of threads.
368
369Any data types supported by L<threads::shared> can be passed via queues:
370
371=over
372
373=item Ordinary scalars
374
375=item Array refs
376
377=item Hash refs
378
379=item Scalar refs
380
381=item Objects based on the above
382
383=back
384
385Ordinary scalars are added to queues as they are.
386
387If not already thread-shared, the other complex data types will be cloned
388(recursively, if needed, and including any C<bless>ings and read-only
389settings) into thread-shared structures before being placed onto a queue.
390
391For example, the following would cause L<Thread::Queue> to create a empty,
392shared array reference via C<&shared([])>, copy the elements 'foo', 'bar'
393and 'baz' from C<@ary> into it, and then place that shared reference onto
394the queue:
395
396 my @ary = qw/foo bar baz/;
397 $q->enqueue(\@ary);
398
399However, for the following, the items are already shared, so their references
400are added directly to the queue, and no cloning takes place:
401
402 my @ary :shared = qw/foo bar baz/;
403 $q->enqueue(\@ary);
404
405 my $obj = &shared({});
406 $$obj{'foo'} = 'bar';
407 $$obj{'qux'} = 99;
408 bless($obj, 'My::Class');
409 $q->enqueue($obj);
410
411See L</"LIMITATIONS"> for caveats related to passing objects via queues.
412
413=head1 QUEUE CREATION
414
415=over
416
417=item ->new()
418
419Creates a new empty queue.
420
421=item ->new(LIST)
422
423Creates a new queue pre-populated with the provided list of items.
424
425=back
426
427=head1 BASIC METHODS
428
429The following methods deal with queues on a FIFO basis.
430
431=over
432
433=item ->enqueue(LIST)
434
435Adds a list of items onto the end of the queue.
436
437=item ->dequeue()
438
439=item ->dequeue(COUNT)
440
441Removes the requested number of items (default is 1) from the head of the
442queue, and returns them.  If the queue contains fewer than the requested
443number of items, then the thread will be blocked until the requisite number
444of items are available (i.e., until other threads C<enqueue> more items).
445
446=item ->dequeue_nb()
447
448=item ->dequeue_nb(COUNT)
449
450Removes the requested number of items (default is 1) from the head of the
451queue, and returns them.  If the queue contains fewer than the requested
452number of items, then it immediately (i.e., non-blocking) returns whatever
453items there are on the queue.  If the queue is empty, then C<undef> is
454returned.
455
456=item ->dequeue_timed(TIMEOUT)
457
458=item ->dequeue_timed(TIMEOUT, COUNT)
459
460Removes the requested number of items (default is 1) from the head of the
461queue, and returns them.  If the queue contains fewer than the requested
462number of items, then the thread will be blocked until the requisite number of
463items are available, or until the timeout is reached.  If the timeout is
464reached, it returns whatever items there are on the queue, or C<undef> if the
465queue is empty.
466
467The timeout may be a number of seconds relative to the current time (e.g., 5
468seconds from when the call is made), or may be an absolute timeout in I<epoch>
469seconds the same as would be used with
470L<cond_timedwait()|threads::shared/"cond_timedwait VARIABLE, ABS_TIMEOUT">.
471Fractional seconds (e.g., 2.5 seconds) are also supported (to the extent of
472the underlying implementation).
473
474If C<TIMEOUT> is missing, C<undef>, or less than or equal to 0, then this call
475behaves the same as C<dequeue_nb>.
476
477=item ->pending()
478
479Returns the number of items still in the queue.  Returns C<undef> if the queue
480has been ended (see below), and there are no more items in the queue.
481
482=item ->limit
483
484Sets the size of the queue.  If set, calls to C<enqueue()> will block until
485the number of pending items in the queue drops below the C<limit>.  The
486C<limit> does not prevent enqueuing items beyond that count:
487
488 my $q = Thread::Queue->new(1, 2);
489 $q->limit = 4;
490 $q->enqueue(3, 4, 5);   # Does not block
491 $q->enqueue(6);         # Blocks until at least 2 items are
492                         # dequeued
493 my $size = $q->limit;   # Returns the current limit (may return
494                         # 'undef')
495 $q->limit = 0;          # Queue size is now unlimited
496
497=item ->end()
498
499Declares that no more items will be added to the queue.
500
501All threads blocking on C<dequeue()> calls will be unblocked with any
502remaining items in the queue and/or C<undef> being returned.  Any subsequent
503calls to C<dequeue()> will behave like C<dequeue_nb()>.
504
505Once ended, no more items may be placed in the queue.
506
507=back
508
509=head1 ADVANCED METHODS
510
511The following methods can be used to manipulate items anywhere in a queue.
512
513To prevent the contents of a queue from being modified by another thread
514while it is being examined and/or changed, L<lock|threads::shared/"lock
515VARIABLE"> the queue inside a local block:
516
517 {
518     lock($q);   # Keep other threads from changing the queue's contents
519     my $item = $q->peek();
520     if ($item ...) {
521         ...
522     }
523 }
524 # Queue is now unlocked
525
526=over
527
528=item ->peek()
529
530=item ->peek(INDEX)
531
532Returns an item from the queue without dequeuing anything.  Defaults to the
533the head of queue (at index position 0) if no index is specified.  Negative
534index values are supported as with L<arrays|perldata/"Subscripts"> (i.e., -1
535is the end of the queue, -2 is next to last, and so on).
536
537If no items exists at the specified index (i.e., the queue is empty, or the
538index is beyond the number of items on the queue), then C<undef> is returned.
539
540Remember, the returned item is not removed from the queue, so manipulating a
541C<peek>ed at reference affects the item on the queue.
542
543=item ->insert(INDEX, LIST)
544
545Adds the list of items to the queue at the specified index position (0
546is the head of the list).  Any existing items at and beyond that position are
547pushed back past the newly added items:
548
549 $q->enqueue(1, 2, 3, 4);
550 $q->insert(1, qw/foo bar/);
551 # Queue now contains:  1, foo, bar, 2, 3, 4
552
553Specifying an index position greater than the number of items in the queue
554just adds the list to the end.
555
556Negative index positions are supported:
557
558 $q->enqueue(1, 2, 3, 4);
559 $q->insert(-2, qw/foo bar/);
560 # Queue now contains:  1, 2, foo, bar, 3, 4
561
562Specifying a negative index position greater than the number of items in the
563queue adds the list to the head of the queue.
564
565=item ->extract()
566
567=item ->extract(INDEX)
568
569=item ->extract(INDEX, COUNT)
570
571Removes and returns the specified number of items (defaults to 1) from the
572specified index position in the queue (0 is the head of the queue).  When
573called with no arguments, C<extract> operates the same as C<dequeue_nb>.
574
575This method is non-blocking, and will return only as many items as are
576available to fulfill the request:
577
578 $q->enqueue(1, 2, 3, 4);
579 my $item  = $q->extract(2)     # Returns 3
580                                # Queue now contains:  1, 2, 4
581 my @items = $q->extract(1, 3)  # Returns (2, 4)
582                                # Queue now contains:  1
583
584Specifying an index position greater than the number of items in the
585queue results in C<undef> or an empty list being returned.
586
587 $q->enqueue('foo');
588 my $nada = $q->extract(3)      # Returns undef
589 my @nada = $q->extract(1, 3)   # Returns ()
590
591Negative index positions are supported.  Specifying a negative index position
592greater than the number of items in the queue may return items from the head
593of the queue (similar to C<dequeue_nb>) if the count overlaps the head of the
594queue from the specified position (i.e. if queue size + index + count is
595greater than zero):
596
597 $q->enqueue(qw/foo bar baz/);
598 my @nada = $q->extract(-6, 2);  # Returns ()      - (3+(-6)+2) <= 0
599 my @some = $q->extract(-6, 4);  # Returns (foo)   - (3+(-6)+4) > 0
600                                 # Queue now contains:  bar, baz
601 my @rest = $q->extract(-3, 4);  # Returns (bar, baz) -
602                                 #                   (2+(-3)+4) > 0
603
604=back
605
606=head1 NOTES
607
608Queues created by L<Thread::Queue> can be used in both threaded and
609non-threaded applications.
610
611=head1 LIMITATIONS
612
613Passing objects on queues may not work if the objects' classes do not support
614sharing.  See L<threads::shared/"BUGS AND LIMITATIONS"> for more.
615
616Passing array/hash refs that contain objects may not work for Perl prior to
6175.10.0.
618
619=head1 SEE ALSO
620
621Thread::Queue Discussion Forum on CPAN:
622L<http://www.cpanforum.com/dist/Thread-Queue>
623
624L<threads>, L<threads::shared>
625
626Sample code in the I<examples> directory of this distribution on CPAN.
627
628=head1 MAINTAINER
629
630Jerry D. Hedden, S<E<lt>jdhedden AT cpan DOT orgE<gt>>
631
632=head1 LICENSE
633
634This program is free software; you can redistribute it and/or modify it under
635the same terms as Perl itself.
636
637=cut
638