alpaka
Abstraction Library for Parallel Kernel Acceleration
Loading...
Searching...
No Matches
CallbackThread.hpp
Go to the documentation of this file.
1/* Copyright 2022 Antonio Di Pilato
2 * SPDX-License-Identifier: MPL-2.0
3 */
4
5#pragma once
6
9
10#include <cassert>
11#include <condition_variable>
12#include <functional>
13#include <future>
14#include <iostream>
15#include <mutex>
16#include <queue>
17#include <thread>
18
19namespace alpaka::core
20{
21 /** A thread queue executing tasks asynchronously.
22 *
23 * This object should be used as a member of objects which are secured by smart pointers to avoid that a task is
24 * taking over the ownership of the callback thread and therefore can destroy itself before all tasks are executed.
25 */
27 {
28#if ALPAKA_COMP_CLANG
29# pragma clang diagnostic push
30# pragma clang diagnostic ignored "-Wweak-vtables"
31#endif
32 // A custom class is used because std::function<F> requires F to be copyable, and std::packaged_task provides a
33 // std::future which will keep the task alive and we cannot control the moment the future is set.
34 //! \todo with C++23 std::move_only_function should be used
35 struct Task
37# pragma clang diagnostic pop
38#endif
39 {
40 virtual ~Task() = default;
41 virtual void run() = 0;
42 };
43
44 template<typename Function>
46 {
47 Function m_func;
48
49 template<typename FunctionFwd>
50 explicit FunctionHolder(FunctionFwd&& func) : m_func{std::forward<FunctionFwd>(func)}
51 {
52 }
53
54 void run() override
55 {
56 // if m_func throws, let it propagate
57 m_func();
58 }
59 };
60
61 using TaskPackage = std::pair<std::unique_ptr<Task>, std::promise<void>>;
62
63 struct State
64 {
65 std::queue<TaskPackage> m_tasks;
66 std::mutex m_mutex;
67 std::condition_variable m_cond;
68 };
69
70 public:
71 CallbackThread(uint32_t numaIdx) : m_state(std::make_shared<State>()), m_numaIdx{numaIdx}
72 {
73 }
74
75 CallbackThread() : m_state(std::make_shared<State>())
76 {
77 }
78
80 {
81 {
82 std::unique_lock<std::mutex> lock{m_state->m_mutex};
83 m_thread.request_stop();
84 // wakeup the thread in case it is waiting
85 m_state->m_cond.notify_one();
86 }
87
88 if(m_thread.joinable())
89 {
90 if(std::this_thread::get_id() == m_thread.get_id())
91 {
92 /* We can not join ourselves.
93 * We can only end here if a task that the callback thread is executing is capturing the object
94 * which is holding the callback thread.
95 */
96 m_thread.detach();
97 }
98 else
99 m_thread.join();
100 }
101 }
102
103 //! It is guaranteed that the task is fully destroyed before the future's result is set.
104 template<typename NullaryFunction>
105 auto submit(NullaryFunction&& nf) -> std::future<void>
106 {
107 using DecayedFunction = std::decay_t<NullaryFunction>;
108 static_assert(
109 std::is_void_v<std::invoke_result_t<DecayedFunction>>,
110 "Submitted function must not have any arguments and return void.");
111
112 // FunctionHolder stores a copy of the user's task, but may be constructed from an expiring value to avoid
113 // the copy. We do NOT store a reference to the users task, which could dangle if the user isn't careful.
114 auto tp = std::pair(
115 std::make_unique<FunctionHolder<DecayedFunction>>(std::forward<NullaryFunction>(nf)),
116 std::promise<void>{});
117 auto f = tp.second.get_future();
118 {
119 std::unique_lock<std::mutex> lock{m_state->m_mutex};
120 m_state->m_tasks.emplace(std::move(tp));
121 if(!m_thread.joinable())
123 m_state->m_cond.notify_one();
124 }
125
126 return f;
127 }
128
129 bool isEmpty() const
130 {
131 std::unique_lock<std::mutex> lock{m_state->m_mutex};
132 return m_state->m_tasks.empty();
133 }
134
135 private:
136 std::jthread m_thread;
137 /** Hold data shared between this call and the thread processing the tasts. */
138 std::shared_ptr<State> m_state;
140
141 auto startWorkerThread() -> void
142 {
143 m_thread = std::jthread(
144 [state = m_state, numaIdx = m_numaIdx](std::stop_token st)
145 {
148
149 while(true)
150 {
151 std::promise<void> taskPromise;
152 std::exception_ptr eptr;
153 {
154 // Task is destroyed before promise is updated but after the queue state is up to date.
155 std::unique_ptr<Task> task = nullptr;
156 {
157 std::unique_lock<std::mutex> lock{state->m_mutex};
158 state->m_cond.wait(
159 lock,
160 [&state, &st] { return st.stop_requested() || !state->m_tasks.empty(); });
161
162 if(st.stop_requested() && state->m_tasks.empty())
163 break;
164
165 task = std::move(state->m_tasks.front().first);
166 taskPromise = std::move(state->m_tasks.front().second);
167 }
168 assert(task);
169 try
170 {
171 task->run();
172 }
173 catch(...)
174 {
175 eptr = std::current_exception();
176 }
177 {
178 std::unique_lock<std::mutex> lock{state->m_mutex};
179 // Pop empty data from the queue, task and promise will be destroyed later in a
180 // well-defined order.
181 state->m_tasks.pop();
182 }
183 // Task will be destroyed here, the queue status is already updated.
184 }
185 // In case the executed tasks is the last task in the queue the waiting threads will see the
186 // queue as empty.
187 if(eptr)
188 taskPromise.set_exception(std::move(eptr));
189 else
190 taskPromise.set_value();
191 }
192 });
193 }
194 };
195} // namespace alpaka::core
std::shared_ptr< State > m_state
Hold data shared between this call and the thread processing the tasts.
std::pair< std::unique_ptr< Task >, std::promise< void > > TaskPackage
auto submit(NullaryFunction &&nf) -> std::future< void >
It is guaranteed that the task is fully destroyed before the future's result is set.
#define ALPAKA_COMP_CLANG
Definition config.hpp:171
constexpr uint32_t allNumaDomains
Constant to select all NUMA domains.
Definition utility.hpp:31
void setThreadAffinity(uint32_t numaIdx)
Set the affinity of the current thread to all cores of the NUMA domain.
Definition utility.hpp:180
STL namespace.
std::queue< TaskPackage > m_tasks