1  
//
1  
//
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3  
// Copyright (c) 2026 Steve Gerbino
3  
// Copyright (c) 2026 Steve Gerbino
4  
//
4  
//
5  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
6  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7  
//
7  
//
8  
// Official repository: https://github.com/cppalliance/corosio
8  
// Official repository: https://github.com/cppalliance/corosio
9  
//
9  
//
10  

10  

11  
#ifndef BOOST_COROSIO_DETAIL_TIMER_SERVICE_HPP
11  
#ifndef BOOST_COROSIO_DETAIL_TIMER_SERVICE_HPP
12  
#define BOOST_COROSIO_DETAIL_TIMER_SERVICE_HPP
12  
#define BOOST_COROSIO_DETAIL_TIMER_SERVICE_HPP
13  

13  

14  
#include <boost/corosio/timer.hpp>
14  
#include <boost/corosio/timer.hpp>
15  
#include <boost/corosio/io_context.hpp>
15  
#include <boost/corosio/io_context.hpp>
16  
#include <boost/corosio/detail/scheduler_op.hpp>
16  
#include <boost/corosio/detail/scheduler_op.hpp>
17  
#include <boost/corosio/native/native_scheduler.hpp>
17  
#include <boost/corosio/native/native_scheduler.hpp>
18  
#include <boost/corosio/detail/intrusive.hpp>
18  
#include <boost/corosio/detail/intrusive.hpp>
19  
#include <boost/corosio/detail/thread_local_ptr.hpp>
19  
#include <boost/corosio/detail/thread_local_ptr.hpp>
20  
#include <boost/capy/error.hpp>
20  
#include <boost/capy/error.hpp>
21  
#include <boost/capy/ex/execution_context.hpp>
21  
#include <boost/capy/ex/execution_context.hpp>
22  
#include <boost/capy/ex/executor_ref.hpp>
22  
#include <boost/capy/ex/executor_ref.hpp>
23  
#include <system_error>
23  
#include <system_error>
24  

24  

25  
#include <atomic>
25  
#include <atomic>
26  
#include <chrono>
26  
#include <chrono>
27  
#include <coroutine>
27  
#include <coroutine>
28  
#include <cstddef>
28  
#include <cstddef>
29  
#include <limits>
29  
#include <limits>
30  
#include <mutex>
30  
#include <mutex>
31  
#include <optional>
31  
#include <optional>
32  
#include <stop_token>
32  
#include <stop_token>
33  
#include <utility>
33  
#include <utility>
34  
#include <vector>
34  
#include <vector>
35  

35  

36  
namespace boost::corosio::detail {
36  
namespace boost::corosio::detail {
37  

37  

38  
struct scheduler;
38  
struct scheduler;
39  

39  

40  
/*
40  
/*
41  
    Timer Service
41  
    Timer Service
42  
    =============
42  
    =============
43  

43  

44  
    Data Structures
44  
    Data Structures
45  
    ---------------
45  
    ---------------
46  
    waiter_node holds per-waiter state: coroutine handle, executor,
46  
    waiter_node holds per-waiter state: coroutine handle, executor,
47  
    error output, stop_token, embedded completion_op. Each concurrent
47  
    error output, stop_token, embedded completion_op. Each concurrent
48  
    co_await t.wait() allocates one waiter_node.
48  
    co_await t.wait() allocates one waiter_node.
49  

49  

50  
    timer_service::implementation holds per-timer state: expiry,
50  
    timer_service::implementation holds per-timer state: expiry,
51  
    heap index, and an intrusive_list of waiter_nodes. Multiple
51  
    heap index, and an intrusive_list of waiter_nodes. Multiple
52  
    coroutines can wait on the same timer simultaneously.
52  
    coroutines can wait on the same timer simultaneously.
53  

53  

54  
    timer_service owns a min-heap of active timers, a free list
54  
    timer_service owns a min-heap of active timers, a free list
55  
    of recycled impls, and a free list of recycled waiter_nodes. The
55  
    of recycled impls, and a free list of recycled waiter_nodes. The
56  
    heap is ordered by expiry time; the scheduler queries
56  
    heap is ordered by expiry time; the scheduler queries
57  
    nearest_expiry() to set the epoll/timerfd timeout.
57  
    nearest_expiry() to set the epoll/timerfd timeout.
58  

58  

59  
    Optimization Strategy
59  
    Optimization Strategy
60  
    ---------------------
60  
    ---------------------
61  
    1. Deferred heap insertion — expires_after() stores the expiry
61  
    1. Deferred heap insertion — expires_after() stores the expiry
62  
       but does not insert into the heap. Insertion happens in wait().
62  
       but does not insert into the heap. Insertion happens in wait().
63  
    2. Thread-local impl cache — single-slot per-thread cache.
63  
    2. Thread-local impl cache — single-slot per-thread cache.
64  
    3. Embedded completion_op — eliminates heap allocation per fire/cancel.
64  
    3. Embedded completion_op — eliminates heap allocation per fire/cancel.
65  
    4. Cached nearest expiry — atomic avoids mutex in nearest_expiry().
65  
    4. Cached nearest expiry — atomic avoids mutex in nearest_expiry().
66  
    5. might_have_pending_waits_ flag — skips lock when no wait issued.
66  
    5. might_have_pending_waits_ flag — skips lock when no wait issued.
67  
    6. Thread-local waiter cache — single-slot per-thread cache.
67  
    6. Thread-local waiter cache — single-slot per-thread cache.
68  

68  

69  
    Concurrency
69  
    Concurrency
70  
    -----------
70  
    -----------
71  
    stop_token callbacks can fire from any thread. The impl_
71  
    stop_token callbacks can fire from any thread. The impl_
72  
    pointer on waiter_node is used as a "still in list" marker.
72  
    pointer on waiter_node is used as a "still in list" marker.
73  
*/
73  
*/
74  

74  

75  
struct BOOST_COROSIO_SYMBOL_VISIBLE waiter_node;
75  
struct BOOST_COROSIO_SYMBOL_VISIBLE waiter_node;
76  

76  

77  
inline void timer_service_invalidate_cache() noexcept;
77  
inline void timer_service_invalidate_cache() noexcept;
78  

78  

79  
// timer_service class body — member function definitions are
79  
// timer_service class body — member function definitions are
80  
// out-of-class (after implementation and waiter_node are complete)
80  
// out-of-class (after implementation and waiter_node are complete)
81  
class BOOST_COROSIO_DECL timer_service final
81  
class BOOST_COROSIO_DECL timer_service final
82  
    : public capy::execution_context::service
82  
    : public capy::execution_context::service
83  
    , public io_object::io_service
83  
    , public io_object::io_service
84  
{
84  
{
85  
public:
85  
public:
86  
    using clock_type = std::chrono::steady_clock;
86  
    using clock_type = std::chrono::steady_clock;
87  
    using time_point = clock_type::time_point;
87  
    using time_point = clock_type::time_point;
88  

88  

 
89 +
    /// Type-erased callback for earliest-expiry-changed notifications.
89  
    class callback
90  
    class callback
90  
    {
91  
    {
91  
        void* ctx_         = nullptr;
92  
        void* ctx_         = nullptr;
92  
        void (*fn_)(void*) = nullptr;
93  
        void (*fn_)(void*) = nullptr;
93  

94  

94  
    public:
95  
    public:
 
96 +
        /// Construct an empty callback.
95  
        callback() = default;
97  
        callback() = default;
 
98 +

 
99 +
        /// Construct a callback with the given context and function.
96  
        callback(void* ctx, void (*fn)(void*)) noexcept : ctx_(ctx), fn_(fn) {}
100  
        callback(void* ctx, void (*fn)(void*)) noexcept : ctx_(ctx), fn_(fn) {}
97  

101  

 
102 +
        /// Return true if the callback is non-empty.
98  
        explicit operator bool() const noexcept
103  
        explicit operator bool() const noexcept
99  
        {
104  
        {
100  
            return fn_ != nullptr;
105  
            return fn_ != nullptr;
101  
        }
106  
        }
 
107 +

 
108 +
        /// Invoke the callback.
102  
        void operator()() const
109  
        void operator()() const
103  
        {
110  
        {
104  
            if (fn_)
111  
            if (fn_)
105  
                fn_(ctx_);
112  
                fn_(ctx_);
106  
        }
113  
        }
107  
    };
114  
    };
108  

115  

109  
    struct implementation;
116  
    struct implementation;
110  

117  

111  
private:
118  
private:
112  
    struct heap_entry
119  
    struct heap_entry
113  
    {
120  
    {
114  
        time_point time_;
121  
        time_point time_;
115  
        implementation* timer_;
122  
        implementation* timer_;
116  
    };
123  
    };
117  

124  

118  
    scheduler* sched_ = nullptr;
125  
    scheduler* sched_ = nullptr;
119  
    mutable std::mutex mutex_;
126  
    mutable std::mutex mutex_;
120  
    std::vector<heap_entry> heap_;
127  
    std::vector<heap_entry> heap_;
121  
    implementation* free_list_     = nullptr;
128  
    implementation* free_list_     = nullptr;
122  
    waiter_node* waiter_free_list_ = nullptr;
129  
    waiter_node* waiter_free_list_ = nullptr;
123  
    callback on_earliest_changed_;
130  
    callback on_earliest_changed_;
124  
    // Avoids mutex in nearest_expiry() and empty()
131  
    // Avoids mutex in nearest_expiry() and empty()
125  
    mutable std::atomic<std::int64_t> cached_nearest_ns_{
132  
    mutable std::atomic<std::int64_t> cached_nearest_ns_{
126  
        (std::numeric_limits<std::int64_t>::max)()};
133  
        (std::numeric_limits<std::int64_t>::max)()};
127  

134  

128  
public:
135  
public:
 
136 +
    /// Construct the timer service bound to a scheduler.
129  
    inline timer_service(capy::execution_context&, scheduler& sched)
137  
    inline timer_service(capy::execution_context&, scheduler& sched)
130  
        : sched_(&sched)
138  
        : sched_(&sched)
131  
    {
139  
    {
132  
    }
140  
    }
133  

141  

 
142 +
    /// Return the associated scheduler.
134  
    inline scheduler& get_scheduler() noexcept
143  
    inline scheduler& get_scheduler() noexcept
135  
    {
144  
    {
136  
        return *sched_;
145  
        return *sched_;
137  
    }
146  
    }
138  

147  

 
148 +
    /// Destroy the timer service.
139  
    ~timer_service() override = default;
149  
    ~timer_service() override = default;
140  

150  

141  
    timer_service(timer_service const&)            = delete;
151  
    timer_service(timer_service const&)            = delete;
142  
    timer_service& operator=(timer_service const&) = delete;
152  
    timer_service& operator=(timer_service const&) = delete;
143  

153  

 
154 +
    /// Register a callback invoked when the earliest expiry changes.
144  
    inline void set_on_earliest_changed(callback cb)
155  
    inline void set_on_earliest_changed(callback cb)
145  
    {
156  
    {
146  
        on_earliest_changed_ = cb;
157  
        on_earliest_changed_ = cb;
147  
    }
158  
    }
148  

159  

 
160 +
    /// Return true if no timers are in the heap.
149  
    inline bool empty() const noexcept
161  
    inline bool empty() const noexcept
150  
    {
162  
    {
151  
        return cached_nearest_ns_.load(std::memory_order_acquire) ==
163  
        return cached_nearest_ns_.load(std::memory_order_acquire) ==
152  
            (std::numeric_limits<std::int64_t>::max)();
164  
            (std::numeric_limits<std::int64_t>::max)();
153  
    }
165  
    }
154  

166  

 
167 +
    /// Return the nearest timer expiry without acquiring the mutex.
155  
    inline time_point nearest_expiry() const noexcept
168  
    inline time_point nearest_expiry() const noexcept
156  
    {
169  
    {
157  
        auto ns = cached_nearest_ns_.load(std::memory_order_acquire);
170  
        auto ns = cached_nearest_ns_.load(std::memory_order_acquire);
158  
        return time_point(time_point::duration(ns));
171  
        return time_point(time_point::duration(ns));
159  
    }
172  
    }
160  

173  

 
174 +
    /// Cancel all pending timers and free cached resources.
161  
    inline void shutdown() override;
175  
    inline void shutdown() override;
 
176 +

 
177 +
    /// Construct a new timer implementation.
162  
    inline io_object::implementation* construct() override;
178  
    inline io_object::implementation* construct() override;
 
179 +

 
180 +
    /// Destroy a timer implementation, cancelling pending waiters.
163  
    inline void destroy(io_object::implementation* p) override;
181  
    inline void destroy(io_object::implementation* p) override;
 
182 +

 
183 +
    /// Cancel and recycle a timer implementation.
164  
    inline void destroy_impl(implementation& impl);
184  
    inline void destroy_impl(implementation& impl);
 
185 +

 
186 +
    /// Create or recycle a waiter node.
165  
    inline waiter_node* create_waiter();
187  
    inline waiter_node* create_waiter();
 
188 +

 
189 +
    /// Return a waiter node to the cache or free list.
166  
    inline void destroy_waiter(waiter_node* w);
190  
    inline void destroy_waiter(waiter_node* w);
 
191 +

 
192 +
    /// Update the timer expiry, cancelling existing waiters.
167  
    inline std::size_t update_timer(implementation& impl, time_point new_time);
193  
    inline std::size_t update_timer(implementation& impl, time_point new_time);
 
194 +

 
195 +
    /// Insert a waiter into the timer's waiter list and the heap.
168  
    inline void insert_waiter(implementation& impl, waiter_node* w);
196  
    inline void insert_waiter(implementation& impl, waiter_node* w);
 
197 +

 
198 +
    /// Cancel all waiters on a timer.
169  
    inline std::size_t cancel_timer(implementation& impl);
199  
    inline std::size_t cancel_timer(implementation& impl);
 
200 +

 
201 +
    /// Cancel a single waiter ( stop_token callback path ).
170  
    inline void cancel_waiter(waiter_node* w);
202  
    inline void cancel_waiter(waiter_node* w);
 
203 +

 
204 +
    /// Cancel one waiter on a timer.
171  
    inline std::size_t cancel_one_waiter(implementation& impl);
205  
    inline std::size_t cancel_one_waiter(implementation& impl);
 
206 +

 
207 +
    /// Complete all waiters whose timers have expired.
172  
    inline std::size_t process_expired();
208  
    inline std::size_t process_expired();
173  

209  

174  
private:
210  
private:
175  
    inline void refresh_cached_nearest() noexcept
211  
    inline void refresh_cached_nearest() noexcept
176  
    {
212  
    {
177  
        auto ns = heap_.empty() ? (std::numeric_limits<std::int64_t>::max)()
213  
        auto ns = heap_.empty() ? (std::numeric_limits<std::int64_t>::max)()
178  
                                : heap_[0].time_.time_since_epoch().count();
214  
                                : heap_[0].time_.time_since_epoch().count();
179  
        cached_nearest_ns_.store(ns, std::memory_order_release);
215  
        cached_nearest_ns_.store(ns, std::memory_order_release);
180  
    }
216  
    }
181  

217  

182  
    inline void remove_timer_impl(implementation& impl);
218  
    inline void remove_timer_impl(implementation& impl);
183  
    inline void up_heap(std::size_t index);
219  
    inline void up_heap(std::size_t index);
184  
    inline void down_heap(std::size_t index);
220  
    inline void down_heap(std::size_t index);
185  
    inline void swap_heap(std::size_t i1, std::size_t i2);
221  
    inline void swap_heap(std::size_t i1, std::size_t i2);
186  
};
222  
};
187  

223  

188  
struct BOOST_COROSIO_SYMBOL_VISIBLE waiter_node
224  
struct BOOST_COROSIO_SYMBOL_VISIBLE waiter_node
189  
    : intrusive_list<waiter_node>::node
225  
    : intrusive_list<waiter_node>::node
190  
{
226  
{
191  
    // Embedded completion op — avoids heap allocation per fire/cancel
227  
    // Embedded completion op — avoids heap allocation per fire/cancel
192  
    struct completion_op final : scheduler_op
228  
    struct completion_op final : scheduler_op
193  
    {
229  
    {
194  
        waiter_node* waiter_ = nullptr;
230  
        waiter_node* waiter_ = nullptr;
195  

231  

196  
        static void do_complete(
232  
        static void do_complete(
197  
            void* owner, scheduler_op* base, std::uint32_t, std::uint32_t);
233  
            void* owner, scheduler_op* base, std::uint32_t, std::uint32_t);
198  

234  

199  
        completion_op() noexcept : scheduler_op(&do_complete) {}
235  
        completion_op() noexcept : scheduler_op(&do_complete) {}
200  

236  

201  
        void operator()() override;
237  
        void operator()() override;
202  
        void destroy() override;
238  
        void destroy() override;
203  
    };
239  
    };
204  

240  

205  
    // Per-waiter stop_token cancellation
241  
    // Per-waiter stop_token cancellation
206  
    struct canceller
242  
    struct canceller
207  
    {
243  
    {
208  
        waiter_node* waiter_;
244  
        waiter_node* waiter_;
209  
        void operator()() const;
245  
        void operator()() const;
210  
    };
246  
    };
211  

247  

212  
    // nullptr once removed from timer's waiter list (concurrency marker)
248  
    // nullptr once removed from timer's waiter list (concurrency marker)
213  
    timer_service::implementation* impl_ = nullptr;
249  
    timer_service::implementation* impl_ = nullptr;
214  
    timer_service* svc_                  = nullptr;
250  
    timer_service* svc_                  = nullptr;
215  
    std::coroutine_handle<> h_;
251  
    std::coroutine_handle<> h_;
216  
    capy::executor_ref d_;
252  
    capy::executor_ref d_;
217  
    std::error_code* ec_out_ = nullptr;
253  
    std::error_code* ec_out_ = nullptr;
218  
    std::stop_token token_;
254  
    std::stop_token token_;
219  
    std::optional<std::stop_callback<canceller>> stop_cb_;
255  
    std::optional<std::stop_callback<canceller>> stop_cb_;
220  
    completion_op op_;
256  
    completion_op op_;
221  
    std::error_code ec_value_;
257  
    std::error_code ec_value_;
222  
    waiter_node* next_free_ = nullptr;
258  
    waiter_node* next_free_ = nullptr;
223  

259  

224  
    waiter_node() noexcept
260  
    waiter_node() noexcept
225  
    {
261  
    {
226  
        op_.waiter_ = this;
262  
        op_.waiter_ = this;
227  
    }
263  
    }
228  
};
264  
};
229  

265  

230  
struct timer_service::implementation final : timer::implementation
266  
struct timer_service::implementation final : timer::implementation
231  
{
267  
{
232  
    using clock_type = std::chrono::steady_clock;
268  
    using clock_type = std::chrono::steady_clock;
233  
    using time_point = clock_type::time_point;
269  
    using time_point = clock_type::time_point;
234  
    using duration   = clock_type::duration;
270  
    using duration   = clock_type::duration;
235  

271  

236  
    timer_service* svc_ = nullptr;
272  
    timer_service* svc_ = nullptr;
237  
    intrusive_list<waiter_node> waiters_;
273  
    intrusive_list<waiter_node> waiters_;
238  

274  

239  
    // Free list linkage (reused when impl is on free_list)
275  
    // Free list linkage (reused when impl is on free_list)
240  
    implementation* next_free_ = nullptr;
276  
    implementation* next_free_ = nullptr;
241  

277  

242  
    inline explicit implementation(timer_service& svc) noexcept;
278  
    inline explicit implementation(timer_service& svc) noexcept;
243  

279  

244  
    inline std::coroutine_handle<> wait(
280  
    inline std::coroutine_handle<> wait(
245  
        std::coroutine_handle<>,
281  
        std::coroutine_handle<>,
246  
        capy::executor_ref,
282  
        capy::executor_ref,
247  
        std::stop_token,
283  
        std::stop_token,
248  
        std::error_code*) override;
284  
        std::error_code*) override;
249  
};
285  
};
250  

286  

251  
// Thread-local caches avoid hot-path mutex acquisitions:
287  
// Thread-local caches avoid hot-path mutex acquisitions:
252  
// 1. Impl cache — single-slot, validated by comparing svc_
288  
// 1. Impl cache — single-slot, validated by comparing svc_
253  
// 2. Waiter cache — single-slot, no service affinity
289  
// 2. Waiter cache — single-slot, no service affinity
254  
// All caches are cleared by timer_service_invalidate_cache() during shutdown.
290  
// All caches are cleared by timer_service_invalidate_cache() during shutdown.
255  

291  

256  
inline thread_local_ptr<timer_service::implementation> tl_cached_impl;
292  
inline thread_local_ptr<timer_service::implementation> tl_cached_impl;
257  
inline thread_local_ptr<waiter_node> tl_cached_waiter;
293  
inline thread_local_ptr<waiter_node> tl_cached_waiter;
258  

294  

259  
inline timer_service::implementation*
295  
inline timer_service::implementation*
260  
try_pop_tl_cache(timer_service* svc) noexcept
296  
try_pop_tl_cache(timer_service* svc) noexcept
261  
{
297  
{
262  
    auto* impl = tl_cached_impl.get();
298  
    auto* impl = tl_cached_impl.get();
263  
    if (impl)
299  
    if (impl)
264  
    {
300  
    {
265  
        tl_cached_impl.set(nullptr);
301  
        tl_cached_impl.set(nullptr);
266  
        if (impl->svc_ == svc)
302  
        if (impl->svc_ == svc)
267  
            return impl;
303  
            return impl;
268  
        // Stale impl from a destroyed service
304  
        // Stale impl from a destroyed service
269  
        delete impl;
305  
        delete impl;
270  
    }
306  
    }
271  
    return nullptr;
307  
    return nullptr;
272  
}
308  
}
273  

309  

274  
inline bool
310  
inline bool
275  
try_push_tl_cache(timer_service::implementation* impl) noexcept
311  
try_push_tl_cache(timer_service::implementation* impl) noexcept
276  
{
312  
{
277  
    if (!tl_cached_impl.get())
313  
    if (!tl_cached_impl.get())
278  
    {
314  
    {
279  
        tl_cached_impl.set(impl);
315  
        tl_cached_impl.set(impl);
280  
        return true;
316  
        return true;
281  
    }
317  
    }
282  
    return false;
318  
    return false;
283  
}
319  
}
284  

320  

285  
inline waiter_node*
321  
inline waiter_node*
286  
try_pop_waiter_tl_cache() noexcept
322  
try_pop_waiter_tl_cache() noexcept
287  
{
323  
{
288  
    auto* w = tl_cached_waiter.get();
324  
    auto* w = tl_cached_waiter.get();
289  
    if (w)
325  
    if (w)
290  
    {
326  
    {
291  
        tl_cached_waiter.set(nullptr);
327  
        tl_cached_waiter.set(nullptr);
292  
        return w;
328  
        return w;
293  
    }
329  
    }
294  
    return nullptr;
330  
    return nullptr;
295  
}
331  
}
296  

332  

297  
inline bool
333  
inline bool
298  
try_push_waiter_tl_cache(waiter_node* w) noexcept
334  
try_push_waiter_tl_cache(waiter_node* w) noexcept
299  
{
335  
{
300  
    if (!tl_cached_waiter.get())
336  
    if (!tl_cached_waiter.get())
301  
    {
337  
    {
302  
        tl_cached_waiter.set(w);
338  
        tl_cached_waiter.set(w);
303  
        return true;
339  
        return true;
304  
    }
340  
    }
305  
    return false;
341  
    return false;
306  
}
342  
}
307  

343  

308  
inline void
344  
inline void
309  
timer_service_invalidate_cache() noexcept
345  
timer_service_invalidate_cache() noexcept
310  
{
346  
{
311  
    delete tl_cached_impl.get();
347  
    delete tl_cached_impl.get();
312  
    tl_cached_impl.set(nullptr);
348  
    tl_cached_impl.set(nullptr);
313  

349  

314  
    delete tl_cached_waiter.get();
350  
    delete tl_cached_waiter.get();
315  
    tl_cached_waiter.set(nullptr);
351  
    tl_cached_waiter.set(nullptr);
316  
}
352  
}
317  

353  

318  
// timer_service out-of-class member function definitions
354  
// timer_service out-of-class member function definitions
319  

355  

320  
inline timer_service::implementation::implementation(
356  
inline timer_service::implementation::implementation(
321  
    timer_service& svc) noexcept
357  
    timer_service& svc) noexcept
322  
    : svc_(&svc)
358  
    : svc_(&svc)
323  
{
359  
{
324  
}
360  
}
325  

361  

326  
inline void
362  
inline void
327  
timer_service::shutdown()
363  
timer_service::shutdown()
328  
{
364  
{
329  
    timer_service_invalidate_cache();
365  
    timer_service_invalidate_cache();
330  

366  

331  
    // Cancel waiting timers still in the heap.
367  
    // Cancel waiting timers still in the heap.
332  
    // Each waiter called work_started() in implementation::wait().
368  
    // Each waiter called work_started() in implementation::wait().
333  
    // On IOCP the scheduler shutdown loop exits when outstanding_work_
369  
    // On IOCP the scheduler shutdown loop exits when outstanding_work_
334  
    // reaches zero, so we must call work_finished() here to balance it.
370  
    // reaches zero, so we must call work_finished() here to balance it.
335  
    // On other backends this is harmless (their drain loops exit when
371  
    // On other backends this is harmless (their drain loops exit when
336  
    // the queue is empty, not based on outstanding_work_).
372  
    // the queue is empty, not based on outstanding_work_).
337  
    for (auto& entry : heap_)
373  
    for (auto& entry : heap_)
338  
    {
374  
    {
339  
        auto* impl = entry.timer_;
375  
        auto* impl = entry.timer_;
340  
        while (auto* w = impl->waiters_.pop_front())
376  
        while (auto* w = impl->waiters_.pop_front())
341  
        {
377  
        {
342  
            w->stop_cb_.reset();
378  
            w->stop_cb_.reset();
343  
            auto h = std::exchange(w->h_, {});
379  
            auto h = std::exchange(w->h_, {});
344  
            sched_->work_finished();
380  
            sched_->work_finished();
345  
            if (h)
381  
            if (h)
346  
                h.destroy();
382  
                h.destroy();
347  
            delete w;
383  
            delete w;
348  
        }
384  
        }
349  
        impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
385  
        impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
350  
        delete impl;
386  
        delete impl;
351  
    }
387  
    }
352  
    heap_.clear();
388  
    heap_.clear();
353  
    cached_nearest_ns_.store(
389  
    cached_nearest_ns_.store(
354  
        (std::numeric_limits<std::int64_t>::max)(), std::memory_order_release);
390  
        (std::numeric_limits<std::int64_t>::max)(), std::memory_order_release);
355  

391  

356  
    // Delete free-listed impls
392  
    // Delete free-listed impls
357  
    while (free_list_)
393  
    while (free_list_)
358  
    {
394  
    {
359  
        auto* next = free_list_->next_free_;
395  
        auto* next = free_list_->next_free_;
360  
        delete free_list_;
396  
        delete free_list_;
361  
        free_list_ = next;
397  
        free_list_ = next;
362  
    }
398  
    }
363  

399  

364  
    // Delete free-listed waiters
400  
    // Delete free-listed waiters
365  
    while (waiter_free_list_)
401  
    while (waiter_free_list_)
366  
    {
402  
    {
367  
        auto* next = waiter_free_list_->next_free_;
403  
        auto* next = waiter_free_list_->next_free_;
368  
        delete waiter_free_list_;
404  
        delete waiter_free_list_;
369  
        waiter_free_list_ = next;
405  
        waiter_free_list_ = next;
370  
    }
406  
    }
371  
}
407  
}
372  

408  

373  
inline io_object::implementation*
409  
inline io_object::implementation*
374  
timer_service::construct()
410  
timer_service::construct()
375  
{
411  
{
376  
    implementation* impl = try_pop_tl_cache(this);
412  
    implementation* impl = try_pop_tl_cache(this);
377  
    if (impl)
413  
    if (impl)
378  
    {
414  
    {
379  
        impl->svc_        = this;
415  
        impl->svc_        = this;
380  
        impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
416  
        impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
381  
        impl->might_have_pending_waits_ = false;
417  
        impl->might_have_pending_waits_ = false;
382  
        return impl;
418  
        return impl;
383  
    }
419  
    }
384  

420  

385  
    std::lock_guard lock(mutex_);
421  
    std::lock_guard lock(mutex_);
386  
    if (free_list_)
422  
    if (free_list_)
387  
    {
423  
    {
388  
        impl              = free_list_;
424  
        impl              = free_list_;
389  
        free_list_        = impl->next_free_;
425  
        free_list_        = impl->next_free_;
390  
        impl->next_free_  = nullptr;
426  
        impl->next_free_  = nullptr;
391  
        impl->svc_        = this;
427  
        impl->svc_        = this;
392  
        impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
428  
        impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
393  
        impl->might_have_pending_waits_ = false;
429  
        impl->might_have_pending_waits_ = false;
394  
    }
430  
    }
395  
    else
431  
    else
396  
    {
432  
    {
397  
        impl = new implementation(*this);
433  
        impl = new implementation(*this);
398  
    }
434  
    }
399  
    return impl;
435  
    return impl;
400  
}
436  
}
401  

437  

402  
inline void
438  
inline void
403  
timer_service::destroy(io_object::implementation* p)
439  
timer_service::destroy(io_object::implementation* p)
404  
{
440  
{
405  
    destroy_impl(static_cast<implementation&>(*p));
441  
    destroy_impl(static_cast<implementation&>(*p));
406  
}
442  
}
407  

443  

408  
inline void
444  
inline void
409  
timer_service::destroy_impl(implementation& impl)
445  
timer_service::destroy_impl(implementation& impl)
410  
{
446  
{
411  
    cancel_timer(impl);
447  
    cancel_timer(impl);
412  

448  

413  
    if (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)())
449  
    if (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)())
414  
    {
450  
    {
415  
        std::lock_guard lock(mutex_);
451  
        std::lock_guard lock(mutex_);
416  
        remove_timer_impl(impl);
452  
        remove_timer_impl(impl);
417  
        refresh_cached_nearest();
453  
        refresh_cached_nearest();
418  
    }
454  
    }
419  

455  

420  
    if (try_push_tl_cache(&impl))
456  
    if (try_push_tl_cache(&impl))
421  
        return;
457  
        return;
422  

458  

423  
    std::lock_guard lock(mutex_);
459  
    std::lock_guard lock(mutex_);
424  
    impl.next_free_ = free_list_;
460  
    impl.next_free_ = free_list_;
425  
    free_list_      = &impl;
461  
    free_list_      = &impl;
426  
}
462  
}
427  

463  

428  
inline waiter_node*
464  
inline waiter_node*
429  
timer_service::create_waiter()
465  
timer_service::create_waiter()
430  
{
466  
{
431  
    if (auto* w = try_pop_waiter_tl_cache())
467  
    if (auto* w = try_pop_waiter_tl_cache())
432  
        return w;
468  
        return w;
433  

469  

434  
    std::lock_guard lock(mutex_);
470  
    std::lock_guard lock(mutex_);
435  
    if (waiter_free_list_)
471  
    if (waiter_free_list_)
436  
    {
472  
    {
437  
        auto* w           = waiter_free_list_;
473  
        auto* w           = waiter_free_list_;
438  
        waiter_free_list_ = w->next_free_;
474  
        waiter_free_list_ = w->next_free_;
439  
        w->next_free_     = nullptr;
475  
        w->next_free_     = nullptr;
440  
        return w;
476  
        return w;
441  
    }
477  
    }
442  

478  

443  
    return new waiter_node();
479  
    return new waiter_node();
444  
}
480  
}
445  

481  

446  
inline void
482  
inline void
447  
timer_service::destroy_waiter(waiter_node* w)
483  
timer_service::destroy_waiter(waiter_node* w)
448  
{
484  
{
449  
    if (try_push_waiter_tl_cache(w))
485  
    if (try_push_waiter_tl_cache(w))
450  
        return;
486  
        return;
451  

487  

452  
    std::lock_guard lock(mutex_);
488  
    std::lock_guard lock(mutex_);
453  
    w->next_free_     = waiter_free_list_;
489  
    w->next_free_     = waiter_free_list_;
454  
    waiter_free_list_ = w;
490  
    waiter_free_list_ = w;
455  
}
491  
}
456  

492  

457  
inline std::size_t
493  
inline std::size_t
458  
timer_service::update_timer(implementation& impl, time_point new_time)
494  
timer_service::update_timer(implementation& impl, time_point new_time)
459  
{
495  
{
460  
    bool in_heap =
496  
    bool in_heap =
461  
        (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)());
497  
        (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)());
462  
    if (!in_heap && impl.waiters_.empty())
498  
    if (!in_heap && impl.waiters_.empty())
463  
        return 0;
499  
        return 0;
464  

500  

465  
    bool notify = false;
501  
    bool notify = false;
466  
    intrusive_list<waiter_node> canceled;
502  
    intrusive_list<waiter_node> canceled;
467  

503  

468  
    {
504  
    {
469  
        std::lock_guard lock(mutex_);
505  
        std::lock_guard lock(mutex_);
470  

506  

471  
        while (auto* w = impl.waiters_.pop_front())
507  
        while (auto* w = impl.waiters_.pop_front())
472  
        {
508  
        {
473  
            w->impl_ = nullptr;
509  
            w->impl_ = nullptr;
474  
            canceled.push_back(w);
510  
            canceled.push_back(w);
475  
        }
511  
        }
476  

512  

477  
        if (impl.heap_index_ < heap_.size())
513  
        if (impl.heap_index_ < heap_.size())
478  
        {
514  
        {
479  
            time_point old_time           = heap_[impl.heap_index_].time_;
515  
            time_point old_time           = heap_[impl.heap_index_].time_;
480  
            heap_[impl.heap_index_].time_ = new_time;
516  
            heap_[impl.heap_index_].time_ = new_time;
481  

517  

482  
            if (new_time < old_time)
518  
            if (new_time < old_time)
483  
                up_heap(impl.heap_index_);
519  
                up_heap(impl.heap_index_);
484  
            else
520  
            else
485  
                down_heap(impl.heap_index_);
521  
                down_heap(impl.heap_index_);
486  

522  

487  
            notify = (impl.heap_index_ == 0);
523  
            notify = (impl.heap_index_ == 0);
488  
        }
524  
        }
489  

525  

490  
        refresh_cached_nearest();
526  
        refresh_cached_nearest();
491  
    }
527  
    }
492  

528  

493  
    std::size_t count = 0;
529  
    std::size_t count = 0;
494  
    while (auto* w = canceled.pop_front())
530  
    while (auto* w = canceled.pop_front())
495  
    {
531  
    {
496  
        w->ec_value_ = make_error_code(capy::error::canceled);
532  
        w->ec_value_ = make_error_code(capy::error::canceled);
497  
        sched_->post(&w->op_);
533  
        sched_->post(&w->op_);
498  
        ++count;
534  
        ++count;
499  
    }
535  
    }
500  

536  

501  
    if (notify)
537  
    if (notify)
502  
        on_earliest_changed_();
538  
        on_earliest_changed_();
503  

539  

504  
    return count;
540  
    return count;
505  
}
541  
}
506  

542  

507  
inline void
543  
inline void
508  
timer_service::insert_waiter(implementation& impl, waiter_node* w)
544  
timer_service::insert_waiter(implementation& impl, waiter_node* w)
509  
{
545  
{
510  
    bool notify = false;
546  
    bool notify = false;
511  
    {
547  
    {
512  
        std::lock_guard lock(mutex_);
548  
        std::lock_guard lock(mutex_);
513  
        if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)())
549  
        if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)())
514  
        {
550  
        {
515  
            impl.heap_index_ = heap_.size();
551  
            impl.heap_index_ = heap_.size();
516  
            heap_.push_back({impl.expiry_, &impl});
552  
            heap_.push_back({impl.expiry_, &impl});
517  
            up_heap(heap_.size() - 1);
553  
            up_heap(heap_.size() - 1);
518  
            notify = (impl.heap_index_ == 0);
554  
            notify = (impl.heap_index_ == 0);
519  
            refresh_cached_nearest();
555  
            refresh_cached_nearest();
520  
        }
556  
        }
521  
        impl.waiters_.push_back(w);
557  
        impl.waiters_.push_back(w);
522  
    }
558  
    }
523  
    if (notify)
559  
    if (notify)
524  
        on_earliest_changed_();
560  
        on_earliest_changed_();
525  
}
561  
}
526  

562  

527  
inline std::size_t
563  
inline std::size_t
528  
timer_service::cancel_timer(implementation& impl)
564  
timer_service::cancel_timer(implementation& impl)
529  
{
565  
{
530  
    if (!impl.might_have_pending_waits_)
566  
    if (!impl.might_have_pending_waits_)
531  
        return 0;
567  
        return 0;
532  

568  

533  
    // Not in heap and no waiters — just clear the flag
569  
    // Not in heap and no waiters — just clear the flag
534  
    if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)() &&
570  
    if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)() &&
535  
        impl.waiters_.empty())
571  
        impl.waiters_.empty())
536  
    {
572  
    {
537  
        impl.might_have_pending_waits_ = false;
573  
        impl.might_have_pending_waits_ = false;
538  
        return 0;
574  
        return 0;
539  
    }
575  
    }
540  

576  

541  
    intrusive_list<waiter_node> canceled;
577  
    intrusive_list<waiter_node> canceled;
542  

578  

543  
    {
579  
    {
544  
        std::lock_guard lock(mutex_);
580  
        std::lock_guard lock(mutex_);
545  
        remove_timer_impl(impl);
581  
        remove_timer_impl(impl);
546  
        while (auto* w = impl.waiters_.pop_front())
582  
        while (auto* w = impl.waiters_.pop_front())
547  
        {
583  
        {
548  
            w->impl_ = nullptr;
584  
            w->impl_ = nullptr;
549  
            canceled.push_back(w);
585  
            canceled.push_back(w);
550  
        }
586  
        }
551  
        refresh_cached_nearest();
587  
        refresh_cached_nearest();
552  
    }
588  
    }
553  

589  

554  
    impl.might_have_pending_waits_ = false;
590  
    impl.might_have_pending_waits_ = false;
555  

591  

556  
    std::size_t count = 0;
592  
    std::size_t count = 0;
557  
    while (auto* w = canceled.pop_front())
593  
    while (auto* w = canceled.pop_front())
558  
    {
594  
    {
559  
        w->ec_value_ = make_error_code(capy::error::canceled);
595  
        w->ec_value_ = make_error_code(capy::error::canceled);
560  
        sched_->post(&w->op_);
596  
        sched_->post(&w->op_);
561  
        ++count;
597  
        ++count;
562  
    }
598  
    }
563  

599  

564  
    return count;
600  
    return count;
565  
}
601  
}
566  

602  

567  
inline void
603  
inline void
568  
timer_service::cancel_waiter(waiter_node* w)
604  
timer_service::cancel_waiter(waiter_node* w)
569  
{
605  
{
570  
    {
606  
    {
571  
        std::lock_guard lock(mutex_);
607  
        std::lock_guard lock(mutex_);
572  
        // Already removed by cancel_timer or process_expired
608  
        // Already removed by cancel_timer or process_expired
573  
        if (!w->impl_)
609  
        if (!w->impl_)
574  
            return;
610  
            return;
575  
        auto* impl = w->impl_;
611  
        auto* impl = w->impl_;
576  
        w->impl_   = nullptr;
612  
        w->impl_   = nullptr;
577  
        impl->waiters_.remove(w);
613  
        impl->waiters_.remove(w);
578  
        if (impl->waiters_.empty())
614  
        if (impl->waiters_.empty())
579  
        {
615  
        {
580  
            remove_timer_impl(*impl);
616  
            remove_timer_impl(*impl);
581  
            impl->might_have_pending_waits_ = false;
617  
            impl->might_have_pending_waits_ = false;
582  
        }
618  
        }
583  
        refresh_cached_nearest();
619  
        refresh_cached_nearest();
584  
    }
620  
    }
585  

621  

586  
    w->ec_value_ = make_error_code(capy::error::canceled);
622  
    w->ec_value_ = make_error_code(capy::error::canceled);
587  
    sched_->post(&w->op_);
623  
    sched_->post(&w->op_);
588  
}
624  
}
589  

625  

590  
inline std::size_t
626  
inline std::size_t
591  
timer_service::cancel_one_waiter(implementation& impl)
627  
timer_service::cancel_one_waiter(implementation& impl)
592  
{
628  
{
593  
    if (!impl.might_have_pending_waits_)
629  
    if (!impl.might_have_pending_waits_)
594  
        return 0;
630  
        return 0;
595  

631  

596  
    waiter_node* w = nullptr;
632  
    waiter_node* w = nullptr;
597  

633  

598  
    {
634  
    {
599  
        std::lock_guard lock(mutex_);
635  
        std::lock_guard lock(mutex_);
600  
        w = impl.waiters_.pop_front();
636  
        w = impl.waiters_.pop_front();
601  
        if (!w)
637  
        if (!w)
602  
            return 0;
638  
            return 0;
603  
        w->impl_ = nullptr;
639  
        w->impl_ = nullptr;
604  
        if (impl.waiters_.empty())
640  
        if (impl.waiters_.empty())
605  
        {
641  
        {
606  
            remove_timer_impl(impl);
642  
            remove_timer_impl(impl);
607  
            impl.might_have_pending_waits_ = false;
643  
            impl.might_have_pending_waits_ = false;
608  
        }
644  
        }
609  
        refresh_cached_nearest();
645  
        refresh_cached_nearest();
610  
    }
646  
    }
611  

647  

612  
    w->ec_value_ = make_error_code(capy::error::canceled);
648  
    w->ec_value_ = make_error_code(capy::error::canceled);
613  
    sched_->post(&w->op_);
649  
    sched_->post(&w->op_);
614  
    return 1;
650  
    return 1;
615  
}
651  
}
616  

652  

617  
inline std::size_t
653  
inline std::size_t
618  
timer_service::process_expired()
654  
timer_service::process_expired()
619  
{
655  
{
620  
    intrusive_list<waiter_node> expired;
656  
    intrusive_list<waiter_node> expired;
621  

657  

622  
    {
658  
    {
623  
        std::lock_guard lock(mutex_);
659  
        std::lock_guard lock(mutex_);
624  
        auto now = clock_type::now();
660  
        auto now = clock_type::now();
625  

661  

626  
        while (!heap_.empty() && heap_[0].time_ <= now)
662  
        while (!heap_.empty() && heap_[0].time_ <= now)
627  
        {
663  
        {
628  
            implementation* t = heap_[0].timer_;
664  
            implementation* t = heap_[0].timer_;
629  
            remove_timer_impl(*t);
665  
            remove_timer_impl(*t);
630  
            while (auto* w = t->waiters_.pop_front())
666  
            while (auto* w = t->waiters_.pop_front())
631  
            {
667  
            {
632  
                w->impl_     = nullptr;
668  
                w->impl_     = nullptr;
633  
                w->ec_value_ = {};
669  
                w->ec_value_ = {};
634  
                expired.push_back(w);
670  
                expired.push_back(w);
635  
            }
671  
            }
636  
            t->might_have_pending_waits_ = false;
672  
            t->might_have_pending_waits_ = false;
637  
        }
673  
        }
638  

674  

639  
        refresh_cached_nearest();
675  
        refresh_cached_nearest();
640  
    }
676  
    }
641  

677  

642  
    std::size_t count = 0;
678  
    std::size_t count = 0;
643  
    while (auto* w = expired.pop_front())
679  
    while (auto* w = expired.pop_front())
644  
    {
680  
    {
645  
        sched_->post(&w->op_);
681  
        sched_->post(&w->op_);
646  
        ++count;
682  
        ++count;
647  
    }
683  
    }
648  

684  

649  
    return count;
685  
    return count;
650  
}
686  
}
651  

687  

652  
inline void
688  
inline void
653  
timer_service::remove_timer_impl(implementation& impl)
689  
timer_service::remove_timer_impl(implementation& impl)
654  
{
690  
{
655  
    std::size_t index = impl.heap_index_;
691  
    std::size_t index = impl.heap_index_;
656  
    if (index >= heap_.size())
692  
    if (index >= heap_.size())
657  
        return; // Not in heap
693  
        return; // Not in heap
658  

694  

659  
    if (index == heap_.size() - 1)
695  
    if (index == heap_.size() - 1)
660  
    {
696  
    {
661  
        // Last element, just pop
697  
        // Last element, just pop
662  
        impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
698  
        impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
663  
        heap_.pop_back();
699  
        heap_.pop_back();
664  
    }
700  
    }
665  
    else
701  
    else
666  
    {
702  
    {
667  
        // Swap with last and reheapify
703  
        // Swap with last and reheapify
668  
        swap_heap(index, heap_.size() - 1);
704  
        swap_heap(index, heap_.size() - 1);
669  
        impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
705  
        impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
670  
        heap_.pop_back();
706  
        heap_.pop_back();
671  

707  

672  
        if (index > 0 && heap_[index].time_ < heap_[(index - 1) / 2].time_)
708  
        if (index > 0 && heap_[index].time_ < heap_[(index - 1) / 2].time_)
673  
            up_heap(index);
709  
            up_heap(index);
674  
        else
710  
        else
675  
            down_heap(index);
711  
            down_heap(index);
676  
    }
712  
    }
677  
}
713  
}
678  

714  

679  
inline void
715  
inline void
680  
timer_service::up_heap(std::size_t index)
716  
timer_service::up_heap(std::size_t index)
681  
{
717  
{
682  
    while (index > 0)
718  
    while (index > 0)
683  
    {
719  
    {
684  
        std::size_t parent = (index - 1) / 2;
720  
        std::size_t parent = (index - 1) / 2;
685  
        if (!(heap_[index].time_ < heap_[parent].time_))
721  
        if (!(heap_[index].time_ < heap_[parent].time_))
686  
            break;
722  
            break;
687  
        swap_heap(index, parent);
723  
        swap_heap(index, parent);
688  
        index = parent;
724  
        index = parent;
689  
    }
725  
    }
690  
}
726  
}
691  

727  

692  
inline void
728  
inline void
693  
timer_service::down_heap(std::size_t index)
729  
timer_service::down_heap(std::size_t index)
694  
{
730  
{
695  
    std::size_t child = index * 2 + 1;
731  
    std::size_t child = index * 2 + 1;
696  
    while (child < heap_.size())
732  
    while (child < heap_.size())
697  
    {
733  
    {
698  
        std::size_t min_child = (child + 1 == heap_.size() ||
734  
        std::size_t min_child = (child + 1 == heap_.size() ||
699  
                                 heap_[child].time_ < heap_[child + 1].time_)
735  
                                 heap_[child].time_ < heap_[child + 1].time_)
700  
            ? child
736  
            ? child
701  
            : child + 1;
737  
            : child + 1;
702  

738  

703  
        if (heap_[index].time_ < heap_[min_child].time_)
739  
        if (heap_[index].time_ < heap_[min_child].time_)
704  
            break;
740  
            break;
705  

741  

706  
        swap_heap(index, min_child);
742  
        swap_heap(index, min_child);
707  
        index = min_child;
743  
        index = min_child;
708  
        child = index * 2 + 1;
744  
        child = index * 2 + 1;
709  
    }
745  
    }
710  
}
746  
}
711  

747  

712  
inline void
748  
inline void
713  
timer_service::swap_heap(std::size_t i1, std::size_t i2)
749  
timer_service::swap_heap(std::size_t i1, std::size_t i2)
714  
{
750  
{
715  
    heap_entry tmp                = heap_[i1];
751  
    heap_entry tmp                = heap_[i1];
716  
    heap_[i1]                     = heap_[i2];
752  
    heap_[i1]                     = heap_[i2];
717  
    heap_[i2]                     = tmp;
753  
    heap_[i2]                     = tmp;
718  
    heap_[i1].timer_->heap_index_ = i1;
754  
    heap_[i1].timer_->heap_index_ = i1;
719  
    heap_[i2].timer_->heap_index_ = i2;
755  
    heap_[i2].timer_->heap_index_ = i2;
720  
}
756  
}
721  

757  

722  
// waiter_node out-of-class member function definitions
758  
// waiter_node out-of-class member function definitions
723  

759  

724  
inline void
760  
inline void
725  
waiter_node::canceller::operator()() const
761  
waiter_node::canceller::operator()() const
726  
{
762  
{
727  
    waiter_->svc_->cancel_waiter(waiter_);
763  
    waiter_->svc_->cancel_waiter(waiter_);
728  
}
764  
}
729  

765  

730  
inline void
766  
inline void
731  
waiter_node::completion_op::do_complete(
767  
waiter_node::completion_op::do_complete(
732  
    [[maybe_unused]] void* owner,
768  
    [[maybe_unused]] void* owner,
733  
    scheduler_op* base,
769  
    scheduler_op* base,
734  
    std::uint32_t,
770  
    std::uint32_t,
735  
    std::uint32_t)
771  
    std::uint32_t)
736  
{
772  
{
737  
    // owner is always non-null here. The destroy path (owner == nullptr)
773  
    // owner is always non-null here. The destroy path (owner == nullptr)
738  
    // is unreachable because completion_op overrides destroy() directly,
774  
    // is unreachable because completion_op overrides destroy() directly,
739  
    // bypassing scheduler_op::destroy() which would call func_(nullptr, ...).
775  
    // bypassing scheduler_op::destroy() which would call func_(nullptr, ...).
740  
    BOOST_COROSIO_ASSERT(owner);
776  
    BOOST_COROSIO_ASSERT(owner);
741  
    static_cast<completion_op*>(base)->operator()();
777  
    static_cast<completion_op*>(base)->operator()();
742  
}
778  
}
743  

779  

744  
inline void
780  
inline void
745  
waiter_node::completion_op::operator()()
781  
waiter_node::completion_op::operator()()
746  
{
782  
{
747  
    auto* w = waiter_;
783  
    auto* w = waiter_;
748  
    w->stop_cb_.reset();
784  
    w->stop_cb_.reset();
749  
    if (w->ec_out_)
785  
    if (w->ec_out_)
750  
        *w->ec_out_ = w->ec_value_;
786  
        *w->ec_out_ = w->ec_value_;
751  

787  

752  
    auto h      = w->h_;
788  
    auto h      = w->h_;
753  
    auto d      = w->d_;
789  
    auto d      = w->d_;
754  
    auto* svc   = w->svc_;
790  
    auto* svc   = w->svc_;
755  
    auto& sched = svc->get_scheduler();
791  
    auto& sched = svc->get_scheduler();
756  

792  

757  
    svc->destroy_waiter(w);
793  
    svc->destroy_waiter(w);
758  

794  

759  
    d.post(h);
795  
    d.post(h);
760  
    sched.work_finished();
796  
    sched.work_finished();
761  
}
797  
}
762  

798  

763  
// GCC 14 false-positive: inlining ~optional<stop_callback> through
799  
// GCC 14 false-positive: inlining ~optional<stop_callback> through
764  
// delete loses track that stop_cb_ was already .reset() above.
800  
// delete loses track that stop_cb_ was already .reset() above.
765  
#if defined(__GNUC__) && !defined(__clang__)
801  
#if defined(__GNUC__) && !defined(__clang__)
766  
#pragma GCC diagnostic push
802  
#pragma GCC diagnostic push
767  
#pragma GCC diagnostic ignored "-Wmaybe-uninitialized"
803  
#pragma GCC diagnostic ignored "-Wmaybe-uninitialized"
768  
#endif
804  
#endif
769  
inline void
805  
inline void
770  
waiter_node::completion_op::destroy()
806  
waiter_node::completion_op::destroy()
771  
{
807  
{
772  
    // Called during scheduler shutdown drain when this completion_op is
808  
    // Called during scheduler shutdown drain when this completion_op is
773  
    // in the scheduler's ready queue (posted by cancel_timer() or
809  
    // in the scheduler's ready queue (posted by cancel_timer() or
774  
    // process_expired()). Balances the work_started() from
810  
    // process_expired()). Balances the work_started() from
775  
    // implementation::wait(). The scheduler drain loop separately
811  
    // implementation::wait(). The scheduler drain loop separately
776  
    // balances the work_started() from post(). On IOCP both decrements
812  
    // balances the work_started() from post(). On IOCP both decrements
777  
    // are required for outstanding_work_ to reach zero; on other
813  
    // are required for outstanding_work_ to reach zero; on other
778  
    // backends this is harmless.
814  
    // backends this is harmless.
779  
    //
815  
    //
780  
    // This override also prevents scheduler_op::destroy() from calling
816  
    // This override also prevents scheduler_op::destroy() from calling
781  
    // do_complete(nullptr, ...). See also: timer_service::shutdown()
817  
    // do_complete(nullptr, ...). See also: timer_service::shutdown()
782  
    // which drains waiters still in the timer heap (the other path).
818  
    // which drains waiters still in the timer heap (the other path).
783  
    auto* w = waiter_;
819  
    auto* w = waiter_;
784  
    w->stop_cb_.reset();
820  
    w->stop_cb_.reset();
785  
    auto h      = std::exchange(w->h_, {});
821  
    auto h      = std::exchange(w->h_, {});
786  
    auto& sched = w->svc_->get_scheduler();
822  
    auto& sched = w->svc_->get_scheduler();
787  
    delete w;
823  
    delete w;
788  
    sched.work_finished();
824  
    sched.work_finished();
789  
    if (h)
825  
    if (h)
790  
        h.destroy();
826  
        h.destroy();
791  
}
827  
}
792  
#if defined(__GNUC__) && !defined(__clang__)
828  
#if defined(__GNUC__) && !defined(__clang__)
793  
#pragma GCC diagnostic pop
829  
#pragma GCC diagnostic pop
794  
#endif
830  
#endif
795  

831  

796  
inline std::coroutine_handle<>
832  
inline std::coroutine_handle<>
797  
timer_service::implementation::wait(
833  
timer_service::implementation::wait(
798  
    std::coroutine_handle<> h,
834  
    std::coroutine_handle<> h,
799  
    capy::executor_ref d,
835  
    capy::executor_ref d,
800  
    std::stop_token token,
836  
    std::stop_token token,
801  
    std::error_code* ec)
837  
    std::error_code* ec)
802  
{
838  
{
803  
    // Already-expired fast path — no waiter_node, no mutex.
839  
    // Already-expired fast path — no waiter_node, no mutex.
804  
    // Post instead of dispatch so the coroutine yields to the
840  
    // Post instead of dispatch so the coroutine yields to the
805  
    // scheduler, allowing other queued work to run.
841  
    // scheduler, allowing other queued work to run.
806  
    if (heap_index_ == (std::numeric_limits<std::size_t>::max)())
842  
    if (heap_index_ == (std::numeric_limits<std::size_t>::max)())
807  
    {
843  
    {
808  
        if (expiry_ == (time_point::min)() || expiry_ <= clock_type::now())
844  
        if (expiry_ == (time_point::min)() || expiry_ <= clock_type::now())
809  
        {
845  
        {
810  
            if (ec)
846  
            if (ec)
811  
                *ec = {};
847  
                *ec = {};
812  
            d.post(h);
848  
            d.post(h);
813  
            return std::noop_coroutine();
849  
            return std::noop_coroutine();
814  
        }
850  
        }
815  
    }
851  
    }
816  

852  

817  
    auto* w    = svc_->create_waiter();
853  
    auto* w    = svc_->create_waiter();
818  
    w->impl_   = this;
854  
    w->impl_   = this;
819  
    w->svc_    = svc_;
855  
    w->svc_    = svc_;
820  
    w->h_      = h;
856  
    w->h_      = h;
821  
    w->d_      = d;
857  
    w->d_      = d;
822  
    w->token_  = std::move(token);
858  
    w->token_  = std::move(token);
823  
    w->ec_out_ = ec;
859  
    w->ec_out_ = ec;
824  

860  

825  
    svc_->insert_waiter(*this, w);
861  
    svc_->insert_waiter(*this, w);
826  
    might_have_pending_waits_ = true;
862  
    might_have_pending_waits_ = true;
827  
    svc_->get_scheduler().work_started();
863  
    svc_->get_scheduler().work_started();
828  

864  

829  
    if (w->token_.stop_possible())
865  
    if (w->token_.stop_possible())
830  
        w->stop_cb_.emplace(w->token_, waiter_node::canceller{w});
866  
        w->stop_cb_.emplace(w->token_, waiter_node::canceller{w});
831  

867  

832  
    return std::noop_coroutine();
868  
    return std::noop_coroutine();
833  
}
869  
}
834  

870  

835  
// Free functions
871  
// Free functions
836  

872  

837  
struct timer_service_access
873  
struct timer_service_access
838  
{
874  
{
839  
    static native_scheduler& get_scheduler(io_context& ctx) noexcept
875  
    static native_scheduler& get_scheduler(io_context& ctx) noexcept
840  
    {
876  
    {
841  
        return static_cast<native_scheduler&>(*ctx.sched_);
877  
        return static_cast<native_scheduler&>(*ctx.sched_);
842  
    }
878  
    }
843  
};
879  
};
844  

880  

845  
// Bypass find_service() mutex by reading the scheduler's cached pointer
881  
// Bypass find_service() mutex by reading the scheduler's cached pointer
846  
inline io_object::io_service&
882  
inline io_object::io_service&
847  
timer_service_direct(capy::execution_context& ctx) noexcept
883  
timer_service_direct(capy::execution_context& ctx) noexcept
848  
{
884  
{
849  
    return *timer_service_access::get_scheduler(static_cast<io_context&>(ctx))
885  
    return *timer_service_access::get_scheduler(static_cast<io_context&>(ctx))
850  
                .timer_svc_;
886  
                .timer_svc_;
851  
}
887  
}
852  

888  

853  
inline std::size_t
889  
inline std::size_t
854  
timer_service_update_expiry(timer::implementation& base)
890  
timer_service_update_expiry(timer::implementation& base)
855  
{
891  
{
856  
    auto& impl = static_cast<timer_service::implementation&>(base);
892  
    auto& impl = static_cast<timer_service::implementation&>(base);
857  
    return impl.svc_->update_timer(impl, impl.expiry_);
893  
    return impl.svc_->update_timer(impl, impl.expiry_);
858  
}
894  
}
859  

895  

860  
inline std::size_t
896  
inline std::size_t
861  
timer_service_cancel(timer::implementation& base) noexcept
897  
timer_service_cancel(timer::implementation& base) noexcept
862  
{
898  
{
863  
    auto& impl = static_cast<timer_service::implementation&>(base);
899  
    auto& impl = static_cast<timer_service::implementation&>(base);
864  
    return impl.svc_->cancel_timer(impl);
900  
    return impl.svc_->cancel_timer(impl);
865  
}
901  
}
866  

902  

867  
inline std::size_t
903  
inline std::size_t
868  
timer_service_cancel_one(timer::implementation& base) noexcept
904  
timer_service_cancel_one(timer::implementation& base) noexcept
869  
{
905  
{
870  
    auto& impl = static_cast<timer_service::implementation&>(base);
906  
    auto& impl = static_cast<timer_service::implementation&>(base);
871  
    return impl.svc_->cancel_one_waiter(impl);
907  
    return impl.svc_->cancel_one_waiter(impl);
872  
}
908  
}
873  

909  

874  
inline timer_service&
910  
inline timer_service&
875  
get_timer_service(capy::execution_context& ctx, scheduler& sched)
911  
get_timer_service(capy::execution_context& ctx, scheduler& sched)
876  
{
912  
{
877  
    return ctx.make_service<timer_service>(sched);
913  
    return ctx.make_service<timer_service>(sched);
878  
}
914  
}
879  

915  

880  
} // namespace boost::corosio::detail
916  
} // namespace boost::corosio::detail
881  

917  

882  
#endif
918  
#endif