1 -
//
 
2 -
// Copyright (c) 2026 Steve Gerbino
 
3 -
//
 
4 -
// Distributed under the Boost Software License, Version 1.0. (See accompanying
 
5 -
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
 
6 -
//
 
7 -
// Official repository: https://github.com/cppalliance/corosio
 
8 -
//
 
9 -

 
10 -
#ifndef BOOST_COROSIO_DETAIL_THREAD_POOL_HPP
 
11 -
#define BOOST_COROSIO_DETAIL_THREAD_POOL_HPP
 
12 -

 
13 -
#include <boost/corosio/detail/config.hpp>
 
14 -
#include <boost/corosio/detail/intrusive.hpp>
 
15 -
#include <boost/capy/ex/execution_context.hpp>
 
16 -

 
17 -
#include <condition_variable>
 
18 -
#include <mutex>
 
19 -
#include <stdexcept>
 
20 -
#include <thread>
 
21 -
#include <vector>
 
22 -

 
23 -
namespace boost::corosio::detail {
 
24 -

 
25 -
/** Base class for thread pool work items.
 
26 -

 
27 -
    Derive from this to create work that can be posted to a
 
28 -
    @ref thread_pool. Uses static function pointer dispatch,
 
29 -
    consistent with the IOCP `op` pattern.
 
30 -

 
31 -
    @par Example
 
32 -
    @code
 
33 -
    struct my_work : pool_work_item
 
34 -
    {
 
35 -
        int* result;
 
36 -
        static void execute( pool_work_item* w ) noexcept
 
37 -
        {
 
38 -
            auto* self = static_cast<my_work*>( w );
 
39 -
            *self->result = 42;
 
40 -
        }
 
41 -
    };
 
42 -

 
43 -
    my_work w;
 
44 -
    w.func_ = &my_work::execute;
 
45 -
    w.result = &r;
 
46 -
    pool.post( &w );
 
47 -
    @endcode
 
48 -
*/
 
49 -
struct pool_work_item : intrusive_queue<pool_work_item>::node
 
50 -
{
 
51 -
    /// Static dispatch function signature.
 
52 -
    using func_type = void (*)(pool_work_item*) noexcept;
 
53 -

 
54 -
    /// Completion handler invoked by the worker thread.
 
55 -
    func_type func_ = nullptr;
 
56 -
};
 
57 -

 
58 -
/** Shared thread pool for dispatching blocking operations.
 
59 -

 
60 -
    Provides a fixed pool of reusable worker threads for operations
 
61 -
    that cannot be integrated with async I/O (e.g. blocking DNS
 
62 -
    calls). Registered as an `execution_context::service` so it
 
63 -
    is a singleton per io_context.
 
64 -

 
65 -
    Threads are created eagerly in the constructor. The default
 
66 -
    thread count is 1.
 
67 -

 
68 -
    @par Thread Safety
 
69 -
    All public member functions are thread-safe.
 
70 -

 
71 -
    @par Shutdown
 
72 -
    Sets a shutdown flag, notifies all threads, and joins them.
 
73 -
    In-flight blocking calls complete naturally before the thread
 
74 -
    exits.
 
75 -
*/
 
76 -
class thread_pool final
 
77 -
    : public capy::execution_context::service
 
78 -
{
 
79 -
    std::mutex mutex_;
 
80 -
    std::condition_variable cv_;
 
81 -
    intrusive_queue<pool_work_item> work_queue_;
 
82 -
    std::vector<std::thread> threads_;
 
83 -
    bool shutdown_ = false;
 
84 -

 
85 -
    void worker_loop();
 
86 -

 
87 -
public:
 
88 -
    using key_type = thread_pool;
 
89 -

 
90 -
    /** Construct the thread pool service.
 
91 -

 
92 -
        Eagerly creates all worker threads.
 
93 -

 
94 -
        @par Exception Safety
 
95 -
        Strong guarantee. If thread creation fails, all
 
96 -
        already-created threads are shut down and joined
 
97 -
        before the exception propagates.
 
98 -

 
99 -
        @param ctx Reference to the owning execution_context.
 
100 -
        @param num_threads Number of worker threads. Must be
 
101 -
               at least 1.
 
102 -

 
103 -
        @throws std::logic_error If `num_threads` is 0.
 
104 -
    */
 
105 -
    explicit thread_pool(
 
106 -
        capy::execution_context& ctx,
 
107 -
        unsigned num_threads = 1)
 
108 -
    {
 
109 -
        (void)ctx;
 
110 -
        if (!num_threads)
 
111 -
            throw std::logic_error(
 
112 -
                "thread_pool requires at least 1 thread");
 
113 -
        threads_.reserve(num_threads);
 
114 -
        try
 
115 -
        {
 
116 -
            for (unsigned i = 0; i < num_threads; ++i)
 
117 -
                threads_.emplace_back([this] { worker_loop(); });
 
118 -
        }
 
119 -
        catch (...)
 
120 -
        {
 
121 -
            shutdown();
 
122 -
            throw;
 
123 -
        }
 
124 -
    }
 
125 -

 
126 -
    ~thread_pool() override = default;
 
127 -

 
128 -
    thread_pool(thread_pool const&) = delete;
 
129 -
    thread_pool& operator=(thread_pool const&) = delete;
 
130 -

 
131 -
    /** Enqueue a work item for execution on the thread pool.
 
132 -

 
133 -
        Zero-allocation: the caller owns the work item's storage.
 
134 -

 
135 -
        @param w The work item to execute. Must remain valid until
 
136 -
                 its `func_` has been called.
 
137 -

 
138 -
        @return `true` if the item was enqueued, `false` if the
 
139 -
                pool has already shut down.
 
140 -
    */
 
141 -
    bool post(pool_work_item* w) noexcept;
 
142 -

 
143 -
    /** Shut down the thread pool.
 
144 -

 
145 -
        Signals all threads to exit after draining any
 
146 -
        remaining queued work, then joins them.
 
147 -
    */
 
148 -
    void shutdown() override;
 
149 -
};
 
150 -

 
151 -
inline void
 
152 -
thread_pool::worker_loop()
 
153 -
{
 
154 -
    for (;;)
 
155 -
    {
 
156 -
        pool_work_item* w;
 
157 -
        {
 
158 -
            std::unique_lock<std::mutex> lock(mutex_);
 
159 -
            cv_.wait(lock, [this] {
 
160 -
                return shutdown_ || !work_queue_.empty();
 
161 -
            });
 
162 -

 
163 -
            w = work_queue_.pop();
 
164 -
            if (!w)
 
165 -
            {
 
166 -
                if (shutdown_)
 
167 -
                    return;
 
168 -
                continue;
 
169 -
            }
 
170 -
        }
 
171 -
        w->func_(w);
 
172 -
    }
 
173 -
}
 
174 -

 
175 -
inline bool
 
176 -
thread_pool::post(pool_work_item* w) noexcept
 
177 -
{
 
178 -
    {
 
179 -
        std::lock_guard<std::mutex> lock(mutex_);
 
180 -
        if (shutdown_)
 
181 -
            return false;
 
182 -
        work_queue_.push(w);
 
183 -
    }
 
184 -
    cv_.notify_one();
 
185 -
    return true;
 
186 -
}
 
187 -

 
188 -
inline void
 
189 -
thread_pool::shutdown()
 
190 -
{
 
191 -
    {
 
192 -
        std::lock_guard<std::mutex> lock(mutex_);
 
193 -
        shutdown_ = true;
 
194 -
    }
 
195 -
    cv_.notify_all();
 
196 -

 
197 -
    for (auto& t : threads_)
 
198 -
    {
 
199 -
        if (t.joinable())
 
200 -
            t.join();
 
201 -
    }
 
202 -
    threads_.clear();
 
203 -

 
204 -
    {
 
205 -
        std::lock_guard<std::mutex> lock(mutex_);
 
206 -
        while (work_queue_.pop())
 
207 -
            ;
 
208 -
    }
 
209 -
}
 
210 -

 
211 -
} // namespace boost::corosio::detail
 
212 -

 
213 -
#endif // BOOST_COROSIO_DETAIL_THREAD_POOL_HPP