src/ex/thread_pool.cpp

100.0% Lines (128/128) 100.0% List of functions (25/25)
thread_pool.cpp
f(x) Functions (25)
Function Calls Lines Blocks
boost::capy::thread_pool::impl::push(boost::capy::continuation*) :58 827x 100.0% 100.0% boost::capy::thread_pool::impl::pop() :68 984x 100.0% 100.0% boost::capy::thread_pool::impl::empty() const :79 1058x 100.0% 100.0% boost::capy::thread_pool::impl::~impl() :96 157x 100.0% 100.0% boost::capy::thread_pool::impl::drain_abandoned() :104 157x 100.0% 100.0% boost::capy::thread_pool::impl::impl(unsigned long, std::basic_string_view<char, std::char_traits<char> >) :114 157x 100.0% 72.0% boost::capy::thread_pool::impl::post(boost::capy::continuation&) :127 827x 100.0% 100.0% boost::capy::thread_pool::impl::on_work_started() :138 345x 100.0% 100.0% boost::capy::thread_pool::impl::on_work_finished() :144 345x 100.0% 100.0% boost::capy::thread_pool::impl::join() :158 168x 100.0% 85.0% boost::capy::thread_pool::impl::join()::{lambda()#1}::operator()() const :174 60x 100.0% 100.0% boost::capy::thread_pool::impl::stop() :186 159x 100.0% 100.0% boost::capy::thread_pool::impl::ensure_started() :198 827x 100.0% 100.0% boost::capy::thread_pool::impl::ensure_started()::{lambda()#1}::operator()() const :200 101x 100.0% 100.0% boost::capy::thread_pool::impl::ensure_started()::{lambda()#1}::operator()() const::{lambda()#1}::operator()() const :203 179x 100.0% 100.0% boost::capy::thread_pool::impl::run(unsigned long) :208 179x 100.0% 84.0% boost::capy::thread_pool::impl::run(unsigned long)::{lambda()#1}::operator()() const :220 1058x 100.0% 100.0% boost::capy::thread_pool::~thread_pool() :236 157x 100.0% 100.0% boost::capy::thread_pool::thread_pool(unsigned long, std::basic_string_view<char, std::char_traits<char> >) :247 157x 100.0% 55.0% boost::capy::thread_pool::join() :255 11x 100.0% 100.0% boost::capy::thread_pool::stop() :262 2x 100.0% 100.0% boost::capy::thread_pool::get_executor() const :271 163x 100.0% 100.0% boost::capy::thread_pool::executor_type::on_work_started() const :279 345x 100.0% 100.0% boost::capy::thread_pool::executor_type::on_work_finished() const :286 345x 100.0% 100.0% boost::capy::thread_pool::executor_type::post(boost::capy::continuation&) const :293 827x 100.0% 100.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 // Copyright (c) 2026 Michael Vandeberg
4 //
5 // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 //
8 // Official repository: https://github.com/boostorg/capy
9 //
10
11 #include <boost/capy/ex/thread_pool.hpp>
12 #include <boost/capy/continuation.hpp>
13 #include <boost/capy/test/thread_name.hpp>
14 #include <algorithm>
15 #include <atomic>
16 #include <condition_variable>
17 #include <cstdio>
18 #include <mutex>
19 #include <thread>
20 #include <vector>
21
22 /*
23 Thread pool implementation using a shared work queue.
24
25 Work items are continuations linked via their intrusive next pointer,
26 stored in a single queue protected by a mutex. No per-post heap
27 allocation: the continuation is owned by the caller and linked
28 directly. Worker threads wait on a condition_variable until work
29 is available or stop is requested.
30
31 Threads are started lazily on first post() via std::call_once to avoid
32 spawning threads for pools that are constructed but never used. Each
33 thread is named with a configurable prefix plus index for debugger
34 visibility.
35
36 Work tracking: on_work_started/on_work_finished maintain an atomic
37 outstanding_work_ counter. join() blocks until this counter reaches
38 zero, then signals workers to stop and joins threads.
39
40 Two shutdown paths:
41 - join(): waits for outstanding work to drain, then stops workers.
42 - stop(): immediately signals workers to exit; queued work is abandoned.
43 - Destructor: stop() then join() (abandon + wait for threads).
44 */
45
46 namespace boost {
47 namespace capy {
48
49 //------------------------------------------------------------------------------
50
51 class thread_pool::impl
52 {
53 // Intrusive queue of continuations via continuation::next.
54 // No per-post allocation: the continuation is owned by the caller.
55 continuation* head_ = nullptr;
56 continuation* tail_ = nullptr;
57
58 827x void push(continuation* c) noexcept
59 {
60 827x c->next = nullptr;
61 827x if(tail_)
62 598x tail_->next = c;
63 else
64 229x head_ = c;
65 827x tail_ = c;
66 827x }
67
68 984x continuation* pop() noexcept
69 {
70 984x if(!head_)
71 157x return nullptr;
72 827x continuation* c = head_;
73 827x head_ = head_->next;
74 827x if(!head_)
75 229x tail_ = nullptr;
76 827x return c;
77 }
78
79 1058x bool empty() const noexcept
80 {
81 1058x return head_ == nullptr;
82 }
83
84 std::mutex mutex_;
85 std::condition_variable work_cv_;
86 std::condition_variable done_cv_;
87 std::vector<std::thread> threads_;
88 std::atomic<std::size_t> outstanding_work_{0};
89 bool stop_{false};
90 bool joined_{false};
91 std::size_t num_threads_;
92 char thread_name_prefix_[13]{}; // 12 chars max + null terminator
93 std::once_flag start_flag_;
94
95 public:
96 157x ~impl() = default;
97
98 // Destroy abandoned coroutine frames. Must be called
99 // before execution_context::shutdown()/destroy() so
100 // that suspended-frame destructors (e.g. delay_awaitable
101 // calling timer_service::cancel()) run while services
102 // are still valid.
103 void
104 157x drain_abandoned() noexcept
105 {
106 341x while(auto* c = pop())
107 {
108 184x auto h = c->h;
109 184x if(h && h != std::noop_coroutine())
110 132x h.destroy();
111 184x }
112 157x }
113
114 157x impl(std::size_t num_threads, std::string_view thread_name_prefix)
115 157x : num_threads_(num_threads)
116 {
117 157x if(num_threads_ == 0)
118 4x num_threads_ = std::max(
119 2x std::thread::hardware_concurrency(), 1u);
120
121 // Truncate prefix to 12 chars, leaving room for up to 3-digit index.
122 157x auto n = thread_name_prefix.copy(thread_name_prefix_, 12);
123 157x thread_name_prefix_[n] = '\0';
124 157x }
125
126 void
127 827x post(continuation& c)
128 {
129 827x ensure_started();
130 {
131 827x std::lock_guard<std::mutex> lock(mutex_);
132 827x push(&c);
133 827x }
134 827x work_cv_.notify_one();
135 827x }
136
137 void
138 345x on_work_started() noexcept
139 {
140 345x outstanding_work_.fetch_add(1, std::memory_order_acq_rel);
141 345x }
142
143 void
144 345x on_work_finished() noexcept
145 {
146 345x if(outstanding_work_.fetch_sub(
147 345x 1, std::memory_order_acq_rel) == 1)
148 {
149 85x std::lock_guard<std::mutex> lock(mutex_);
150 85x if(joined_ && !stop_)
151 4x stop_ = true;
152 85x done_cv_.notify_all();
153 85x work_cv_.notify_all();
154 85x }
155 345x }
156
157 void
158 168x join() noexcept
159 {
160 {
161 168x std::unique_lock<std::mutex> lock(mutex_);
162 168x if(joined_)
163 11x return;
164 157x joined_ = true;
165
166 157x if(outstanding_work_.load(
167 157x std::memory_order_acquire) == 0)
168 {
169 102x stop_ = true;
170 102x work_cv_.notify_all();
171 }
172 else
173 {
174 55x done_cv_.wait(lock, [this]{
175 60x return stop_;
176 });
177 }
178 168x }
179
180 336x for(auto& t : threads_)
181 179x if(t.joinable())
182 179x t.join();
183 }
184
185 void
186 159x stop() noexcept
187 {
188 {
189 159x std::lock_guard<std::mutex> lock(mutex_);
190 159x stop_ = true;
191 159x }
192 159x work_cv_.notify_all();
193 159x done_cv_.notify_all();
194 159x }
195
196 private:
197 void
198 827x ensure_started()
199 {
200 827x std::call_once(start_flag_, [this]{
201 101x threads_.reserve(num_threads_);
202 280x for(std::size_t i = 0; i < num_threads_; ++i)
203 358x threads_.emplace_back([this, i]{ run(i); });
204 101x });
205 827x }
206
207 void
208 179x run(std::size_t index)
209 {
210 // Build name; set_current_thread_name truncates to platform limits.
211 char name[16];
212 179x std::snprintf(name, sizeof(name), "%s%zu", thread_name_prefix_, index);
213 179x set_current_thread_name(name);
214
215 for(;;)
216 {
217 822x continuation* c = nullptr;
218 {
219 822x std::unique_lock<std::mutex> lock(mutex_);
220 822x work_cv_.wait(lock, [this]{
221 1383x return !empty() ||
222 1383x stop_;
223 });
224 822x if(stop_)
225 358x return;
226 643x c = pop();
227 822x }
228 643x if(c)
229 643x c->h.resume();
230 643x }
231 }
232 };
233
234 //------------------------------------------------------------------------------
235
236 157x thread_pool::
237 ~thread_pool()
238 {
239 157x impl_->stop();
240 157x impl_->join();
241 157x impl_->drain_abandoned();
242 157x shutdown();
243 157x destroy();
244 157x delete impl_;
245 157x }
246
247 157x thread_pool::
248 157x thread_pool(std::size_t num_threads, std::string_view thread_name_prefix)
249 157x : impl_(new impl(num_threads, thread_name_prefix))
250 {
251 157x this->set_frame_allocator(std::allocator<void>{});
252 157x }
253
254 void
255 11x thread_pool::
256 join() noexcept
257 {
258 11x impl_->join();
259 11x }
260
261 void
262 2x thread_pool::
263 stop() noexcept
264 {
265 2x impl_->stop();
266 2x }
267
268 //------------------------------------------------------------------------------
269
270 thread_pool::executor_type
271 163x thread_pool::
272 get_executor() const noexcept
273 {
274 163x return executor_type(
275 163x const_cast<thread_pool&>(*this));
276 }
277
278 void
279 345x thread_pool::executor_type::
280 on_work_started() const noexcept
281 {
282 345x pool_->impl_->on_work_started();
283 345x }
284
285 void
286 345x thread_pool::executor_type::
287 on_work_finished() const noexcept
288 {
289 345x pool_->impl_->on_work_finished();
290 345x }
291
292 void
293 827x thread_pool::executor_type::
294 post(continuation& c) const
295 {
296 827x pool_->impl_->post(c);
297 827x }
298
299 } // capy
300 } // boost
301