xref: /openbsd-src/gnu/usr.bin/perl/dist/Thread-Queue/lib/Thread/Queue.pm (revision e5157e49389faebcb42b7237d55fbf096d9c2523)
1package Thread::Queue;
2
3use strict;
4use warnings;
5
6our $VERSION = '3.05';
7$VERSION = eval $VERSION;
8
9use threads::shared 1.21;
10use Scalar::Util 1.10 qw(looks_like_number blessed reftype refaddr);
11
12# Carp errors from threads::shared calls should complain about caller
13our @CARP_NOT = ("threads::shared");
14
15# Create a new queue possibly pre-populated with items
16sub new
17{
18    my $class = shift;
19    my @queue :shared = map { shared_clone($_) } @_;
20    my %self :shared = ( 'queue' => \@queue );
21    return bless(\%self, $class);
22}
23
24# Add items to the tail of a queue
25sub enqueue
26{
27    my $self = shift;
28    lock(%$self);
29    if ($$self{'ENDED'}) {
30        require Carp;
31        Carp::croak("'enqueue' method called on queue that has been 'end'ed");
32    }
33    push(@{$$self{'queue'}}, map { shared_clone($_) } @_)
34        and cond_signal(%$self);
35}
36
37# Return a count of the number of items on a queue
38sub pending
39{
40    my $self = shift;
41    lock(%$self);
42    return if ($$self{'ENDED'} && ! @{$$self{'queue'}});
43    return scalar(@{$$self{'queue'}});
44}
45
46# Indicate that no more data will enter the queue
47sub end
48{
49    my $self = shift;
50    lock $self;
51    # No more data is coming
52    $$self{'ENDED'} = 1;
53    # Try to release at least one blocked thread
54    cond_signal(%$self);
55}
56
57# Return 1 or more items from the head of a queue, blocking if needed
58sub dequeue
59{
60    my $self = shift;
61    lock(%$self);
62    my $queue = $$self{'queue'};
63
64    my $count = @_ ? $self->_validate_count(shift) : 1;
65
66    # Wait for requisite number of items
67    cond_wait(%$self) while ((@$queue < $count) && ! $$self{'ENDED'});
68    cond_signal(%$self) if ((@$queue > $count) || $$self{'ENDED'});
69
70    # If no longer blocking, try getting whatever is left on the queue
71    return $self->dequeue_nb($count) if ($$self{'ENDED'});
72
73    # Return single item
74    return shift(@$queue) if ($count == 1);
75
76    # Return multiple items
77    my @items;
78    push(@items, shift(@$queue)) for (1..$count);
79    return @items;
80}
81
82# Return items from the head of a queue with no blocking
83sub dequeue_nb
84{
85    my $self = shift;
86    lock(%$self);
87    my $queue = $$self{'queue'};
88
89    my $count = @_ ? $self->_validate_count(shift) : 1;
90
91    # Return single item
92    return shift(@$queue) if ($count == 1);
93
94    # Return multiple items
95    my @items;
96    for (1..$count) {
97        last if (! @$queue);
98        push(@items, shift(@$queue));
99    }
100    return @items;
101}
102
103# Return items from the head of a queue, blocking if needed up to a timeout
104sub dequeue_timed
105{
106    my $self = shift;
107    lock(%$self);
108    my $queue = $$self{'queue'};
109
110    # Timeout may be relative or absolute
111    my $timeout = @_ ? $self->_validate_timeout(shift) : -1;
112    # Convert to an absolute time for use with cond_timedwait()
113    if ($timeout < 32000000) {   # More than one year
114        $timeout += time();
115    }
116
117    my $count = @_ ? $self->_validate_count(shift) : 1;
118
119    # Wait for requisite number of items, or until timeout
120    while ((@$queue < $count) && ! $$self{'ENDED'}) {
121        last if (! cond_timedwait(%$self, $timeout));
122    }
123    cond_signal(%$self) if ((@$queue > $count) || $$self{'ENDED'});
124
125    # Get whatever we need off the queue if available
126    return $self->dequeue_nb($count);
127}
128
129# Return an item without removing it from a queue
130sub peek
131{
132    my $self = shift;
133    lock(%$self);
134    my $index = @_ ? $self->_validate_index(shift) : 0;
135    return $$self{'queue'}[$index];
136}
137
138# Insert items anywhere into a queue
139sub insert
140{
141    my $self = shift;
142    lock(%$self);
143
144    if ($$self{'ENDED'}) {
145        require Carp;
146        Carp::croak("'insert' method called on queue that has been 'end'ed");
147    }
148
149    my $queue = $$self{'queue'};
150
151    my $index = $self->_validate_index(shift);
152
153    return if (! @_);   # Nothing to insert
154
155    # Support negative indices
156    if ($index < 0) {
157        $index += @$queue;
158        if ($index < 0) {
159            $index = 0;
160        }
161    }
162
163    # Dequeue items from $index onward
164    my @tmp;
165    while (@$queue > $index) {
166        unshift(@tmp, pop(@$queue))
167    }
168
169    # Add new items to the queue
170    push(@$queue, map { shared_clone($_) } @_);
171
172    # Add previous items back onto the queue
173    push(@$queue, @tmp);
174
175    # Soup's up
176    cond_signal(%$self);
177}
178
179# Remove items from anywhere in a queue
180sub extract
181{
182    my $self = shift;
183    lock(%$self);
184    my $queue = $$self{'queue'};
185
186    my $index = @_ ? $self->_validate_index(shift) : 0;
187    my $count = @_ ? $self->_validate_count(shift) : 1;
188
189    # Support negative indices
190    if ($index < 0) {
191        $index += @$queue;
192        if ($index < 0) {
193            $count += $index;
194            return if ($count <= 0);            # Beyond the head of the queue
195            return $self->dequeue_nb($count);  # Extract from the head
196        }
197    }
198
199    # Dequeue items from $index+$count onward
200    my @tmp;
201    while (@$queue > ($index+$count)) {
202        unshift(@tmp, pop(@$queue))
203    }
204
205    # Extract desired items
206    my @items;
207    unshift(@items, pop(@$queue)) while (@$queue > $index);
208
209    # Add back any removed items
210    push(@$queue, @tmp);
211
212    # Return single item
213    return $items[0] if ($count == 1);
214
215    # Return multiple items
216    return @items;
217}
218
219### Internal Methods ###
220
221# Check value of the requested index
222sub _validate_index
223{
224    my $self = shift;
225    my $index = shift;
226
227    if (! defined($index) ||
228        ! looks_like_number($index) ||
229        (int($index) != $index))
230    {
231        require Carp;
232        my ($method) = (caller(1))[3];
233        my $class_name = ref($self);
234        $method =~ s/$class_name\:://;
235        $index = 'undef' if (! defined($index));
236        Carp::croak("Invalid 'index' argument ($index) to '$method' method");
237    }
238
239    return $index;
240};
241
242# Check value of the requested count
243sub _validate_count
244{
245    my $self = shift;
246    my $count = shift;
247
248    if (! defined($count) ||
249        ! looks_like_number($count) ||
250        (int($count) != $count) ||
251        ($count < 1))
252    {
253        require Carp;
254        my ($method) = (caller(1))[3];
255        my $class_name = ref($self);
256        $method =~ s/$class_name\:://;
257        $count = 'undef' if (! defined($count));
258        Carp::croak("Invalid 'count' argument ($count) to '$method' method");
259    }
260
261    return $count;
262};
263
264# Check value of the requested timeout
265sub _validate_timeout
266{
267    my $self = shift;
268    my $timeout = shift;
269
270    if (! defined($timeout) ||
271        ! looks_like_number($timeout))
272    {
273        require Carp;
274        my ($method) = (caller(1))[3];
275        my $class_name = ref($self);
276        $method =~ s/$class_name\:://;
277        $timeout = 'undef' if (! defined($timeout));
278        Carp::croak("Invalid 'timeout' argument ($timeout) to '$method' method");
279    }
280
281    return $timeout;
282};
283
2841;
285
286=head1 NAME
287
288Thread::Queue - Thread-safe queues
289
290=head1 VERSION
291
292This document describes Thread::Queue version 3.05
293
294=head1 SYNOPSIS
295
296    use strict;
297    use warnings;
298
299    use threads;
300    use Thread::Queue;
301
302    my $q = Thread::Queue->new();    # A new empty queue
303
304    # Worker thread
305    my $thr = threads->create(
306        sub {
307            # Thread will loop until no more work
308            while (defined(my $item = $q->dequeue())) {
309                # Do work on $item
310                ...
311            }
312        }
313    );
314
315    # Send work to the thread
316    $q->enqueue($item1, ...);
317    # Signal that there is no more work to be sent
318    $q->end();
319    # Join up with the thread when it finishes
320    $thr->join();
321
322    ...
323
324    # Count of items in the queue
325    my $left = $q->pending();
326
327    # Non-blocking dequeue
328    if (defined(my $item = $q->dequeue_nb())) {
329        # Work on $item
330    }
331
332    # Blocking dequeue with 5-second timeout
333    if (defined(my $item = $q->dequeue_timed(5))) {
334        # Work on $item
335    }
336
337    # Get the second item in the queue without dequeuing anything
338    my $item = $q->peek(1);
339
340    # Insert two items into the queue just behind the head
341    $q->insert(1, $item1, $item2);
342
343    # Extract the last two items on the queue
344    my ($item1, $item2) = $q->extract(-2, 2);
345
346=head1 DESCRIPTION
347
348This module provides thread-safe FIFO queues that can be accessed safely by
349any number of threads.
350
351Any data types supported by L<threads::shared> can be passed via queues:
352
353=over
354
355=item Ordinary scalars
356
357=item Array refs
358
359=item Hash refs
360
361=item Scalar refs
362
363=item Objects based on the above
364
365=back
366
367Ordinary scalars are added to queues as they are.
368
369If not already thread-shared, the other complex data types will be cloned
370(recursively, if needed, and including any C<bless>ings and read-only
371settings) into thread-shared structures before being placed onto a queue.
372
373For example, the following would cause L<Thread::Queue> to create a empty,
374shared array reference via C<&shared([])>, copy the elements 'foo', 'bar'
375and 'baz' from C<@ary> into it, and then place that shared reference onto
376the queue:
377
378    my @ary = qw/foo bar baz/;
379    $q->enqueue(\@ary);
380
381However, for the following, the items are already shared, so their references
382are added directly to the queue, and no cloning takes place:
383
384    my @ary :shared = qw/foo bar baz/;
385    $q->enqueue(\@ary);
386
387    my $obj = &shared({});
388    $$obj{'foo'} = 'bar';
389    $$obj{'qux'} = 99;
390    bless($obj, 'My::Class');
391    $q->enqueue($obj);
392
393See L</"LIMITATIONS"> for caveats related to passing objects via queues.
394
395=head1 QUEUE CREATION
396
397=over
398
399=item ->new()
400
401Creates a new empty queue.
402
403=item ->new(LIST)
404
405Creates a new queue pre-populated with the provided list of items.
406
407=back
408
409=head1 BASIC METHODS
410
411The following methods deal with queues on a FIFO basis.
412
413=over
414
415=item ->enqueue(LIST)
416
417Adds a list of items onto the end of the queue.
418
419=item ->dequeue()
420
421=item ->dequeue(COUNT)
422
423Removes the requested number of items (default is 1) from the head of the
424queue, and returns them.  If the queue contains fewer than the requested
425number of items, then the thread will be blocked until the requisite number
426of items are available (i.e., until other threads <enqueue> more items).
427
428=item ->dequeue_nb()
429
430=item ->dequeue_nb(COUNT)
431
432Removes the requested number of items (default is 1) from the head of the
433queue, and returns them.  If the queue contains fewer than the requested
434number of items, then it immediately (i.e., non-blocking) returns whatever
435items there are on the queue.  If the queue is empty, then C<undef> is
436returned.
437
438=item ->dequeue_timed(TIMEOUT)
439
440=item ->dequeue_timed(TIMEOUT, COUNT)
441
442Removes the requested number of items (default is 1) from the head of the
443queue, and returns them.  If the queue contains fewer than the requested
444number of items, then the thread will be blocked until the requisite number of
445items are available, or until the timeout is reached.  If the timeout is
446reached, it returns whatever items there are on the queue, or C<undef> if the
447queue is empty.
448
449The timeout may be a number of seconds relative to the current time (e.g., 5
450seconds from when the call is made), or may be an absolute timeout in I<epoch>
451seconds the same as would be used with
452L<cond_timedwait()|threads::shared/"cond_timedwait VARIABLE, ABS_TIMEOUT">.
453Fractional seconds (e.g., 2.5 seconds) are also supported (to the extent of
454the underlying implementation).
455
456If C<TIMEOUT> is missing, C<undef>, or less than or equal to 0, then this call
457behaves the same as C<dequeue_nb>.
458
459=item ->pending()
460
461Returns the number of items still in the queue.  Returns C<undef> if the queue
462has been ended (see below), and there are no more items in the queue.
463
464=item ->end()
465
466Declares that no more items will be added to the queue.
467
468All threads blocking on C<dequeue()> calls will be unblocked with any
469remaining items in the queue and/or C<undef> being returned.  Any subsequent
470calls to C<dequeue()> will behave like C<dequeue_nb()>.
471
472Once ended, no more items may be placed in the queue.
473
474=back
475
476=head1 ADVANCED METHODS
477
478The following methods can be used to manipulate items anywhere in a queue.
479
480To prevent the contents of a queue from being modified by another thread
481while it is being examined and/or changed, L<lock|threads::shared/"lock
482VARIABLE"> the queue inside a local block:
483
484    {
485        lock($q);   # Keep other threads from changing the queue's contents
486        my $item = $q->peek();
487        if ($item ...) {
488            ...
489        }
490    }
491    # Queue is now unlocked
492
493=over
494
495=item ->peek()
496
497=item ->peek(INDEX)
498
499Returns an item from the queue without dequeuing anything.  Defaults to the
500the head of queue (at index position 0) if no index is specified.  Negative
501index values are supported as with L<arrays|perldata/"Subscripts"> (i.e., -1
502is the end of the queue, -2 is next to last, and so on).
503
504If no items exists at the specified index (i.e., the queue is empty, or the
505index is beyond the number of items on the queue), then C<undef> is returned.
506
507Remember, the returned item is not removed from the queue, so manipulating a
508C<peek>ed at reference affects the item on the queue.
509
510=item ->insert(INDEX, LIST)
511
512Adds the list of items to the queue at the specified index position (0
513is the head of the list).  Any existing items at and beyond that position are
514pushed back past the newly added items:
515
516    $q->enqueue(1, 2, 3, 4);
517    $q->insert(1, qw/foo bar/);
518    # Queue now contains:  1, foo, bar, 2, 3, 4
519
520Specifying an index position greater than the number of items in the queue
521just adds the list to the end.
522
523Negative index positions are supported:
524
525    $q->enqueue(1, 2, 3, 4);
526    $q->insert(-2, qw/foo bar/);
527    # Queue now contains:  1, 2, foo, bar, 3, 4
528
529Specifying a negative index position greater than the number of items in the
530queue adds the list to the head of the queue.
531
532=item ->extract()
533
534=item ->extract(INDEX)
535
536=item ->extract(INDEX, COUNT)
537
538Removes and returns the specified number of items (defaults to 1) from the
539specified index position in the queue (0 is the head of the queue).  When
540called with no arguments, C<extract> operates the same as C<dequeue_nb>.
541
542This method is non-blocking, and will return only as many items as are
543available to fulfill the request:
544
545    $q->enqueue(1, 2, 3, 4);
546    my $item  = $q->extract(2)     # Returns 3
547                                   # Queue now contains:  1, 2, 4
548    my @items = $q->extract(1, 3)  # Returns (2, 4)
549                                   # Queue now contains:  1
550
551Specifying an index position greater than the number of items in the
552queue results in C<undef> or an empty list being returned.
553
554    $q->enqueue('foo');
555    my $nada = $q->extract(3)      # Returns undef
556    my @nada = $q->extract(1, 3)   # Returns ()
557
558Negative index positions are supported.  Specifying a negative index position
559greater than the number of items in the queue may return items from the head
560of the queue (similar to C<dequeue_nb>) if the count overlaps the head of the
561queue from the specified position (i.e. if queue size + index + count is
562greater than zero):
563
564    $q->enqueue(qw/foo bar baz/);
565    my @nada = $q->extract(-6, 2);   # Returns ()         - (3+(-6)+2) <= 0
566    my @some = $q->extract(-6, 4);   # Returns (foo)      - (3+(-6)+4) > 0
567                                     # Queue now contains:  bar, baz
568    my @rest = $q->extract(-3, 4);   # Returns (bar, baz) - (2+(-3)+4) > 0
569
570=back
571
572=head1 NOTES
573
574Queues created by L<Thread::Queue> can be used in both threaded and
575non-threaded applications.
576
577=head1 LIMITATIONS
578
579Passing objects on queues may not work if the objects' classes do not support
580sharing.  See L<threads::shared/"BUGS AND LIMITATIONS"> for more.
581
582Passing array/hash refs that contain objects may not work for Perl prior to
5835.10.0.
584
585=head1 SEE ALSO
586
587Thread::Queue Discussion Forum on CPAN:
588L<http://www.cpanforum.com/dist/Thread-Queue>
589
590L<threads>, L<threads::shared>
591
592Sample code in the I<examples> directory of this distribution on CPAN.
593
594=head1 MAINTAINER
595
596Jerry D. Hedden, S<E<lt>jdhedden AT cpan DOT orgE<gt>>
597
598=head1 LICENSE
599
600This program is free software; you can redistribute it and/or modify it under
601the same terms as Perl itself.
602
603=cut
604