Kokkos Core Kernels Package Version of the Day
Loading...
Searching...
No Matches
Kokkos_WorkGraphPolicy.hpp
1//@HEADER
2// ************************************************************************
3//
4// Kokkos v. 4.0
5// Copyright (2022) National Technology & Engineering
6// Solutions of Sandia, LLC (NTESS).
7//
8// Under the terms of Contract DE-NA0003525 with NTESS,
9// the U.S. Government retains certain rights in this software.
10//
11// Part of Kokkos, under the Apache License v2.0 with LLVM Exceptions.
12// See https://kokkos.org/LICENSE for license information.
13// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
14//
15//@HEADER
16
17#ifndef KOKKOS_IMPL_PUBLIC_INCLUDE
18#include <Kokkos_Macros.hpp>
19static_assert(false,
20 "Including non-public Kokkos header files is not allowed.");
21#endif
22#ifndef KOKKOS_WORKGRAPHPOLICY_HPP
23#define KOKKOS_WORKGRAPHPOLICY_HPP
24
25#include <impl/Kokkos_AnalyzePolicy.hpp>
26#include <Kokkos_Crs.hpp>
27
28namespace Kokkos {
29namespace Impl {
30
31template <class functor_type, class execution_space, class... policy_args>
32class WorkGraphExec;
33
34}
35} // namespace Kokkos
36
37namespace Kokkos {
38
39template <class... Properties>
40class WorkGraphPolicy : public Kokkos::Impl::PolicyTraits<Properties...> {
41 public:
42 using execution_policy = WorkGraphPolicy<Properties...>;
43 using self_type = WorkGraphPolicy<Properties...>;
44 using traits = Kokkos::Impl::PolicyTraits<Properties...>;
45 using index_type = typename traits::index_type;
46 using member_type = index_type;
47 using execution_space = typename traits::execution_space;
48 using memory_space = typename execution_space::memory_space;
50
51 enum : std::int32_t {
52 END_TOKEN = -1,
53 BEGIN_TOKEN = -2,
54 COMPLETED_TOKEN = -3
55 };
56
57 private:
59
60 // Let N = m_graph.numRows(), the total work
61 // m_queue[ 0 .. N-1] = the ready queue
62 // m_queue[ N .. 2*N-1] = the waiting queue counts
63 // m_queue[2*N .. 2*N+2] = the ready queue hints
64
65 graph_type const m_graph;
66 ints_type m_queue;
67
68 KOKKOS_INLINE_FUNCTION
69 void push_work(const std::int32_t w) const noexcept {
70 const std::int32_t N = m_graph.numRows();
71
72 std::int32_t* const ready_queue = &m_queue[0];
73 std::int32_t* const end_hint = &m_queue[2 * N + 1];
74
75 // Push work to end of queue
76 const std::int32_t j = atomic_fetch_add(end_hint, 1);
77
78 if ((N <= j) || (END_TOKEN != atomic_exchange(ready_queue + j, w))) {
79 // ERROR: past the end of queue or did not replace END_TOKEN
80 Kokkos::abort("WorkGraphPolicy push_work error");
81 }
82
83 memory_fence();
84 }
85
86 public:
101 KOKKOS_INLINE_FUNCTION
102 std::int32_t pop_work() const noexcept {
103 const std::int32_t N = m_graph.numRows();
104
105 std::int32_t* const ready_queue = &m_queue[0];
106 std::int32_t* const begin_hint = &m_queue[2 * N];
107
108 // begin hint is guaranteed to be less than or equal to
109 // actual begin location in the queue.
110
111 for (std::int32_t i = Kokkos::atomic_load(begin_hint); i < N; ++i) {
112 const std::int32_t w = Kokkos::atomic_load(&ready_queue[i]);
113
114 if (w == END_TOKEN) {
115 return END_TOKEN;
116 }
117
118 if ((w != BEGIN_TOKEN) &&
119 (w == atomic_compare_exchange(ready_queue + i, w,
120 (std::int32_t)BEGIN_TOKEN))) {
121 // Attempt to claim ready work index succeeded,
122 // update the hint and return work index
123 atomic_increment(begin_hint);
124 return w;
125 }
126 // arrive here when ready_queue[i] == BEGIN_TOKEN
127 }
128
129 return COMPLETED_TOKEN;
130 }
131
132 KOKKOS_INLINE_FUNCTION
133 void completed_work(std::int32_t w) const noexcept {
134 Kokkos::memory_fence();
135
136 // Make sure the completed work function's memory accesses are flushed.
137
138 const std::int32_t N = m_graph.numRows();
139
140 std::int32_t* const count_queue = &m_queue[N];
141
142 const std::int32_t B = m_graph.row_map(w);
143 const std::int32_t E = m_graph.row_map(w + 1);
144
145 for (std::int32_t i = B; i < E; ++i) {
146 const std::int32_t j = m_graph.entries(i);
147 if (1 == atomic_fetch_add(count_queue + j, -1)) {
148 push_work(j);
149 }
150 }
151 }
152
153 struct TagInit {};
154 struct TagCount {};
155 struct TagReady {};
156
163 KOKKOS_INLINE_FUNCTION
164 void operator()(const TagInit, int i) const noexcept {
165 m_queue[i] = i < m_graph.numRows() ? END_TOKEN : 0;
166 }
167
168 KOKKOS_INLINE_FUNCTION
169 void operator()(const TagCount, int i) const noexcept {
170 std::int32_t* const count_queue = &m_queue[m_graph.numRows()];
171
172 atomic_increment(count_queue + m_graph.entries[i]);
173 }
174
175 KOKKOS_INLINE_FUNCTION
176 void operator()(const TagReady, int w) const noexcept {
177 std::int32_t const* const count_queue = &m_queue[m_graph.numRows()];
178
179 if (0 == count_queue[w]) push_work(w);
180 }
181
182 execution_space space() const { return execution_space(); }
183
184 WorkGraphPolicy(const graph_type& arg_graph)
185 : m_graph(arg_graph),
186 m_queue(view_alloc("queue", WithoutInitializing),
187 arg_graph.numRows() * 2 + 2) {
188 { // Initialize
189 using policy_type = RangePolicy<std::int32_t, execution_space, TagInit>;
191 const closure_type closure(*this, policy_type(0, m_queue.size()));
192 closure.execute();
193 execution_space().fence(
194 "Kokkos::WorkGraphPolicy::WorkGraphPolicy: fence after executing "
195 "graph init");
196 }
197
198 { // execute-after counts
199 using policy_type = RangePolicy<std::int32_t, execution_space, TagCount>;
201 const closure_type closure(*this, policy_type(0, m_graph.entries.size()));
202 closure.execute();
203 execution_space().fence(
204 "Kokkos::WorkGraphPolicy::WorkGraphPolicy: fence after executing "
205 "graph count");
206 }
207
208 { // Scheduling ready tasks
209 using policy_type = RangePolicy<std::int32_t, execution_space, TagReady>;
211 const closure_type closure(*this, policy_type(0, m_graph.numRows()));
212 closure.execute();
213 execution_space().fence(
214 "Kokkos::WorkGraphPolicy::WorkGraphPolicy: fence after executing "
215 "readied graph");
216 }
217 }
218};
219
220} // namespace Kokkos
221
222#ifdef KOKKOS_ENABLE_SERIAL
223#include "Serial/Kokkos_Serial_WorkGraphPolicy.hpp"
224#endif
225
226#ifdef KOKKOS_ENABLE_OPENMP
227#include "OpenMP/Kokkos_OpenMP_WorkGraphPolicy.hpp"
228#endif
229
230#ifdef KOKKOS_ENABLE_CUDA
231#include "Cuda/Kokkos_Cuda_WorkGraphPolicy.hpp"
232#endif
233
234#ifdef KOKKOS_ENABLE_HIP
235#include "HIP/Kokkos_HIP_WorkGraphPolicy.hpp"
236#endif
237
238#ifdef KOKKOS_ENABLE_THREADS
239#include "Threads/Kokkos_Threads_WorkGraphPolicy.hpp"
240#endif
241
242#ifdef KOKKOS_ENABLE_HPX
243#include "HPX/Kokkos_HPX_WorkGraphPolicy.hpp"
244#endif
245
246#endif /* #define KOKKOS_WORKGRAPHPOLICY_HPP */
Compressed row storage array.
Definition: Kokkos_Crs.hpp:63
Implementation of the ParallelFor operator that has a partial specialization for the device.
View to an array of data.