35 template<
typename T_Device>
36 struct Queue : std::enable_shared_from_this<Queue<T_Device>>
39 Queue(internal::concepts::DeviceHandle
auto device, uint32_t
const idx, uint32_t numIdx,
bool isBlocking)
40 : m_device(
std::move(device))
43 , m_workerThread(numIdx)
44 , m_isBlocking(isBlocking)
52 internal::wait(*
this);
63 return m_idx == other.m_idx && m_device == other.m_device;
68 return !(*
this == other);
74 static_assert(internal::concepts::Queue<Queue>);
79 uint32_t m_numaIdx = 0u;
80 core::CallbackThread m_workerThread;
81 bool m_isBlocking{
false};
88 std::atomic<bool> m_isBlockingTaskExecuted{
false};
103 template<
typename T_Fn>
104 auto submit(T_Fn&& fn)
109 std::lock_guard<std::mutex> lk(m_mutex);
110 m_isBlockingTaskExecuted =
true;
114#if defined(__GNUC__) && !defined(__clang__)
115# pragma GCC diagnostic push
116# pragma GCC diagnostic ignored "-Wtsan"
119 std::promise<void> p;
120 auto f = p.get_future();
122#if defined(__GNUC__) && !defined(__clang__)
123# pragma GCC diagnostic pop
125 m_isBlockingTaskExecuted =
false;
131 return m_workerThread.submit(std::forward<T_Fn>(fn));
134 friend struct alpaka::internal::GetName;
136 std::string getName()
const
138 return std::string(
"host::Queue id=") + std::to_string(m_idx);
141 friend struct internal::GetNativeHandle;
143 [[nodiscard]]
auto getNativeHandle() const noexcept
148 friend struct internal::Enqueue;
150 template<alpaka::onHost::concepts::ThreadSpec T_ThreadSpec>
151 void enqueue(T_ThreadSpec
const& threadSpec,
auto const& kernelBundle)
155 "'exec::anyExecutor' can not be used to enqueue an kernel.");
162 bool setThreadAffinity = m_isBlocking;
164 [kernelBundle, threadSpec, deviceKind, numIdx = m_numaIdx, setThreadAffinity]()
166 auto moreLayer =
Dict{
167 DictEntry(object::launchedWidthFrameSpec, std::false_type{}),
170 DictEntry(object::exec, threadSpec.getExecutor())};
171 onAcc::Acc acc =
makeAcc(threadSpec, numIdx, setThreadAffinity);
172 acc(kernelBundle, moreLayer);
176 template<alpaka::onHost::concepts::FrameSpec T_FrameSpec>
177 void enqueue(T_FrameSpec
const& frameSpec,
auto const& kernelBundle)
181 "'exec::anyExecutor' can not be used to enqueue an kernel.");
183 auto adjustedThreadSpec = internal::adjustThreadSpec(*m_device.get(), frameSpec, kernelBundle);
189 bool setThreadAffinity = m_isBlocking;
191 [kernelBundle, adjustedThreadSpec, deviceKind, numIdx = m_numaIdx, setThreadAffinity]()
193 auto moreLayer =
Dict{
194 DictEntry(object::launchedWidthFrameSpec, std::true_type{}),
197 DictEntry(object::exec, adjustedThreadSpec.getExecutor())};
198 onAcc::Acc acc =
makeAcc(adjustedThreadSpec, numIdx, setThreadAffinity);
199 acc(kernelBundle, moreLayer);
208 void enqueueHostFn(
auto const& task)
211 submit([task]() { task(); });
214 void enqueueHostFnDeferred(
auto const& task)
217 m_workerThread.submit(task);
220 friend struct alpaka::internal::GetDeviceType;
222 auto getDeviceKind()
const
224 return alpaka::internal::getDeviceKind(*m_device.get());
227 auto getDevice()
const
232 std::shared_ptr<Queue> getSharedPtr()
234 return this->shared_from_this();
237 friend struct internal::IsQueueEmpty;
247 bool isQueueEmpty()
const
253 return !m_isBlockingTaskExecuted;
257 return m_workerThread.isEmpty();
261 friend struct onHost::internal::GetDevice;
263 friend struct internal::Wait;
264 friend struct internal::WaitFor;
265 friend struct internal::Memcpy;
266 friend struct internal::MemcpyDeviceGlobal;
267 friend struct internal::Memset;
268 friend struct alpaka::internal::GetApi;
269 friend struct internal::AllocDeferred;
275 template<
typename T_Device>
276 struct Wait::Op<
cpu::
Queue<T_Device>>
278 void operator()(cpu::Queue<T_Device>& queue)
const
284 if(
queue.isQueueEmpty() ==
false)
291 template<
typename T_Device,
typename T_Event>
292 struct Enqueue::Event<
cpu::
Queue<T_Device>, T_Event>
294 void operator()(cpu::Queue<T_Device>& queue, T_Event& event)
const
300 std::lock_guard<std::mutex> lk(
event.m_mutex);
302 ++
event.m_enqueueCount;
304 auto const enqueueCount =
event.m_enqueueCount;
309 if(
queue.m_isBlocking)
312 if(enqueueCount ==
event.m_enqueueCount)
314 event.m_LastReadyEnqueueCount = std::max(enqueueCount,
event.m_LastReadyEnqueueCount);
317 std::promise<void> p;
319 event.m_future = p.get_future();
323 auto sharedEvent =
event.getSharedPtr();
325 event.m_future =
queue.submit(
326 [sharedEvent, enqueueCount]()
mutable
328 std::unique_lock<std::mutex> lk2(sharedEvent->m_mutex);
331 if(enqueueCount == sharedEvent->m_enqueueCount)
333 sharedEvent->m_LastReadyEnqueueCount
334 = std::max(enqueueCount, sharedEvent->m_LastReadyEnqueueCount);
342 template<
typename T_Device,
typename T_Event>
343 struct WaitFor::Op<
cpu::
Queue<T_Device>, T_Event>
345 void operator()(cpu::Queue<T_Device>& queue, cpu::Event<T_Device>& event)
const
351 std::unique_lock<std::mutex> lk(
event.m_mutex);
358 if(
queue.m_isBlocking)
360 std::shared_future sFuture =
event.m_future;
366 auto sharedEvent =
event.getSharedPtr();
367 auto oldFuture =
event.m_future;
372 queue.submit([sharedEvent, oldFuture]() { oldFuture.get(); });
379 template<
typename T_Device,
typename T_Dest,
typename T_Source,
typename T_Extents>
380 struct Memcpy::Op<
cpu::
Queue<T_Device>, T_Dest, T_Source, T_Extents>
382 void operator()(cpu::Queue<T_Device>& queue,
auto&& dest, T_Source
const& source, T_Extents
const& extents)
394 if constexpr(dim == 1u)
397 [extents, destPtr, srcPtr]()
405 auto destPitchBytesWithoutColumn = dest.getPitches().eraseBack();
406 auto sourcePitchBytesWithoutColumn = source.getPitches().eraseBack();
409 [extents, destPtr, srcPtr, destPitchBytesWithoutColumn, sourcePitchBytesWithoutColumn]()
411 auto const dstExtentWithoutColumn = extents.eraseBack();
412 if(
static_cast<std::size_t
>(extents.product()) != 0u)
415 dstExtentWithoutColumn,
419 reinterpret_cast<std::uint8_t*
>(destPtr)
420 + (idx * destPitchBytesWithoutColumn).sum(),
421 reinterpret_cast<std::uint8_t const*
>(srcPtr)
422 + (idx * sourcePitchBytesWithoutColumn).sum(),
423 static_cast<size_t>(extents.back())
433 template<
typename T_Device,
typename T_Source,
typename T_Storage,
typename T>
434 struct internal::MemcpyDeviceGlobal::
435 Op<cpu::Queue<T_Device>, onAcc::internal::GlobalDeviceMemoryWrapper<T_Storage, T>, T_Source>
438 cpu::Queue<T_Device>& queue,
439 onAcc::internal::GlobalDeviceMemoryWrapper<T_Storage, T> dest,
443 auto* destPtr = dest.getHandle(
api::host).data();
444 void const* srcPtr{
nullptr};
449 queue.submit([destPtr, srcPtr]() { std::memcpy(destPtr, srcPtr,
sizeof(T)); });
454 template<
typename T_Device,
typename T_Dest,
typename T_Storage,
typename T>
455 struct internal::MemcpyDeviceGlobal::
456 Op<cpu::Queue<T_Device>, T_Dest, onAcc::internal::GlobalDeviceMemoryWrapper<T_Storage, T>>
459 cpu::Queue<T_Device>& queue,
461 onAcc::internal::GlobalDeviceMemoryWrapper<T_Storage, T> source)
const
464 void* destPtr{
nullptr};
469 auto const* srcPtr = source.getHandle(
api::host).data();
470 queue.submit([destPtr, srcPtr]() { std::memcpy(destPtr, srcPtr,
sizeof(T)); });
474 template<
typename T_Device,
typename T_Dest,
typename T_Extents>
475 struct Memset::Op<
cpu::
Queue<T_Device>, T_Dest, T_Extents>
480 void operator()(cpu::Queue<T_Device>& queue,
auto&& dest, uint8_t byteValue, T_Extents
const& extents)
488 if constexpr(dim == 1u)
491 [extents, destPtr, byteValue]()
502 auto destPitchBytesWithoutColumn = dest.getPitches().eraseBack();
504 [extents, destPtr, destPitchBytesWithoutColumn, byteValue]()
506 auto const dstExtentWithoutColumn = extents.eraseBack();
507 if(
static_cast<std::size_t
>(extents.product()) != 0u)
510 dstExtentWithoutColumn,
514 reinterpret_cast<std::uint8_t*
>(destPtr)
515 + (idx * destPitchBytesWithoutColumn).sum(),
517 static_cast<size_t>(extents.back())
526 template<
typename T_Device,
typename T_Dest,
typename T_Value,
typename T_Extents>
527 struct Fill::Op<
cpu::
Queue<T_Device>, T_Dest, T_Value, T_Extents>
529 void operator()(cpu::Queue<T_Device>& queue,
auto&& dest, T_Value elementValue, T_Extents
const& extents)
535 alpaka::concepts::IView<T_Value>
auto dataView =
makeView(dest);
537 alpaka::internal::generic::fill(
540 dataView.getSubView(extents),
548 template<
typename T_Type,
typename T_Device, alpaka::concepts::Vector T_Extents>
549 struct AllocDeferred::Op<T_Type,
cpu::
Queue<T_Device>, T_Extents>
553 uint32_t result = 1u;
554 while((result << 1u) <= value)
561 auto operator()(cpu::Queue<T_Device>& queue, T_Extents
const& extents)
const
571 auto queueDependency =
queue.getSharedPtr();
574 device->pinPointer(ptr, memSizeInByte);
577 auto deleter = [ptr, queueDep = std::move(queueDependency)]()
586 Alignment<alignment>{}};
592 std::stringstream ss;
602namespace alpaka::internal
604 template<
typename T_Device>
605 struct GetApi::Op<onHost::cpu::Queue<T_Device>>
607 inline constexpr auto operator()(
auto&& queue)
const
#define ALPAKA_TYPEOF(...)
Get the type of instance.
#define ALPAKA_FORWARD(instance)
Perfectly forward an instance as argument.
#define ALPAKA_LOG_INFO(logLvl, callable)
Write a meta data message to the output.
#define ALPAKA_LOG_FUNCTION(logLvl)
Log the entry and exit of a scope.
consteval uint32_t highestPowerOfTwo(uint32_t value)
auto emulatedAlignedMemDescription(uint32_t alignmentInByte, T_Extents extents)
provides a memory description to create multidimensional linewise aligned memory within a one dimensi...
constexpr auto simdOptimizedAlignment(auto api, alpaka::concepts::DeviceKind auto deviceKind)
Calculate the best alignment for SIMD optimized memory allocation.
ALPAKA_FN_INLINE ALPAKA_FN_HOST void alignedFree(size_t alignment, auto ptr)
ALPAKA_FN_INLINE ALPAKA_FN_HOST auto alignedAlloc(size_t alignment, size_t size) -> void *
constexpr AnyExecutor anyExecutor
Automatic executor selection.
constexpr DeviceKind deviceKind
Functionality which is usable on the host CPU controller thread.
constexpr auto defaultExecutor(internal::concepts::DeviceHandle auto deviceHandle)
Select a default executor for the given device.
SharedBuffer(T_Any const &, T_Type *, T_UserExtents const &, T_UserPitches const &, std::invocable<> auto, T_MemAlignment const) -> SharedBuffer< ALPAKA_TYPEOF(getApi(std::declval< T_Any >())), T_Type, typename T_UserPitches::UniVec, T_MemAlignment >
std::shared_ptr< T > Handle
decltype(auto) data(auto &&any)
pointer to data of an object
auto makeAcc(alpaka::onHost::concepts::ThreadSpec auto const &threadSpec, uint32_t numaIdx, bool setThreadAffinity)
Device(Handle< T_Device > &&) -> Device< ALPAKA_TYPEOF(alpaka::internal::getApi(std::declval< T_Device >())), ALPAKA_TYPEOF(alpaka::internal::getDeviceKind(std::declval< T_Device >()))>
void wait(alpaka::concepts::HasGet auto &handle)
wait for all work to be finished
Queue(Handle< T_Queue > &&, T_QueueKind) -> Queue< Device< ALPAKA_TYPEOF(alpaka::internal::getApi(std::declval< T_Queue >())), ALPAKA_TYPEOF(alpaka::internal::getDeviceKind(std::declval< T_Queue >()))>, T_QueueKind >
typename GetValueType< T >::type GetValueType_t
constexpr uint32_t getDim_v
auto * toVoidPtr(T inPtr)
Cast a pointer that may or may not point to volatile memory to a (void*) or (void const*).
constexpr decltype(auto) getDeviceKind(auto &&any)
Get the device type of an object.
constexpr decltype(auto) getApi(auto &&any)
Get the API an object depends on.
constexpr auto makeView(auto &&anyWithApi, T_ValueType *pointer, concepts::Vector auto const &extents, T_MemAlignment const memAlignment=T_MemAlignment{})
ALPAKA_FN_HOST_ACC Dict(Tuple< DictEntry< T_Keys, T_Values >... > const &) -> Dict< DictEntry< T_Keys, T_Values >... >
bool operator!=(Queue const &other) const
Queue & operator=(Queue const &)=delete
bool operator==(Queue const &other) const
Queue & operator=(Queue &&)=delete
Queue(internal::concepts::DeviceHandle auto device, uint32_t const idx, uint32_t numIdx, bool isBlocking)
Queue(Queue const &)=delete