xref: /llvm-project/offload/DeviceRTL/src/Reduction.cpp (revision 34f8573a514915222630cf21e8a0c901a25f4ca0)
1 //===---- Reduction.cpp - OpenMP device reduction implementation - C++ -*-===//
2 //
3 // Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
4 // See https://llvm.org/LICENSE.txt for license information.
5 // SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
6 //
7 //===----------------------------------------------------------------------===//
8 //
9 // This file contains the implementation of reduction with KMPC interface.
10 //
11 //===----------------------------------------------------------------------===//
12 
13 #include "Debug.h"
14 #include "DeviceTypes.h"
15 #include "DeviceUtils.h"
16 #include "Interface.h"
17 #include "Mapping.h"
18 #include "State.h"
19 #include "Synchronization.h"
20 
21 using namespace ompx;
22 
23 namespace {
24 
25 #pragma omp begin declare target device_type(nohost)
26 
27 void gpu_regular_warp_reduce(void *reduce_data, ShuffleReductFnTy shflFct) {
28   for (uint32_t mask = mapping::getWarpSize() / 2; mask > 0; mask /= 2) {
29     shflFct(reduce_data, /*LaneId - not used= */ 0,
30             /*Offset = */ mask, /*AlgoVersion=*/0);
31   }
32 }
33 
34 void gpu_irregular_warp_reduce(void *reduce_data, ShuffleReductFnTy shflFct,
35                                uint32_t size, uint32_t tid) {
36   uint32_t curr_size;
37   uint32_t mask;
38   curr_size = size;
39   mask = curr_size / 2;
40   while (mask > 0) {
41     shflFct(reduce_data, /*LaneId = */ tid, /*Offset=*/mask, /*AlgoVersion=*/1);
42     curr_size = (curr_size + 1) / 2;
43     mask = curr_size / 2;
44   }
45 }
46 
47 static uint32_t gpu_irregular_simd_reduce(void *reduce_data,
48                                           ShuffleReductFnTy shflFct) {
49   uint32_t size, remote_id, physical_lane_id;
50   physical_lane_id = mapping::getThreadIdInBlock() % mapping::getWarpSize();
51   __kmpc_impl_lanemask_t lanemask_lt = mapping::lanemaskLT();
52   __kmpc_impl_lanemask_t Liveness = mapping::activemask();
53   uint32_t logical_lane_id = utils::popc(Liveness & lanemask_lt) * 2;
54   __kmpc_impl_lanemask_t lanemask_gt = mapping::lanemaskGT();
55   do {
56     Liveness = mapping::activemask();
57     remote_id = utils::ffs(Liveness & lanemask_gt);
58     size = utils::popc(Liveness);
59     logical_lane_id /= 2;
60     shflFct(reduce_data, /*LaneId =*/logical_lane_id,
61             /*Offset=*/remote_id - 1 - physical_lane_id, /*AlgoVersion=*/2);
62   } while (logical_lane_id % 2 == 0 && size > 1);
63   return (logical_lane_id == 0);
64 }
65 
66 static int32_t nvptx_parallel_reduce_nowait(void *reduce_data,
67                                             ShuffleReductFnTy shflFct,
68                                             InterWarpCopyFnTy cpyFct) {
69   uint32_t BlockThreadId = mapping::getThreadIdInBlock();
70   if (mapping::isMainThreadInGenericMode(/*IsSPMD=*/false))
71     BlockThreadId = 0;
72   uint32_t NumThreads = omp_get_num_threads();
73   if (NumThreads == 1)
74     return 1;
75 
76     //
77     // This reduce function handles reduction within a team. It handles
78     // parallel regions in both L1 and L2 parallelism levels. It also
79     // supports Generic, SPMD, and NoOMP modes.
80     //
81     // 1. Reduce within a warp.
82     // 2. Warp master copies value to warp 0 via shared memory.
83     // 3. Warp 0 reduces to a single value.
84     // 4. The reduced value is available in the thread that returns 1.
85     //
86 
87 #if __has_builtin(__nvvm_reflect)
88   if (__nvvm_reflect("__CUDA_ARCH") >= 700) {
89     uint32_t WarpsNeeded =
90         (NumThreads + mapping::getWarpSize() - 1) / mapping::getWarpSize();
91     uint32_t WarpId = mapping::getWarpIdInBlock();
92 
93     // Volta execution model:
94     // For the Generic execution mode a parallel region either has 1 thread and
95     // beyond that, always a multiple of 32. For the SPMD execution mode we may
96     // have any number of threads.
97     if ((NumThreads % mapping::getWarpSize() == 0) ||
98         (WarpId < WarpsNeeded - 1))
99       gpu_regular_warp_reduce(reduce_data, shflFct);
100     else if (NumThreads > 1) // Only SPMD execution mode comes thru this case.
101       gpu_irregular_warp_reduce(
102           reduce_data, shflFct,
103           /*LaneCount=*/NumThreads % mapping::getWarpSize(),
104           /*LaneId=*/mapping::getThreadIdInBlock() % mapping::getWarpSize());
105 
106     // When we have more than [mapping::getWarpSize()] number of threads
107     // a block reduction is performed here.
108     //
109     // Only L1 parallel region can enter this if condition.
110     if (NumThreads > mapping::getWarpSize()) {
111       // Gather all the reduced values from each warp
112       // to the first warp.
113       cpyFct(reduce_data, WarpsNeeded);
114 
115       if (WarpId == 0)
116         gpu_irregular_warp_reduce(reduce_data, shflFct, WarpsNeeded,
117                                   BlockThreadId);
118     }
119     return BlockThreadId == 0;
120   }
121 #endif
122   __kmpc_impl_lanemask_t Liveness = mapping::activemask();
123   if (Liveness == lanes::All) // Full warp
124     gpu_regular_warp_reduce(reduce_data, shflFct);
125   else if (!(Liveness & (Liveness + 1))) // Partial warp but contiguous lanes
126     gpu_irregular_warp_reduce(reduce_data, shflFct,
127                               /*LaneCount=*/utils::popc(Liveness),
128                               /*LaneId=*/mapping::getThreadIdInBlock() %
129                                   mapping::getWarpSize());
130   else { // Dispersed lanes. Only threads in L2
131          // parallel region may enter here; return
132          // early.
133     return gpu_irregular_simd_reduce(reduce_data, shflFct);
134   }
135 
136   // When we have more than [mapping::getWarpSize()] number of threads
137   // a block reduction is performed here.
138   //
139   // Only L1 parallel region can enter this if condition.
140   if (NumThreads > mapping::getWarpSize()) {
141     uint32_t WarpsNeeded =
142         (NumThreads + mapping::getWarpSize() - 1) / mapping::getWarpSize();
143     // Gather all the reduced values from each warp
144     // to the first warp.
145     cpyFct(reduce_data, WarpsNeeded);
146 
147     uint32_t WarpId = BlockThreadId / mapping::getWarpSize();
148     if (WarpId == 0)
149       gpu_irregular_warp_reduce(reduce_data, shflFct, WarpsNeeded,
150                                 BlockThreadId);
151 
152     return BlockThreadId == 0;
153   }
154 
155   // Get the OMP thread Id. This is different from BlockThreadId in the case
156   // of an L2 parallel region.
157   return BlockThreadId == 0;
158 }
159 
160 uint32_t roundToWarpsize(uint32_t s) {
161   if (s < mapping::getWarpSize())
162     return 1;
163   return (s & ~(unsigned)(mapping::getWarpSize() - 1));
164 }
165 
166 uint32_t kmpcMin(uint32_t x, uint32_t y) { return x < y ? x : y; }
167 
168 } // namespace
169 
170 extern "C" {
171 int32_t __kmpc_nvptx_parallel_reduce_nowait_v2(IdentTy *Loc,
172                                                uint64_t reduce_data_size,
173                                                void *reduce_data,
174                                                ShuffleReductFnTy shflFct,
175                                                InterWarpCopyFnTy cpyFct) {
176   return nvptx_parallel_reduce_nowait(reduce_data, shflFct, cpyFct);
177 }
178 
179 int32_t __kmpc_nvptx_teams_reduce_nowait_v2(
180     IdentTy *Loc, void *GlobalBuffer, uint32_t num_of_records,
181     uint64_t reduce_data_size, void *reduce_data, ShuffleReductFnTy shflFct,
182     InterWarpCopyFnTy cpyFct, ListGlobalFnTy lgcpyFct, ListGlobalFnTy lgredFct,
183     ListGlobalFnTy glcpyFct, ListGlobalFnTy glredFct) {
184   // Terminate all threads in non-SPMD mode except for the master thread.
185   uint32_t ThreadId = mapping::getThreadIdInBlock();
186   if (mapping::isGenericMode()) {
187     if (!mapping::isMainThreadInGenericMode())
188       return 0;
189     ThreadId = 0;
190   }
191 
192   uint32_t &IterCnt = state::getKernelLaunchEnvironment().ReductionIterCnt;
193   uint32_t &Cnt = state::getKernelLaunchEnvironment().ReductionCnt;
194 
195   // In non-generic mode all workers participate in the teams reduction.
196   // In generic mode only the team master participates in the teams
197   // reduction because the workers are waiting for parallel work.
198   uint32_t NumThreads = omp_get_num_threads();
199   uint32_t TeamId = omp_get_team_num();
200   uint32_t NumTeams = omp_get_num_teams();
201   static unsigned SHARED(Bound);
202   static unsigned SHARED(ChunkTeamCount);
203 
204   // Block progress for teams greater than the current upper
205   // limit. We always only allow a number of teams less or equal
206   // to the number of slots in the buffer.
207   bool IsMaster = (ThreadId == 0);
208   while (IsMaster) {
209     Bound = atomic::load(&IterCnt, atomic::aquire);
210     if (TeamId < Bound + num_of_records)
211       break;
212   }
213 
214   if (IsMaster) {
215     int ModBockId = TeamId % num_of_records;
216     if (TeamId < num_of_records) {
217       lgcpyFct(GlobalBuffer, ModBockId, reduce_data);
218     } else
219       lgredFct(GlobalBuffer, ModBockId, reduce_data);
220 
221     // Propagate the memory writes above to the world.
222     fence::kernel(atomic::release);
223 
224     // Increment team counter.
225     // This counter is incremented by all teams in the current
226     // num_of_records chunk.
227     ChunkTeamCount = atomic::inc(&Cnt, num_of_records - 1u, atomic::seq_cst,
228                                  atomic::MemScopeTy::device);
229   }
230 
231   // Synchronize in SPMD mode as in generic mode all but 1 threads are in the
232   // state machine.
233   if (mapping::isSPMDMode())
234     synchronize::threadsAligned(atomic::acq_rel);
235 
236   // reduce_data is global or shared so before being reduced within the
237   // warp we need to bring it in local memory:
238   // local_reduce_data = reduce_data[i]
239   //
240   // Example for 3 reduction variables a, b, c (of potentially different
241   // types):
242   //
243   // buffer layout (struct of arrays):
244   // a, a, ..., a, b, b, ... b, c, c, ... c
245   // |__________|
246   //     num_of_records
247   //
248   // local_data_reduce layout (struct):
249   // a, b, c
250   //
251   // Each thread will have a local struct containing the values to be
252   // reduced:
253   //      1. do reduction within each warp.
254   //      2. do reduction across warps.
255   //      3. write the final result to the main reduction variable
256   //         by returning 1 in the thread holding the reduction result.
257 
258   // Check if this is the very last team.
259   unsigned NumRecs = kmpcMin(NumTeams, uint32_t(num_of_records));
260   if (ChunkTeamCount == NumTeams - Bound - 1) {
261     // Ensure we see the global memory writes by other teams
262     fence::kernel(atomic::aquire);
263 
264     //
265     // Last team processing.
266     //
267     if (ThreadId >= NumRecs)
268       return 0;
269     NumThreads = roundToWarpsize(kmpcMin(NumThreads, NumRecs));
270     if (ThreadId >= NumThreads)
271       return 0;
272 
273     // Load from buffer and reduce.
274     glcpyFct(GlobalBuffer, ThreadId, reduce_data);
275     for (uint32_t i = NumThreads + ThreadId; i < NumRecs; i += NumThreads)
276       glredFct(GlobalBuffer, i, reduce_data);
277 
278     // Reduce across warps to the warp master.
279     if (NumThreads > 1) {
280       gpu_regular_warp_reduce(reduce_data, shflFct);
281 
282       // When we have more than [mapping::getWarpSize()] number of threads
283       // a block reduction is performed here.
284       uint32_t ActiveThreads = kmpcMin(NumRecs, NumThreads);
285       if (ActiveThreads > mapping::getWarpSize()) {
286         uint32_t WarpsNeeded = (ActiveThreads + mapping::getWarpSize() - 1) /
287                                mapping::getWarpSize();
288         // Gather all the reduced values from each warp
289         // to the first warp.
290         cpyFct(reduce_data, WarpsNeeded);
291 
292         uint32_t WarpId = ThreadId / mapping::getWarpSize();
293         if (WarpId == 0)
294           gpu_irregular_warp_reduce(reduce_data, shflFct, WarpsNeeded,
295                                     ThreadId);
296       }
297     }
298 
299     if (IsMaster) {
300       Cnt = 0;
301       IterCnt = 0;
302       return 1;
303     }
304     return 0;
305   }
306   if (IsMaster && ChunkTeamCount == num_of_records - 1) {
307     // Allow SIZE number of teams to proceed writing their
308     // intermediate results to the global buffer.
309     atomic::add(&IterCnt, uint32_t(num_of_records), atomic::seq_cst);
310   }
311 
312   return 0;
313 }
314 }
315 
316 void *__kmpc_reduction_get_fixed_buffer() {
317   return state::getKernelLaunchEnvironment().ReductionBuffer;
318 }
319 
320 #pragma omp end declare target
321