1  
//
1  
//
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3  
// Copyright (c) 2026 Michael Vandeberg
3  
// Copyright (c) 2026 Michael Vandeberg
4  
//
4  
//
5  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
6  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7  
//
7  
//
8  
// Official repository: https://github.com/boostorg/capy
8  
// Official repository: https://github.com/boostorg/capy
9  
//
9  
//
10  

10  

11  
#include <boost/capy/ex/thread_pool.hpp>
11  
#include <boost/capy/ex/thread_pool.hpp>
12  
#include <boost/capy/continuation.hpp>
12  
#include <boost/capy/continuation.hpp>
13  
#include <boost/capy/test/thread_name.hpp>
13  
#include <boost/capy/test/thread_name.hpp>
14  
#include <algorithm>
14  
#include <algorithm>
15  
#include <atomic>
15  
#include <atomic>
16  
#include <condition_variable>
16  
#include <condition_variable>
17  
#include <cstdio>
17  
#include <cstdio>
18  
#include <mutex>
18  
#include <mutex>
19  
#include <thread>
19  
#include <thread>
20  
#include <vector>
20  
#include <vector>
21  

21  

22  
/*
22  
/*
23  
    Thread pool implementation using a shared work queue.
23  
    Thread pool implementation using a shared work queue.
24  

24  

25  
    Work items are continuations linked via their intrusive next pointer,
25  
    Work items are continuations linked via their intrusive next pointer,
26  
    stored in a single queue protected by a mutex. No per-post heap
26  
    stored in a single queue protected by a mutex. No per-post heap
27  
    allocation: the continuation is owned by the caller and linked
27  
    allocation: the continuation is owned by the caller and linked
28  
    directly. Worker threads wait on a condition_variable until work
28  
    directly. Worker threads wait on a condition_variable until work
29  
    is available or stop is requested.
29  
    is available or stop is requested.
30  

30  

31  
    Threads are started lazily on first post() via std::call_once to avoid
31  
    Threads are started lazily on first post() via std::call_once to avoid
32  
    spawning threads for pools that are constructed but never used. Each
32  
    spawning threads for pools that are constructed but never used. Each
33  
    thread is named with a configurable prefix plus index for debugger
33  
    thread is named with a configurable prefix plus index for debugger
34  
    visibility.
34  
    visibility.
35  

35  

36  
    Work tracking: on_work_started/on_work_finished maintain an atomic
36  
    Work tracking: on_work_started/on_work_finished maintain an atomic
37  
    outstanding_work_ counter. join() blocks until this counter reaches
37  
    outstanding_work_ counter. join() blocks until this counter reaches
38  
    zero, then signals workers to stop and joins threads.
38  
    zero, then signals workers to stop and joins threads.
39  

39  

40  
    Two shutdown paths:
40  
    Two shutdown paths:
41  
    - join(): waits for outstanding work to drain, then stops workers.
41  
    - join(): waits for outstanding work to drain, then stops workers.
42  
    - stop(): immediately signals workers to exit; queued work is abandoned.
42  
    - stop(): immediately signals workers to exit; queued work is abandoned.
43  
    - Destructor: stop() then join() (abandon + wait for threads).
43  
    - Destructor: stop() then join() (abandon + wait for threads).
44  
*/
44  
*/
45  

45  

46  
namespace boost {
46  
namespace boost {
47  
namespace capy {
47  
namespace capy {
48  

48  

49  
//------------------------------------------------------------------------------
49  
//------------------------------------------------------------------------------
50  

50  

51  
class thread_pool::impl
51  
class thread_pool::impl
52  
{
52  
{
53  
    // Intrusive queue of continuations via continuation::next.
53  
    // Intrusive queue of continuations via continuation::next.
54  
    // No per-post allocation: the continuation is owned by the caller.
54  
    // No per-post allocation: the continuation is owned by the caller.
55  
    continuation* head_ = nullptr;
55  
    continuation* head_ = nullptr;
56  
    continuation* tail_ = nullptr;
56  
    continuation* tail_ = nullptr;
57  

57  

58  
    void push(continuation* c) noexcept
58  
    void push(continuation* c) noexcept
59  
    {
59  
    {
60  
        c->next = nullptr;
60  
        c->next = nullptr;
61  
        if(tail_)
61  
        if(tail_)
62  
            tail_->next = c;
62  
            tail_->next = c;
63  
        else
63  
        else
64  
            head_ = c;
64  
            head_ = c;
65  
        tail_ = c;
65  
        tail_ = c;
66  
    }
66  
    }
67  

67  

68  
    continuation* pop() noexcept
68  
    continuation* pop() noexcept
69  
    {
69  
    {
70  
        if(!head_)
70  
        if(!head_)
71  
            return nullptr;
71  
            return nullptr;
72  
        continuation* c = head_;
72  
        continuation* c = head_;
73  
        head_ = head_->next;
73  
        head_ = head_->next;
74  
        if(!head_)
74  
        if(!head_)
75  
            tail_ = nullptr;
75  
            tail_ = nullptr;
76  
        return c;
76  
        return c;
77  
    }
77  
    }
78  

78  

79  
    bool empty() const noexcept
79  
    bool empty() const noexcept
80  
    {
80  
    {
81  
        return head_ == nullptr;
81  
        return head_ == nullptr;
82  
    }
82  
    }
83  

83  

84  
    std::mutex mutex_;
84  
    std::mutex mutex_;
85 -
    std::condition_variable cv_;
85 +
    std::condition_variable work_cv_;
 
86 +
    std::condition_variable done_cv_;
86  
    std::vector<std::thread> threads_;
87  
    std::vector<std::thread> threads_;
87  
    std::atomic<std::size_t> outstanding_work_{0};
88  
    std::atomic<std::size_t> outstanding_work_{0};
88  
    bool stop_{false};
89  
    bool stop_{false};
89  
    bool joined_{false};
90  
    bool joined_{false};
90  
    std::size_t num_threads_;
91  
    std::size_t num_threads_;
91  
    char thread_name_prefix_[13]{};  // 12 chars max + null terminator
92  
    char thread_name_prefix_[13]{};  // 12 chars max + null terminator
92  
    std::once_flag start_flag_;
93  
    std::once_flag start_flag_;
93  

94  

94  
public:
95  
public:
95  
    ~impl() = default;
96  
    ~impl() = default;
96  

97  

97  
    // Destroy abandoned coroutine frames. Must be called
98  
    // Destroy abandoned coroutine frames. Must be called
98  
    // before execution_context::shutdown()/destroy() so
99  
    // before execution_context::shutdown()/destroy() so
99  
    // that suspended-frame destructors (e.g. delay_awaitable
100  
    // that suspended-frame destructors (e.g. delay_awaitable
100  
    // calling timer_service::cancel()) run while services
101  
    // calling timer_service::cancel()) run while services
101  
    // are still valid.
102  
    // are still valid.
102  
    void
103  
    void
103  
    drain_abandoned() noexcept
104  
    drain_abandoned() noexcept
104  
    {
105  
    {
105  
        while(auto* c = pop())
106  
        while(auto* c = pop())
106  
        {
107  
        {
107  
            auto h = c->h;
108  
            auto h = c->h;
108  
            if(h && h != std::noop_coroutine())
109  
            if(h && h != std::noop_coroutine())
109  
                h.destroy();
110  
                h.destroy();
110  
        }
111  
        }
111  
    }
112  
    }
112  

113  

113  
    impl(std::size_t num_threads, std::string_view thread_name_prefix)
114  
    impl(std::size_t num_threads, std::string_view thread_name_prefix)
114  
        : num_threads_(num_threads)
115  
        : num_threads_(num_threads)
115  
    {
116  
    {
116  
        if(num_threads_ == 0)
117  
        if(num_threads_ == 0)
117  
            num_threads_ = std::max(
118  
            num_threads_ = std::max(
118  
                std::thread::hardware_concurrency(), 1u);
119  
                std::thread::hardware_concurrency(), 1u);
119  

120  

120  
        // Truncate prefix to 12 chars, leaving room for up to 3-digit index.
121  
        // Truncate prefix to 12 chars, leaving room for up to 3-digit index.
121  
        auto n = thread_name_prefix.copy(thread_name_prefix_, 12);
122  
        auto n = thread_name_prefix.copy(thread_name_prefix_, 12);
122  
        thread_name_prefix_[n] = '\0';
123  
        thread_name_prefix_[n] = '\0';
123  
    }
124  
    }
124  

125  

125  
    void
126  
    void
126  
    post(continuation& c)
127  
    post(continuation& c)
127  
    {
128  
    {
128  
        ensure_started();
129  
        ensure_started();
129  
        {
130  
        {
130  
            std::lock_guard<std::mutex> lock(mutex_);
131  
            std::lock_guard<std::mutex> lock(mutex_);
131  
            push(&c);
132  
            push(&c);
132  
        }
133  
        }
133 -
        cv_.notify_one();
134 +
        work_cv_.notify_one();
134  
    }
135  
    }
135  

136  

136  
    void
137  
    void
137  
    on_work_started() noexcept
138  
    on_work_started() noexcept
138  
    {
139  
    {
139  
        outstanding_work_.fetch_add(1, std::memory_order_acq_rel);
140  
        outstanding_work_.fetch_add(1, std::memory_order_acq_rel);
140  
    }
141  
    }
141  

142  

142  
    void
143  
    void
143  
    on_work_finished() noexcept
144  
    on_work_finished() noexcept
144  
    {
145  
    {
145  
        if(outstanding_work_.fetch_sub(
146  
        if(outstanding_work_.fetch_sub(
146  
            1, std::memory_order_acq_rel) == 1)
147  
            1, std::memory_order_acq_rel) == 1)
147  
        {
148  
        {
148  
            std::lock_guard<std::mutex> lock(mutex_);
149  
            std::lock_guard<std::mutex> lock(mutex_);
149  
            if(joined_ && !stop_)
150  
            if(joined_ && !stop_)
150  
                stop_ = true;
151  
                stop_ = true;
151 -
            cv_.notify_all();
152 +
            done_cv_.notify_all();
 
153 +
            work_cv_.notify_all();
152  
        }
154  
        }
153  
    }
155  
    }
154  

156  

155  
    void
157  
    void
156  
    join() noexcept
158  
    join() noexcept
157  
    {
159  
    {
158  
        {
160  
        {
159  
            std::unique_lock<std::mutex> lock(mutex_);
161  
            std::unique_lock<std::mutex> lock(mutex_);
160  
            if(joined_)
162  
            if(joined_)
161  
                return;
163  
                return;
162  
            joined_ = true;
164  
            joined_ = true;
163  

165  

164  
            if(outstanding_work_.load(
166  
            if(outstanding_work_.load(
165  
                std::memory_order_acquire) == 0)
167  
                std::memory_order_acquire) == 0)
166  
            {
168  
            {
167  
                stop_ = true;
169  
                stop_ = true;
168 -
                cv_.notify_all();
170 +
                work_cv_.notify_all();
169  
            }
171  
            }
170  
            else
172  
            else
171  
            {
173  
            {
172 -
                cv_.wait(lock, [this]{
174 +
                done_cv_.wait(lock, [this]{
173  
                    return stop_;
175  
                    return stop_;
174  
                });
176  
                });
175  
            }
177  
            }
176  
        }
178  
        }
177  

179  

178  
        for(auto& t : threads_)
180  
        for(auto& t : threads_)
179  
            if(t.joinable())
181  
            if(t.joinable())
180  
                t.join();
182  
                t.join();
181  
    }
183  
    }
182  

184  

183  
    void
185  
    void
184  
    stop() noexcept
186  
    stop() noexcept
185  
    {
187  
    {
186  
        {
188  
        {
187  
            std::lock_guard<std::mutex> lock(mutex_);
189  
            std::lock_guard<std::mutex> lock(mutex_);
188  
            stop_ = true;
190  
            stop_ = true;
189  
        }
191  
        }
190 -
        cv_.notify_all();
192 +
        work_cv_.notify_all();
 
193 +
        done_cv_.notify_all();
191  
    }
194  
    }
192  

195  

193  
private:
196  
private:
194  
    void
197  
    void
195  
    ensure_started()
198  
    ensure_started()
196  
    {
199  
    {
197  
        std::call_once(start_flag_, [this]{
200  
        std::call_once(start_flag_, [this]{
198  
            threads_.reserve(num_threads_);
201  
            threads_.reserve(num_threads_);
199  
            for(std::size_t i = 0; i < num_threads_; ++i)
202  
            for(std::size_t i = 0; i < num_threads_; ++i)
200  
                threads_.emplace_back([this, i]{ run(i); });
203  
                threads_.emplace_back([this, i]{ run(i); });
201  
        });
204  
        });
202  
    }
205  
    }
203  

206  

204  
    void
207  
    void
205  
    run(std::size_t index)
208  
    run(std::size_t index)
206  
    {
209  
    {
207  
        // Build name; set_current_thread_name truncates to platform limits.
210  
        // Build name; set_current_thread_name truncates to platform limits.
208  
        char name[16];
211  
        char name[16];
209  
        std::snprintf(name, sizeof(name), "%s%zu", thread_name_prefix_, index);
212  
        std::snprintf(name, sizeof(name), "%s%zu", thread_name_prefix_, index);
210  
        set_current_thread_name(name);
213  
        set_current_thread_name(name);
211  

214  

212  
        for(;;)
215  
        for(;;)
213  
        {
216  
        {
214  
            continuation* c = nullptr;
217  
            continuation* c = nullptr;
215  
            {
218  
            {
216  
                std::unique_lock<std::mutex> lock(mutex_);
219  
                std::unique_lock<std::mutex> lock(mutex_);
217 -
                cv_.wait(lock, [this]{
220 +
                work_cv_.wait(lock, [this]{
218  
                    return !empty() ||
221  
                    return !empty() ||
219  
                        stop_;
222  
                        stop_;
220  
                });
223  
                });
221  
                if(stop_)
224  
                if(stop_)
222  
                    return;
225  
                    return;
223  
                c = pop();
226  
                c = pop();
224  
            }
227  
            }
225  
            if(c)
228  
            if(c)
226  
                c->h.resume();
229  
                c->h.resume();
227  
        }
230  
        }
228  
    }
231  
    }
229  
};
232  
};
230  

233  

231  
//------------------------------------------------------------------------------
234  
//------------------------------------------------------------------------------
232  

235  

233  
thread_pool::
236  
thread_pool::
234  
~thread_pool()
237  
~thread_pool()
235  
{
238  
{
236  
    impl_->stop();
239  
    impl_->stop();
237  
    impl_->join();
240  
    impl_->join();
238  
    impl_->drain_abandoned();
241  
    impl_->drain_abandoned();
239  
    shutdown();
242  
    shutdown();
240  
    destroy();
243  
    destroy();
241  
    delete impl_;
244  
    delete impl_;
242  
}
245  
}
243  

246  

244  
thread_pool::
247  
thread_pool::
245  
thread_pool(std::size_t num_threads, std::string_view thread_name_prefix)
248  
thread_pool(std::size_t num_threads, std::string_view thread_name_prefix)
246  
    : impl_(new impl(num_threads, thread_name_prefix))
249  
    : impl_(new impl(num_threads, thread_name_prefix))
247  
{
250  
{
248  
    this->set_frame_allocator(std::allocator<void>{});
251  
    this->set_frame_allocator(std::allocator<void>{});
249  
}
252  
}
250  

253  

251  
void
254  
void
252  
thread_pool::
255  
thread_pool::
253  
join() noexcept
256  
join() noexcept
254  
{
257  
{
255  
    impl_->join();
258  
    impl_->join();
256  
}
259  
}
257  

260  

258  
void
261  
void
259  
thread_pool::
262  
thread_pool::
260  
stop() noexcept
263  
stop() noexcept
261  
{
264  
{
262  
    impl_->stop();
265  
    impl_->stop();
263  
}
266  
}
264  

267  

265  
//------------------------------------------------------------------------------
268  
//------------------------------------------------------------------------------
266  

269  

267  
thread_pool::executor_type
270  
thread_pool::executor_type
268  
thread_pool::
271  
thread_pool::
269  
get_executor() const noexcept
272  
get_executor() const noexcept
270  
{
273  
{
271  
    return executor_type(
274  
    return executor_type(
272  
        const_cast<thread_pool&>(*this));
275  
        const_cast<thread_pool&>(*this));
273  
}
276  
}
274  

277  

275  
void
278  
void
276  
thread_pool::executor_type::
279  
thread_pool::executor_type::
277  
on_work_started() const noexcept
280  
on_work_started() const noexcept
278  
{
281  
{
279  
    pool_->impl_->on_work_started();
282  
    pool_->impl_->on_work_started();
280  
}
283  
}
281  

284  

282  
void
285  
void
283  
thread_pool::executor_type::
286  
thread_pool::executor_type::
284  
on_work_finished() const noexcept
287  
on_work_finished() const noexcept
285  
{
288  
{
286  
    pool_->impl_->on_work_finished();
289  
    pool_->impl_->on_work_finished();
287  
}
290  
}
288  

291  

289  
void
292  
void
290  
thread_pool::executor_type::
293  
thread_pool::executor_type::
291  
post(continuation& c) const
294  
post(continuation& c) const
292  
{
295  
{
293  
    pool_->impl_->post(c);
296  
    pool_->impl_->post(c);
294  
}
297  
}
295  

298  

296  
} // capy
299  
} // capy
297  
} // boost
300  
} // boost