xref: /openbsd-src/gnu/usr.bin/perl/dist/Thread-Queue/lib/Thread/Queue.pm (revision ac9b4aacc1da35008afea06a5d23c2f2dea9b93e)
1package Thread::Queue;
2
3use strict;
4use warnings;
5
6our $VERSION = '2.11';
7
8use threads::shared 1.21;
9use Scalar::Util 1.10 qw(looks_like_number blessed reftype refaddr);
10
11# Carp errors from threads::shared calls should complain about caller
12our @CARP_NOT = ("threads::shared");
13
14# Predeclarations for internal functions
15my ($validate_count, $validate_index);
16
17# Create a new queue possibly pre-populated with items
18sub new
19{
20    my $class = shift;
21    my @queue :shared = map { shared_clone($_) } @_;
22    return bless(\@queue, $class);
23}
24
25# Add items to the tail of a queue
26sub enqueue
27{
28    my $queue = shift;
29    lock(@$queue);
30    push(@$queue, map { shared_clone($_) } @_)
31        and cond_signal(@$queue);
32}
33
34# Return a count of the number of items on a queue
35sub pending
36{
37    my $queue = shift;
38    lock(@$queue);
39    return scalar(@$queue);
40}
41
42# Return 1 or more items from the head of a queue, blocking if needed
43sub dequeue
44{
45    my $queue = shift;
46    lock(@$queue);
47
48    my $count = @_ ? $validate_count->(shift) : 1;
49
50    # Wait for requisite number of items
51    cond_wait(@$queue) until (@$queue >= $count);
52    cond_signal(@$queue) if (@$queue > $count);
53
54    # Return single item
55    return shift(@$queue) if ($count == 1);
56
57    # Return multiple items
58    my @items;
59    push(@items, shift(@$queue)) for (1..$count);
60    return @items;
61}
62
63# Return items from the head of a queue with no blocking
64sub dequeue_nb
65{
66    my $queue = shift;
67    lock(@$queue);
68
69    my $count = @_ ? $validate_count->(shift) : 1;
70
71    # Return single item
72    return shift(@$queue) if ($count == 1);
73
74    # Return multiple items
75    my @items;
76    for (1..$count) {
77        last if (! @$queue);
78        push(@items, shift(@$queue));
79    }
80    return @items;
81}
82
83# Return an item without removing it from a queue
84sub peek
85{
86    my $queue = shift;
87    lock(@$queue);
88    my $index = @_ ? $validate_index->(shift) : 0;
89    return $$queue[$index];
90}
91
92# Insert items anywhere into a queue
93sub insert
94{
95    my $queue = shift;
96    lock(@$queue);
97
98    my $index = $validate_index->(shift);
99
100    return if (! @_);   # Nothing to insert
101
102    # Support negative indices
103    if ($index < 0) {
104        $index += @$queue;
105        if ($index < 0) {
106            $index = 0;
107        }
108    }
109
110    # Dequeue items from $index onward
111    my @tmp;
112    while (@$queue > $index) {
113        unshift(@tmp, pop(@$queue))
114    }
115
116    # Add new items to the queue
117    push(@$queue, map { shared_clone($_) } @_);
118
119    # Add previous items back onto the queue
120    push(@$queue, @tmp);
121
122    # Soup's up
123    cond_signal(@$queue);
124}
125
126# Remove items from anywhere in a queue
127sub extract
128{
129    my $queue = shift;
130    lock(@$queue);
131
132    my $index = @_ ? $validate_index->(shift) : 0;
133    my $count = @_ ? $validate_count->(shift) : 1;
134
135    # Support negative indices
136    if ($index < 0) {
137        $index += @$queue;
138        if ($index < 0) {
139            $count += $index;
140            return if ($count <= 0);            # Beyond the head of the queue
141            return $queue->dequeue_nb($count);  # Extract from the head
142        }
143    }
144
145    # Dequeue items from $index+$count onward
146    my @tmp;
147    while (@$queue > ($index+$count)) {
148        unshift(@tmp, pop(@$queue))
149    }
150
151    # Extract desired items
152    my @items;
153    unshift(@items, pop(@$queue)) while (@$queue > $index);
154
155    # Add back any removed items
156    push(@$queue, @tmp);
157
158    # Return single item
159    return $items[0] if ($count == 1);
160
161    # Return multiple items
162    return @items;
163}
164
165### Internal Functions ###
166
167# Check value of the requested index
168$validate_index = sub {
169    my $index = shift;
170
171    if (! defined($index) ||
172        ! looks_like_number($index) ||
173        (int($index) != $index))
174    {
175        require Carp;
176        my ($method) = (caller(1))[3];
177        $method =~ s/Thread::Queue:://;
178        $index = 'undef' if (! defined($index));
179        Carp::croak("Invalid 'index' argument ($index) to '$method' method");
180    }
181
182    return $index;
183};
184
185# Check value of the requested count
186$validate_count = sub {
187    my $count = shift;
188
189    if (! defined($count) ||
190        ! looks_like_number($count) ||
191        (int($count) != $count) ||
192        ($count < 1))
193    {
194        require Carp;
195        my ($method) = (caller(1))[3];
196        $method =~ s/Thread::Queue:://;
197        $count = 'undef' if (! defined($count));
198        Carp::croak("Invalid 'count' argument ($count) to '$method' method");
199    }
200
201    return $count;
202};
203
2041;
205
206=head1 NAME
207
208Thread::Queue - Thread-safe queues
209
210=head1 VERSION
211
212This document describes Thread::Queue version 2.11
213
214=head1 SYNOPSIS
215
216    use strict;
217    use warnings;
218
219    use threads;
220    use Thread::Queue;
221
222    my $q = Thread::Queue->new();    # A new empty queue
223
224    # Worker thread
225    my $thr = threads->create(sub {
226                                while (my $item = $q->dequeue()) {
227                                    # Do work on $item
228                                }
229                             })->detach();
230
231    # Send work to the thread
232    $q->enqueue($item1, ...);
233
234
235    # Count of items in the queue
236    my $left = $q->pending();
237
238    # Non-blocking dequeue
239    if (defined(my $item = $q->dequeue_nb())) {
240        # Work on $item
241    }
242
243    # Get the second item in the queue without dequeuing anything
244    my $item = $q->peek(1);
245
246    # Insert two items into the queue just behind the head
247    $q->insert(1, $item1, $item2);
248
249    # Extract the last two items on the queue
250    my ($item1, $item2) = $q->extract(-2, 2);
251
252=head1 DESCRIPTION
253
254This module provides thread-safe FIFO queues that can be accessed safely by
255any number of threads.
256
257Any data types supported by L<threads::shared> can be passed via queues:
258
259=over
260
261=item Ordinary scalars
262
263=item Array refs
264
265=item Hash refs
266
267=item Scalar refs
268
269=item Objects based on the above
270
271=back
272
273Ordinary scalars are added to queues as they are.
274
275If not already thread-shared, the other complex data types will be cloned
276(recursively, if needed, and including any C<bless>ings and read-only
277settings) into thread-shared structures before being placed onto a queue.
278
279For example, the following would cause L<Thread::Queue> to create a empty,
280shared array reference via C<&shared([])>, copy the elements 'foo', 'bar'
281and 'baz' from C<@ary> into it, and then place that shared reference onto
282the queue:
283
284    my @ary = qw/foo bar baz/;
285    $q->enqueue(\@ary);
286
287However, for the following, the items are already shared, so their references
288are added directly to the queue, and no cloning takes place:
289
290    my @ary :shared = qw/foo bar baz/;
291    $q->enqueue(\@ary);
292
293    my $obj = &shared({});
294    $$obj{'foo'} = 'bar';
295    $$obj{'qux'} = 99;
296    bless($obj, 'My::Class');
297    $q->enqueue($obj);
298
299See L</"LIMITATIONS"> for caveats related to passing objects via queues.
300
301=head1 QUEUE CREATION
302
303=over
304
305=item ->new()
306
307Creates a new empty queue.
308
309=item ->new(LIST)
310
311Creates a new queue pre-populated with the provided list of items.
312
313=back
314
315=head1 BASIC METHODS
316
317The following methods deal with queues on a FIFO basis.
318
319=over
320
321=item ->enqueue(LIST)
322
323Adds a list of items onto the end of the queue.
324
325=item ->dequeue()
326
327=item ->dequeue(COUNT)
328
329Removes the requested number of items (default is 1) from the head of the
330queue, and returns them.  If the queue contains fewer than the requested
331number of items, then the thread will be blocked until the requisite number
332of items are available (i.e., until other threads <enqueue> more items).
333
334=item ->dequeue_nb()
335
336=item ->dequeue_nb(COUNT)
337
338Removes the requested number of items (default is 1) from the head of the
339queue, and returns them.  If the queue contains fewer than the requested
340number of items, then it immediately (i.e., non-blocking) returns whatever
341items there are on the queue.  If the queue is empty, then C<undef> is
342returned.
343
344=item ->pending()
345
346Returns the number of items still in the queue.
347
348=back
349
350=head1 ADVANCED METHODS
351
352The following methods can be used to manipulate items anywhere in a queue.
353
354To prevent the contents of a queue from being modified by another thread
355while it is being examined and/or changed, L<lock|threads::shared/"lock
356VARIABLE"> the queue inside a local block:
357
358    {
359        lock($q);   # Keep other threads from changing the queue's contents
360        my $item = $q->peek();
361        if ($item ...) {
362            ...
363        }
364    }
365    # Queue is now unlocked
366
367=over
368
369=item ->peek()
370
371=item ->peek(INDEX)
372
373Returns an item from the queue without dequeuing anything.  Defaults to the
374the head of queue (at index position 0) if no index is specified.  Negative
375index values are supported as with L<arrays|perldata/"Subscripts"> (i.e., -1
376is the end of the queue, -2 is next to last, and so on).
377
378If no items exists at the specified index (i.e., the queue is empty, or the
379index is beyond the number of items on the queue), then C<undef> is returned.
380
381Remember, the returned item is not removed from the queue, so manipulating a
382C<peek>ed at reference affects the item on the queue.
383
384=item ->insert(INDEX, LIST)
385
386Adds the list of items to the queue at the specified index position (0
387is the head of the list).  Any existing items at and beyond that position are
388pushed back past the newly added items:
389
390    $q->enqueue(1, 2, 3, 4);
391    $q->insert(1, qw/foo bar/);
392    # Queue now contains:  1, foo, bar, 2, 3, 4
393
394Specifying an index position greater than the number of items in the queue
395just adds the list to the end.
396
397Negative index positions are supported:
398
399    $q->enqueue(1, 2, 3, 4);
400    $q->insert(-2, qw/foo bar/);
401    # Queue now contains:  1, 2, foo, bar, 3, 4
402
403Specifying a negative index position greater than the number of items in the
404queue adds the list to the head of the queue.
405
406=item ->extract()
407
408=item ->extract(INDEX)
409
410=item ->extract(INDEX, COUNT)
411
412Removes and returns the specified number of items (defaults to 1) from the
413specified index position in the queue (0 is the head of the queue).  When
414called with no arguments, C<extract> operates the same as C<dequeue_nb>.
415
416This method is non-blocking, and will return only as many items as are
417available to fulfill the request:
418
419    $q->enqueue(1, 2, 3, 4);
420    my $item  = $q->extract(2)     # Returns 3
421                                   # Queue now contains:  1, 2, 4
422    my @items = $q->extract(1, 3)  # Returns (2, 4)
423                                   # Queue now contains:  1
424
425Specifying an index position greater than the number of items in the
426queue results in C<undef> or an empty list being returned.
427
428    $q->enqueue('foo');
429    my $nada = $q->extract(3)      # Returns undef
430    my @nada = $q->extract(1, 3)   # Returns ()
431
432Negative index positions are supported.  Specifying a negative index position
433greater than the number of items in the queue may return items from the head
434of the queue (similar to C<dequeue_nb>) if the count overlaps the head of the
435queue from the specified position (i.e. if queue size + index + count is
436greater than zero):
437
438    $q->enqueue(qw/foo bar baz/);
439    my @nada = $q->extract(-6, 2);   # Returns ()         - (3+(-6)+2) <= 0
440    my @some = $q->extract(-6, 4);   # Returns (foo)      - (3+(-6)+4) > 0
441                                     # Queue now contains:  bar, baz
442    my @rest = $q->extract(-3, 4);   # Returns (bar, baz) - (2+(-3)+4) > 0
443
444=back
445
446=head1 NOTES
447
448Queues created by L<Thread::Queue> can be used in both threaded and
449non-threaded applications.
450
451=head1 LIMITATIONS
452
453Passing objects on queues may not work if the objects' classes do not support
454sharing.  See L<threads::shared/"BUGS AND LIMITATIONS"> for more.
455
456Passing array/hash refs that contain objects may not work for Perl prior to
4575.10.0.
458
459=head1 SEE ALSO
460
461Thread::Queue Discussion Forum on CPAN:
462L<http://www.cpanforum.com/dist/Thread-Queue>
463
464Annotated POD for Thread::Queue:
465L<http://annocpan.org/~JDHEDDEN/Thread-Queue-2.11/lib/Thread/Queue.pm>
466
467Source repository:
468L<http://code.google.com/p/thread-queue/>
469
470L<threads>, L<threads::shared>
471
472=head1 MAINTAINER
473
474Jerry D. Hedden, S<E<lt>jdhedden AT cpan DOT orgE<gt>>
475
476=head1 LICENSE
477
478This program is free software; you can redistribute it and/or modify it under
479the same terms as Perl itself.
480
481=cut
482