1  
//
1  
//
2  
// Copyright (c) 2026 Steve Gerbino
2  
// Copyright (c) 2026 Steve Gerbino
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/corosio
7  
// Official repository: https://github.com/cppalliance/corosio
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_COROSIO_DETAIL_THREAD_POOL_HPP
10  
#ifndef BOOST_COROSIO_DETAIL_THREAD_POOL_HPP
11  
#define BOOST_COROSIO_DETAIL_THREAD_POOL_HPP
11  
#define BOOST_COROSIO_DETAIL_THREAD_POOL_HPP
12  

12  

13  
#include <boost/corosio/detail/config.hpp>
13  
#include <boost/corosio/detail/config.hpp>
14  
#include <boost/corosio/detail/intrusive.hpp>
14  
#include <boost/corosio/detail/intrusive.hpp>
15  
#include <boost/capy/ex/execution_context.hpp>
15  
#include <boost/capy/ex/execution_context.hpp>
16  

16  

17  
#include <condition_variable>
17  
#include <condition_variable>
18  
#include <mutex>
18  
#include <mutex>
19  
#include <stdexcept>
19  
#include <stdexcept>
20  
#include <thread>
20  
#include <thread>
21  
#include <vector>
21  
#include <vector>
22  

22  

23  
namespace boost::corosio::detail {
23  
namespace boost::corosio::detail {
24  

24  

25  
/** Base class for thread pool work items.
25  
/** Base class for thread pool work items.
26  

26  

27  
    Derive from this to create work that can be posted to a
27  
    Derive from this to create work that can be posted to a
28  
    @ref thread_pool. Uses static function pointer dispatch,
28  
    @ref thread_pool. Uses static function pointer dispatch,
29  
    consistent with the IOCP `op` pattern.
29  
    consistent with the IOCP `op` pattern.
30  

30  

31  
    @par Example
31  
    @par Example
32  
    @code
32  
    @code
33  
    struct my_work : pool_work_item
33  
    struct my_work : pool_work_item
34  
    {
34  
    {
35  
        int* result;
35  
        int* result;
36  
        static void execute( pool_work_item* w ) noexcept
36  
        static void execute( pool_work_item* w ) noexcept
37  
        {
37  
        {
38  
            auto* self = static_cast<my_work*>( w );
38  
            auto* self = static_cast<my_work*>( w );
39  
            *self->result = 42;
39  
            *self->result = 42;
40  
        }
40  
        }
41  
    };
41  
    };
42  

42  

43  
    my_work w;
43  
    my_work w;
44  
    w.func_ = &my_work::execute;
44  
    w.func_ = &my_work::execute;
45  
    w.result = &r;
45  
    w.result = &r;
46  
    pool.post( &w );
46  
    pool.post( &w );
47  
    @endcode
47  
    @endcode
48  
*/
48  
*/
49  
struct pool_work_item : intrusive_queue<pool_work_item>::node
49  
struct pool_work_item : intrusive_queue<pool_work_item>::node
50  
{
50  
{
51  
    /// Static dispatch function signature.
51  
    /// Static dispatch function signature.
52  
    using func_type = void (*)(pool_work_item*) noexcept;
52  
    using func_type = void (*)(pool_work_item*) noexcept;
53  

53  

54  
    /// Completion handler invoked by the worker thread.
54  
    /// Completion handler invoked by the worker thread.
55  
    func_type func_ = nullptr;
55  
    func_type func_ = nullptr;
56  
};
56  
};
57  

57  

58  
/** Shared thread pool for dispatching blocking operations.
58  
/** Shared thread pool for dispatching blocking operations.
59  

59  

60  
    Provides a fixed pool of reusable worker threads for operations
60  
    Provides a fixed pool of reusable worker threads for operations
61  
    that cannot be integrated with async I/O (e.g. blocking DNS
61  
    that cannot be integrated with async I/O (e.g. blocking DNS
62  
    calls). Registered as an `execution_context::service` so it
62  
    calls). Registered as an `execution_context::service` so it
63  
    is a singleton per io_context.
63  
    is a singleton per io_context.
64  

64  

65  
    Threads are created eagerly in the constructor. The default
65  
    Threads are created eagerly in the constructor. The default
66  
    thread count is 1.
66  
    thread count is 1.
67  

67  

68  
    @par Thread Safety
68  
    @par Thread Safety
69  
    All public member functions are thread-safe.
69  
    All public member functions are thread-safe.
70  

70  

71  
    @par Shutdown
71  
    @par Shutdown
72  
    Sets a shutdown flag, notifies all threads, and joins them.
72  
    Sets a shutdown flag, notifies all threads, and joins them.
73  
    In-flight blocking calls complete naturally before the thread
73  
    In-flight blocking calls complete naturally before the thread
74  
    exits.
74  
    exits.
75  
*/
75  
*/
76  
class thread_pool final
76  
class thread_pool final
77  
    : public capy::execution_context::service
77  
    : public capy::execution_context::service
78  
{
78  
{
79  
    std::mutex mutex_;
79  
    std::mutex mutex_;
80  
    std::condition_variable cv_;
80  
    std::condition_variable cv_;
81  
    intrusive_queue<pool_work_item> work_queue_;
81  
    intrusive_queue<pool_work_item> work_queue_;
82  
    std::vector<std::thread> threads_;
82  
    std::vector<std::thread> threads_;
83  
    bool shutdown_ = false;
83  
    bool shutdown_ = false;
84  

84  

85  
    void worker_loop();
85  
    void worker_loop();
86  

86  

87  
public:
87  
public:
88  
    using key_type = thread_pool;
88  
    using key_type = thread_pool;
89  

89  

90  
    /** Construct the thread pool service.
90  
    /** Construct the thread pool service.
91  

91  

92  
        Eagerly creates all worker threads.
92  
        Eagerly creates all worker threads.
93  

93  

94  
        @par Exception Safety
94  
        @par Exception Safety
95  
        Strong guarantee. If thread creation fails, all
95  
        Strong guarantee. If thread creation fails, all
96  
        already-created threads are shut down and joined
96  
        already-created threads are shut down and joined
97  
        before the exception propagates.
97  
        before the exception propagates.
98  

98  

99  
        @param ctx Reference to the owning execution_context.
99  
        @param ctx Reference to the owning execution_context.
100  
        @param num_threads Number of worker threads. Must be
100  
        @param num_threads Number of worker threads. Must be
101  
               at least 1.
101  
               at least 1.
102  

102  

103  
        @throws std::logic_error If `num_threads` is 0.
103  
        @throws std::logic_error If `num_threads` is 0.
104  
    */
104  
    */
105  
    explicit thread_pool(
105  
    explicit thread_pool(
106  
        capy::execution_context& ctx,
106  
        capy::execution_context& ctx,
107  
        unsigned num_threads = 1)
107  
        unsigned num_threads = 1)
108  
    {
108  
    {
109  
        (void)ctx;
109  
        (void)ctx;
110  
        if (!num_threads)
110  
        if (!num_threads)
111  
            throw std::logic_error(
111  
            throw std::logic_error(
112  
                "thread_pool requires at least 1 thread");
112  
                "thread_pool requires at least 1 thread");
113  
        threads_.reserve(num_threads);
113  
        threads_.reserve(num_threads);
114  
        try
114  
        try
115  
        {
115  
        {
116  
            for (unsigned i = 0; i < num_threads; ++i)
116  
            for (unsigned i = 0; i < num_threads; ++i)
117  
                threads_.emplace_back([this] { worker_loop(); });
117  
                threads_.emplace_back([this] { worker_loop(); });
118  
        }
118  
        }
119  
        catch (...)
119  
        catch (...)
120  
        {
120  
        {
121  
            shutdown();
121  
            shutdown();
122  
            throw;
122  
            throw;
123  
        }
123  
        }
124  
    }
124  
    }
125  

125  

126  
    ~thread_pool() override = default;
126  
    ~thread_pool() override = default;
127  

127  

128  
    thread_pool(thread_pool const&) = delete;
128  
    thread_pool(thread_pool const&) = delete;
129  
    thread_pool& operator=(thread_pool const&) = delete;
129  
    thread_pool& operator=(thread_pool const&) = delete;
130  

130  

131  
    /** Enqueue a work item for execution on the thread pool.
131  
    /** Enqueue a work item for execution on the thread pool.
132  

132  

133  
        Zero-allocation: the caller owns the work item's storage.
133  
        Zero-allocation: the caller owns the work item's storage.
134  

134  

135  
        @param w The work item to execute. Must remain valid until
135  
        @param w The work item to execute. Must remain valid until
136  
                 its `func_` has been called.
136  
                 its `func_` has been called.
137  

137  

138  
        @return `true` if the item was enqueued, `false` if the
138  
        @return `true` if the item was enqueued, `false` if the
139  
                pool has already shut down.
139  
                pool has already shut down.
140  
    */
140  
    */
141  
    bool post(pool_work_item* w) noexcept;
141  
    bool post(pool_work_item* w) noexcept;
142  

142  

143  
    /** Shut down the thread pool.
143  
    /** Shut down the thread pool.
144  

144  

145  
        Signals all threads to exit after draining any
145  
        Signals all threads to exit after draining any
146  
        remaining queued work, then joins them.
146  
        remaining queued work, then joins them.
147  
    */
147  
    */
148  
    void shutdown() override;
148  
    void shutdown() override;
149  
};
149  
};
150  

150  

151  
inline void
151  
inline void
152  
thread_pool::worker_loop()
152  
thread_pool::worker_loop()
153  
{
153  
{
154  
    for (;;)
154  
    for (;;)
155  
    {
155  
    {
156  
        pool_work_item* w;
156  
        pool_work_item* w;
157  
        {
157  
        {
158  
            std::unique_lock<std::mutex> lock(mutex_);
158  
            std::unique_lock<std::mutex> lock(mutex_);
159  
            cv_.wait(lock, [this] {
159  
            cv_.wait(lock, [this] {
160  
                return shutdown_ || !work_queue_.empty();
160  
                return shutdown_ || !work_queue_.empty();
161  
            });
161  
            });
162  

162  

163  
            w = work_queue_.pop();
163  
            w = work_queue_.pop();
164  
            if (!w)
164  
            if (!w)
165  
            {
165  
            {
166  
                if (shutdown_)
166  
                if (shutdown_)
167  
                    return;
167  
                    return;
168  
                continue;
168  
                continue;
169  
            }
169  
            }
170  
        }
170  
        }
171  
        w->func_(w);
171  
        w->func_(w);
172  
    }
172  
    }
173  
}
173  
}
174  

174  

175  
inline bool
175  
inline bool
176  
thread_pool::post(pool_work_item* w) noexcept
176  
thread_pool::post(pool_work_item* w) noexcept
177  
{
177  
{
178  
    {
178  
    {
179  
        std::lock_guard<std::mutex> lock(mutex_);
179  
        std::lock_guard<std::mutex> lock(mutex_);
180  
        if (shutdown_)
180  
        if (shutdown_)
181  
            return false;
181  
            return false;
182  
        work_queue_.push(w);
182  
        work_queue_.push(w);
183  
    }
183  
    }
184  
    cv_.notify_one();
184  
    cv_.notify_one();
185  
    return true;
185  
    return true;
186  
}
186  
}
187  

187  

188  
inline void
188  
inline void
189  
thread_pool::shutdown()
189  
thread_pool::shutdown()
190  
{
190  
{
191  
    {
191  
    {
192  
        std::lock_guard<std::mutex> lock(mutex_);
192  
        std::lock_guard<std::mutex> lock(mutex_);
193  
        shutdown_ = true;
193  
        shutdown_ = true;
194  
    }
194  
    }
195  
    cv_.notify_all();
195  
    cv_.notify_all();
196  

196  

197  
    for (auto& t : threads_)
197  
    for (auto& t : threads_)
198  
    {
198  
    {
199  
        if (t.joinable())
199  
        if (t.joinable())
200  
            t.join();
200  
            t.join();
201  
    }
201  
    }
202  
    threads_.clear();
202  
    threads_.clear();
203  

203  

204  
    {
204  
    {
205  
        std::lock_guard<std::mutex> lock(mutex_);
205  
        std::lock_guard<std::mutex> lock(mutex_);
206  
        while (work_queue_.pop())
206  
        while (work_queue_.pop())
207  
            ;
207  
            ;
208  
    }
208  
    }
209  
}
209  
}
210  

210  

211  
} // namespace boost::corosio::detail
211  
} // namespace boost::corosio::detail
212  

212  

213  
#endif // BOOST_COROSIO_DETAIL_THREAD_POOL_HPP
213  
#endif // BOOST_COROSIO_DETAIL_THREAD_POOL_HPP