LCOV - code coverage report
Current view: top level - corosio/detail - timer_service.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 91.5 % 365 334 31
Test Date: 2026-03-05 21:00:38 Functions: 97.8 % 45 44 1

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
       3                 : // Copyright (c) 2026 Steve Gerbino
       4                 : //
       5                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       6                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       7                 : //
       8                 : // Official repository: https://github.com/cppalliance/corosio
       9                 : //
      10                 : 
      11                 : #ifndef BOOST_COROSIO_DETAIL_TIMER_SERVICE_HPP
      12                 : #define BOOST_COROSIO_DETAIL_TIMER_SERVICE_HPP
      13                 : 
      14                 : #include <boost/corosio/timer.hpp>
      15                 : #include <boost/corosio/io_context.hpp>
      16                 : #include <boost/corosio/detail/scheduler_op.hpp>
      17                 : #include <boost/corosio/native/native_scheduler.hpp>
      18                 : #include <boost/corosio/detail/intrusive.hpp>
      19                 : #include <boost/corosio/detail/thread_local_ptr.hpp>
      20                 : #include <boost/capy/error.hpp>
      21                 : #include <boost/capy/ex/execution_context.hpp>
      22                 : #include <boost/capy/ex/executor_ref.hpp>
      23                 : #include <system_error>
      24                 : 
      25                 : #include <atomic>
      26                 : #include <chrono>
      27                 : #include <coroutine>
      28                 : #include <cstddef>
      29                 : #include <limits>
      30                 : #include <mutex>
      31                 : #include <optional>
      32                 : #include <stop_token>
      33                 : #include <utility>
      34                 : #include <vector>
      35                 : 
      36                 : namespace boost::corosio::detail {
      37                 : 
      38                 : struct scheduler;
      39                 : 
      40                 : /*
      41                 :     Timer Service
      42                 :     =============
      43                 : 
      44                 :     Data Structures
      45                 :     ---------------
      46                 :     waiter_node holds per-waiter state: coroutine handle, executor,
      47                 :     error output, stop_token, embedded completion_op. Each concurrent
      48                 :     co_await t.wait() allocates one waiter_node.
      49                 : 
      50                 :     timer_service::implementation holds per-timer state: expiry,
      51                 :     heap index, and an intrusive_list of waiter_nodes. Multiple
      52                 :     coroutines can wait on the same timer simultaneously.
      53                 : 
      54                 :     timer_service owns a min-heap of active timers, a free list
      55                 :     of recycled impls, and a free list of recycled waiter_nodes. The
      56                 :     heap is ordered by expiry time; the scheduler queries
      57                 :     nearest_expiry() to set the epoll/timerfd timeout.
      58                 : 
      59                 :     Optimization Strategy
      60                 :     ---------------------
      61                 :     1. Deferred heap insertion — expires_after() stores the expiry
      62                 :        but does not insert into the heap. Insertion happens in wait().
      63                 :     2. Thread-local impl cache — single-slot per-thread cache.
      64                 :     3. Embedded completion_op — eliminates heap allocation per fire/cancel.
      65                 :     4. Cached nearest expiry — atomic avoids mutex in nearest_expiry().
      66                 :     5. might_have_pending_waits_ flag — skips lock when no wait issued.
      67                 :     6. Thread-local waiter cache — single-slot per-thread cache.
      68                 : 
      69                 :     Concurrency
      70                 :     -----------
      71                 :     stop_token callbacks can fire from any thread. The impl_
      72                 :     pointer on waiter_node is used as a "still in list" marker.
      73                 : */
      74                 : 
      75                 : struct BOOST_COROSIO_SYMBOL_VISIBLE waiter_node;
      76                 : 
      77                 : inline void timer_service_invalidate_cache() noexcept;
      78                 : 
      79                 : // timer_service class body — member function definitions are
      80                 : // out-of-class (after implementation and waiter_node are complete)
      81                 : class BOOST_COROSIO_DECL timer_service final
      82                 :     : public capy::execution_context::service
      83                 :     , public io_object::io_service
      84                 : {
      85                 : public:
      86                 :     using clock_type = std::chrono::steady_clock;
      87                 :     using time_point = clock_type::time_point;
      88                 : 
      89                 :     /// Type-erased callback for earliest-expiry-changed notifications.
      90                 :     class callback
      91                 :     {
      92                 :         void* ctx_         = nullptr;
      93                 :         void (*fn_)(void*) = nullptr;
      94                 : 
      95                 :     public:
      96                 :         /// Construct an empty callback.
      97 HIT         407 :         callback() = default;
      98                 : 
      99                 :         /// Construct a callback with the given context and function.
     100             407 :         callback(void* ctx, void (*fn)(void*)) noexcept : ctx_(ctx), fn_(fn) {}
     101                 : 
     102                 :         /// Return true if the callback is non-empty.
     103                 :         explicit operator bool() const noexcept
     104                 :         {
     105                 :             return fn_ != nullptr;
     106                 :         }
     107                 : 
     108                 :         /// Invoke the callback.
     109            7272 :         void operator()() const
     110                 :         {
     111            7272 :             if (fn_)
     112            7272 :                 fn_(ctx_);
     113            7272 :         }
     114                 :     };
     115                 : 
     116                 :     struct implementation;
     117                 : 
     118                 : private:
     119                 :     struct heap_entry
     120                 :     {
     121                 :         time_point time_;
     122                 :         implementation* timer_;
     123                 :     };
     124                 : 
     125                 :     scheduler* sched_ = nullptr;
     126                 :     mutable std::mutex mutex_;
     127                 :     std::vector<heap_entry> heap_;
     128                 :     implementation* free_list_     = nullptr;
     129                 :     waiter_node* waiter_free_list_ = nullptr;
     130                 :     callback on_earliest_changed_;
     131                 :     // Avoids mutex in nearest_expiry() and empty()
     132                 :     mutable std::atomic<std::int64_t> cached_nearest_ns_{
     133                 :         (std::numeric_limits<std::int64_t>::max)()};
     134                 : 
     135                 : public:
     136                 :     /// Construct the timer service bound to a scheduler.
     137             407 :     inline timer_service(capy::execution_context&, scheduler& sched)
     138             407 :         : sched_(&sched)
     139                 :     {
     140             407 :     }
     141                 : 
     142                 :     /// Return the associated scheduler.
     143           14630 :     inline scheduler& get_scheduler() noexcept
     144                 :     {
     145           14630 :         return *sched_;
     146                 :     }
     147                 : 
     148                 :     /// Destroy the timer service.
     149             814 :     ~timer_service() override = default;
     150                 : 
     151                 :     timer_service(timer_service const&)            = delete;
     152                 :     timer_service& operator=(timer_service const&) = delete;
     153                 : 
     154                 :     /// Register a callback invoked when the earliest expiry changes.
     155             407 :     inline void set_on_earliest_changed(callback cb)
     156                 :     {
     157             407 :         on_earliest_changed_ = cb;
     158             407 :     }
     159                 : 
     160                 :     /// Return true if no timers are in the heap.
     161                 :     inline bool empty() const noexcept
     162                 :     {
     163                 :         return cached_nearest_ns_.load(std::memory_order_acquire) ==
     164                 :             (std::numeric_limits<std::int64_t>::max)();
     165                 :     }
     166                 : 
     167                 :     /// Return the nearest timer expiry without acquiring the mutex.
     168           17129 :     inline time_point nearest_expiry() const noexcept
     169                 :     {
     170           17129 :         auto ns = cached_nearest_ns_.load(std::memory_order_acquire);
     171           17129 :         return time_point(time_point::duration(ns));
     172                 :     }
     173                 : 
     174                 :     /// Cancel all pending timers and free cached resources.
     175                 :     inline void shutdown() override;
     176                 : 
     177                 :     /// Construct a new timer implementation.
     178                 :     inline io_object::implementation* construct() override;
     179                 : 
     180                 :     /// Destroy a timer implementation, cancelling pending waiters.
     181                 :     inline void destroy(io_object::implementation* p) override;
     182                 : 
     183                 :     /// Cancel and recycle a timer implementation.
     184                 :     inline void destroy_impl(implementation& impl);
     185                 : 
     186                 :     /// Create or recycle a waiter node.
     187                 :     inline waiter_node* create_waiter();
     188                 : 
     189                 :     /// Return a waiter node to the cache or free list.
     190                 :     inline void destroy_waiter(waiter_node* w);
     191                 : 
     192                 :     /// Update the timer expiry, cancelling existing waiters.
     193                 :     inline std::size_t update_timer(implementation& impl, time_point new_time);
     194                 : 
     195                 :     /// Insert a waiter into the timer's waiter list and the heap.
     196                 :     inline void insert_waiter(implementation& impl, waiter_node* w);
     197                 : 
     198                 :     /// Cancel all waiters on a timer.
     199                 :     inline std::size_t cancel_timer(implementation& impl);
     200                 : 
     201                 :     /// Cancel a single waiter ( stop_token callback path ).
     202                 :     inline void cancel_waiter(waiter_node* w);
     203                 : 
     204                 :     /// Cancel one waiter on a timer.
     205                 :     inline std::size_t cancel_one_waiter(implementation& impl);
     206                 : 
     207                 :     /// Complete all waiters whose timers have expired.
     208                 :     inline std::size_t process_expired();
     209                 : 
     210                 : private:
     211          137508 :     inline void refresh_cached_nearest() noexcept
     212                 :     {
     213          137508 :         auto ns = heap_.empty() ? (std::numeric_limits<std::int64_t>::max)()
     214          137032 :                                 : heap_[0].time_.time_since_epoch().count();
     215          137508 :         cached_nearest_ns_.store(ns, std::memory_order_release);
     216          137508 :     }
     217                 : 
     218                 :     inline void remove_timer_impl(implementation& impl);
     219                 :     inline void up_heap(std::size_t index);
     220                 :     inline void down_heap(std::size_t index);
     221                 :     inline void swap_heap(std::size_t i1, std::size_t i2);
     222                 : };
     223                 : 
     224                 : struct BOOST_COROSIO_SYMBOL_VISIBLE waiter_node
     225                 :     : intrusive_list<waiter_node>::node
     226                 : {
     227                 :     // Embedded completion op — avoids heap allocation per fire/cancel
     228                 :     struct completion_op final : scheduler_op
     229                 :     {
     230                 :         waiter_node* waiter_ = nullptr;
     231                 : 
     232                 :         static void do_complete(
     233                 :             void* owner, scheduler_op* base, std::uint32_t, std::uint32_t);
     234                 : 
     235             194 :         completion_op() noexcept : scheduler_op(&do_complete) {}
     236                 : 
     237                 :         void operator()() override;
     238                 :         void destroy() override;
     239                 :     };
     240                 : 
     241                 :     // Per-waiter stop_token cancellation
     242                 :     struct canceller
     243                 :     {
     244                 :         waiter_node* waiter_;
     245                 :         void operator()() const;
     246                 :     };
     247                 : 
     248                 :     // nullptr once removed from timer's waiter list (concurrency marker)
     249                 :     timer_service::implementation* impl_ = nullptr;
     250                 :     timer_service* svc_                  = nullptr;
     251                 :     std::coroutine_handle<> h_;
     252                 :     capy::executor_ref d_;
     253                 :     std::error_code* ec_out_ = nullptr;
     254                 :     std::stop_token token_;
     255                 :     std::optional<std::stop_callback<canceller>> stop_cb_;
     256                 :     completion_op op_;
     257                 :     std::error_code ec_value_;
     258                 :     waiter_node* next_free_ = nullptr;
     259                 : 
     260             194 :     waiter_node() noexcept
     261             194 :     {
     262             194 :         op_.waiter_ = this;
     263             194 :     }
     264                 : };
     265                 : 
     266                 : struct timer_service::implementation final : timer::implementation
     267                 : {
     268                 :     using clock_type = std::chrono::steady_clock;
     269                 :     using time_point = clock_type::time_point;
     270                 :     using duration   = clock_type::duration;
     271                 : 
     272                 :     timer_service* svc_ = nullptr;
     273                 :     intrusive_list<waiter_node> waiters_;
     274                 : 
     275                 :     // Free list linkage (reused when impl is on free_list)
     276                 :     implementation* next_free_ = nullptr;
     277                 : 
     278                 :     inline explicit implementation(timer_service& svc) noexcept;
     279                 : 
     280                 :     inline std::coroutine_handle<> wait(
     281                 :         std::coroutine_handle<>,
     282                 :         capy::executor_ref,
     283                 :         std::stop_token,
     284                 :         std::error_code*) override;
     285                 : };
     286                 : 
     287                 : // Thread-local caches avoid hot-path mutex acquisitions:
     288                 : // 1. Impl cache — single-slot, validated by comparing svc_
     289                 : // 2. Waiter cache — single-slot, no service affinity
     290                 : // All caches are cleared by timer_service_invalidate_cache() during shutdown.
     291                 : 
     292                 : inline thread_local_ptr<timer_service::implementation> tl_cached_impl;
     293                 : inline thread_local_ptr<waiter_node> tl_cached_waiter;
     294                 : 
     295                 : inline timer_service::implementation*
     296            7601 : try_pop_tl_cache(timer_service* svc) noexcept
     297                 : {
     298            7601 :     auto* impl = tl_cached_impl.get();
     299            7601 :     if (impl)
     300                 :     {
     301            7374 :         tl_cached_impl.set(nullptr);
     302            7374 :         if (impl->svc_ == svc)
     303            7374 :             return impl;
     304                 :         // Stale impl from a destroyed service
     305 MIS           0 :         delete impl;
     306                 :     }
     307 HIT         227 :     return nullptr;
     308                 : }
     309                 : 
     310                 : inline bool
     311            7599 : try_push_tl_cache(timer_service::implementation* impl) noexcept
     312                 : {
     313            7599 :     if (!tl_cached_impl.get())
     314                 :     {
     315            7525 :         tl_cached_impl.set(impl);
     316            7525 :         return true;
     317                 :     }
     318              74 :     return false;
     319                 : }
     320                 : 
     321                 : inline waiter_node*
     322            7316 : try_pop_waiter_tl_cache() noexcept
     323                 : {
     324            7316 :     auto* w = tl_cached_waiter.get();
     325            7316 :     if (w)
     326                 :     {
     327            7120 :         tl_cached_waiter.set(nullptr);
     328            7120 :         return w;
     329                 :     }
     330             196 :     return nullptr;
     331                 : }
     332                 : 
     333                 : inline bool
     334            7306 : try_push_waiter_tl_cache(waiter_node* w) noexcept
     335                 : {
     336            7306 :     if (!tl_cached_waiter.get())
     337                 :     {
     338            7226 :         tl_cached_waiter.set(w);
     339            7226 :         return true;
     340                 :     }
     341              80 :     return false;
     342                 : }
     343                 : 
     344                 : inline void
     345             407 : timer_service_invalidate_cache() noexcept
     346                 : {
     347             407 :     delete tl_cached_impl.get();
     348             407 :     tl_cached_impl.set(nullptr);
     349                 : 
     350             407 :     delete tl_cached_waiter.get();
     351             407 :     tl_cached_waiter.set(nullptr);
     352             407 : }
     353                 : 
     354                 : // timer_service out-of-class member function definitions
     355                 : 
     356             227 : inline timer_service::implementation::implementation(
     357             227 :     timer_service& svc) noexcept
     358             227 :     : svc_(&svc)
     359                 : {
     360             227 : }
     361                 : 
     362                 : inline void
     363             407 : timer_service::shutdown()
     364                 : {
     365             407 :     timer_service_invalidate_cache();
     366                 : 
     367                 :     // Cancel waiting timers still in the heap.
     368                 :     // Each waiter called work_started() in implementation::wait().
     369                 :     // On IOCP the scheduler shutdown loop exits when outstanding_work_
     370                 :     // reaches zero, so we must call work_finished() here to balance it.
     371                 :     // On other backends this is harmless (their drain loops exit when
     372                 :     // the queue is empty, not based on outstanding_work_).
     373             409 :     for (auto& entry : heap_)
     374                 :     {
     375               2 :         auto* impl = entry.timer_;
     376               4 :         while (auto* w = impl->waiters_.pop_front())
     377                 :         {
     378               2 :             w->stop_cb_.reset();
     379               2 :             auto h = std::exchange(w->h_, {});
     380               2 :             sched_->work_finished();
     381               2 :             if (h)
     382               2 :                 h.destroy();
     383               2 :             delete w;
     384               2 :         }
     385               2 :         impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
     386               2 :         delete impl;
     387                 :     }
     388             407 :     heap_.clear();
     389             407 :     cached_nearest_ns_.store(
     390                 :         (std::numeric_limits<std::int64_t>::max)(), std::memory_order_release);
     391                 : 
     392                 :     // Delete free-listed impls
     393             481 :     while (free_list_)
     394                 :     {
     395              74 :         auto* next = free_list_->next_free_;
     396              74 :         delete free_list_;
     397              74 :         free_list_ = next;
     398                 :     }
     399                 : 
     400                 :     // Delete free-listed waiters
     401             485 :     while (waiter_free_list_)
     402                 :     {
     403              78 :         auto* next = waiter_free_list_->next_free_;
     404              78 :         delete waiter_free_list_;
     405              78 :         waiter_free_list_ = next;
     406                 :     }
     407             407 : }
     408                 : 
     409                 : inline io_object::implementation*
     410            7601 : timer_service::construct()
     411                 : {
     412            7601 :     implementation* impl = try_pop_tl_cache(this);
     413            7601 :     if (impl)
     414                 :     {
     415            7374 :         impl->svc_        = this;
     416            7374 :         impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
     417            7374 :         impl->might_have_pending_waits_ = false;
     418            7374 :         return impl;
     419                 :     }
     420                 : 
     421             227 :     std::lock_guard lock(mutex_);
     422             227 :     if (free_list_)
     423                 :     {
     424 MIS           0 :         impl              = free_list_;
     425               0 :         free_list_        = impl->next_free_;
     426               0 :         impl->next_free_  = nullptr;
     427               0 :         impl->svc_        = this;
     428               0 :         impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
     429               0 :         impl->might_have_pending_waits_ = false;
     430                 :     }
     431                 :     else
     432                 :     {
     433 HIT         227 :         impl = new implementation(*this);
     434                 :     }
     435             227 :     return impl;
     436             227 : }
     437                 : 
     438                 : inline void
     439            7599 : timer_service::destroy(io_object::implementation* p)
     440                 : {
     441            7599 :     destroy_impl(static_cast<implementation&>(*p));
     442            7599 : }
     443                 : 
     444                 : inline void
     445            7599 : timer_service::destroy_impl(implementation& impl)
     446                 : {
     447            7599 :     cancel_timer(impl);
     448                 : 
     449            7599 :     if (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)())
     450                 :     {
     451 MIS           0 :         std::lock_guard lock(mutex_);
     452               0 :         remove_timer_impl(impl);
     453               0 :         refresh_cached_nearest();
     454               0 :     }
     455                 : 
     456 HIT        7599 :     if (try_push_tl_cache(&impl))
     457            7525 :         return;
     458                 : 
     459              74 :     std::lock_guard lock(mutex_);
     460              74 :     impl.next_free_ = free_list_;
     461              74 :     free_list_      = &impl;
     462              74 : }
     463                 : 
     464                 : inline waiter_node*
     465            7316 : timer_service::create_waiter()
     466                 : {
     467            7316 :     if (auto* w = try_pop_waiter_tl_cache())
     468            7120 :         return w;
     469                 : 
     470             196 :     std::lock_guard lock(mutex_);
     471             196 :     if (waiter_free_list_)
     472                 :     {
     473               2 :         auto* w           = waiter_free_list_;
     474               2 :         waiter_free_list_ = w->next_free_;
     475               2 :         w->next_free_     = nullptr;
     476               2 :         return w;
     477                 :     }
     478                 : 
     479             194 :     return new waiter_node();
     480             196 : }
     481                 : 
     482                 : inline void
     483            7306 : timer_service::destroy_waiter(waiter_node* w)
     484                 : {
     485            7306 :     if (try_push_waiter_tl_cache(w))
     486            7226 :         return;
     487                 : 
     488              80 :     std::lock_guard lock(mutex_);
     489              80 :     w->next_free_     = waiter_free_list_;
     490              80 :     waiter_free_list_ = w;
     491              80 : }
     492                 : 
     493                 : inline std::size_t
     494               6 : timer_service::update_timer(implementation& impl, time_point new_time)
     495                 : {
     496                 :     bool in_heap =
     497               6 :         (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)());
     498               6 :     if (!in_heap && impl.waiters_.empty())
     499 MIS           0 :         return 0;
     500                 : 
     501 HIT           6 :     bool notify = false;
     502               6 :     intrusive_list<waiter_node> canceled;
     503                 : 
     504                 :     {
     505               6 :         std::lock_guard lock(mutex_);
     506                 : 
     507              16 :         while (auto* w = impl.waiters_.pop_front())
     508                 :         {
     509              10 :             w->impl_ = nullptr;
     510              10 :             canceled.push_back(w);
     511              10 :         }
     512                 : 
     513               6 :         if (impl.heap_index_ < heap_.size())
     514                 :         {
     515               6 :             time_point old_time           = heap_[impl.heap_index_].time_;
     516               6 :             heap_[impl.heap_index_].time_ = new_time;
     517                 : 
     518               6 :             if (new_time < old_time)
     519               6 :                 up_heap(impl.heap_index_);
     520                 :             else
     521 MIS           0 :                 down_heap(impl.heap_index_);
     522                 : 
     523 HIT           6 :             notify = (impl.heap_index_ == 0);
     524                 :         }
     525                 : 
     526               6 :         refresh_cached_nearest();
     527               6 :     }
     528                 : 
     529               6 :     std::size_t count = 0;
     530              16 :     while (auto* w = canceled.pop_front())
     531                 :     {
     532              10 :         w->ec_value_ = make_error_code(capy::error::canceled);
     533              10 :         sched_->post(&w->op_);
     534              10 :         ++count;
     535              10 :     }
     536                 : 
     537               6 :     if (notify)
     538               6 :         on_earliest_changed_();
     539                 : 
     540               6 :     return count;
     541                 : }
     542                 : 
     543                 : inline void
     544            7316 : timer_service::insert_waiter(implementation& impl, waiter_node* w)
     545                 : {
     546            7316 :     bool notify = false;
     547                 :     {
     548            7316 :         std::lock_guard lock(mutex_);
     549            7316 :         if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)())
     550                 :         {
     551            7294 :             impl.heap_index_ = heap_.size();
     552            7294 :             heap_.push_back({impl.expiry_, &impl});
     553            7294 :             up_heap(heap_.size() - 1);
     554            7294 :             notify = (impl.heap_index_ == 0);
     555            7294 :             refresh_cached_nearest();
     556                 :         }
     557            7316 :         impl.waiters_.push_back(w);
     558            7316 :     }
     559            7316 :     if (notify)
     560            7266 :         on_earliest_changed_();
     561            7316 : }
     562                 : 
     563                 : inline std::size_t
     564            7607 : timer_service::cancel_timer(implementation& impl)
     565                 : {
     566            7607 :     if (!impl.might_have_pending_waits_)
     567            7583 :         return 0;
     568                 : 
     569                 :     // Not in heap and no waiters — just clear the flag
     570              24 :     if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)() &&
     571 MIS           0 :         impl.waiters_.empty())
     572                 :     {
     573               0 :         impl.might_have_pending_waits_ = false;
     574               0 :         return 0;
     575                 :     }
     576                 : 
     577 HIT          24 :     intrusive_list<waiter_node> canceled;
     578                 : 
     579                 :     {
     580              24 :         std::lock_guard lock(mutex_);
     581              24 :         remove_timer_impl(impl);
     582              52 :         while (auto* w = impl.waiters_.pop_front())
     583                 :         {
     584              28 :             w->impl_ = nullptr;
     585              28 :             canceled.push_back(w);
     586              28 :         }
     587              24 :         refresh_cached_nearest();
     588              24 :     }
     589                 : 
     590              24 :     impl.might_have_pending_waits_ = false;
     591                 : 
     592              24 :     std::size_t count = 0;
     593              52 :     while (auto* w = canceled.pop_front())
     594                 :     {
     595              28 :         w->ec_value_ = make_error_code(capy::error::canceled);
     596              28 :         sched_->post(&w->op_);
     597              28 :         ++count;
     598              28 :     }
     599                 : 
     600              24 :     return count;
     601                 : }
     602                 : 
     603                 : inline void
     604              30 : timer_service::cancel_waiter(waiter_node* w)
     605                 : {
     606                 :     {
     607              30 :         std::lock_guard lock(mutex_);
     608                 :         // Already removed by cancel_timer or process_expired
     609              30 :         if (!w->impl_)
     610 MIS           0 :             return;
     611 HIT          30 :         auto* impl = w->impl_;
     612              30 :         w->impl_   = nullptr;
     613              30 :         impl->waiters_.remove(w);
     614              30 :         if (impl->waiters_.empty())
     615                 :         {
     616              28 :             remove_timer_impl(*impl);
     617              28 :             impl->might_have_pending_waits_ = false;
     618                 :         }
     619              30 :         refresh_cached_nearest();
     620              30 :     }
     621                 : 
     622              30 :     w->ec_value_ = make_error_code(capy::error::canceled);
     623              30 :     sched_->post(&w->op_);
     624                 : }
     625                 : 
     626                 : inline std::size_t
     627               2 : timer_service::cancel_one_waiter(implementation& impl)
     628                 : {
     629               2 :     if (!impl.might_have_pending_waits_)
     630 MIS           0 :         return 0;
     631                 : 
     632 HIT           2 :     waiter_node* w = nullptr;
     633                 : 
     634                 :     {
     635               2 :         std::lock_guard lock(mutex_);
     636               2 :         w = impl.waiters_.pop_front();
     637               2 :         if (!w)
     638 MIS           0 :             return 0;
     639 HIT           2 :         w->impl_ = nullptr;
     640               2 :         if (impl.waiters_.empty())
     641                 :         {
     642 MIS           0 :             remove_timer_impl(impl);
     643               0 :             impl.might_have_pending_waits_ = false;
     644                 :         }
     645 HIT           2 :         refresh_cached_nearest();
     646               2 :     }
     647                 : 
     648               2 :     w->ec_value_ = make_error_code(capy::error::canceled);
     649               2 :     sched_->post(&w->op_);
     650               2 :     return 1;
     651                 : }
     652                 : 
     653                 : inline std::size_t
     654          130152 : timer_service::process_expired()
     655                 : {
     656          130152 :     intrusive_list<waiter_node> expired;
     657                 : 
     658                 :     {
     659          130152 :         std::lock_guard lock(mutex_);
     660          130152 :         auto now = clock_type::now();
     661                 : 
     662          137392 :         while (!heap_.empty() && heap_[0].time_ <= now)
     663                 :         {
     664            7240 :             implementation* t = heap_[0].timer_;
     665            7240 :             remove_timer_impl(*t);
     666           14484 :             while (auto* w = t->waiters_.pop_front())
     667                 :             {
     668            7244 :                 w->impl_     = nullptr;
     669            7244 :                 w->ec_value_ = {};
     670            7244 :                 expired.push_back(w);
     671            7244 :             }
     672            7240 :             t->might_have_pending_waits_ = false;
     673                 :         }
     674                 : 
     675          130152 :         refresh_cached_nearest();
     676          130152 :     }
     677                 : 
     678          130152 :     std::size_t count = 0;
     679          137396 :     while (auto* w = expired.pop_front())
     680                 :     {
     681            7244 :         sched_->post(&w->op_);
     682            7244 :         ++count;
     683            7244 :     }
     684                 : 
     685          130152 :     return count;
     686                 : }
     687                 : 
     688                 : inline void
     689            7292 : timer_service::remove_timer_impl(implementation& impl)
     690                 : {
     691            7292 :     std::size_t index = impl.heap_index_;
     692            7292 :     if (index >= heap_.size())
     693 MIS           0 :         return; // Not in heap
     694                 : 
     695 HIT        7292 :     if (index == heap_.size() - 1)
     696                 :     {
     697                 :         // Last element, just pop
     698             135 :         impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
     699             135 :         heap_.pop_back();
     700                 :     }
     701                 :     else
     702                 :     {
     703                 :         // Swap with last and reheapify
     704            7157 :         swap_heap(index, heap_.size() - 1);
     705            7157 :         impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
     706            7157 :         heap_.pop_back();
     707                 : 
     708            7157 :         if (index > 0 && heap_[index].time_ < heap_[(index - 1) / 2].time_)
     709 MIS           0 :             up_heap(index);
     710                 :         else
     711 HIT        7157 :             down_heap(index);
     712                 :     }
     713                 : }
     714                 : 
     715                 : inline void
     716            7300 : timer_service::up_heap(std::size_t index)
     717                 : {
     718           14435 :     while (index > 0)
     719                 :     {
     720            7163 :         std::size_t parent = (index - 1) / 2;
     721            7163 :         if (!(heap_[index].time_ < heap_[parent].time_))
     722              28 :             break;
     723            7135 :         swap_heap(index, parent);
     724            7135 :         index = parent;
     725                 :     }
     726            7300 : }
     727                 : 
     728                 : inline void
     729            7157 : timer_service::down_heap(std::size_t index)
     730                 : {
     731            7157 :     std::size_t child = index * 2 + 1;
     732            7157 :     while (child < heap_.size())
     733                 :     {
     734               6 :         std::size_t min_child = (child + 1 == heap_.size() ||
     735 MIS           0 :                                  heap_[child].time_ < heap_[child + 1].time_)
     736 HIT           6 :             ? child
     737               6 :             : child + 1;
     738                 : 
     739               6 :         if (heap_[index].time_ < heap_[min_child].time_)
     740               6 :             break;
     741                 : 
     742 MIS           0 :         swap_heap(index, min_child);
     743               0 :         index = min_child;
     744               0 :         child = index * 2 + 1;
     745                 :     }
     746 HIT        7157 : }
     747                 : 
     748                 : inline void
     749           14292 : timer_service::swap_heap(std::size_t i1, std::size_t i2)
     750                 : {
     751           14292 :     heap_entry tmp                = heap_[i1];
     752           14292 :     heap_[i1]                     = heap_[i2];
     753           14292 :     heap_[i2]                     = tmp;
     754           14292 :     heap_[i1].timer_->heap_index_ = i1;
     755           14292 :     heap_[i2].timer_->heap_index_ = i2;
     756           14292 : }
     757                 : 
     758                 : // waiter_node out-of-class member function definitions
     759                 : 
     760                 : inline void
     761              30 : waiter_node::canceller::operator()() const
     762                 : {
     763              30 :     waiter_->svc_->cancel_waiter(waiter_);
     764              30 : }
     765                 : 
     766                 : inline void
     767 MIS           0 : waiter_node::completion_op::do_complete(
     768                 :     [[maybe_unused]] void* owner,
     769                 :     scheduler_op* base,
     770                 :     std::uint32_t,
     771                 :     std::uint32_t)
     772                 : {
     773                 :     // owner is always non-null here. The destroy path (owner == nullptr)
     774                 :     // is unreachable because completion_op overrides destroy() directly,
     775                 :     // bypassing scheduler_op::destroy() which would call func_(nullptr, ...).
     776               0 :     BOOST_COROSIO_ASSERT(owner);
     777               0 :     static_cast<completion_op*>(base)->operator()();
     778               0 : }
     779                 : 
     780                 : inline void
     781 HIT        7306 : waiter_node::completion_op::operator()()
     782                 : {
     783            7306 :     auto* w = waiter_;
     784            7306 :     w->stop_cb_.reset();
     785            7306 :     if (w->ec_out_)
     786            7306 :         *w->ec_out_ = w->ec_value_;
     787                 : 
     788            7306 :     auto h      = w->h_;
     789            7306 :     auto d      = w->d_;
     790            7306 :     auto* svc   = w->svc_;
     791            7306 :     auto& sched = svc->get_scheduler();
     792                 : 
     793            7306 :     svc->destroy_waiter(w);
     794                 : 
     795            7306 :     d.post(h);
     796            7306 :     sched.work_finished();
     797            7306 : }
     798                 : 
     799                 : // GCC 14 false-positive: inlining ~optional<stop_callback> through
     800                 : // delete loses track that stop_cb_ was already .reset() above.
     801                 : #if defined(__GNUC__) && !defined(__clang__)
     802                 : #pragma GCC diagnostic push
     803                 : #pragma GCC diagnostic ignored "-Wmaybe-uninitialized"
     804                 : #endif
     805                 : inline void
     806               8 : waiter_node::completion_op::destroy()
     807                 : {
     808                 :     // Called during scheduler shutdown drain when this completion_op is
     809                 :     // in the scheduler's ready queue (posted by cancel_timer() or
     810                 :     // process_expired()). Balances the work_started() from
     811                 :     // implementation::wait(). The scheduler drain loop separately
     812                 :     // balances the work_started() from post(). On IOCP both decrements
     813                 :     // are required for outstanding_work_ to reach zero; on other
     814                 :     // backends this is harmless.
     815                 :     //
     816                 :     // This override also prevents scheduler_op::destroy() from calling
     817                 :     // do_complete(nullptr, ...). See also: timer_service::shutdown()
     818                 :     // which drains waiters still in the timer heap (the other path).
     819               8 :     auto* w = waiter_;
     820               8 :     w->stop_cb_.reset();
     821               8 :     auto h      = std::exchange(w->h_, {});
     822               8 :     auto& sched = w->svc_->get_scheduler();
     823               8 :     delete w;
     824               8 :     sched.work_finished();
     825               8 :     if (h)
     826               8 :         h.destroy();
     827               8 : }
     828                 : #if defined(__GNUC__) && !defined(__clang__)
     829                 : #pragma GCC diagnostic pop
     830                 : #endif
     831                 : 
     832                 : inline std::coroutine_handle<>
     833            7319 : timer_service::implementation::wait(
     834                 :     std::coroutine_handle<> h,
     835                 :     capy::executor_ref d,
     836                 :     std::stop_token token,
     837                 :     std::error_code* ec)
     838                 : {
     839                 :     // Already-expired fast path — no waiter_node, no mutex.
     840                 :     // Post instead of dispatch so the coroutine yields to the
     841                 :     // scheduler, allowing other queued work to run.
     842            7319 :     if (heap_index_ == (std::numeric_limits<std::size_t>::max)())
     843                 :     {
     844            7297 :         if (expiry_ == (time_point::min)() || expiry_ <= clock_type::now())
     845                 :         {
     846               3 :             if (ec)
     847               3 :                 *ec = {};
     848               3 :             d.post(h);
     849               3 :             return std::noop_coroutine();
     850                 :         }
     851                 :     }
     852                 : 
     853            7316 :     auto* w    = svc_->create_waiter();
     854            7316 :     w->impl_   = this;
     855            7316 :     w->svc_    = svc_;
     856            7316 :     w->h_      = h;
     857            7316 :     w->d_      = d;
     858            7316 :     w->token_  = std::move(token);
     859            7316 :     w->ec_out_ = ec;
     860                 : 
     861            7316 :     svc_->insert_waiter(*this, w);
     862            7316 :     might_have_pending_waits_ = true;
     863            7316 :     svc_->get_scheduler().work_started();
     864                 : 
     865            7316 :     if (w->token_.stop_possible())
     866              48 :         w->stop_cb_.emplace(w->token_, waiter_node::canceller{w});
     867                 : 
     868            7316 :     return std::noop_coroutine();
     869                 : }
     870                 : 
     871                 : // Free functions
     872                 : 
     873                 : struct timer_service_access
     874                 : {
     875            7601 :     static native_scheduler& get_scheduler(io_context& ctx) noexcept
     876                 :     {
     877            7601 :         return static_cast<native_scheduler&>(*ctx.sched_);
     878                 :     }
     879                 : };
     880                 : 
     881                 : // Bypass find_service() mutex by reading the scheduler's cached pointer
     882                 : inline io_object::io_service&
     883            7601 : timer_service_direct(capy::execution_context& ctx) noexcept
     884                 : {
     885            7601 :     return *timer_service_access::get_scheduler(static_cast<io_context&>(ctx))
     886            7601 :                 .timer_svc_;
     887                 : }
     888                 : 
     889                 : inline std::size_t
     890               6 : timer_service_update_expiry(timer::implementation& base)
     891                 : {
     892               6 :     auto& impl = static_cast<timer_service::implementation&>(base);
     893               6 :     return impl.svc_->update_timer(impl, impl.expiry_);
     894                 : }
     895                 : 
     896                 : inline std::size_t
     897               8 : timer_service_cancel(timer::implementation& base) noexcept
     898                 : {
     899               8 :     auto& impl = static_cast<timer_service::implementation&>(base);
     900               8 :     return impl.svc_->cancel_timer(impl);
     901                 : }
     902                 : 
     903                 : inline std::size_t
     904               2 : timer_service_cancel_one(timer::implementation& base) noexcept
     905                 : {
     906               2 :     auto& impl = static_cast<timer_service::implementation&>(base);
     907               2 :     return impl.svc_->cancel_one_waiter(impl);
     908                 : }
     909                 : 
     910                 : inline timer_service&
     911             407 : get_timer_service(capy::execution_context& ctx, scheduler& sched)
     912                 : {
     913             407 :     return ctx.make_service<timer_service>(sched);
     914                 : }
     915                 : 
     916                 : } // namespace boost::corosio::detail
     917                 : 
     918                 : #endif
        

Generated by: LCOV version 2.3