TLA Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : // Copyright (c) 2026 Michael Vandeberg
4 : //
5 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 : //
8 : // Official repository: https://github.com/boostorg/capy
9 : //
10 :
11 : #include <boost/capy/ex/thread_pool.hpp>
12 : #include <boost/capy/continuation.hpp>
13 : #include <boost/capy/test/thread_name.hpp>
14 : #include <algorithm>
15 : #include <atomic>
16 : #include <condition_variable>
17 : #include <cstdio>
18 : #include <mutex>
19 : #include <thread>
20 : #include <vector>
21 :
22 : /*
23 : Thread pool implementation using a shared work queue.
24 :
25 : Work items are continuations linked via their intrusive next pointer,
26 : stored in a single queue protected by a mutex. No per-post heap
27 : allocation: the continuation is owned by the caller and linked
28 : directly. Worker threads wait on a condition_variable until work
29 : is available or stop is requested.
30 :
31 : Threads are started lazily on first post() via std::call_once to avoid
32 : spawning threads for pools that are constructed but never used. Each
33 : thread is named with a configurable prefix plus index for debugger
34 : visibility.
35 :
36 : Work tracking: on_work_started/on_work_finished maintain an atomic
37 : outstanding_work_ counter. join() blocks until this counter reaches
38 : zero, then signals workers to stop and joins threads.
39 :
40 : Two shutdown paths:
41 : - join(): waits for outstanding work to drain, then stops workers.
42 : - stop(): immediately signals workers to exit; queued work is abandoned.
43 : - Destructor: stop() then join() (abandon + wait for threads).
44 : */
45 :
46 : namespace boost {
47 : namespace capy {
48 :
49 : //------------------------------------------------------------------------------
50 :
51 : class thread_pool::impl
52 : {
53 : // Intrusive queue of continuations via continuation::next.
54 : // No per-post allocation: the continuation is owned by the caller.
55 : continuation* head_ = nullptr;
56 : continuation* tail_ = nullptr;
57 :
58 HIT 827 : void push(continuation* c) noexcept
59 : {
60 827 : c->next = nullptr;
61 827 : if(tail_)
62 598 : tail_->next = c;
63 : else
64 229 : head_ = c;
65 827 : tail_ = c;
66 827 : }
67 :
68 984 : continuation* pop() noexcept
69 : {
70 984 : if(!head_)
71 157 : return nullptr;
72 827 : continuation* c = head_;
73 827 : head_ = head_->next;
74 827 : if(!head_)
75 229 : tail_ = nullptr;
76 827 : return c;
77 : }
78 :
79 1058 : bool empty() const noexcept
80 : {
81 1058 : return head_ == nullptr;
82 : }
83 :
84 : std::mutex mutex_;
85 : std::condition_variable work_cv_;
86 : std::condition_variable done_cv_;
87 : std::vector<std::thread> threads_;
88 : std::atomic<std::size_t> outstanding_work_{0};
89 : bool stop_{false};
90 : bool joined_{false};
91 : std::size_t num_threads_;
92 : char thread_name_prefix_[13]{}; // 12 chars max + null terminator
93 : std::once_flag start_flag_;
94 :
95 : public:
96 157 : ~impl() = default;
97 :
98 : // Destroy abandoned coroutine frames. Must be called
99 : // before execution_context::shutdown()/destroy() so
100 : // that suspended-frame destructors (e.g. delay_awaitable
101 : // calling timer_service::cancel()) run while services
102 : // are still valid.
103 : void
104 157 : drain_abandoned() noexcept
105 : {
106 341 : while(auto* c = pop())
107 : {
108 184 : auto h = c->h;
109 184 : if(h && h != std::noop_coroutine())
110 132 : h.destroy();
111 184 : }
112 157 : }
113 :
114 157 : impl(std::size_t num_threads, std::string_view thread_name_prefix)
115 157 : : num_threads_(num_threads)
116 : {
117 157 : if(num_threads_ == 0)
118 4 : num_threads_ = std::max(
119 2 : std::thread::hardware_concurrency(), 1u);
120 :
121 : // Truncate prefix to 12 chars, leaving room for up to 3-digit index.
122 157 : auto n = thread_name_prefix.copy(thread_name_prefix_, 12);
123 157 : thread_name_prefix_[n] = '\0';
124 157 : }
125 :
126 : void
127 827 : post(continuation& c)
128 : {
129 827 : ensure_started();
130 : {
131 827 : std::lock_guard<std::mutex> lock(mutex_);
132 827 : push(&c);
133 827 : }
134 827 : work_cv_.notify_one();
135 827 : }
136 :
137 : void
138 345 : on_work_started() noexcept
139 : {
140 345 : outstanding_work_.fetch_add(1, std::memory_order_acq_rel);
141 345 : }
142 :
143 : void
144 345 : on_work_finished() noexcept
145 : {
146 345 : if(outstanding_work_.fetch_sub(
147 345 : 1, std::memory_order_acq_rel) == 1)
148 : {
149 85 : std::lock_guard<std::mutex> lock(mutex_);
150 85 : if(joined_ && !stop_)
151 4 : stop_ = true;
152 85 : done_cv_.notify_all();
153 85 : work_cv_.notify_all();
154 85 : }
155 345 : }
156 :
157 : void
158 168 : join() noexcept
159 : {
160 : {
161 168 : std::unique_lock<std::mutex> lock(mutex_);
162 168 : if(joined_)
163 11 : return;
164 157 : joined_ = true;
165 :
166 157 : if(outstanding_work_.load(
167 157 : std::memory_order_acquire) == 0)
168 : {
169 102 : stop_ = true;
170 102 : work_cv_.notify_all();
171 : }
172 : else
173 : {
174 55 : done_cv_.wait(lock, [this]{
175 60 : return stop_;
176 : });
177 : }
178 168 : }
179 :
180 336 : for(auto& t : threads_)
181 179 : if(t.joinable())
182 179 : t.join();
183 : }
184 :
185 : void
186 159 : stop() noexcept
187 : {
188 : {
189 159 : std::lock_guard<std::mutex> lock(mutex_);
190 159 : stop_ = true;
191 159 : }
192 159 : work_cv_.notify_all();
193 159 : done_cv_.notify_all();
194 159 : }
195 :
196 : private:
197 : void
198 827 : ensure_started()
199 : {
200 827 : std::call_once(start_flag_, [this]{
201 101 : threads_.reserve(num_threads_);
202 280 : for(std::size_t i = 0; i < num_threads_; ++i)
203 358 : threads_.emplace_back([this, i]{ run(i); });
204 101 : });
205 827 : }
206 :
207 : void
208 179 : run(std::size_t index)
209 : {
210 : // Build name; set_current_thread_name truncates to platform limits.
211 : char name[16];
212 179 : std::snprintf(name, sizeof(name), "%s%zu", thread_name_prefix_, index);
213 179 : set_current_thread_name(name);
214 :
215 : for(;;)
216 : {
217 822 : continuation* c = nullptr;
218 : {
219 822 : std::unique_lock<std::mutex> lock(mutex_);
220 822 : work_cv_.wait(lock, [this]{
221 1383 : return !empty() ||
222 1383 : stop_;
223 : });
224 822 : if(stop_)
225 358 : return;
226 643 : c = pop();
227 822 : }
228 643 : if(c)
229 643 : c->h.resume();
230 643 : }
231 : }
232 : };
233 :
234 : //------------------------------------------------------------------------------
235 :
236 157 : thread_pool::
237 : ~thread_pool()
238 : {
239 157 : impl_->stop();
240 157 : impl_->join();
241 157 : impl_->drain_abandoned();
242 157 : shutdown();
243 157 : destroy();
244 157 : delete impl_;
245 157 : }
246 :
247 157 : thread_pool::
248 157 : thread_pool(std::size_t num_threads, std::string_view thread_name_prefix)
249 157 : : impl_(new impl(num_threads, thread_name_prefix))
250 : {
251 157 : this->set_frame_allocator(std::allocator<void>{});
252 157 : }
253 :
254 : void
255 11 : thread_pool::
256 : join() noexcept
257 : {
258 11 : impl_->join();
259 11 : }
260 :
261 : void
262 2 : thread_pool::
263 : stop() noexcept
264 : {
265 2 : impl_->stop();
266 2 : }
267 :
268 : //------------------------------------------------------------------------------
269 :
270 : thread_pool::executor_type
271 163 : thread_pool::
272 : get_executor() const noexcept
273 : {
274 163 : return executor_type(
275 163 : const_cast<thread_pool&>(*this));
276 : }
277 :
278 : void
279 345 : thread_pool::executor_type::
280 : on_work_started() const noexcept
281 : {
282 345 : pool_->impl_->on_work_started();
283 345 : }
284 :
285 : void
286 345 : thread_pool::executor_type::
287 : on_work_finished() const noexcept
288 : {
289 345 : pool_->impl_->on_work_finished();
290 345 : }
291 :
292 : void
293 827 : thread_pool::executor_type::
294 : post(continuation& c) const
295 : {
296 827 : pool_->impl_->post(c);
297 827 : }
298 :
299 : } // capy
300 : } // boost
|