xref: /openbsd-src/gnu/usr.bin/perl/dist/Thread-Queue/lib/Thread/Queue.pm (revision 56d68f1e19ff848c889ecfa71d3a06340ff64892)
1b39c5158Smillertpackage Thread::Queue;
2b39c5158Smillert
3b39c5158Smillertuse strict;
4b39c5158Smillertuse warnings;
5b39c5158Smillert
6*56d68f1eSafresh1our $VERSION = '3.14';          # remember to update version in POD!
7898184e3Ssthen$VERSION = eval $VERSION;
8b39c5158Smillert
9b39c5158Smillertuse threads::shared 1.21;
10b39c5158Smillertuse Scalar::Util 1.10 qw(looks_like_number blessed reftype refaddr);
11b39c5158Smillert
12b39c5158Smillert# Carp errors from threads::shared calls should complain about caller
13b39c5158Smillertour @CARP_NOT = ("threads::shared");
14b39c5158Smillert
15b39c5158Smillert# Create a new queue possibly pre-populated with items
16b39c5158Smillertsub new
17b39c5158Smillert{
18b39c5158Smillert    my $class = shift;
19b39c5158Smillert    my @queue :shared = map { shared_clone($_) } @_;
2091f110e0Safresh1    my %self :shared = ( 'queue' => \@queue );
2191f110e0Safresh1    return bless(\%self, $class);
22b39c5158Smillert}
23b39c5158Smillert
24b39c5158Smillert# Add items to the tail of a queue
25b39c5158Smillertsub enqueue
26b39c5158Smillert{
2791f110e0Safresh1    my $self = shift;
2891f110e0Safresh1    lock(%$self);
29b8851fccSafresh1
3091f110e0Safresh1    if ($$self{'ENDED'}) {
3191f110e0Safresh1        require Carp;
3291f110e0Safresh1        Carp::croak("'enqueue' method called on queue that has been 'end'ed");
3391f110e0Safresh1    }
34b8851fccSafresh1
35b8851fccSafresh1    # Block if queue size exceeds any specified limit
36b8851fccSafresh1    my $queue = $$self{'queue'};
37b8851fccSafresh1    cond_wait(%$self) while ($$self{'LIMIT'} && (@$queue >= $$self{'LIMIT'}));
38b8851fccSafresh1
39b8851fccSafresh1    # Add items to queue, and then signal other threads
40b8851fccSafresh1    push(@$queue, map { shared_clone($_) } @_)
4191f110e0Safresh1        and cond_signal(%$self);
42b39c5158Smillert}
43b39c5158Smillert
44b8851fccSafresh1# Set or return the max. size for a queue
45b8851fccSafresh1sub limit : lvalue
46b8851fccSafresh1{
47b8851fccSafresh1    my $self = shift;
48b8851fccSafresh1    lock(%$self);
49b8851fccSafresh1    $$self{'LIMIT'};
50b8851fccSafresh1}
51b8851fccSafresh1
52b39c5158Smillert# Return a count of the number of items on a queue
53b39c5158Smillertsub pending
54b39c5158Smillert{
5591f110e0Safresh1    my $self = shift;
5691f110e0Safresh1    lock(%$self);
5791f110e0Safresh1    return if ($$self{'ENDED'} && ! @{$$self{'queue'}});
5891f110e0Safresh1    return scalar(@{$$self{'queue'}});
5991f110e0Safresh1}
6091f110e0Safresh1
6191f110e0Safresh1# Indicate that no more data will enter the queue
6291f110e0Safresh1sub end
6391f110e0Safresh1{
6491f110e0Safresh1    my $self = shift;
65b8851fccSafresh1    lock(%$self);
6691f110e0Safresh1    # No more data is coming
6791f110e0Safresh1    $$self{'ENDED'} = 1;
689f11ffb7Safresh1
699f11ffb7Safresh1    cond_signal(%$self);  # Unblock possibly waiting threads
70b39c5158Smillert}
71b39c5158Smillert
72b39c5158Smillert# Return 1 or more items from the head of a queue, blocking if needed
73b39c5158Smillertsub dequeue
74b39c5158Smillert{
7591f110e0Safresh1    my $self = shift;
7691f110e0Safresh1    lock(%$self);
7791f110e0Safresh1    my $queue = $$self{'queue'};
78b39c5158Smillert
796fb12b70Safresh1    my $count = @_ ? $self->_validate_count(shift) : 1;
80b39c5158Smillert
81b39c5158Smillert    # Wait for requisite number of items
8291f110e0Safresh1    cond_wait(%$self) while ((@$queue < $count) && ! $$self{'ENDED'});
8391f110e0Safresh1
8491f110e0Safresh1    # If no longer blocking, try getting whatever is left on the queue
8591f110e0Safresh1    return $self->dequeue_nb($count) if ($$self{'ENDED'});
86b39c5158Smillert
87b39c5158Smillert    # Return single item
889f11ffb7Safresh1    if ($count == 1) {
899f11ffb7Safresh1        my $item = shift(@$queue);
909f11ffb7Safresh1        cond_signal(%$self);  # Unblock possibly waiting threads
919f11ffb7Safresh1        return $item;
929f11ffb7Safresh1    }
93b39c5158Smillert
94b39c5158Smillert    # Return multiple items
95b39c5158Smillert    my @items;
96b39c5158Smillert    push(@items, shift(@$queue)) for (1..$count);
979f11ffb7Safresh1    cond_signal(%$self);  # Unblock possibly waiting threads
98b39c5158Smillert    return @items;
99b39c5158Smillert}
100b39c5158Smillert
101b39c5158Smillert# Return items from the head of a queue with no blocking
102b39c5158Smillertsub dequeue_nb
103b39c5158Smillert{
10491f110e0Safresh1    my $self = shift;
10591f110e0Safresh1    lock(%$self);
10691f110e0Safresh1    my $queue = $$self{'queue'};
107b39c5158Smillert
1086fb12b70Safresh1    my $count = @_ ? $self->_validate_count(shift) : 1;
109b39c5158Smillert
110b39c5158Smillert    # Return single item
1119f11ffb7Safresh1    if ($count == 1) {
1129f11ffb7Safresh1        my $item = shift(@$queue);
1139f11ffb7Safresh1        cond_signal(%$self);  # Unblock possibly waiting threads
1149f11ffb7Safresh1        return $item;
1159f11ffb7Safresh1    }
116b39c5158Smillert
117b39c5158Smillert    # Return multiple items
118b39c5158Smillert    my @items;
119b39c5158Smillert    for (1..$count) {
120b39c5158Smillert        last if (! @$queue);
121b39c5158Smillert        push(@items, shift(@$queue));
122b39c5158Smillert    }
1239f11ffb7Safresh1    cond_signal(%$self);  # Unblock possibly waiting threads
124b39c5158Smillert    return @items;
125b39c5158Smillert}
126b39c5158Smillert
12791f110e0Safresh1# Return items from the head of a queue, blocking if needed up to a timeout
12891f110e0Safresh1sub dequeue_timed
12991f110e0Safresh1{
13091f110e0Safresh1    my $self = shift;
13191f110e0Safresh1    lock(%$self);
13291f110e0Safresh1    my $queue = $$self{'queue'};
13391f110e0Safresh1
13491f110e0Safresh1    # Timeout may be relative or absolute
1356fb12b70Safresh1    my $timeout = @_ ? $self->_validate_timeout(shift) : -1;
13691f110e0Safresh1    # Convert to an absolute time for use with cond_timedwait()
13791f110e0Safresh1    if ($timeout < 32000000) {   # More than one year
13891f110e0Safresh1        $timeout += time();
13991f110e0Safresh1    }
14091f110e0Safresh1
1416fb12b70Safresh1    my $count = @_ ? $self->_validate_count(shift) : 1;
14291f110e0Safresh1
14391f110e0Safresh1    # Wait for requisite number of items, or until timeout
14491f110e0Safresh1    while ((@$queue < $count) && ! $$self{'ENDED'}) {
14591f110e0Safresh1        last if (! cond_timedwait(%$self, $timeout));
14691f110e0Safresh1    }
14791f110e0Safresh1
14891f110e0Safresh1    # Get whatever we need off the queue if available
14991f110e0Safresh1    return $self->dequeue_nb($count);
15091f110e0Safresh1}
15191f110e0Safresh1
152b39c5158Smillert# Return an item without removing it from a queue
153b39c5158Smillertsub peek
154b39c5158Smillert{
15591f110e0Safresh1    my $self = shift;
15691f110e0Safresh1    lock(%$self);
1576fb12b70Safresh1    my $index = @_ ? $self->_validate_index(shift) : 0;
15891f110e0Safresh1    return $$self{'queue'}[$index];
159b39c5158Smillert}
160b39c5158Smillert
161b39c5158Smillert# Insert items anywhere into a queue
162b39c5158Smillertsub insert
163b39c5158Smillert{
16491f110e0Safresh1    my $self = shift;
16591f110e0Safresh1    lock(%$self);
16691f110e0Safresh1
16791f110e0Safresh1    if ($$self{'ENDED'}) {
16891f110e0Safresh1        require Carp;
16991f110e0Safresh1        Carp::croak("'insert' method called on queue that has been 'end'ed");
17091f110e0Safresh1    }
17191f110e0Safresh1
17291f110e0Safresh1    my $queue = $$self{'queue'};
173b39c5158Smillert
1746fb12b70Safresh1    my $index = $self->_validate_index(shift);
175b39c5158Smillert
176b39c5158Smillert    return if (! @_);   # Nothing to insert
177b39c5158Smillert
178b39c5158Smillert    # Support negative indices
179b39c5158Smillert    if ($index < 0) {
180b39c5158Smillert        $index += @$queue;
181b39c5158Smillert        if ($index < 0) {
182b39c5158Smillert            $index = 0;
183b39c5158Smillert        }
184b39c5158Smillert    }
185b39c5158Smillert
186b39c5158Smillert    # Dequeue items from $index onward
187b39c5158Smillert    my @tmp;
188b39c5158Smillert    while (@$queue > $index) {
189b39c5158Smillert        unshift(@tmp, pop(@$queue))
190b39c5158Smillert    }
191b39c5158Smillert
192b39c5158Smillert    # Add new items to the queue
193b39c5158Smillert    push(@$queue, map { shared_clone($_) } @_);
194b39c5158Smillert
195b39c5158Smillert    # Add previous items back onto the queue
196b39c5158Smillert    push(@$queue, @tmp);
197b39c5158Smillert
1989f11ffb7Safresh1    cond_signal(%$self);  # Unblock possibly waiting threads
199b39c5158Smillert}
200b39c5158Smillert
201b39c5158Smillert# Remove items from anywhere in a queue
202b39c5158Smillertsub extract
203b39c5158Smillert{
20491f110e0Safresh1    my $self = shift;
20591f110e0Safresh1    lock(%$self);
20691f110e0Safresh1    my $queue = $$self{'queue'};
207b39c5158Smillert
2086fb12b70Safresh1    my $index = @_ ? $self->_validate_index(shift) : 0;
2096fb12b70Safresh1    my $count = @_ ? $self->_validate_count(shift) : 1;
210b39c5158Smillert
211b39c5158Smillert    # Support negative indices
212b39c5158Smillert    if ($index < 0) {
213b39c5158Smillert        $index += @$queue;
214b39c5158Smillert        if ($index < 0) {
215b39c5158Smillert            $count += $index;
216b39c5158Smillert            return if ($count <= 0);           # Beyond the head of the queue
21791f110e0Safresh1            return $self->dequeue_nb($count);  # Extract from the head
218b39c5158Smillert        }
219b39c5158Smillert    }
220b39c5158Smillert
221b39c5158Smillert    # Dequeue items from $index+$count onward
222b39c5158Smillert    my @tmp;
223b39c5158Smillert    while (@$queue > ($index+$count)) {
224b39c5158Smillert        unshift(@tmp, pop(@$queue))
225b39c5158Smillert    }
226b39c5158Smillert
227b39c5158Smillert    # Extract desired items
228b39c5158Smillert    my @items;
229b39c5158Smillert    unshift(@items, pop(@$queue)) while (@$queue > $index);
230b39c5158Smillert
231b39c5158Smillert    # Add back any removed items
232b39c5158Smillert    push(@$queue, @tmp);
233b39c5158Smillert
2349f11ffb7Safresh1    cond_signal(%$self);  # Unblock possibly waiting threads
2359f11ffb7Safresh1
236b39c5158Smillert    # Return single item
237b39c5158Smillert    return $items[0] if ($count == 1);
238b39c5158Smillert
239b39c5158Smillert    # Return multiple items
240b39c5158Smillert    return @items;
241b39c5158Smillert}
242b39c5158Smillert
2436fb12b70Safresh1### Internal Methods ###
244b39c5158Smillert
245b39c5158Smillert# Check value of the requested index
2466fb12b70Safresh1sub _validate_index
2476fb12b70Safresh1{
2486fb12b70Safresh1    my $self = shift;
249b39c5158Smillert    my $index = shift;
250b39c5158Smillert
251b39c5158Smillert    if (! defined($index) ||
252b39c5158Smillert        ! looks_like_number($index) ||
253b39c5158Smillert        (int($index) != $index))
254b39c5158Smillert    {
255b39c5158Smillert        require Carp;
256b39c5158Smillert        my ($method) = (caller(1))[3];
2576fb12b70Safresh1        my $class_name = ref($self);
2586fb12b70Safresh1        $method =~ s/$class_name\:://;
259b39c5158Smillert        $index = 'undef' if (! defined($index));
260b39c5158Smillert        Carp::croak("Invalid 'index' argument ($index) to '$method' method");
261b39c5158Smillert    }
262b39c5158Smillert
263b39c5158Smillert    return $index;
264b39c5158Smillert};
265b39c5158Smillert
266b39c5158Smillert# Check value of the requested count
2676fb12b70Safresh1sub _validate_count
2686fb12b70Safresh1{
2696fb12b70Safresh1    my $self = shift;
270b39c5158Smillert    my $count = shift;
271b39c5158Smillert
272b39c5158Smillert    if (! defined($count) ||
273b39c5158Smillert        ! looks_like_number($count) ||
274b39c5158Smillert        (int($count) != $count) ||
2759f11ffb7Safresh1        ($count < 1) ||
2769f11ffb7Safresh1        ($$self{'LIMIT'} && $count > $$self{'LIMIT'}))
277b39c5158Smillert    {
278b39c5158Smillert        require Carp;
279b39c5158Smillert        my ($method) = (caller(1))[3];
2806fb12b70Safresh1        my $class_name = ref($self);
2816fb12b70Safresh1        $method =~ s/$class_name\:://;
282b39c5158Smillert        $count = 'undef' if (! defined($count));
2839f11ffb7Safresh1        if ($$self{'LIMIT'} && $count > $$self{'LIMIT'}) {
2849f11ffb7Safresh1            Carp::croak("'count' argument ($count) to '$method' method exceeds queue size limit ($$self{'LIMIT'})");
2859f11ffb7Safresh1        } else {
286b39c5158Smillert            Carp::croak("Invalid 'count' argument ($count) to '$method' method");
287b39c5158Smillert        }
2889f11ffb7Safresh1    }
289b39c5158Smillert
290b39c5158Smillert    return $count;
291b39c5158Smillert};
292b39c5158Smillert
29391f110e0Safresh1# Check value of the requested timeout
2946fb12b70Safresh1sub _validate_timeout
2956fb12b70Safresh1{
2966fb12b70Safresh1    my $self = shift;
29791f110e0Safresh1    my $timeout = shift;
29891f110e0Safresh1
29991f110e0Safresh1    if (! defined($timeout) ||
30091f110e0Safresh1        ! looks_like_number($timeout))
30191f110e0Safresh1    {
30291f110e0Safresh1        require Carp;
30391f110e0Safresh1        my ($method) = (caller(1))[3];
3046fb12b70Safresh1        my $class_name = ref($self);
3056fb12b70Safresh1        $method =~ s/$class_name\:://;
30691f110e0Safresh1        $timeout = 'undef' if (! defined($timeout));
30791f110e0Safresh1        Carp::croak("Invalid 'timeout' argument ($timeout) to '$method' method");
30891f110e0Safresh1    }
30991f110e0Safresh1
31091f110e0Safresh1    return $timeout;
31191f110e0Safresh1};
31291f110e0Safresh1
313b39c5158Smillert1;
314b39c5158Smillert
315b39c5158Smillert=head1 NAME
316b39c5158Smillert
317b39c5158SmillertThread::Queue - Thread-safe queues
318b39c5158Smillert
319b39c5158Smillert=head1 VERSION
320b39c5158Smillert
321*56d68f1eSafresh1This document describes Thread::Queue version 3.14
322b39c5158Smillert
323b39c5158Smillert=head1 SYNOPSIS
324b39c5158Smillert
325b39c5158Smillert    use strict;
326b39c5158Smillert    use warnings;
327b39c5158Smillert
328b39c5158Smillert    use threads;
329b39c5158Smillert    use Thread::Queue;
330b39c5158Smillert
331b39c5158Smillert    my $q = Thread::Queue->new();    # A new empty queue
332b39c5158Smillert
333b39c5158Smillert    # Worker thread
33491f110e0Safresh1    my $thr = threads->create(
33591f110e0Safresh1        sub {
33691f110e0Safresh1            # Thread will loop until no more work
33791f110e0Safresh1            while (defined(my $item = $q->dequeue())) {
338b39c5158Smillert                # Do work on $item
33991f110e0Safresh1                ...
340b39c5158Smillert            }
34191f110e0Safresh1        }
34291f110e0Safresh1    );
343b39c5158Smillert
344b39c5158Smillert    # Send work to the thread
345b39c5158Smillert    $q->enqueue($item1, ...);
34691f110e0Safresh1    # Signal that there is no more work to be sent
34791f110e0Safresh1    $q->end();
34891f110e0Safresh1    # Join up with the thread when it finishes
34991f110e0Safresh1    $thr->join();
350b39c5158Smillert
35191f110e0Safresh1    ...
352b39c5158Smillert
353b39c5158Smillert    # Count of items in the queue
354b39c5158Smillert    my $left = $q->pending();
355b39c5158Smillert
356b39c5158Smillert    # Non-blocking dequeue
357b39c5158Smillert    if (defined(my $item = $q->dequeue_nb())) {
358b39c5158Smillert        # Work on $item
359b39c5158Smillert    }
360b39c5158Smillert
36191f110e0Safresh1    # Blocking dequeue with 5-second timeout
36291f110e0Safresh1    if (defined(my $item = $q->dequeue_timed(5))) {
36391f110e0Safresh1        # Work on $item
36491f110e0Safresh1    }
36591f110e0Safresh1
366b8851fccSafresh1    # Set a size for a queue
367b8851fccSafresh1    $q->limit = 5;
368b8851fccSafresh1
369b39c5158Smillert    # Get the second item in the queue without dequeuing anything
370b39c5158Smillert    my $item = $q->peek(1);
371b39c5158Smillert
372b39c5158Smillert    # Insert two items into the queue just behind the head
373b39c5158Smillert    $q->insert(1, $item1, $item2);
374b39c5158Smillert
375b39c5158Smillert    # Extract the last two items on the queue
376b39c5158Smillert    my ($item1, $item2) = $q->extract(-2, 2);
377b39c5158Smillert
378b39c5158Smillert=head1 DESCRIPTION
379b39c5158Smillert
380b39c5158SmillertThis module provides thread-safe FIFO queues that can be accessed safely by
381b39c5158Smillertany number of threads.
382b39c5158Smillert
383b39c5158SmillertAny data types supported by L<threads::shared> can be passed via queues:
384b39c5158Smillert
385b39c5158Smillert=over
386b39c5158Smillert
387b39c5158Smillert=item Ordinary scalars
388b39c5158Smillert
389b39c5158Smillert=item Array refs
390b39c5158Smillert
391b39c5158Smillert=item Hash refs
392b39c5158Smillert
393b39c5158Smillert=item Scalar refs
394b39c5158Smillert
395b39c5158Smillert=item Objects based on the above
396b39c5158Smillert
397b39c5158Smillert=back
398b39c5158Smillert
399b39c5158SmillertOrdinary scalars are added to queues as they are.
400b39c5158Smillert
401b39c5158SmillertIf not already thread-shared, the other complex data types will be cloned
402b39c5158Smillert(recursively, if needed, and including any C<bless>ings and read-only
403b39c5158Smillertsettings) into thread-shared structures before being placed onto a queue.
404b39c5158Smillert
405b39c5158SmillertFor example, the following would cause L<Thread::Queue> to create a empty,
406b39c5158Smillertshared array reference via C<&shared([])>, copy the elements 'foo', 'bar'
407b39c5158Smillertand 'baz' from C<@ary> into it, and then place that shared reference onto
408b39c5158Smillertthe queue:
409b39c5158Smillert
410b39c5158Smillert my @ary = qw/foo bar baz/;
411b39c5158Smillert $q->enqueue(\@ary);
412b39c5158Smillert
413b39c5158SmillertHowever, for the following, the items are already shared, so their references
414b39c5158Smillertare added directly to the queue, and no cloning takes place:
415b39c5158Smillert
416b39c5158Smillert my @ary :shared = qw/foo bar baz/;
417b39c5158Smillert $q->enqueue(\@ary);
418b39c5158Smillert
419b39c5158Smillert my $obj = &shared({});
420b39c5158Smillert $$obj{'foo'} = 'bar';
421b39c5158Smillert $$obj{'qux'} = 99;
422b39c5158Smillert bless($obj, 'My::Class');
423b39c5158Smillert $q->enqueue($obj);
424b39c5158Smillert
425b39c5158SmillertSee L</"LIMITATIONS"> for caveats related to passing objects via queues.
426b39c5158Smillert
427b39c5158Smillert=head1 QUEUE CREATION
428b39c5158Smillert
429b39c5158Smillert=over
430b39c5158Smillert
431b39c5158Smillert=item ->new()
432b39c5158Smillert
433b39c5158SmillertCreates a new empty queue.
434b39c5158Smillert
435b39c5158Smillert=item ->new(LIST)
436b39c5158Smillert
437b39c5158SmillertCreates a new queue pre-populated with the provided list of items.
438b39c5158Smillert
439b39c5158Smillert=back
440b39c5158Smillert
441b39c5158Smillert=head1 BASIC METHODS
442b39c5158Smillert
443b39c5158SmillertThe following methods deal with queues on a FIFO basis.
444b39c5158Smillert
445b39c5158Smillert=over
446b39c5158Smillert
447b39c5158Smillert=item ->enqueue(LIST)
448b39c5158Smillert
449b39c5158SmillertAdds a list of items onto the end of the queue.
450b39c5158Smillert
451b39c5158Smillert=item ->dequeue()
452b39c5158Smillert
453b39c5158Smillert=item ->dequeue(COUNT)
454b39c5158Smillert
455b39c5158SmillertRemoves the requested number of items (default is 1) from the head of the
456b39c5158Smillertqueue, and returns them.  If the queue contains fewer than the requested
457b39c5158Smillertnumber of items, then the thread will be blocked until the requisite number
458b8851fccSafresh1of items are available (i.e., until other threads C<enqueue> more items).
459b39c5158Smillert
460b39c5158Smillert=item ->dequeue_nb()
461b39c5158Smillert
462b39c5158Smillert=item ->dequeue_nb(COUNT)
463b39c5158Smillert
464b39c5158SmillertRemoves the requested number of items (default is 1) from the head of the
465b39c5158Smillertqueue, and returns them.  If the queue contains fewer than the requested
466b39c5158Smillertnumber of items, then it immediately (i.e., non-blocking) returns whatever
467b39c5158Smillertitems there are on the queue.  If the queue is empty, then C<undef> is
468b39c5158Smillertreturned.
469b39c5158Smillert
47091f110e0Safresh1=item ->dequeue_timed(TIMEOUT)
47191f110e0Safresh1
47291f110e0Safresh1=item ->dequeue_timed(TIMEOUT, COUNT)
47391f110e0Safresh1
47491f110e0Safresh1Removes the requested number of items (default is 1) from the head of the
47591f110e0Safresh1queue, and returns them.  If the queue contains fewer than the requested
47691f110e0Safresh1number of items, then the thread will be blocked until the requisite number of
47791f110e0Safresh1items are available, or until the timeout is reached.  If the timeout is
47891f110e0Safresh1reached, it returns whatever items there are on the queue, or C<undef> if the
47991f110e0Safresh1queue is empty.
48091f110e0Safresh1
48191f110e0Safresh1The timeout may be a number of seconds relative to the current time (e.g., 5
48291f110e0Safresh1seconds from when the call is made), or may be an absolute timeout in I<epoch>
48391f110e0Safresh1seconds the same as would be used with
48491f110e0Safresh1L<cond_timedwait()|threads::shared/"cond_timedwait VARIABLE, ABS_TIMEOUT">.
48591f110e0Safresh1Fractional seconds (e.g., 2.5 seconds) are also supported (to the extent of
48691f110e0Safresh1the underlying implementation).
48791f110e0Safresh1
4886fb12b70Safresh1If C<TIMEOUT> is missing, C<undef>, or less than or equal to 0, then this call
48991f110e0Safresh1behaves the same as C<dequeue_nb>.
49091f110e0Safresh1
491b39c5158Smillert=item ->pending()
492b39c5158Smillert
49391f110e0Safresh1Returns the number of items still in the queue.  Returns C<undef> if the queue
49491f110e0Safresh1has been ended (see below), and there are no more items in the queue.
49591f110e0Safresh1
496b8851fccSafresh1=item ->limit
497b8851fccSafresh1
498b8851fccSafresh1Sets the size of the queue.  If set, calls to C<enqueue()> will block until
499b8851fccSafresh1the number of pending items in the queue drops below the C<limit>.  The
500b8851fccSafresh1C<limit> does not prevent enqueuing items beyond that count:
501b8851fccSafresh1
502b8851fccSafresh1 my $q = Thread::Queue->new(1, 2);
503b8851fccSafresh1 $q->limit = 4;
504b8851fccSafresh1 $q->enqueue(3, 4, 5);   # Does not block
505b8851fccSafresh1 $q->enqueue(6);         # Blocks until at least 2 items are
506b8851fccSafresh1                         # dequeued
507b8851fccSafresh1 my $size = $q->limit;   # Returns the current limit (may return
508b8851fccSafresh1                         # 'undef')
509b8851fccSafresh1 $q->limit = 0;          # Queue size is now unlimited
510b8851fccSafresh1
5119f11ffb7Safresh1Calling any of the dequeue methods with C<COUNT> greater than a queue's
5129f11ffb7Safresh1C<limit> will generate an error.
5139f11ffb7Safresh1
51491f110e0Safresh1=item ->end()
51591f110e0Safresh1
51691f110e0Safresh1Declares that no more items will be added to the queue.
51791f110e0Safresh1
51891f110e0Safresh1All threads blocking on C<dequeue()> calls will be unblocked with any
51991f110e0Safresh1remaining items in the queue and/or C<undef> being returned.  Any subsequent
52091f110e0Safresh1calls to C<dequeue()> will behave like C<dequeue_nb()>.
52191f110e0Safresh1
52291f110e0Safresh1Once ended, no more items may be placed in the queue.
523b39c5158Smillert
524b39c5158Smillert=back
525b39c5158Smillert
526b39c5158Smillert=head1 ADVANCED METHODS
527b39c5158Smillert
528b39c5158SmillertThe following methods can be used to manipulate items anywhere in a queue.
529b39c5158Smillert
530b39c5158SmillertTo prevent the contents of a queue from being modified by another thread
531b39c5158Smillertwhile it is being examined and/or changed, L<lock|threads::shared/"lock
532b39c5158SmillertVARIABLE"> the queue inside a local block:
533b39c5158Smillert
534b39c5158Smillert {
535b39c5158Smillert     lock($q);   # Keep other threads from changing the queue's contents
536b39c5158Smillert     my $item = $q->peek();
537b39c5158Smillert     if ($item ...) {
538b39c5158Smillert         ...
539b39c5158Smillert     }
540b39c5158Smillert }
541b39c5158Smillert # Queue is now unlocked
542b39c5158Smillert
543b39c5158Smillert=over
544b39c5158Smillert
545b39c5158Smillert=item ->peek()
546b39c5158Smillert
547b39c5158Smillert=item ->peek(INDEX)
548b39c5158Smillert
549b39c5158SmillertReturns an item from the queue without dequeuing anything.  Defaults to the
550*56d68f1eSafresh1head of queue (at index position 0) if no index is specified.  Negative
551b39c5158Smillertindex values are supported as with L<arrays|perldata/"Subscripts"> (i.e., -1
552b39c5158Smillertis the end of the queue, -2 is next to last, and so on).
553b39c5158Smillert
554b39c5158SmillertIf no items exists at the specified index (i.e., the queue is empty, or the
555b39c5158Smillertindex is beyond the number of items on the queue), then C<undef> is returned.
556b39c5158Smillert
557b39c5158SmillertRemember, the returned item is not removed from the queue, so manipulating a
558b39c5158SmillertC<peek>ed at reference affects the item on the queue.
559b39c5158Smillert
560b39c5158Smillert=item ->insert(INDEX, LIST)
561b39c5158Smillert
562b39c5158SmillertAdds the list of items to the queue at the specified index position (0
563b39c5158Smillertis the head of the list).  Any existing items at and beyond that position are
564b39c5158Smillertpushed back past the newly added items:
565b39c5158Smillert
566b39c5158Smillert $q->enqueue(1, 2, 3, 4);
567b39c5158Smillert $q->insert(1, qw/foo bar/);
568b39c5158Smillert # Queue now contains:  1, foo, bar, 2, 3, 4
569b39c5158Smillert
570b39c5158SmillertSpecifying an index position greater than the number of items in the queue
571b39c5158Smillertjust adds the list to the end.
572b39c5158Smillert
573b39c5158SmillertNegative index positions are supported:
574b39c5158Smillert
575b39c5158Smillert $q->enqueue(1, 2, 3, 4);
576b39c5158Smillert $q->insert(-2, qw/foo bar/);
577b39c5158Smillert # Queue now contains:  1, 2, foo, bar, 3, 4
578b39c5158Smillert
579b39c5158SmillertSpecifying a negative index position greater than the number of items in the
580b39c5158Smillertqueue adds the list to the head of the queue.
581b39c5158Smillert
582b39c5158Smillert=item ->extract()
583b39c5158Smillert
584b39c5158Smillert=item ->extract(INDEX)
585b39c5158Smillert
586b39c5158Smillert=item ->extract(INDEX, COUNT)
587b39c5158Smillert
588b39c5158SmillertRemoves and returns the specified number of items (defaults to 1) from the
589b39c5158Smillertspecified index position in the queue (0 is the head of the queue).  When
590b39c5158Smillertcalled with no arguments, C<extract> operates the same as C<dequeue_nb>.
591b39c5158Smillert
592b39c5158SmillertThis method is non-blocking, and will return only as many items as are
593b39c5158Smillertavailable to fulfill the request:
594b39c5158Smillert
595b39c5158Smillert $q->enqueue(1, 2, 3, 4);
596b39c5158Smillert my $item  = $q->extract(2)     # Returns 3
597b39c5158Smillert                                # Queue now contains:  1, 2, 4
598b39c5158Smillert my @items = $q->extract(1, 3)  # Returns (2, 4)
599b39c5158Smillert                                # Queue now contains:  1
600b39c5158Smillert
601b39c5158SmillertSpecifying an index position greater than the number of items in the
602b39c5158Smillertqueue results in C<undef> or an empty list being returned.
603b39c5158Smillert
604b39c5158Smillert $q->enqueue('foo');
605b39c5158Smillert my $nada = $q->extract(3)      # Returns undef
606b39c5158Smillert my @nada = $q->extract(1, 3)   # Returns ()
607b39c5158Smillert
608b39c5158SmillertNegative index positions are supported.  Specifying a negative index position
609b39c5158Smillertgreater than the number of items in the queue may return items from the head
610b39c5158Smillertof the queue (similar to C<dequeue_nb>) if the count overlaps the head of the
611b39c5158Smillertqueue from the specified position (i.e. if queue size + index + count is
612b39c5158Smillertgreater than zero):
613b39c5158Smillert
614b39c5158Smillert $q->enqueue(qw/foo bar baz/);
615b39c5158Smillert my @nada = $q->extract(-6, 2);  # Returns ()      - (3+(-6)+2) <= 0
616b39c5158Smillert my @some = $q->extract(-6, 4);  # Returns (foo)   - (3+(-6)+4) > 0
617b39c5158Smillert                                 # Queue now contains:  bar, baz
618b8851fccSafresh1 my @rest = $q->extract(-3, 4);  # Returns (bar, baz) -
619b8851fccSafresh1                                 #                   (2+(-3)+4) > 0
620b39c5158Smillert
621b39c5158Smillert=back
622b39c5158Smillert
623b39c5158Smillert=head1 NOTES
624b39c5158Smillert
625b39c5158SmillertQueues created by L<Thread::Queue> can be used in both threaded and
626b39c5158Smillertnon-threaded applications.
627b39c5158Smillert
628b39c5158Smillert=head1 LIMITATIONS
629b39c5158Smillert
630b39c5158SmillertPassing objects on queues may not work if the objects' classes do not support
631b39c5158Smillertsharing.  See L<threads::shared/"BUGS AND LIMITATIONS"> for more.
632b39c5158Smillert
633b39c5158SmillertPassing array/hash refs that contain objects may not work for Perl prior to
634b39c5158Smillert5.10.0.
635b39c5158Smillert
636b39c5158Smillert=head1 SEE ALSO
637b39c5158Smillert
6389f11ffb7Safresh1Thread::Queue on MetaCPAN:
6399f11ffb7Safresh1L<https://metacpan.org/release/Thread-Queue>
6409f11ffb7Safresh1
6419f11ffb7Safresh1Code repository for CPAN distribution:
6429f11ffb7Safresh1L<https://github.com/Dual-Life/Thread-Queue>
643b39c5158Smillert
644b39c5158SmillertL<threads>, L<threads::shared>
645b39c5158Smillert
64691f110e0Safresh1Sample code in the I<examples> directory of this distribution on CPAN.
64791f110e0Safresh1
648b39c5158Smillert=head1 MAINTAINER
649b39c5158Smillert
650b39c5158SmillertJerry D. Hedden, S<E<lt>jdhedden AT cpan DOT orgE<gt>>
651b39c5158Smillert
652b39c5158Smillert=head1 LICENSE
653b39c5158Smillert
654b39c5158SmillertThis program is free software; you can redistribute it and/or modify it under
655b39c5158Smillertthe same terms as Perl itself.
656b39c5158Smillert
657b39c5158Smillert=cut
658