xref: /openbsd-src/gnu/usr.bin/perl/dist/Thread-Queue/t/09_ended.t (revision 91f110e064cd7c194e59e019b83bb7496c1c84d4)
1*91f110e0Safresh1use strict;
2*91f110e0Safresh1use warnings;
3*91f110e0Safresh1
4*91f110e0Safresh1use Config;
5*91f110e0Safresh1
6*91f110e0Safresh1BEGIN {
7*91f110e0Safresh1    if (! $Config{'useithreads'}) {
8*91f110e0Safresh1        print("1..0 # SKIP Perl not compiled with 'useithreads'\n");
9*91f110e0Safresh1        exit(0);
10*91f110e0Safresh1    }
11*91f110e0Safresh1    if (! $Config{'d_select'}) {
12*91f110e0Safresh1        print("1..0 # SKIP 'select()' not available for testing\n");
13*91f110e0Safresh1        exit(0);
14*91f110e0Safresh1    }
15*91f110e0Safresh1}
16*91f110e0Safresh1
17*91f110e0Safresh1use threads;
18*91f110e0Safresh1use Thread::Queue;
19*91f110e0Safresh1
20*91f110e0Safresh1use Test::More;
21*91f110e0Safresh1
22*91f110e0Safresh1my $num_threads = 3;
23*91f110e0Safresh1my $cycles = 2;
24*91f110e0Safresh1my $count = 2;
25*91f110e0Safresh1plan tests => 3*$num_threads*$cycles*$count + 6*$num_threads + 6;
26*91f110e0Safresh1
27*91f110e0Safresh1# Test for end() while threads are blocked and no more items in queue
28*91f110e0Safresh1{
29*91f110e0Safresh1    my @items = 1..($num_threads*$cycles*$count);
30*91f110e0Safresh1    my $q = Thread::Queue->new(@items);
31*91f110e0Safresh1    my $r = Thread::Queue->new();
32*91f110e0Safresh1
33*91f110e0Safresh1    my @threads;
34*91f110e0Safresh1    for my $ii (1..$num_threads) {
35*91f110e0Safresh1        push @threads, threads->create( sub {
36*91f110e0Safresh1            # Thread will loop until no more work is coming
37*91f110e0Safresh1            LOOP:
38*91f110e0Safresh1            while (my @set = $q->dequeue($count)) {
39*91f110e0Safresh1                foreach my $item (@set) {
40*91f110e0Safresh1                    last LOOP if (! defined($item));
41*91f110e0Safresh1                    pass("'$item' read from queue in thread $ii");
42*91f110e0Safresh1                }
43*91f110e0Safresh1                select(undef, undef, undef, rand(1));
44*91f110e0Safresh1                $r->enqueue($ii);
45*91f110e0Safresh1            }
46*91f110e0Safresh1            pass("Thread $ii exiting");
47*91f110e0Safresh1        });
48*91f110e0Safresh1    }
49*91f110e0Safresh1
50*91f110e0Safresh1    # Make sure there's nothing in the queue and threads are blocking
51*91f110e0Safresh1    for my $ii (1..($num_threads*$cycles)) {
52*91f110e0Safresh1        $r->dequeue();
53*91f110e0Safresh1    }
54*91f110e0Safresh1    sleep(1);
55*91f110e0Safresh1    threads->yield();
56*91f110e0Safresh1
57*91f110e0Safresh1    is($q->pending(), 0, 'Queue is empty');
58*91f110e0Safresh1
59*91f110e0Safresh1    # Signal no more work is coming
60*91f110e0Safresh1    $q->end();
61*91f110e0Safresh1
62*91f110e0Safresh1    is($q->pending(), undef, 'Queue is ended');
63*91f110e0Safresh1
64*91f110e0Safresh1    for my $thread (@threads) {
65*91f110e0Safresh1        $thread->join;
66*91f110e0Safresh1        pass($thread->tid." joined");
67*91f110e0Safresh1    }
68*91f110e0Safresh1}
69*91f110e0Safresh1
70*91f110e0Safresh1# Test for end() while threads are blocked and items still remain in queue
71*91f110e0Safresh1{
72*91f110e0Safresh1    my @items = 1..($num_threads*$cycles*$count + 1);
73*91f110e0Safresh1    my $q = Thread::Queue->new(@items);
74*91f110e0Safresh1    my $r = Thread::Queue->new();
75*91f110e0Safresh1
76*91f110e0Safresh1    my @threads;
77*91f110e0Safresh1    for my $ii (1..$num_threads) {
78*91f110e0Safresh1        push @threads, threads->create( sub {
79*91f110e0Safresh1            # Thread will loop until no more work is coming
80*91f110e0Safresh1            LOOP:
81*91f110e0Safresh1            while (my @set = $q->dequeue($count)) {
82*91f110e0Safresh1                foreach my $item (@set) {
83*91f110e0Safresh1                    last LOOP if (! defined($item));
84*91f110e0Safresh1                    pass("'$item' read from queue in thread $ii");
85*91f110e0Safresh1                }
86*91f110e0Safresh1                select(undef, undef, undef, rand(1));
87*91f110e0Safresh1                $r->enqueue($ii);
88*91f110e0Safresh1            }
89*91f110e0Safresh1            pass("Thread $ii exiting");
90*91f110e0Safresh1        });
91*91f110e0Safresh1    }
92*91f110e0Safresh1
93*91f110e0Safresh1    # Make sure there's nothing in the queue and threads are blocking
94*91f110e0Safresh1    for my $ii (1..($num_threads*$cycles)) {
95*91f110e0Safresh1        $r->dequeue();
96*91f110e0Safresh1    }
97*91f110e0Safresh1    sleep(1);
98*91f110e0Safresh1    threads->yield();
99*91f110e0Safresh1
100*91f110e0Safresh1    is($q->pending(), 1, 'Queue has one left');
101*91f110e0Safresh1
102*91f110e0Safresh1    # Signal no more work is coming
103*91f110e0Safresh1    $q->end();
104*91f110e0Safresh1
105*91f110e0Safresh1    for my $thread (@threads) {
106*91f110e0Safresh1        $thread->join;
107*91f110e0Safresh1        pass($thread->tid." joined");
108*91f110e0Safresh1    }
109*91f110e0Safresh1
110*91f110e0Safresh1    is($q->pending(), undef, 'Queue is ended');
111*91f110e0Safresh1}
112*91f110e0Safresh1
113*91f110e0Safresh1# Test of end() send while items in queue
114*91f110e0Safresh1{
115*91f110e0Safresh1    my @items = 1..($num_threads*$cycles*$count + 1);
116*91f110e0Safresh1    my $q = Thread::Queue->new(@items);
117*91f110e0Safresh1
118*91f110e0Safresh1    my @threads;
119*91f110e0Safresh1    for my $ii (1..$num_threads) {
120*91f110e0Safresh1        push @threads, threads->create( sub {
121*91f110e0Safresh1            # Thread will loop until no more work is coming
122*91f110e0Safresh1            LOOP:
123*91f110e0Safresh1            while (my @set = $q->dequeue($count)) {
124*91f110e0Safresh1                foreach my $item (@set) {
125*91f110e0Safresh1                    last LOOP if (! defined($item));
126*91f110e0Safresh1                    pass("'$item' read from queue in thread $ii");
127*91f110e0Safresh1                }
128*91f110e0Safresh1                select(undef, undef, undef, rand(1));
129*91f110e0Safresh1            }
130*91f110e0Safresh1            pass("Thread $ii exiting");
131*91f110e0Safresh1        });
132*91f110e0Safresh1    }
133*91f110e0Safresh1
134*91f110e0Safresh1    # Signal no more work is coming to the blocked threads, they
135*91f110e0Safresh1    # should unblock.
136*91f110e0Safresh1    $q->end();
137*91f110e0Safresh1
138*91f110e0Safresh1    for my $thread (@threads) {
139*91f110e0Safresh1        $thread->join;
140*91f110e0Safresh1        pass($thread->tid." joined");
141*91f110e0Safresh1    }
142*91f110e0Safresh1}
143*91f110e0Safresh1
144*91f110e0Safresh1exit(0);
145*91f110e0Safresh1
146*91f110e0Safresh1# EOF
147