1 /* 2 * Copyright (c) Meta Platforms, Inc. and affiliates. 3 * All rights reserved. 4 * 5 * This source code is licensed under both the BSD-style license (found in the 6 * LICENSE file in the root directory of this source tree) and the GPLv2 (found 7 * in the COPYING file in the root directory of this source tree). 8 */ 9 #include "utils/Buffer.h" 10 #include "utils/WorkQueue.h" 11 12 #include <gtest/gtest.h> 13 #include <iostream> 14 #include <memory> 15 #include <mutex> 16 #include <thread> 17 #include <vector> 18 19 using namespace pzstd; 20 21 namespace { 22 struct Popper { 23 WorkQueue<int>* queue; 24 int* results; 25 std::mutex* mutex; 26 27 void operator()() { 28 int result; 29 while (queue->pop(result)) { 30 std::lock_guard<std::mutex> lock(*mutex); 31 results[result] = result; 32 } 33 } 34 }; 35 } 36 37 TEST(WorkQueue, SingleThreaded) { 38 WorkQueue<int> queue; 39 int result; 40 41 queue.push(5); 42 EXPECT_TRUE(queue.pop(result)); 43 EXPECT_EQ(5, result); 44 45 queue.push(1); 46 queue.push(2); 47 EXPECT_TRUE(queue.pop(result)); 48 EXPECT_EQ(1, result); 49 EXPECT_TRUE(queue.pop(result)); 50 EXPECT_EQ(2, result); 51 52 queue.push(1); 53 queue.push(2); 54 queue.finish(); 55 EXPECT_TRUE(queue.pop(result)); 56 EXPECT_EQ(1, result); 57 EXPECT_TRUE(queue.pop(result)); 58 EXPECT_EQ(2, result); 59 EXPECT_FALSE(queue.pop(result)); 60 61 queue.waitUntilFinished(); 62 } 63 64 TEST(WorkQueue, SPSC) { 65 WorkQueue<int> queue; 66 const int max = 100; 67 68 for (int i = 0; i < 10; ++i) { 69 queue.push(int{i}); 70 } 71 72 std::thread thread([ &queue, max ] { 73 int result; 74 for (int i = 0;; ++i) { 75 if (!queue.pop(result)) { 76 EXPECT_EQ(i, max); 77 break; 78 } 79 EXPECT_EQ(i, result); 80 } 81 }); 82 83 std::this_thread::yield(); 84 for (int i = 10; i < max; ++i) { 85 queue.push(int{i}); 86 } 87 queue.finish(); 88 89 thread.join(); 90 } 91 92 TEST(WorkQueue, SPMC) { 93 WorkQueue<int> queue; 94 std::vector<int> results(50, -1); 95 std::mutex mutex; 96 std::vector<std::thread> threads; 97 for (int i = 0; i < 5; ++i) { 98 threads.emplace_back(Popper{&queue, results.data(), &mutex}); 99 } 100 101 for (int i = 0; i < 50; ++i) { 102 queue.push(int{i}); 103 } 104 queue.finish(); 105 106 for (auto& thread : threads) { 107 thread.join(); 108 } 109 110 for (int i = 0; i < 50; ++i) { 111 EXPECT_EQ(i, results[i]); 112 } 113 } 114 115 TEST(WorkQueue, MPMC) { 116 WorkQueue<int> queue; 117 std::vector<int> results(100, -1); 118 std::mutex mutex; 119 std::vector<std::thread> popperThreads; 120 for (int i = 0; i < 4; ++i) { 121 popperThreads.emplace_back(Popper{&queue, results.data(), &mutex}); 122 } 123 124 std::vector<std::thread> pusherThreads; 125 for (int i = 0; i < 2; ++i) { 126 auto min = i * 50; 127 auto max = (i + 1) * 50; 128 pusherThreads.emplace_back( 129 [ &queue, min, max ] { 130 for (int i = min; i < max; ++i) { 131 queue.push(int{i}); 132 } 133 }); 134 } 135 136 for (auto& thread : pusherThreads) { 137 thread.join(); 138 } 139 queue.finish(); 140 141 for (auto& thread : popperThreads) { 142 thread.join(); 143 } 144 145 for (int i = 0; i < 100; ++i) { 146 EXPECT_EQ(i, results[i]); 147 } 148 } 149 150 TEST(WorkQueue, BoundedSizeWorks) { 151 WorkQueue<int> queue(1); 152 int result; 153 queue.push(5); 154 queue.pop(result); 155 queue.push(5); 156 queue.pop(result); 157 queue.push(5); 158 queue.finish(); 159 queue.pop(result); 160 EXPECT_EQ(5, result); 161 } 162 163 TEST(WorkQueue, BoundedSizePushAfterFinish) { 164 WorkQueue<int> queue(1); 165 int result; 166 queue.push(5); 167 std::thread pusher([&queue] { 168 queue.push(6); 169 }); 170 // Dirtily try and make sure that pusher has run. 171 std::this_thread::sleep_for(std::chrono::seconds(1)); 172 queue.finish(); 173 EXPECT_TRUE(queue.pop(result)); 174 EXPECT_EQ(5, result); 175 EXPECT_FALSE(queue.pop(result)); 176 177 pusher.join(); 178 } 179 180 TEST(WorkQueue, SetMaxSize) { 181 WorkQueue<int> queue(2); 182 int result; 183 queue.push(5); 184 queue.push(6); 185 queue.setMaxSize(1); 186 std::thread pusher([&queue] { 187 queue.push(7); 188 }); 189 // Dirtily try and make sure that pusher has run. 190 std::this_thread::sleep_for(std::chrono::seconds(1)); 191 queue.finish(); 192 EXPECT_TRUE(queue.pop(result)); 193 EXPECT_EQ(5, result); 194 EXPECT_TRUE(queue.pop(result)); 195 EXPECT_EQ(6, result); 196 EXPECT_FALSE(queue.pop(result)); 197 198 pusher.join(); 199 } 200 201 TEST(WorkQueue, BoundedSizeMPMC) { 202 WorkQueue<int> queue(10); 203 std::vector<int> results(200, -1); 204 std::mutex mutex; 205 std::cerr << "Creating popperThreads" << std::endl; 206 std::vector<std::thread> popperThreads; 207 for (int i = 0; i < 4; ++i) { 208 popperThreads.emplace_back(Popper{&queue, results.data(), &mutex}); 209 } 210 211 std::cerr << "Creating pusherThreads" << std::endl; 212 std::vector<std::thread> pusherThreads; 213 for (int i = 0; i < 2; ++i) { 214 auto min = i * 100; 215 auto max = (i + 1) * 100; 216 pusherThreads.emplace_back( 217 [ &queue, min, max ] { 218 for (int i = min; i < max; ++i) { 219 queue.push(int{i}); 220 } 221 }); 222 } 223 224 std::cerr << "Joining pusherThreads" << std::endl; 225 for (auto& thread : pusherThreads) { 226 thread.join(); 227 } 228 std::cerr << "Finishing queue" << std::endl; 229 queue.finish(); 230 231 std::cerr << "Joining popperThreads" << std::endl; 232 for (auto& thread : popperThreads) { 233 thread.join(); 234 } 235 236 std::cerr << "Inspecting results" << std::endl; 237 for (int i = 0; i < 200; ++i) { 238 EXPECT_EQ(i, results[i]); 239 } 240 } 241 242 TEST(WorkQueue, FailedPush) { 243 WorkQueue<std::unique_ptr<int>> queue; 244 std::unique_ptr<int> x(new int{5}); 245 EXPECT_TRUE(queue.push(std::move(x))); 246 EXPECT_EQ(nullptr, x); 247 queue.finish(); 248 x.reset(new int{6}); 249 EXPECT_FALSE(queue.push(std::move(x))); 250 EXPECT_NE(nullptr, x); 251 EXPECT_EQ(6, *x); 252 } 253 254 TEST(BufferWorkQueue, SizeCalculatedCorrectly) { 255 { 256 BufferWorkQueue queue; 257 queue.finish(); 258 EXPECT_EQ(0, queue.size()); 259 } 260 { 261 BufferWorkQueue queue; 262 queue.push(Buffer(10)); 263 queue.finish(); 264 EXPECT_EQ(10, queue.size()); 265 } 266 { 267 BufferWorkQueue queue; 268 queue.push(Buffer(10)); 269 queue.push(Buffer(5)); 270 queue.finish(); 271 EXPECT_EQ(15, queue.size()); 272 } 273 { 274 BufferWorkQueue queue; 275 queue.push(Buffer(10)); 276 queue.push(Buffer(5)); 277 queue.finish(); 278 Buffer buffer; 279 queue.pop(buffer); 280 EXPECT_EQ(5, queue.size()); 281 } 282 } 283