1 //===---- Reduction.cpp - OpenMP device reduction implementation - C++ -*-===// 2 // 3 // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions. 4 // See https://llvm.org/LICENSE.txt for license information. 5 // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception 6 // 7 //===----------------------------------------------------------------------===// 8 // 9 // This file contains the implementation of reduction with KMPC interface. 10 // 11 //===----------------------------------------------------------------------===// 12 13 #include "Debug.h" 14 #include "DeviceTypes.h" 15 #include "DeviceUtils.h" 16 #include "Interface.h" 17 #include "Mapping.h" 18 #include "State.h" 19 #include "Synchronization.h" 20 21 using namespace ompx; 22 23 namespace { 24 25 #pragma omp begin declare target device_type(nohost) 26 27 void gpu_regular_warp_reduce(void *reduce_data, ShuffleReductFnTy shflFct) { 28 for (uint32_t mask = mapping::getWarpSize() / 2; mask > 0; mask /= 2) { 29 shflFct(reduce_data, /*LaneId - not used= */ 0, 30 /*Offset = */ mask, /*AlgoVersion=*/0); 31 } 32 } 33 34 void gpu_irregular_warp_reduce(void *reduce_data, ShuffleReductFnTy shflFct, 35 uint32_t size, uint32_t tid) { 36 uint32_t curr_size; 37 uint32_t mask; 38 curr_size = size; 39 mask = curr_size / 2; 40 while (mask > 0) { 41 shflFct(reduce_data, /*LaneId = */ tid, /*Offset=*/mask, /*AlgoVersion=*/1); 42 curr_size = (curr_size + 1) / 2; 43 mask = curr_size / 2; 44 } 45 } 46 47 static uint32_t gpu_irregular_simd_reduce(void *reduce_data, 48 ShuffleReductFnTy shflFct) { 49 uint32_t size, remote_id, physical_lane_id; 50 physical_lane_id = mapping::getThreadIdInBlock() % mapping::getWarpSize(); 51 __kmpc_impl_lanemask_t lanemask_lt = mapping::lanemaskLT(); 52 __kmpc_impl_lanemask_t Liveness = mapping::activemask(); 53 uint32_t logical_lane_id = utils::popc(Liveness & lanemask_lt) * 2; 54 __kmpc_impl_lanemask_t lanemask_gt = mapping::lanemaskGT(); 55 do { 56 Liveness = mapping::activemask(); 57 remote_id = utils::ffs(Liveness & lanemask_gt); 58 size = utils::popc(Liveness); 59 logical_lane_id /= 2; 60 shflFct(reduce_data, /*LaneId =*/logical_lane_id, 61 /*Offset=*/remote_id - 1 - physical_lane_id, /*AlgoVersion=*/2); 62 } while (logical_lane_id % 2 == 0 && size > 1); 63 return (logical_lane_id == 0); 64 } 65 66 static int32_t nvptx_parallel_reduce_nowait(void *reduce_data, 67 ShuffleReductFnTy shflFct, 68 InterWarpCopyFnTy cpyFct) { 69 uint32_t BlockThreadId = mapping::getThreadIdInBlock(); 70 if (mapping::isMainThreadInGenericMode(/*IsSPMD=*/false)) 71 BlockThreadId = 0; 72 uint32_t NumThreads = omp_get_num_threads(); 73 if (NumThreads == 1) 74 return 1; 75 76 // 77 // This reduce function handles reduction within a team. It handles 78 // parallel regions in both L1 and L2 parallelism levels. It also 79 // supports Generic, SPMD, and NoOMP modes. 80 // 81 // 1. Reduce within a warp. 82 // 2. Warp master copies value to warp 0 via shared memory. 83 // 3. Warp 0 reduces to a single value. 84 // 4. The reduced value is available in the thread that returns 1. 85 // 86 87 #if __has_builtin(__nvvm_reflect) 88 if (__nvvm_reflect("__CUDA_ARCH") >= 700) { 89 uint32_t WarpsNeeded = 90 (NumThreads + mapping::getWarpSize() - 1) / mapping::getWarpSize(); 91 uint32_t WarpId = mapping::getWarpIdInBlock(); 92 93 // Volta execution model: 94 // For the Generic execution mode a parallel region either has 1 thread and 95 // beyond that, always a multiple of 32. For the SPMD execution mode we may 96 // have any number of threads. 97 if ((NumThreads % mapping::getWarpSize() == 0) || 98 (WarpId < WarpsNeeded - 1)) 99 gpu_regular_warp_reduce(reduce_data, shflFct); 100 else if (NumThreads > 1) // Only SPMD execution mode comes thru this case. 101 gpu_irregular_warp_reduce( 102 reduce_data, shflFct, 103 /*LaneCount=*/NumThreads % mapping::getWarpSize(), 104 /*LaneId=*/mapping::getThreadIdInBlock() % mapping::getWarpSize()); 105 106 // When we have more than [mapping::getWarpSize()] number of threads 107 // a block reduction is performed here. 108 // 109 // Only L1 parallel region can enter this if condition. 110 if (NumThreads > mapping::getWarpSize()) { 111 // Gather all the reduced values from each warp 112 // to the first warp. 113 cpyFct(reduce_data, WarpsNeeded); 114 115 if (WarpId == 0) 116 gpu_irregular_warp_reduce(reduce_data, shflFct, WarpsNeeded, 117 BlockThreadId); 118 } 119 return BlockThreadId == 0; 120 } 121 #endif 122 __kmpc_impl_lanemask_t Liveness = mapping::activemask(); 123 if (Liveness == lanes::All) // Full warp 124 gpu_regular_warp_reduce(reduce_data, shflFct); 125 else if (!(Liveness & (Liveness + 1))) // Partial warp but contiguous lanes 126 gpu_irregular_warp_reduce(reduce_data, shflFct, 127 /*LaneCount=*/utils::popc(Liveness), 128 /*LaneId=*/mapping::getThreadIdInBlock() % 129 mapping::getWarpSize()); 130 else { // Dispersed lanes. Only threads in L2 131 // parallel region may enter here; return 132 // early. 133 return gpu_irregular_simd_reduce(reduce_data, shflFct); 134 } 135 136 // When we have more than [mapping::getWarpSize()] number of threads 137 // a block reduction is performed here. 138 // 139 // Only L1 parallel region can enter this if condition. 140 if (NumThreads > mapping::getWarpSize()) { 141 uint32_t WarpsNeeded = 142 (NumThreads + mapping::getWarpSize() - 1) / mapping::getWarpSize(); 143 // Gather all the reduced values from each warp 144 // to the first warp. 145 cpyFct(reduce_data, WarpsNeeded); 146 147 uint32_t WarpId = BlockThreadId / mapping::getWarpSize(); 148 if (WarpId == 0) 149 gpu_irregular_warp_reduce(reduce_data, shflFct, WarpsNeeded, 150 BlockThreadId); 151 152 return BlockThreadId == 0; 153 } 154 155 // Get the OMP thread Id. This is different from BlockThreadId in the case 156 // of an L2 parallel region. 157 return BlockThreadId == 0; 158 } 159 160 uint32_t roundToWarpsize(uint32_t s) { 161 if (s < mapping::getWarpSize()) 162 return 1; 163 return (s & ~(unsigned)(mapping::getWarpSize() - 1)); 164 } 165 166 uint32_t kmpcMin(uint32_t x, uint32_t y) { return x < y ? x : y; } 167 168 } // namespace 169 170 extern "C" { 171 int32_t __kmpc_nvptx_parallel_reduce_nowait_v2(IdentTy *Loc, 172 uint64_t reduce_data_size, 173 void *reduce_data, 174 ShuffleReductFnTy shflFct, 175 InterWarpCopyFnTy cpyFct) { 176 return nvptx_parallel_reduce_nowait(reduce_data, shflFct, cpyFct); 177 } 178 179 int32_t __kmpc_nvptx_teams_reduce_nowait_v2( 180 IdentTy *Loc, void *GlobalBuffer, uint32_t num_of_records, 181 uint64_t reduce_data_size, void *reduce_data, ShuffleReductFnTy shflFct, 182 InterWarpCopyFnTy cpyFct, ListGlobalFnTy lgcpyFct, ListGlobalFnTy lgredFct, 183 ListGlobalFnTy glcpyFct, ListGlobalFnTy glredFct) { 184 // Terminate all threads in non-SPMD mode except for the master thread. 185 uint32_t ThreadId = mapping::getThreadIdInBlock(); 186 if (mapping::isGenericMode()) { 187 if (!mapping::isMainThreadInGenericMode()) 188 return 0; 189 ThreadId = 0; 190 } 191 192 uint32_t &IterCnt = state::getKernelLaunchEnvironment().ReductionIterCnt; 193 uint32_t &Cnt = state::getKernelLaunchEnvironment().ReductionCnt; 194 195 // In non-generic mode all workers participate in the teams reduction. 196 // In generic mode only the team master participates in the teams 197 // reduction because the workers are waiting for parallel work. 198 uint32_t NumThreads = omp_get_num_threads(); 199 uint32_t TeamId = omp_get_team_num(); 200 uint32_t NumTeams = omp_get_num_teams(); 201 static unsigned SHARED(Bound); 202 static unsigned SHARED(ChunkTeamCount); 203 204 // Block progress for teams greater than the current upper 205 // limit. We always only allow a number of teams less or equal 206 // to the number of slots in the buffer. 207 bool IsMaster = (ThreadId == 0); 208 while (IsMaster) { 209 Bound = atomic::load(&IterCnt, atomic::aquire); 210 if (TeamId < Bound + num_of_records) 211 break; 212 } 213 214 if (IsMaster) { 215 int ModBockId = TeamId % num_of_records; 216 if (TeamId < num_of_records) { 217 lgcpyFct(GlobalBuffer, ModBockId, reduce_data); 218 } else 219 lgredFct(GlobalBuffer, ModBockId, reduce_data); 220 221 // Propagate the memory writes above to the world. 222 fence::kernel(atomic::release); 223 224 // Increment team counter. 225 // This counter is incremented by all teams in the current 226 // num_of_records chunk. 227 ChunkTeamCount = atomic::inc(&Cnt, num_of_records - 1u, atomic::seq_cst, 228 atomic::MemScopeTy::device); 229 } 230 231 // Synchronize in SPMD mode as in generic mode all but 1 threads are in the 232 // state machine. 233 if (mapping::isSPMDMode()) 234 synchronize::threadsAligned(atomic::acq_rel); 235 236 // reduce_data is global or shared so before being reduced within the 237 // warp we need to bring it in local memory: 238 // local_reduce_data = reduce_data[i] 239 // 240 // Example for 3 reduction variables a, b, c (of potentially different 241 // types): 242 // 243 // buffer layout (struct of arrays): 244 // a, a, ..., a, b, b, ... b, c, c, ... c 245 // |__________| 246 // num_of_records 247 // 248 // local_data_reduce layout (struct): 249 // a, b, c 250 // 251 // Each thread will have a local struct containing the values to be 252 // reduced: 253 // 1. do reduction within each warp. 254 // 2. do reduction across warps. 255 // 3. write the final result to the main reduction variable 256 // by returning 1 in the thread holding the reduction result. 257 258 // Check if this is the very last team. 259 unsigned NumRecs = kmpcMin(NumTeams, uint32_t(num_of_records)); 260 if (ChunkTeamCount == NumTeams - Bound - 1) { 261 // Ensure we see the global memory writes by other teams 262 fence::kernel(atomic::aquire); 263 264 // 265 // Last team processing. 266 // 267 if (ThreadId >= NumRecs) 268 return 0; 269 NumThreads = roundToWarpsize(kmpcMin(NumThreads, NumRecs)); 270 if (ThreadId >= NumThreads) 271 return 0; 272 273 // Load from buffer and reduce. 274 glcpyFct(GlobalBuffer, ThreadId, reduce_data); 275 for (uint32_t i = NumThreads + ThreadId; i < NumRecs; i += NumThreads) 276 glredFct(GlobalBuffer, i, reduce_data); 277 278 // Reduce across warps to the warp master. 279 if (NumThreads > 1) { 280 gpu_regular_warp_reduce(reduce_data, shflFct); 281 282 // When we have more than [mapping::getWarpSize()] number of threads 283 // a block reduction is performed here. 284 uint32_t ActiveThreads = kmpcMin(NumRecs, NumThreads); 285 if (ActiveThreads > mapping::getWarpSize()) { 286 uint32_t WarpsNeeded = (ActiveThreads + mapping::getWarpSize() - 1) / 287 mapping::getWarpSize(); 288 // Gather all the reduced values from each warp 289 // to the first warp. 290 cpyFct(reduce_data, WarpsNeeded); 291 292 uint32_t WarpId = ThreadId / mapping::getWarpSize(); 293 if (WarpId == 0) 294 gpu_irregular_warp_reduce(reduce_data, shflFct, WarpsNeeded, 295 ThreadId); 296 } 297 } 298 299 if (IsMaster) { 300 Cnt = 0; 301 IterCnt = 0; 302 return 1; 303 } 304 return 0; 305 } 306 if (IsMaster && ChunkTeamCount == num_of_records - 1) { 307 // Allow SIZE number of teams to proceed writing their 308 // intermediate results to the global buffer. 309 atomic::add(&IterCnt, uint32_t(num_of_records), atomic::seq_cst); 310 } 311 312 return 0; 313 } 314 } 315 316 void *__kmpc_reduction_get_fixed_buffer() { 317 return state::getKernelLaunchEnvironment().ReductionBuffer; 318 } 319 320 #pragma omp end declare target 321