alpaka
Abstraction Library for Parallel Kernel Acceleration
Loading...
Searching...
No Matches
Queue.hpp
Go to the documentation of this file.
1/* Copyright 2025 Simeon Ehrig, René Widera, Mehmet Yusufoglu, Andrea Bocci
2 * SPDX-License-Identifier: MPL-2.0
3 */
4
5#pragma once
6
8#include "alpaka/api/util.hpp"
11#include "alpaka/interface.hpp"
13#include "alpaka/onAcc/Acc.hpp"
19
20#include <algorithm>
21#include <future>
22#include <shared_mutex>
23#include <sstream>
24#include <type_traits>
25
26#if ALPAKA_LANG_SYCL
27
28# include <sycl/sycl.hpp>
29
30namespace alpaka::onHost
31{
32 namespace syclGeneric
33 {
34 /** Dispatch a compile time warp size to the kernel
35 *
36 * The runtime provided warp size of the device is transformed into a compile time warp size.
37 * During the kernel (lambda) call in cgh.parallel_for() the lambda must be annotated with
38 * `[[sycl::reqd_sub_group_size(WARP_SIZE)]]`. In cases where the warp size is not supported by device a
39 * compiler warning will be shown, therefore a second stage during the call of parallel_for() is required where
40 * we check if we know based on macro defines provided by the compiler which subgroup sizes (warp size) are
41 * supported for the device ther kernel is currently compiled. In cases, where the macro definition to detect
42 * the target device is not in the list (file: core/syclConfig.hpp) we allow all subgroup sizes generated from
43 * the runtime dispatcher in this trait. This is also the case if we not compile ahead of time for a device.
44 * @attention If a warning `-Wincorrect-sub-group-size` is shown this mean we generated a kernel with an
45 * unsupported warp size, triggered by the on host runtime dispatch in this trait.
46 *
47 * The reason why we do not want to execute the runtime dispatch within the parallel_for, equal to what
48 * mainline alpaka is doing, is that any kernel instance should have only one code patch to avoid possible
49 * register pressure due to a code path which will maybe never called but is generated in the kernel.
50 * This complicated approach gives us the guarantee that the runtime device warp size is used during the kernel
51 * generation.
52 */
53 struct Warpsize
54 {
55 template<alpaka::concepts::DeviceKind T_DeviceKind>
56 struct Dispatch
57 {
58 auto operator()(T_DeviceKind deviceKind, auto&& fn) const;
59 };
60 };
61
62 template<>
63 struct Warpsize::Dispatch<alpaka::deviceKind::Cpu>
64 {
65 auto operator()(alpaka::deviceKind::Cpu, auto&& fn, uint32_t warpSize) const
66 {
67 switch(warpSize)
68 {
69 case 1u:
70 return fn(std::integral_constant<uint32_t, 1u>{});
71 case 2u:
72 return fn(std::integral_constant<uint32_t, 2u>{});
73 case 4u:
74 return fn(std::integral_constant<uint32_t, 4u>{});
75 case 8u:
76 return fn(std::integral_constant<uint32_t, 8u>{});
77 case 16u:
78 return fn(std::integral_constant<uint32_t, 16u>{});
79 case 32u:
80 return fn(std::integral_constant<uint32_t, 32u>{});
81 default:
82 throw std::runtime_error(
83 std::string("Sycl warp size runtime dispatch, unsupported warpSize: ")
84 + std::to_string(warpSize));
85 return fn(std::integral_constant<uint32_t, 1u>{});
86 }
87 }
88 };
89
90 template<>
91 struct Warpsize::Dispatch<alpaka::deviceKind::IntelGpu>
92 {
93 auto operator()(alpaka::deviceKind::IntelGpu, auto&& fn, uint32_t warpSize) const
94 {
95 switch(warpSize)
96 {
97 case 8u:
98 return fn(std::integral_constant<uint32_t, 8u>{});
99 case 16u:
100 return fn(std::integral_constant<uint32_t, 16u>{});
101 case 32u:
102 return fn(std::integral_constant<uint32_t, 32u>{});
103 default:
104 throw std::runtime_error(
105 std::string("Sycl warp size runtime dispatch, unsupported warpSize: ")
106 + std::to_string(warpSize));
107 return fn(std::integral_constant<uint32_t, 32u>{});
108 }
109 }
110 };
111
112 template<>
113 struct Warpsize::Dispatch<alpaka::deviceKind::AmdGpu>
114 {
115 auto operator()(alpaka::deviceKind::AmdGpu, auto&& fn, uint32_t warpSize) const
116 {
117 switch(warpSize)
118 {
119 case 32u:
120 return fn(std::integral_constant<uint32_t, 32u>{});
121 case 64u:
122 return fn(std::integral_constant<uint32_t, 64u>{});
123 default:
124 throw std::runtime_error(
125 std::string("Sycl warp size runtime dispatch, unsupported warpSize: ")
126 + std::to_string(warpSize));
127 return fn(std::integral_constant<uint32_t, 32u>{});
128 }
129 }
130 };
131
132 template<>
133 struct Warpsize::Dispatch<alpaka::deviceKind::NvidiaGpu>
134 {
135 auto operator()(alpaka::deviceKind::NvidiaGpu, auto&& fn, uint32_t warpSize) const
136 {
137 switch(warpSize)
138 {
139 case 32u:
140 return fn(std::integral_constant<uint32_t, 32u>{});
141 default:
142 throw std::runtime_error(
143 std::string("Sycl warp size runtime dispatch, unsupported warpSize: ")
144 + std::to_string(warpSize));
145 return fn(std::integral_constant<uint32_t, 32u>{});
146 }
147 }
148 };
149
150 template<typename T_Device>
151 struct Queue : std::enable_shared_from_this<Queue<T_Device>>
152 {
153 private:
154 friend struct alpaka::internal::GetApi;
155
156 template<alpaka::concepts::Vector TVec>
157 static constexpr auto vecToSyclRange(TVec vec)
158 {
159 constexpr auto dim = std::decay_t<TVec>::dim();
160 return [&vec]<auto... I>(std::index_sequence<I...>)
161 // TODO: check if this is the correct order
162 { return sycl::range<dim>(vec[I]...); }(std::make_index_sequence<dim>{});
163 };
164
165 inline constexpr auto dispatchWarpSize(auto&& fn) const
166 {
167 auto warpSize
168 = internal::GetDeviceProperties::Op<ALPAKA_TYPEOF(*m_device.get())>{}(*m_device.get()).warpSize;
169
170 return Warpsize::Dispatch<ALPAKA_TYPEOF(getDeviceKind())>{}(
172 ALPAKA_FORWARD(fn),
173 warpSize);
174 }
175
176
177 public:
178 Queue(internal::concepts::DeviceHandle auto device, uint32_t const idx, bool isBlocking)
179 : m_device(std::move(device))
180 , m_idx(idx)
181 , m_queue(
182 m_device->getNativeHandle().second,
183 m_device->getNativeHandle().first,
184 {sycl::property::queue::in_order{}})
185 , m_isBlocking(isBlocking)
186 {
187 ALPAKA_LOG_FUNCTION(onHost::logger::queue);
188 }
189
190 [[nodiscard]] bool isBlocking() const noexcept
191 {
192 return m_isBlocking;
193 }
194
195 Queue(Queue const&) = delete;
196 Queue& operator=(Queue const&) = delete;
197
198 Queue(Queue&&) = delete;
199 Queue& operator=(Queue&&) = delete;
200
201 ~Queue()
202 {
203 ALPAKA_LOG_FUNCTION(onHost::logger::queue);
204 try
205 {
206 m_queue.wait_and_throw();
207 }
208 catch(sycl::exception const& err)
209 {
210 std::cerr << "Caught SYCL exception while destructing a SYCL queue: " << err.what() << " ("
211 << err.code() << ')' << std::endl;
212 }
213 catch(std::exception const& err)
214 {
215 std::cerr << "The following runtime error(s) occurred while destructing a SYCL queue:"
216 << err.what() << std::endl;
217 }
218 }
219
220 std::shared_ptr<Queue> getSharedPtr()
221 {
222 return this->shared_from_this();
223 }
224
225 [[nodiscard]] auto getNativeHandle() const noexcept
226 {
227 return m_queue;
228 }
229
230 void wait()
231 {
232 m_queue.wait_and_throw();
233 }
234
235 std::string getName() const
236 {
237 std::stringstream ss;
238 ss << "Queue<" << getApi(m_device).getName() << ">";
239 ss << " id=" << m_idx;
240 return ss.str();
241 }
242
243 private:
244 friend struct alpaka::internal::GetDeviceType;
245 friend struct alpaka::onHost::internal::Enqueue;
246 friend struct onHost::internal::AllocDeferred;
247
248 auto getDeviceKind() const
249 {
250 return alpaka::internal::getDeviceKind(*m_device.get());
251 }
252
253 auto getDevice() const
254 {
255 return m_device;
256 }
257
258 friend struct onHost::internal::GetDevice;
259
260 friend struct alpaka::onHost::internal::WaitFor;
261
262 void waitFor(syclGeneric::Event<T_Device>& event)
263 {
264 ALPAKA_LOG_FUNCTION(onHost::logger::event + onHost::logger::queue);
265 sycl::event sycl_event = event.getNativeHandle();
266 sycl::event ev = m_queue.submit([sycl_event](sycl::handler& cgh) { cgh.depends_on(sycl_event); });
267 setLastEvent(ev);
268 if(isBlocking())
269 ev.wait_and_throw();
270 }
271
272 friend struct internal::IsQueueEmpty;
273
274 /** Test of all tasks in the queue are finished
275 *
276 * @attention We are testing for the last event of last enqueued alpaka event or action. The function
277 * cannot check events that were queued directly into the native queue, bypassing alpaka.
278 */
279 bool isQueueEmpty() const
280 {
281 ALPAKA_LOG_FUNCTION(onHost::logger::queue);
282
283 auto const status = getLastEvent().template get_info<sycl::info::event::command_execution_status>();
284 return status == sycl::info::event_command_status::complete;
285 }
286
287 //! Thread safe getter for the last sycl event.
288 sycl::event getLastEvent() const
289 {
290 std::shared_lock<std::shared_mutex> lock{m_eventGuard};
291 return m_lastEvent;
292 }
293
294 /** Thread safe setter for the last sycl event
295 *
296 * To track dependencies this method must be called with any event returned by native sycl calls.
297 */
298 void setLastEvent(sycl::event const& ev) const
299 {
300 std::unique_lock<std::shared_mutex> lock{m_eventGuard};
301 m_lastEvent = ev;
302 }
303
304 friend struct alpaka::onHost::internal::Memset;
305 friend struct alpaka::onHost::internal::Memcpy;
306 friend struct alpaka::onHost::internal::MemcpyDeviceGlobal;
307 friend struct alpaka::onHost::internal::Alloc;
308 friend struct alpaka::onHost::internal::AllocDeferred;
309 friend struct alpaka::onHost::internal::AllocMapped;
310 friend struct alpaka::onHost::internal::Fill;
311
312 Handle<T_Device> m_device;
313 uint32_t m_idx = 0u;
314 sycl::queue m_queue;
315 // secure that two threads can change the event at the same time
316 mutable std::shared_mutex m_eventGuard;
317 /** Event which is representing the last enqueued task/action by alpaka
318 *
319 * @attention You should not use the event directly, use always getLastEvent() or setLastEvent().
320 * Tasks enqueued via the native handle outside of alpaka, will not be tracked by this event, therefore it
321 * can be possible that the queue is not empty but the event is already marked as complete. If you need to
322 * track also tasks enqueued outside of alpaka you should use onHost::wait(auto&&).
323 */
324 mutable sycl::event m_lastEvent;
325 core::CallbackThread m_callBackThread;
326 bool m_isBlocking{false};
327 };
328
329 } // namespace syclGeneric
330
331 template<typename T_Device, typename T_Task>
332 struct internal::Enqueue::HostTask<syclGeneric::Queue<T_Device>, T_Task>
333 {
334 void operator()(syclGeneric::Queue<T_Device>& queue, T_Task const& task) const
335 {
337 /* Using the queue by reference is fine here, because if the queue is destroyed during the native sycl host
338 * task is executed the sycl queue is still valid, in the destructure of the alpaka queue we wait until all
339 * native sycl queue tasks are processed. Accessing the callback thread is still allowed att his point in
340 * time. Capturing the queue as handle (shared pointer) will result into a deadlock because the native sycl
341 * host task is not allowed to destruct the alpaka3, we call in the destructor of the queue 'wait for the
342 * native sycl queue' which is than producing the deadlock.*/
343 sycl::event ev = queue.m_queue.submit(
344 [&queue, task](sycl::handler& cgh)
345 {
346 cgh.host_task(
347 [&queue, task]
348 {
349 auto f = queue.m_callBackThread.submit([t = std::move(task)] { t(); });
350 f.wait();
351 });
352 });
353 queue.setLastEvent(ev);
354 if(queue.isBlocking())
355 ev.wait_and_throw();
356 }
357 };
358
359 template<typename T_Device, typename T_Task>
360 struct internal::Enqueue::HostTaskDeferred<syclGeneric::Queue<T_Device>, T_Task>
361 {
362 // same as for Enqueue::HostTask, but not waiting for the task to finish
363 void operator()(syclGeneric::Queue<T_Device>& queue, T_Task const& task) const
364 {
366 /* Using the queue by reference is fine here, because if the queue is destroyed during the native sycl host
367 * task is executed the sycl queue is still valid, in the destructure of the alpaka queue we wait until all
368 * native sycl queue tasks are processed. Accessing the callback thread is still allowed att his point in
369 * time. Capturing the queue as handle (shared pointer) will result into a deadlock because the native sycl
370 * host task is not allowed to destruct the alpaka3, we call in the destructor of the queue 'wait for the
371 * native sycl queue' which is than producing the deadlock.*/
372 sycl::event ev = queue.m_queue.submit(
373 [&queue, task](sycl::handler& cgh)
374 {
375 cgh.host_task([&queue, task]() { queue.m_callBackThread.submit([t = std::move(task)] { t(); }); });
376 });
377 queue.setLastEvent(ev);
378 if(queue.isBlocking())
379 ev.wait_and_throw();
380 }
381 };
382
383 template<typename T_Device, typename T_Event>
384 struct internal::Enqueue::Event<syclGeneric::Queue<T_Device>, T_Event>
385 {
386 void operator()(syclGeneric::Queue<T_Device>& queue, T_Event& event) const
387 {
389
390 /* We do not use the last event of the queue itself because creating an emulated event allows to see newly
391 * submitted tasks add to the native sycl queue outside alpaka. */
392 sycl::event emulatedEvent = queue.m_queue.submit([](sycl::handler& cgh) { cgh.single_task([]() {}); });
393 event.setEvent(emulatedEvent);
394 if(queue.isBlocking())
395 emulatedEvent.wait_and_throw();
396 }
397 };
398
399 template<typename T_Device, typename T_Dest, typename T_Extents>
401 struct internal::Memset::Op<syclGeneric::Queue<T_Device>, T_Dest, T_Extents>
402 {
403 void operator()(syclGeneric::Queue<T_Device>& queue, auto&& dest, uint8_t byteValue, T_Extents const& extents)
404 const requires std::same_as<ALPAKA_TYPEOF(dest), T_Dest>
405 {
407 // TODO: implement generic version for multidimensional memory
408 sycl::queue sycl_queue = queue.getNativeHandle();
409 sycl::event ev = sycl_queue.memset(
411 byteValue,
412 extents.x() * sizeof(alpaka::trait::GetValueType_t<T_Dest>));
413 queue.setLastEvent(ev);
414 if(queue.isBlocking())
415 ev.wait_and_throw();
416 }
417 };
418
419 template<typename T_Device, typename T_Dest, typename T_Source, typename T_Extents>
421 struct internal::Memcpy::Op<syclGeneric::Queue<T_Device>, T_Dest, T_Source, T_Extents>
422 {
423 void operator()(
424 syclGeneric::Queue<T_Device>& queue,
425 auto&& dest,
426 T_Source const& source,
427 T_Extents const& extents) const requires std::same_as<ALPAKA_TYPEOF(dest), T_Dest>
428 {
430 // TODO: implement generic version for multidimensional memory
431 sycl::queue sycl_queue = queue.getNativeHandle();
432 sycl::event ev = sycl_queue.memcpy(
435 extents.x() * sizeof(alpaka::trait::GetValueType_t<T_Dest>));
436 queue.setLastEvent(ev);
437 if(queue.isBlocking())
438 ev.wait_and_throw();
439 }
440 };
441
442 template<typename T_Device, typename T_Dest, typename T_Value, typename T_Extents>
444 struct internal::Fill::Op<syclGeneric::Queue<T_Device>, T_Dest, T_Value, T_Extents>
445 {
446 void operator()(
447 syclGeneric::Queue<T_Device>& queue,
448 auto&& dest,
449 T_Value elementValue,
450 T_Extents const& extents) const
451 requires std::same_as<ALPAKA_TYPEOF(dest), T_Dest>
452 && std::same_as<alpaka::trait::GetValueType_t<ALPAKA_TYPEOF(dest)>, T_Value>
453 {
455 sycl::queue sycl_queue = queue.getNativeHandle();
456 sycl::event ev = sycl_queue.fill(internal::Data::data(dest), elementValue, extents.x());
457 queue.setLastEvent(ev);
458 if(queue.isBlocking())
459 ev.wait_and_throw();
460 }
461 };
462
463 /** The code is a copy of the Alloc::Op with the difference that the memory is allocated and freed
464 * within a queue
465 */
466 template<typename T_Type, typename T_Device, alpaka::concepts::Vector T_Extents>
467 struct internal::AllocDeferred::Op<T_Type, syclGeneric::Queue<T_Device>, T_Extents>
468 {
469 auto operator()(syclGeneric::Queue<T_Device>& queue, T_Extents const& extents) const
470 {
472 auto device = queue.getDevice();
473 constexpr uint32_t alignment = api::util::simdOptimizedAlignment<T_Type>(
474 ALPAKA_TYPEOF(getApi(device)){},
475 ALPAKA_TYPEOF(getDeviceKind(device)){});
476 auto [memSizeInByte, pitches] = api::util::emulatedAlignedMemDescription<T_Type>(alignment, extents);
477
478 auto deviceDependency = onHost::Device{queue.getDevice()->getSharedPtr()};
479 sycl::queue sycl_queue = queue.getNativeHandle();
480 auto queueDependency = queue.getSharedPtr();
481
482
483 T_Type* ptr = reinterpret_cast<T_Type*>(sycl::aligned_alloc_device(alignment, memSizeInByte, sycl_queue));
484
485 // guarantees that the allocation is blocking the queue if necessary.
486 if(queue.isBlocking())
487 sycl_queue.wait_and_throw();
488
489 auto deleter = [queueDep = std::move(queueDependency), ptr]()
490 {
491 sycl::queue sycl_queue = queueDep->getNativeHandle();
492 /* in cases where the deleter lifetime is extended e.g. by using keepAlive() on a buffer it can be that
493 * the queue callback thread is holding the last instance of the deleter. keepAlive() is executed
494 * within a sycl host tasks, it is forbidden to create another host task in a host task, result will be
495 * a deadlock. Therefore, we submit the host task to free the memory first to the callback thread which
496 * is than enqueuing the host task. This means that we can guarantee that the memory is freed after all
497 * work, enqueued at the moment where the deleter is executed, in the sycl queue is finished. The
498 * memory will be freed a little bit later than it could in cases other threads enqueue now kernel,
499 * tasks into the sycl queue while the callback thread is creating the host tasks.
500 */
501 queueDep->m_callBackThread.submit(
502 [sycl_queue, ptr]() mutable
503 {
504 sycl_queue.submit([&](sycl::handler& cgh)
505 { cgh.host_task([=]() { sycl::free(toVoidPtr(ptr), sycl_queue); }); });
506 });
507 };
508
509 auto sharedBuffer = onHost::SharedBuffer{
510 deviceDependency,
511 ptr,
512 extents,
513 pitches,
514 std::move(deleter),
515 Alignment<alignment>{}};
516 return sharedBuffer;
517 }
518 };
519} // namespace alpaka::onHost
520
521namespace alpaka::internal
522
523{
524 template<typename T_Device>
525 struct GetApi::Op<alpaka::onHost::syclGeneric::Queue<T_Device>>
526 {
527 inline constexpr auto operator()(auto&& queue) const
528 {
529 return alpaka::getApi(queue.m_device);
530 }
531 };
532} // namespace alpaka::internal
533
534#endif
#define ALPAKA_TYPEOF(...)
Get the type of instance.
Definition common.hpp:153
#define ALPAKA_FORWARD(instance)
Perfectly forward an instance as argument.
Definition common.hpp:147
#define ALPAKA_LOG_FUNCTION(logLvl)
Log the entry and exit of a scope.
Definition logger.hpp:95
auto emulatedAlignedMemDescription(uint32_t alignmentInByte, T_Extents extents)
provides a memory description to create multidimensional linewise aligned memory within a one dimensi...
Definition util.hpp:100
constexpr auto simdOptimizedAlignment(auto api, alpaka::concepts::DeviceKind auto deviceKind)
Calculate the best alignment for SIMD optimized memory allocation.
Definition util.hpp:140
constexpr auto alpaka
Definition fn.hpp:66
alpaka internal implementations.
Definition generic.hpp:19
constexpr auto getDeviceKind(auto &&any)
Definition interface.hpp:85
constexpr WarpSize warpSize
Definition tag.hpp:44
constexpr Device device
Definition scope.hpp:70
bool isQueueEmpty(auto &queue)
constexpr auto getDevice(auto &&any)
Definition interface.hpp:77
void waitFor(auto &queue, auto &event)
constexpr auto queue
Definition lvl.hpp:127
constexpr auto memory
Definition lvl.hpp:112
constexpr auto event
Definition lvl.hpp:97
Functionality which is usable on the host CPU controller thread.
Definition api.hpp:40
auto getNativeHandle(auto const &handle)
Get the native handle of an handle.
SharedBuffer(T_Any const &, T_Type *, T_UserExtents const &, T_UserPitches const &, std::invocable<> auto, T_MemAlignment const) -> SharedBuffer< ALPAKA_TYPEOF(getApi(std::declval< T_Any >())), T_Type, typename T_UserPitches::UniVec, T_MemAlignment >
std::convertible_to< std::string > auto getName(auto &&any)
Runtime name for a given object.
Definition interface.hpp:96
Device(Handle< T_Device > &&) -> Device< ALPAKA_TYPEOF(alpaka::internal::getApi(std::declval< T_Device >())), ALPAKA_TYPEOF(alpaka::internal::getDeviceKind(std::declval< T_Device >()))>
void wait(alpaka::concepts::HasGet auto &handle)
wait for all work to be finished
Queue(Handle< T_Queue > &&, T_QueueKind) -> Queue< Device< ALPAKA_TYPEOF(alpaka::internal::getApi(std::declval< T_Queue >())), ALPAKA_TYPEOF(alpaka::internal::getDeviceKind(std::declval< T_Queue >()))>, T_QueueKind >
typename GetValueType< T >::type GetValueType_t
Definition trait.hpp:65
constexpr uint32_t getDim_v
Definition trait.hpp:41
auto * toVoidPtr(T inPtr)
Cast a pointer that may or may not point to volatile memory to a (void*) or (void const*).
Definition util.hpp:34
constexpr decltype(auto) getDeviceKind(auto &&any)
Get the device type of an object.
Definition interface.hpp:52
constexpr decltype(auto) getApi(auto &&any)
Get the API an object depends on.
Definition interface.hpp:23
constexpr auto operator()(auto &&any) const
Definition interface.hpp:55
void operator()(T_Any &any, T_Extents const &) const
static decltype(auto) data(auto &&any)
void operator()(T_Queue &queue, T_Event &event) const
void operator()(T_Queue &queue, T_Task const &task) const
void operator()(T_Queue &queue, T_Task const &task) const