LCOV - code coverage report
Current view: top level - /jenkins/workspace/boost-root/libs/capy/src/ex - thread_pool.cpp (source / functions) Coverage Total Hit
Test: coverage_remapped.info Lines: 100.0 % 128 128
Test Date: 2026-03-23 19:47:49 Functions: 100.0 % 25 25

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
       3                 : // Copyright (c) 2026 Michael Vandeberg
       4                 : //
       5                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       6                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       7                 : //
       8                 : // Official repository: https://github.com/boostorg/capy
       9                 : //
      10                 : 
      11                 : #include <boost/capy/ex/thread_pool.hpp>
      12                 : #include <boost/capy/continuation.hpp>
      13                 : #include <boost/capy/test/thread_name.hpp>
      14                 : #include <algorithm>
      15                 : #include <atomic>
      16                 : #include <condition_variable>
      17                 : #include <cstdio>
      18                 : #include <mutex>
      19                 : #include <thread>
      20                 : #include <vector>
      21                 : 
      22                 : /*
      23                 :     Thread pool implementation using a shared work queue.
      24                 : 
      25                 :     Work items are continuations linked via their intrusive next pointer,
      26                 :     stored in a single queue protected by a mutex. No per-post heap
      27                 :     allocation: the continuation is owned by the caller and linked
      28                 :     directly. Worker threads wait on a condition_variable until work
      29                 :     is available or stop is requested.
      30                 : 
      31                 :     Threads are started lazily on first post() via std::call_once to avoid
      32                 :     spawning threads for pools that are constructed but never used. Each
      33                 :     thread is named with a configurable prefix plus index for debugger
      34                 :     visibility.
      35                 : 
      36                 :     Work tracking: on_work_started/on_work_finished maintain an atomic
      37                 :     outstanding_work_ counter. join() blocks until this counter reaches
      38                 :     zero, then signals workers to stop and joins threads.
      39                 : 
      40                 :     Two shutdown paths:
      41                 :     - join(): waits for outstanding work to drain, then stops workers.
      42                 :     - stop(): immediately signals workers to exit; queued work is abandoned.
      43                 :     - Destructor: stop() then join() (abandon + wait for threads).
      44                 : */
      45                 : 
      46                 : namespace boost {
      47                 : namespace capy {
      48                 : 
      49                 : //------------------------------------------------------------------------------
      50                 : 
      51                 : class thread_pool::impl
      52                 : {
      53                 :     // Intrusive queue of continuations via continuation::next.
      54                 :     // No per-post allocation: the continuation is owned by the caller.
      55                 :     continuation* head_ = nullptr;
      56                 :     continuation* tail_ = nullptr;
      57                 : 
      58 HIT         827 :     void push(continuation* c) noexcept
      59                 :     {
      60             827 :         c->next = nullptr;
      61             827 :         if(tail_)
      62             598 :             tail_->next = c;
      63                 :         else
      64             229 :             head_ = c;
      65             827 :         tail_ = c;
      66             827 :     }
      67                 : 
      68             984 :     continuation* pop() noexcept
      69                 :     {
      70             984 :         if(!head_)
      71             157 :             return nullptr;
      72             827 :         continuation* c = head_;
      73             827 :         head_ = head_->next;
      74             827 :         if(!head_)
      75             229 :             tail_ = nullptr;
      76             827 :         return c;
      77                 :     }
      78                 : 
      79            1058 :     bool empty() const noexcept
      80                 :     {
      81            1058 :         return head_ == nullptr;
      82                 :     }
      83                 : 
      84                 :     std::mutex mutex_;
      85                 :     std::condition_variable work_cv_;
      86                 :     std::condition_variable done_cv_;
      87                 :     std::vector<std::thread> threads_;
      88                 :     std::atomic<std::size_t> outstanding_work_{0};
      89                 :     bool stop_{false};
      90                 :     bool joined_{false};
      91                 :     std::size_t num_threads_;
      92                 :     char thread_name_prefix_[13]{};  // 12 chars max + null terminator
      93                 :     std::once_flag start_flag_;
      94                 : 
      95                 : public:
      96             157 :     ~impl() = default;
      97                 : 
      98                 :     // Destroy abandoned coroutine frames. Must be called
      99                 :     // before execution_context::shutdown()/destroy() so
     100                 :     // that suspended-frame destructors (e.g. delay_awaitable
     101                 :     // calling timer_service::cancel()) run while services
     102                 :     // are still valid.
     103                 :     void
     104             157 :     drain_abandoned() noexcept
     105                 :     {
     106             341 :         while(auto* c = pop())
     107                 :         {
     108             184 :             auto h = c->h;
     109             184 :             if(h && h != std::noop_coroutine())
     110             132 :                 h.destroy();
     111             184 :         }
     112             157 :     }
     113                 : 
     114             157 :     impl(std::size_t num_threads, std::string_view thread_name_prefix)
     115             157 :         : num_threads_(num_threads)
     116                 :     {
     117             157 :         if(num_threads_ == 0)
     118               4 :             num_threads_ = std::max(
     119               2 :                 std::thread::hardware_concurrency(), 1u);
     120                 : 
     121                 :         // Truncate prefix to 12 chars, leaving room for up to 3-digit index.
     122             157 :         auto n = thread_name_prefix.copy(thread_name_prefix_, 12);
     123             157 :         thread_name_prefix_[n] = '\0';
     124             157 :     }
     125                 : 
     126                 :     void
     127             827 :     post(continuation& c)
     128                 :     {
     129             827 :         ensure_started();
     130                 :         {
     131             827 :             std::lock_guard<std::mutex> lock(mutex_);
     132             827 :             push(&c);
     133             827 :         }
     134             827 :         work_cv_.notify_one();
     135             827 :     }
     136                 : 
     137                 :     void
     138             345 :     on_work_started() noexcept
     139                 :     {
     140             345 :         outstanding_work_.fetch_add(1, std::memory_order_acq_rel);
     141             345 :     }
     142                 : 
     143                 :     void
     144             345 :     on_work_finished() noexcept
     145                 :     {
     146             345 :         if(outstanding_work_.fetch_sub(
     147             345 :             1, std::memory_order_acq_rel) == 1)
     148                 :         {
     149              85 :             std::lock_guard<std::mutex> lock(mutex_);
     150              85 :             if(joined_ && !stop_)
     151               4 :                 stop_ = true;
     152              85 :             done_cv_.notify_all();
     153              85 :             work_cv_.notify_all();
     154              85 :         }
     155             345 :     }
     156                 : 
     157                 :     void
     158             168 :     join() noexcept
     159                 :     {
     160                 :         {
     161             168 :             std::unique_lock<std::mutex> lock(mutex_);
     162             168 :             if(joined_)
     163              11 :                 return;
     164             157 :             joined_ = true;
     165                 : 
     166             157 :             if(outstanding_work_.load(
     167             157 :                 std::memory_order_acquire) == 0)
     168                 :             {
     169             102 :                 stop_ = true;
     170             102 :                 work_cv_.notify_all();
     171                 :             }
     172                 :             else
     173                 :             {
     174              55 :                 done_cv_.wait(lock, [this]{
     175              60 :                     return stop_;
     176                 :                 });
     177                 :             }
     178             168 :         }
     179                 : 
     180             336 :         for(auto& t : threads_)
     181             179 :             if(t.joinable())
     182             179 :                 t.join();
     183                 :     }
     184                 : 
     185                 :     void
     186             159 :     stop() noexcept
     187                 :     {
     188                 :         {
     189             159 :             std::lock_guard<std::mutex> lock(mutex_);
     190             159 :             stop_ = true;
     191             159 :         }
     192             159 :         work_cv_.notify_all();
     193             159 :         done_cv_.notify_all();
     194             159 :     }
     195                 : 
     196                 : private:
     197                 :     void
     198             827 :     ensure_started()
     199                 :     {
     200             827 :         std::call_once(start_flag_, [this]{
     201             101 :             threads_.reserve(num_threads_);
     202             280 :             for(std::size_t i = 0; i < num_threads_; ++i)
     203             358 :                 threads_.emplace_back([this, i]{ run(i); });
     204             101 :         });
     205             827 :     }
     206                 : 
     207                 :     void
     208             179 :     run(std::size_t index)
     209                 :     {
     210                 :         // Build name; set_current_thread_name truncates to platform limits.
     211                 :         char name[16];
     212             179 :         std::snprintf(name, sizeof(name), "%s%zu", thread_name_prefix_, index);
     213             179 :         set_current_thread_name(name);
     214                 : 
     215                 :         for(;;)
     216                 :         {
     217             822 :             continuation* c = nullptr;
     218                 :             {
     219             822 :                 std::unique_lock<std::mutex> lock(mutex_);
     220             822 :                 work_cv_.wait(lock, [this]{
     221            1383 :                     return !empty() ||
     222            1383 :                         stop_;
     223                 :                 });
     224             822 :                 if(stop_)
     225             358 :                     return;
     226             643 :                 c = pop();
     227             822 :             }
     228             643 :             if(c)
     229             643 :                 c->h.resume();
     230             643 :         }
     231                 :     }
     232                 : };
     233                 : 
     234                 : //------------------------------------------------------------------------------
     235                 : 
     236             157 : thread_pool::
     237                 : ~thread_pool()
     238                 : {
     239             157 :     impl_->stop();
     240             157 :     impl_->join();
     241             157 :     impl_->drain_abandoned();
     242             157 :     shutdown();
     243             157 :     destroy();
     244             157 :     delete impl_;
     245             157 : }
     246                 : 
     247             157 : thread_pool::
     248             157 : thread_pool(std::size_t num_threads, std::string_view thread_name_prefix)
     249             157 :     : impl_(new impl(num_threads, thread_name_prefix))
     250                 : {
     251             157 :     this->set_frame_allocator(std::allocator<void>{});
     252             157 : }
     253                 : 
     254                 : void
     255              11 : thread_pool::
     256                 : join() noexcept
     257                 : {
     258              11 :     impl_->join();
     259              11 : }
     260                 : 
     261                 : void
     262               2 : thread_pool::
     263                 : stop() noexcept
     264                 : {
     265               2 :     impl_->stop();
     266               2 : }
     267                 : 
     268                 : //------------------------------------------------------------------------------
     269                 : 
     270                 : thread_pool::executor_type
     271             163 : thread_pool::
     272                 : get_executor() const noexcept
     273                 : {
     274             163 :     return executor_type(
     275             163 :         const_cast<thread_pool&>(*this));
     276                 : }
     277                 : 
     278                 : void
     279             345 : thread_pool::executor_type::
     280                 : on_work_started() const noexcept
     281                 : {
     282             345 :     pool_->impl_->on_work_started();
     283             345 : }
     284                 : 
     285                 : void
     286             345 : thread_pool::executor_type::
     287                 : on_work_finished() const noexcept
     288                 : {
     289             345 :     pool_->impl_->on_work_finished();
     290             345 : }
     291                 : 
     292                 : void
     293             827 : thread_pool::executor_type::
     294                 : post(continuation& c) const
     295                 : {
     296             827 :     pool_->impl_->post(c);
     297             827 : }
     298                 : 
     299                 : } // capy
     300                 : } // boost
        

Generated by: LCOV version 2.3