11#include <condition_variable>
29# pragma clang diagnostic push
30# pragma clang diagnostic ignored "-Wweak-vtables"
37# pragma clang diagnostic pop
40 virtual ~Task() =
default;
41 virtual void run() = 0;
44 template<
typename Function>
45 struct FunctionHolder : Task
49 template<
typename FunctionFwd>
50 explicit FunctionHolder(FunctionFwd&& func) : m_func{std::forward<FunctionFwd>(func)}
61 using TaskPackage = std::pair<std::unique_ptr<Task>, std::promise<void>>;
65 std::queue<TaskPackage> m_tasks;
67 std::condition_variable m_cond;
71 CallbackThread(uint32_t numaIdx) : m_state(std::make_shared<State>()), m_numaIdx{numaIdx}
75 CallbackThread() : m_state(std::make_shared<State>())
82 std::unique_lock<std::mutex> lock{m_state->m_mutex};
83 m_thread.request_stop();
85 m_state->m_cond.notify_one();
88 if(m_thread.joinable())
90 if(std::this_thread::get_id() == m_thread.get_id())
104 template<
typename NullaryFunction>
105 auto submit(NullaryFunction&& nf) -> std::future<void>
107 using DecayedFunction = std::decay_t<NullaryFunction>;
109 std::is_void_v<std::invoke_result_t<DecayedFunction>>,
110 "Submitted function must not have any arguments and return void.");
115 std::make_unique<FunctionHolder<DecayedFunction>>(std::forward<NullaryFunction>(nf)),
116 std::promise<void>{});
117 auto f = tp.second.get_future();
119 std::unique_lock<std::mutex> lock{m_state->m_mutex};
120 m_state->m_tasks.emplace(std::move(tp));
121 if(!m_thread.joinable())
123 m_state->m_cond.notify_one();
131 std::unique_lock<std::mutex> lock{m_state->m_mutex};
132 return m_state->m_tasks.empty();
136 std::jthread m_thread;
138 std::shared_ptr<State> m_state;
139 uint32_t m_numaIdx = onHost::internal::hwloc::allNumaDomains;
141 auto startWorkerThread() ->
void
143 m_thread = std::jthread(
144 [state = m_state, numaIdx = m_numaIdx](std::stop_token st)
146 if(numaIdx != onHost::internal::hwloc::allNumaDomains)
147 onHost::internal::hwloc::setThreadAffinity(numaIdx);
151 std::promise<void> taskPromise;
152 std::exception_ptr eptr;
155 std::unique_ptr<Task> task =
nullptr;
157 std::unique_lock<std::mutex> lock{state->m_mutex};
160 [&state, &st] {
return st.stop_requested() || !state->m_tasks.empty(); });
162 if(st.stop_requested() && state->m_tasks.empty())
165 task = std::move(state->m_tasks.front().first);
166 taskPromise = std::move(state->m_tasks.front().second);
175 eptr = std::current_exception();
178 std::unique_lock<std::mutex> lock{state->m_mutex};
181 state->m_tasks.pop();
188 taskPromise.set_exception(std::move(eptr));
190 taskPromise.set_value();
#define ALPAKA_COMP_CLANG