TLA Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : // Copyright (c) 2026 Steve Gerbino
4 : //
5 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 : //
8 : // Official repository: https://github.com/cppalliance/corosio
9 : //
10 :
11 : #ifndef BOOST_COROSIO_DETAIL_TIMER_SERVICE_HPP
12 : #define BOOST_COROSIO_DETAIL_TIMER_SERVICE_HPP
13 :
14 : #include <boost/corosio/timer.hpp>
15 : #include <boost/corosio/io_context.hpp>
16 : #include <boost/corosio/detail/scheduler_op.hpp>
17 : #include <boost/corosio/native/native_scheduler.hpp>
18 : #include <boost/corosio/detail/intrusive.hpp>
19 : #include <boost/corosio/detail/thread_local_ptr.hpp>
20 : #include <boost/capy/error.hpp>
21 : #include <boost/capy/ex/execution_context.hpp>
22 : #include <boost/capy/ex/executor_ref.hpp>
23 : #include <system_error>
24 :
25 : #include <atomic>
26 : #include <chrono>
27 : #include <coroutine>
28 : #include <cstddef>
29 : #include <limits>
30 : #include <mutex>
31 : #include <optional>
32 : #include <stop_token>
33 : #include <utility>
34 : #include <vector>
35 :
36 : namespace boost::corosio::detail {
37 :
38 : struct scheduler;
39 :
40 : /*
41 : Timer Service
42 : =============
43 :
44 : Data Structures
45 : ---------------
46 : waiter_node holds per-waiter state: coroutine handle, executor,
47 : error output, stop_token, embedded completion_op. Each concurrent
48 : co_await t.wait() allocates one waiter_node.
49 :
50 : timer_service::implementation holds per-timer state: expiry,
51 : heap index, and an intrusive_list of waiter_nodes. Multiple
52 : coroutines can wait on the same timer simultaneously.
53 :
54 : timer_service owns a min-heap of active timers, a free list
55 : of recycled impls, and a free list of recycled waiter_nodes. The
56 : heap is ordered by expiry time; the scheduler queries
57 : nearest_expiry() to set the epoll/timerfd timeout.
58 :
59 : Optimization Strategy
60 : ---------------------
61 : 1. Deferred heap insertion — expires_after() stores the expiry
62 : but does not insert into the heap. Insertion happens in wait().
63 : 2. Thread-local impl cache — single-slot per-thread cache.
64 : 3. Embedded completion_op — eliminates heap allocation per fire/cancel.
65 : 4. Cached nearest expiry — atomic avoids mutex in nearest_expiry().
66 : 5. might_have_pending_waits_ flag — skips lock when no wait issued.
67 : 6. Thread-local waiter cache — single-slot per-thread cache.
68 :
69 : Concurrency
70 : -----------
71 : stop_token callbacks can fire from any thread. The impl_
72 : pointer on waiter_node is used as a "still in list" marker.
73 : */
74 :
75 : struct BOOST_COROSIO_SYMBOL_VISIBLE waiter_node;
76 :
77 : inline void timer_service_invalidate_cache() noexcept;
78 :
79 : // timer_service class body — member function definitions are
80 : // out-of-class (after implementation and waiter_node are complete)
81 : class BOOST_COROSIO_DECL timer_service final
82 : : public capy::execution_context::service
83 : , public io_object::io_service
84 : {
85 : public:
86 : using clock_type = std::chrono::steady_clock;
87 : using time_point = clock_type::time_point;
88 :
89 : /// Type-erased callback for earliest-expiry-changed notifications.
90 : class callback
91 : {
92 : void* ctx_ = nullptr;
93 : void (*fn_)(void*) = nullptr;
94 :
95 : public:
96 : /// Construct an empty callback.
97 HIT 407 : callback() = default;
98 :
99 : /// Construct a callback with the given context and function.
100 407 : callback(void* ctx, void (*fn)(void*)) noexcept : ctx_(ctx), fn_(fn) {}
101 :
102 : /// Return true if the callback is non-empty.
103 : explicit operator bool() const noexcept
104 : {
105 : return fn_ != nullptr;
106 : }
107 :
108 : /// Invoke the callback.
109 7272 : void operator()() const
110 : {
111 7272 : if (fn_)
112 7272 : fn_(ctx_);
113 7272 : }
114 : };
115 :
116 : struct implementation;
117 :
118 : private:
119 : struct heap_entry
120 : {
121 : time_point time_;
122 : implementation* timer_;
123 : };
124 :
125 : scheduler* sched_ = nullptr;
126 : mutable std::mutex mutex_;
127 : std::vector<heap_entry> heap_;
128 : implementation* free_list_ = nullptr;
129 : waiter_node* waiter_free_list_ = nullptr;
130 : callback on_earliest_changed_;
131 : // Avoids mutex in nearest_expiry() and empty()
132 : mutable std::atomic<std::int64_t> cached_nearest_ns_{
133 : (std::numeric_limits<std::int64_t>::max)()};
134 :
135 : public:
136 : /// Construct the timer service bound to a scheduler.
137 407 : inline timer_service(capy::execution_context&, scheduler& sched)
138 407 : : sched_(&sched)
139 : {
140 407 : }
141 :
142 : /// Return the associated scheduler.
143 14630 : inline scheduler& get_scheduler() noexcept
144 : {
145 14630 : return *sched_;
146 : }
147 :
148 : /// Destroy the timer service.
149 814 : ~timer_service() override = default;
150 :
151 : timer_service(timer_service const&) = delete;
152 : timer_service& operator=(timer_service const&) = delete;
153 :
154 : /// Register a callback invoked when the earliest expiry changes.
155 407 : inline void set_on_earliest_changed(callback cb)
156 : {
157 407 : on_earliest_changed_ = cb;
158 407 : }
159 :
160 : /// Return true if no timers are in the heap.
161 : inline bool empty() const noexcept
162 : {
163 : return cached_nearest_ns_.load(std::memory_order_acquire) ==
164 : (std::numeric_limits<std::int64_t>::max)();
165 : }
166 :
167 : /// Return the nearest timer expiry without acquiring the mutex.
168 17129 : inline time_point nearest_expiry() const noexcept
169 : {
170 17129 : auto ns = cached_nearest_ns_.load(std::memory_order_acquire);
171 17129 : return time_point(time_point::duration(ns));
172 : }
173 :
174 : /// Cancel all pending timers and free cached resources.
175 : inline void shutdown() override;
176 :
177 : /// Construct a new timer implementation.
178 : inline io_object::implementation* construct() override;
179 :
180 : /// Destroy a timer implementation, cancelling pending waiters.
181 : inline void destroy(io_object::implementation* p) override;
182 :
183 : /// Cancel and recycle a timer implementation.
184 : inline void destroy_impl(implementation& impl);
185 :
186 : /// Create or recycle a waiter node.
187 : inline waiter_node* create_waiter();
188 :
189 : /// Return a waiter node to the cache or free list.
190 : inline void destroy_waiter(waiter_node* w);
191 :
192 : /// Update the timer expiry, cancelling existing waiters.
193 : inline std::size_t update_timer(implementation& impl, time_point new_time);
194 :
195 : /// Insert a waiter into the timer's waiter list and the heap.
196 : inline void insert_waiter(implementation& impl, waiter_node* w);
197 :
198 : /// Cancel all waiters on a timer.
199 : inline std::size_t cancel_timer(implementation& impl);
200 :
201 : /// Cancel a single waiter ( stop_token callback path ).
202 : inline void cancel_waiter(waiter_node* w);
203 :
204 : /// Cancel one waiter on a timer.
205 : inline std::size_t cancel_one_waiter(implementation& impl);
206 :
207 : /// Complete all waiters whose timers have expired.
208 : inline std::size_t process_expired();
209 :
210 : private:
211 137508 : inline void refresh_cached_nearest() noexcept
212 : {
213 137508 : auto ns = heap_.empty() ? (std::numeric_limits<std::int64_t>::max)()
214 137032 : : heap_[0].time_.time_since_epoch().count();
215 137508 : cached_nearest_ns_.store(ns, std::memory_order_release);
216 137508 : }
217 :
218 : inline void remove_timer_impl(implementation& impl);
219 : inline void up_heap(std::size_t index);
220 : inline void down_heap(std::size_t index);
221 : inline void swap_heap(std::size_t i1, std::size_t i2);
222 : };
223 :
224 : struct BOOST_COROSIO_SYMBOL_VISIBLE waiter_node
225 : : intrusive_list<waiter_node>::node
226 : {
227 : // Embedded completion op — avoids heap allocation per fire/cancel
228 : struct completion_op final : scheduler_op
229 : {
230 : waiter_node* waiter_ = nullptr;
231 :
232 : static void do_complete(
233 : void* owner, scheduler_op* base, std::uint32_t, std::uint32_t);
234 :
235 194 : completion_op() noexcept : scheduler_op(&do_complete) {}
236 :
237 : void operator()() override;
238 : void destroy() override;
239 : };
240 :
241 : // Per-waiter stop_token cancellation
242 : struct canceller
243 : {
244 : waiter_node* waiter_;
245 : void operator()() const;
246 : };
247 :
248 : // nullptr once removed from timer's waiter list (concurrency marker)
249 : timer_service::implementation* impl_ = nullptr;
250 : timer_service* svc_ = nullptr;
251 : std::coroutine_handle<> h_;
252 : capy::executor_ref d_;
253 : std::error_code* ec_out_ = nullptr;
254 : std::stop_token token_;
255 : std::optional<std::stop_callback<canceller>> stop_cb_;
256 : completion_op op_;
257 : std::error_code ec_value_;
258 : waiter_node* next_free_ = nullptr;
259 :
260 194 : waiter_node() noexcept
261 194 : {
262 194 : op_.waiter_ = this;
263 194 : }
264 : };
265 :
266 : struct timer_service::implementation final : timer::implementation
267 : {
268 : using clock_type = std::chrono::steady_clock;
269 : using time_point = clock_type::time_point;
270 : using duration = clock_type::duration;
271 :
272 : timer_service* svc_ = nullptr;
273 : intrusive_list<waiter_node> waiters_;
274 :
275 : // Free list linkage (reused when impl is on free_list)
276 : implementation* next_free_ = nullptr;
277 :
278 : inline explicit implementation(timer_service& svc) noexcept;
279 :
280 : inline std::coroutine_handle<> wait(
281 : std::coroutine_handle<>,
282 : capy::executor_ref,
283 : std::stop_token,
284 : std::error_code*) override;
285 : };
286 :
287 : // Thread-local caches avoid hot-path mutex acquisitions:
288 : // 1. Impl cache — single-slot, validated by comparing svc_
289 : // 2. Waiter cache — single-slot, no service affinity
290 : // All caches are cleared by timer_service_invalidate_cache() during shutdown.
291 :
292 : inline thread_local_ptr<timer_service::implementation> tl_cached_impl;
293 : inline thread_local_ptr<waiter_node> tl_cached_waiter;
294 :
295 : inline timer_service::implementation*
296 7601 : try_pop_tl_cache(timer_service* svc) noexcept
297 : {
298 7601 : auto* impl = tl_cached_impl.get();
299 7601 : if (impl)
300 : {
301 7374 : tl_cached_impl.set(nullptr);
302 7374 : if (impl->svc_ == svc)
303 7374 : return impl;
304 : // Stale impl from a destroyed service
305 MIS 0 : delete impl;
306 : }
307 HIT 227 : return nullptr;
308 : }
309 :
310 : inline bool
311 7599 : try_push_tl_cache(timer_service::implementation* impl) noexcept
312 : {
313 7599 : if (!tl_cached_impl.get())
314 : {
315 7525 : tl_cached_impl.set(impl);
316 7525 : return true;
317 : }
318 74 : return false;
319 : }
320 :
321 : inline waiter_node*
322 7316 : try_pop_waiter_tl_cache() noexcept
323 : {
324 7316 : auto* w = tl_cached_waiter.get();
325 7316 : if (w)
326 : {
327 7120 : tl_cached_waiter.set(nullptr);
328 7120 : return w;
329 : }
330 196 : return nullptr;
331 : }
332 :
333 : inline bool
334 7306 : try_push_waiter_tl_cache(waiter_node* w) noexcept
335 : {
336 7306 : if (!tl_cached_waiter.get())
337 : {
338 7226 : tl_cached_waiter.set(w);
339 7226 : return true;
340 : }
341 80 : return false;
342 : }
343 :
344 : inline void
345 407 : timer_service_invalidate_cache() noexcept
346 : {
347 407 : delete tl_cached_impl.get();
348 407 : tl_cached_impl.set(nullptr);
349 :
350 407 : delete tl_cached_waiter.get();
351 407 : tl_cached_waiter.set(nullptr);
352 407 : }
353 :
354 : // timer_service out-of-class member function definitions
355 :
356 227 : inline timer_service::implementation::implementation(
357 227 : timer_service& svc) noexcept
358 227 : : svc_(&svc)
359 : {
360 227 : }
361 :
362 : inline void
363 407 : timer_service::shutdown()
364 : {
365 407 : timer_service_invalidate_cache();
366 :
367 : // Cancel waiting timers still in the heap.
368 : // Each waiter called work_started() in implementation::wait().
369 : // On IOCP the scheduler shutdown loop exits when outstanding_work_
370 : // reaches zero, so we must call work_finished() here to balance it.
371 : // On other backends this is harmless (their drain loops exit when
372 : // the queue is empty, not based on outstanding_work_).
373 409 : for (auto& entry : heap_)
374 : {
375 2 : auto* impl = entry.timer_;
376 4 : while (auto* w = impl->waiters_.pop_front())
377 : {
378 2 : w->stop_cb_.reset();
379 2 : auto h = std::exchange(w->h_, {});
380 2 : sched_->work_finished();
381 2 : if (h)
382 2 : h.destroy();
383 2 : delete w;
384 2 : }
385 2 : impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
386 2 : delete impl;
387 : }
388 407 : heap_.clear();
389 407 : cached_nearest_ns_.store(
390 : (std::numeric_limits<std::int64_t>::max)(), std::memory_order_release);
391 :
392 : // Delete free-listed impls
393 481 : while (free_list_)
394 : {
395 74 : auto* next = free_list_->next_free_;
396 74 : delete free_list_;
397 74 : free_list_ = next;
398 : }
399 :
400 : // Delete free-listed waiters
401 485 : while (waiter_free_list_)
402 : {
403 78 : auto* next = waiter_free_list_->next_free_;
404 78 : delete waiter_free_list_;
405 78 : waiter_free_list_ = next;
406 : }
407 407 : }
408 :
409 : inline io_object::implementation*
410 7601 : timer_service::construct()
411 : {
412 7601 : implementation* impl = try_pop_tl_cache(this);
413 7601 : if (impl)
414 : {
415 7374 : impl->svc_ = this;
416 7374 : impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
417 7374 : impl->might_have_pending_waits_ = false;
418 7374 : return impl;
419 : }
420 :
421 227 : std::lock_guard lock(mutex_);
422 227 : if (free_list_)
423 : {
424 MIS 0 : impl = free_list_;
425 0 : free_list_ = impl->next_free_;
426 0 : impl->next_free_ = nullptr;
427 0 : impl->svc_ = this;
428 0 : impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
429 0 : impl->might_have_pending_waits_ = false;
430 : }
431 : else
432 : {
433 HIT 227 : impl = new implementation(*this);
434 : }
435 227 : return impl;
436 227 : }
437 :
438 : inline void
439 7599 : timer_service::destroy(io_object::implementation* p)
440 : {
441 7599 : destroy_impl(static_cast<implementation&>(*p));
442 7599 : }
443 :
444 : inline void
445 7599 : timer_service::destroy_impl(implementation& impl)
446 : {
447 7599 : cancel_timer(impl);
448 :
449 7599 : if (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)())
450 : {
451 MIS 0 : std::lock_guard lock(mutex_);
452 0 : remove_timer_impl(impl);
453 0 : refresh_cached_nearest();
454 0 : }
455 :
456 HIT 7599 : if (try_push_tl_cache(&impl))
457 7525 : return;
458 :
459 74 : std::lock_guard lock(mutex_);
460 74 : impl.next_free_ = free_list_;
461 74 : free_list_ = &impl;
462 74 : }
463 :
464 : inline waiter_node*
465 7316 : timer_service::create_waiter()
466 : {
467 7316 : if (auto* w = try_pop_waiter_tl_cache())
468 7120 : return w;
469 :
470 196 : std::lock_guard lock(mutex_);
471 196 : if (waiter_free_list_)
472 : {
473 2 : auto* w = waiter_free_list_;
474 2 : waiter_free_list_ = w->next_free_;
475 2 : w->next_free_ = nullptr;
476 2 : return w;
477 : }
478 :
479 194 : return new waiter_node();
480 196 : }
481 :
482 : inline void
483 7306 : timer_service::destroy_waiter(waiter_node* w)
484 : {
485 7306 : if (try_push_waiter_tl_cache(w))
486 7226 : return;
487 :
488 80 : std::lock_guard lock(mutex_);
489 80 : w->next_free_ = waiter_free_list_;
490 80 : waiter_free_list_ = w;
491 80 : }
492 :
493 : inline std::size_t
494 6 : timer_service::update_timer(implementation& impl, time_point new_time)
495 : {
496 : bool in_heap =
497 6 : (impl.heap_index_ != (std::numeric_limits<std::size_t>::max)());
498 6 : if (!in_heap && impl.waiters_.empty())
499 MIS 0 : return 0;
500 :
501 HIT 6 : bool notify = false;
502 6 : intrusive_list<waiter_node> canceled;
503 :
504 : {
505 6 : std::lock_guard lock(mutex_);
506 :
507 16 : while (auto* w = impl.waiters_.pop_front())
508 : {
509 10 : w->impl_ = nullptr;
510 10 : canceled.push_back(w);
511 10 : }
512 :
513 6 : if (impl.heap_index_ < heap_.size())
514 : {
515 6 : time_point old_time = heap_[impl.heap_index_].time_;
516 6 : heap_[impl.heap_index_].time_ = new_time;
517 :
518 6 : if (new_time < old_time)
519 6 : up_heap(impl.heap_index_);
520 : else
521 MIS 0 : down_heap(impl.heap_index_);
522 :
523 HIT 6 : notify = (impl.heap_index_ == 0);
524 : }
525 :
526 6 : refresh_cached_nearest();
527 6 : }
528 :
529 6 : std::size_t count = 0;
530 16 : while (auto* w = canceled.pop_front())
531 : {
532 10 : w->ec_value_ = make_error_code(capy::error::canceled);
533 10 : sched_->post(&w->op_);
534 10 : ++count;
535 10 : }
536 :
537 6 : if (notify)
538 6 : on_earliest_changed_();
539 :
540 6 : return count;
541 : }
542 :
543 : inline void
544 7316 : timer_service::insert_waiter(implementation& impl, waiter_node* w)
545 : {
546 7316 : bool notify = false;
547 : {
548 7316 : std::lock_guard lock(mutex_);
549 7316 : if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)())
550 : {
551 7294 : impl.heap_index_ = heap_.size();
552 7294 : heap_.push_back({impl.expiry_, &impl});
553 7294 : up_heap(heap_.size() - 1);
554 7294 : notify = (impl.heap_index_ == 0);
555 7294 : refresh_cached_nearest();
556 : }
557 7316 : impl.waiters_.push_back(w);
558 7316 : }
559 7316 : if (notify)
560 7266 : on_earliest_changed_();
561 7316 : }
562 :
563 : inline std::size_t
564 7607 : timer_service::cancel_timer(implementation& impl)
565 : {
566 7607 : if (!impl.might_have_pending_waits_)
567 7583 : return 0;
568 :
569 : // Not in heap and no waiters — just clear the flag
570 24 : if (impl.heap_index_ == (std::numeric_limits<std::size_t>::max)() &&
571 MIS 0 : impl.waiters_.empty())
572 : {
573 0 : impl.might_have_pending_waits_ = false;
574 0 : return 0;
575 : }
576 :
577 HIT 24 : intrusive_list<waiter_node> canceled;
578 :
579 : {
580 24 : std::lock_guard lock(mutex_);
581 24 : remove_timer_impl(impl);
582 52 : while (auto* w = impl.waiters_.pop_front())
583 : {
584 28 : w->impl_ = nullptr;
585 28 : canceled.push_back(w);
586 28 : }
587 24 : refresh_cached_nearest();
588 24 : }
589 :
590 24 : impl.might_have_pending_waits_ = false;
591 :
592 24 : std::size_t count = 0;
593 52 : while (auto* w = canceled.pop_front())
594 : {
595 28 : w->ec_value_ = make_error_code(capy::error::canceled);
596 28 : sched_->post(&w->op_);
597 28 : ++count;
598 28 : }
599 :
600 24 : return count;
601 : }
602 :
603 : inline void
604 30 : timer_service::cancel_waiter(waiter_node* w)
605 : {
606 : {
607 30 : std::lock_guard lock(mutex_);
608 : // Already removed by cancel_timer or process_expired
609 30 : if (!w->impl_)
610 MIS 0 : return;
611 HIT 30 : auto* impl = w->impl_;
612 30 : w->impl_ = nullptr;
613 30 : impl->waiters_.remove(w);
614 30 : if (impl->waiters_.empty())
615 : {
616 28 : remove_timer_impl(*impl);
617 28 : impl->might_have_pending_waits_ = false;
618 : }
619 30 : refresh_cached_nearest();
620 30 : }
621 :
622 30 : w->ec_value_ = make_error_code(capy::error::canceled);
623 30 : sched_->post(&w->op_);
624 : }
625 :
626 : inline std::size_t
627 2 : timer_service::cancel_one_waiter(implementation& impl)
628 : {
629 2 : if (!impl.might_have_pending_waits_)
630 MIS 0 : return 0;
631 :
632 HIT 2 : waiter_node* w = nullptr;
633 :
634 : {
635 2 : std::lock_guard lock(mutex_);
636 2 : w = impl.waiters_.pop_front();
637 2 : if (!w)
638 MIS 0 : return 0;
639 HIT 2 : w->impl_ = nullptr;
640 2 : if (impl.waiters_.empty())
641 : {
642 MIS 0 : remove_timer_impl(impl);
643 0 : impl.might_have_pending_waits_ = false;
644 : }
645 HIT 2 : refresh_cached_nearest();
646 2 : }
647 :
648 2 : w->ec_value_ = make_error_code(capy::error::canceled);
649 2 : sched_->post(&w->op_);
650 2 : return 1;
651 : }
652 :
653 : inline std::size_t
654 130152 : timer_service::process_expired()
655 : {
656 130152 : intrusive_list<waiter_node> expired;
657 :
658 : {
659 130152 : std::lock_guard lock(mutex_);
660 130152 : auto now = clock_type::now();
661 :
662 137392 : while (!heap_.empty() && heap_[0].time_ <= now)
663 : {
664 7240 : implementation* t = heap_[0].timer_;
665 7240 : remove_timer_impl(*t);
666 14484 : while (auto* w = t->waiters_.pop_front())
667 : {
668 7244 : w->impl_ = nullptr;
669 7244 : w->ec_value_ = {};
670 7244 : expired.push_back(w);
671 7244 : }
672 7240 : t->might_have_pending_waits_ = false;
673 : }
674 :
675 130152 : refresh_cached_nearest();
676 130152 : }
677 :
678 130152 : std::size_t count = 0;
679 137396 : while (auto* w = expired.pop_front())
680 : {
681 7244 : sched_->post(&w->op_);
682 7244 : ++count;
683 7244 : }
684 :
685 130152 : return count;
686 : }
687 :
688 : inline void
689 7292 : timer_service::remove_timer_impl(implementation& impl)
690 : {
691 7292 : std::size_t index = impl.heap_index_;
692 7292 : if (index >= heap_.size())
693 MIS 0 : return; // Not in heap
694 :
695 HIT 7292 : if (index == heap_.size() - 1)
696 : {
697 : // Last element, just pop
698 135 : impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
699 135 : heap_.pop_back();
700 : }
701 : else
702 : {
703 : // Swap with last and reheapify
704 7157 : swap_heap(index, heap_.size() - 1);
705 7157 : impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
706 7157 : heap_.pop_back();
707 :
708 7157 : if (index > 0 && heap_[index].time_ < heap_[(index - 1) / 2].time_)
709 MIS 0 : up_heap(index);
710 : else
711 HIT 7157 : down_heap(index);
712 : }
713 : }
714 :
715 : inline void
716 7300 : timer_service::up_heap(std::size_t index)
717 : {
718 14435 : while (index > 0)
719 : {
720 7163 : std::size_t parent = (index - 1) / 2;
721 7163 : if (!(heap_[index].time_ < heap_[parent].time_))
722 28 : break;
723 7135 : swap_heap(index, parent);
724 7135 : index = parent;
725 : }
726 7300 : }
727 :
728 : inline void
729 7157 : timer_service::down_heap(std::size_t index)
730 : {
731 7157 : std::size_t child = index * 2 + 1;
732 7157 : while (child < heap_.size())
733 : {
734 6 : std::size_t min_child = (child + 1 == heap_.size() ||
735 MIS 0 : heap_[child].time_ < heap_[child + 1].time_)
736 HIT 6 : ? child
737 6 : : child + 1;
738 :
739 6 : if (heap_[index].time_ < heap_[min_child].time_)
740 6 : break;
741 :
742 MIS 0 : swap_heap(index, min_child);
743 0 : index = min_child;
744 0 : child = index * 2 + 1;
745 : }
746 HIT 7157 : }
747 :
748 : inline void
749 14292 : timer_service::swap_heap(std::size_t i1, std::size_t i2)
750 : {
751 14292 : heap_entry tmp = heap_[i1];
752 14292 : heap_[i1] = heap_[i2];
753 14292 : heap_[i2] = tmp;
754 14292 : heap_[i1].timer_->heap_index_ = i1;
755 14292 : heap_[i2].timer_->heap_index_ = i2;
756 14292 : }
757 :
758 : // waiter_node out-of-class member function definitions
759 :
760 : inline void
761 30 : waiter_node::canceller::operator()() const
762 : {
763 30 : waiter_->svc_->cancel_waiter(waiter_);
764 30 : }
765 :
766 : inline void
767 MIS 0 : waiter_node::completion_op::do_complete(
768 : [[maybe_unused]] void* owner,
769 : scheduler_op* base,
770 : std::uint32_t,
771 : std::uint32_t)
772 : {
773 : // owner is always non-null here. The destroy path (owner == nullptr)
774 : // is unreachable because completion_op overrides destroy() directly,
775 : // bypassing scheduler_op::destroy() which would call func_(nullptr, ...).
776 0 : BOOST_COROSIO_ASSERT(owner);
777 0 : static_cast<completion_op*>(base)->operator()();
778 0 : }
779 :
780 : inline void
781 HIT 7306 : waiter_node::completion_op::operator()()
782 : {
783 7306 : auto* w = waiter_;
784 7306 : w->stop_cb_.reset();
785 7306 : if (w->ec_out_)
786 7306 : *w->ec_out_ = w->ec_value_;
787 :
788 7306 : auto h = w->h_;
789 7306 : auto d = w->d_;
790 7306 : auto* svc = w->svc_;
791 7306 : auto& sched = svc->get_scheduler();
792 :
793 7306 : svc->destroy_waiter(w);
794 :
795 7306 : d.post(h);
796 7306 : sched.work_finished();
797 7306 : }
798 :
799 : // GCC 14 false-positive: inlining ~optional<stop_callback> through
800 : // delete loses track that stop_cb_ was already .reset() above.
801 : #if defined(__GNUC__) && !defined(__clang__)
802 : #pragma GCC diagnostic push
803 : #pragma GCC diagnostic ignored "-Wmaybe-uninitialized"
804 : #endif
805 : inline void
806 8 : waiter_node::completion_op::destroy()
807 : {
808 : // Called during scheduler shutdown drain when this completion_op is
809 : // in the scheduler's ready queue (posted by cancel_timer() or
810 : // process_expired()). Balances the work_started() from
811 : // implementation::wait(). The scheduler drain loop separately
812 : // balances the work_started() from post(). On IOCP both decrements
813 : // are required for outstanding_work_ to reach zero; on other
814 : // backends this is harmless.
815 : //
816 : // This override also prevents scheduler_op::destroy() from calling
817 : // do_complete(nullptr, ...). See also: timer_service::shutdown()
818 : // which drains waiters still in the timer heap (the other path).
819 8 : auto* w = waiter_;
820 8 : w->stop_cb_.reset();
821 8 : auto h = std::exchange(w->h_, {});
822 8 : auto& sched = w->svc_->get_scheduler();
823 8 : delete w;
824 8 : sched.work_finished();
825 8 : if (h)
826 8 : h.destroy();
827 8 : }
828 : #if defined(__GNUC__) && !defined(__clang__)
829 : #pragma GCC diagnostic pop
830 : #endif
831 :
832 : inline std::coroutine_handle<>
833 7319 : timer_service::implementation::wait(
834 : std::coroutine_handle<> h,
835 : capy::executor_ref d,
836 : std::stop_token token,
837 : std::error_code* ec)
838 : {
839 : // Already-expired fast path — no waiter_node, no mutex.
840 : // Post instead of dispatch so the coroutine yields to the
841 : // scheduler, allowing other queued work to run.
842 7319 : if (heap_index_ == (std::numeric_limits<std::size_t>::max)())
843 : {
844 7297 : if (expiry_ == (time_point::min)() || expiry_ <= clock_type::now())
845 : {
846 3 : if (ec)
847 3 : *ec = {};
848 3 : d.post(h);
849 3 : return std::noop_coroutine();
850 : }
851 : }
852 :
853 7316 : auto* w = svc_->create_waiter();
854 7316 : w->impl_ = this;
855 7316 : w->svc_ = svc_;
856 7316 : w->h_ = h;
857 7316 : w->d_ = d;
858 7316 : w->token_ = std::move(token);
859 7316 : w->ec_out_ = ec;
860 :
861 7316 : svc_->insert_waiter(*this, w);
862 7316 : might_have_pending_waits_ = true;
863 7316 : svc_->get_scheduler().work_started();
864 :
865 7316 : if (w->token_.stop_possible())
866 48 : w->stop_cb_.emplace(w->token_, waiter_node::canceller{w});
867 :
868 7316 : return std::noop_coroutine();
869 : }
870 :
871 : // Free functions
872 :
873 : struct timer_service_access
874 : {
875 7601 : static native_scheduler& get_scheduler(io_context& ctx) noexcept
876 : {
877 7601 : return static_cast<native_scheduler&>(*ctx.sched_);
878 : }
879 : };
880 :
881 : // Bypass find_service() mutex by reading the scheduler's cached pointer
882 : inline io_object::io_service&
883 7601 : timer_service_direct(capy::execution_context& ctx) noexcept
884 : {
885 7601 : return *timer_service_access::get_scheduler(static_cast<io_context&>(ctx))
886 7601 : .timer_svc_;
887 : }
888 :
889 : inline std::size_t
890 6 : timer_service_update_expiry(timer::implementation& base)
891 : {
892 6 : auto& impl = static_cast<timer_service::implementation&>(base);
893 6 : return impl.svc_->update_timer(impl, impl.expiry_);
894 : }
895 :
896 : inline std::size_t
897 8 : timer_service_cancel(timer::implementation& base) noexcept
898 : {
899 8 : auto& impl = static_cast<timer_service::implementation&>(base);
900 8 : return impl.svc_->cancel_timer(impl);
901 : }
902 :
903 : inline std::size_t
904 2 : timer_service_cancel_one(timer::implementation& base) noexcept
905 : {
906 2 : auto& impl = static_cast<timer_service::implementation&>(base);
907 2 : return impl.svc_->cancel_one_waiter(impl);
908 : }
909 :
910 : inline timer_service&
911 407 : get_timer_service(capy::execution_context& ctx, scheduler& sched)
912 : {
913 407 : return ctx.make_service<timer_service>(sched);
914 : }
915 :
916 : } // namespace boost::corosio::detail
917 :
918 : #endif
|