include/boost/corosio/tcp_server.hpp

78.7% Lines (107/136) 91.2% Functions (31/34)
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Vinnie Falco (vinnie.falco@gmail.com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_TCP_SERVER_HPP
11 #define BOOST_COROSIO_TCP_SERVER_HPP
12
13 #include <boost/corosio/detail/config.hpp>
14 #include <boost/corosio/detail/except.hpp>
15 #include <boost/corosio/tcp_acceptor.hpp>
16 #include <boost/corosio/tcp_socket.hpp>
17 #include <boost/corosio/io_context.hpp>
18 #include <boost/corosio/endpoint.hpp>
19 #include <boost/capy/task.hpp>
20 #include <boost/capy/concept/execution_context.hpp>
21 #include <boost/capy/concept/io_awaitable.hpp>
22 #include <boost/capy/concept/executor.hpp>
23 #include <boost/capy/ex/any_executor.hpp>
24 #include <boost/capy/ex/frame_allocator.hpp>
25 #include <boost/capy/ex/io_env.hpp>
26 #include <boost/capy/ex/run_async.hpp>
27
28 #include <coroutine>
29 #include <memory>
30 #include <ranges>
31 #include <vector>
32
33 namespace boost::corosio {
34
35 #ifdef _MSC_VER
36 #pragma warning(push)
37 #pragma warning(disable : 4251) // class needs to have dll-interface
38 #endif
39
40 /** TCP server with pooled workers.
41
42 This class manages a pool of reusable worker objects that handle
43 incoming connections. When a connection arrives, an idle worker
44 is dispatched to handle it. After the connection completes, the
45 worker returns to the pool for reuse, avoiding allocation overhead
46 per connection.
47
48 Workers are set via @ref set_workers as a forward range of
49 pointer-like objects (e.g., `unique_ptr<worker_base>`). The server
50 takes ownership of the container via type erasure.
51
52 @par Thread Safety
53 Distinct objects: Safe.
54 Shared objects: Unsafe.
55
56 @par Lifecycle
57 The server operates in three states:
58
59 - **Stopped**: Initial state, or after @ref join completes.
60 - **Running**: After @ref start, actively accepting connections.
61 - **Stopping**: After @ref stop, draining active work.
62
63 State transitions:
64 @code
65 [Stopped] --start()--> [Running] --stop()--> [Stopping] --join()--> [Stopped]
66 @endcode
67
68 @par Running the Server
69 @code
70 io_context ioc;
71 tcp_server srv(ioc, ioc.get_executor());
72 srv.set_workers(make_workers(ioc, 100));
73 srv.bind(endpoint{address_v4::any(), 8080});
74 srv.start();
75 ioc.run(); // Blocks until all work completes
76 @endcode
77
78 @par Graceful Shutdown
79 To shut down gracefully, call @ref stop then drain the io_context:
80 @code
81 // From a signal handler or timer callback:
82 srv.stop();
83
84 // ioc.run() returns after pending work drains.
85 // Then from the thread that called ioc.run():
86 srv.join(); // Wait for accept loops to finish
87 @endcode
88
89 @par Restart After Stop
90 The server can be restarted after a complete shutdown cycle.
91 You must drain the io_context and call @ref join before restarting:
92 @code
93 srv.start();
94 ioc.run_for( 10s ); // Run for a while
95 srv.stop(); // Signal shutdown
96 ioc.run(); // REQUIRED: drain pending completions
97 srv.join(); // REQUIRED: wait for accept loops
98
99 // Now safe to restart
100 srv.start();
101 ioc.run();
102 @endcode
103
104 @par WARNING: What NOT to Do
105 - Do NOT call @ref join from inside a worker coroutine (deadlock).
106 - Do NOT call @ref join from a thread running `ioc.run()` (deadlock).
107 - Do NOT call @ref start without completing @ref join after @ref stop.
108 - Do NOT call `ioc.stop()` for graceful shutdown; use @ref stop instead.
109
110 @par Example
111 @code
112 class my_worker : public tcp_server::worker_base
113 {
114 corosio::tcp_socket sock_;
115 capy::any_executor ex_;
116 public:
117 my_worker(io_context& ctx)
118 : sock_(ctx)
119 , ex_(ctx.get_executor())
120 {
121 }
122
123 corosio::tcp_socket& socket() override { return sock_; }
124
125 void run(launcher launch) override
126 {
127 launch(ex_, [](corosio::tcp_socket* sock) -> capy::task<>
128 {
129 // handle connection using sock
130 co_return;
131 }(&sock_));
132 }
133 };
134
135 auto make_workers(io_context& ctx, int n)
136 {
137 std::vector<std::unique_ptr<tcp_server::worker_base>> v;
138 v.reserve(n);
139 for(int i = 0; i < n; ++i)
140 v.push_back(std::make_unique<my_worker>(ctx));
141 return v;
142 }
143
144 io_context ioc;
145 tcp_server srv(ioc, ioc.get_executor());
146 srv.set_workers(make_workers(ioc, 100));
147 @endcode
148
149 @see worker_base, set_workers, launcher
150 */
151 class BOOST_COROSIO_DECL tcp_server
152 {
153 public:
154 class worker_base; ///< Abstract base for connection handlers.
155 class launcher; ///< Move-only handle to launch worker coroutines.
156
157 private:
158 struct waiter
159 {
160 waiter* next;
161 std::coroutine_handle<> h;
162 worker_base* w;
163 };
164
165 struct impl;
166
167 static impl* make_impl(capy::execution_context& ctx);
168
169 impl* impl_;
170 capy::any_executor ex_;
171 waiter* waiters_ = nullptr;
172 worker_base* idle_head_ = nullptr; // Forward list: available workers
173 worker_base* active_head_ =
174 nullptr; // Doubly linked: workers handling connections
175 worker_base* active_tail_ = nullptr; // Tail for O(1) push_back
176 std::size_t active_accepts_ = 0; // Number of active do_accept coroutines
177 std::shared_ptr<void> storage_; // Owns the worker container (type-erased)
178 bool running_ = false;
179
180 // Idle list (forward/singly linked) - push front, pop front
181 45x void idle_push(worker_base* w) noexcept
182 {
183 45x w->next_ = idle_head_;
184 45x idle_head_ = w;
185 45x }
186
187 9x worker_base* idle_pop() noexcept
188 {
189 9x auto* w = idle_head_;
190 9x if (w)
191 9x idle_head_ = w->next_;
192 9x return w;
193 }
194
195 9x bool idle_empty() const noexcept
196 {
197 9x return idle_head_ == nullptr;
198 }
199
200 // Active list (doubly linked) - push back, remove anywhere
201 3x void active_push(worker_base* w) noexcept
202 {
203 3x w->next_ = nullptr;
204 3x w->prev_ = active_tail_;
205 3x if (active_tail_)
206 active_tail_->next_ = w;
207 else
208 3x active_head_ = w;
209 3x active_tail_ = w;
210 3x }
211
212 9x void active_remove(worker_base* w) noexcept
213 {
214 // Skip if not in active list (e.g., after failed accept)
215 9x if (w != active_head_ && w->prev_ == nullptr)
216 6x return;
217 3x if (w->prev_)
218 w->prev_->next_ = w->next_;
219 else
220 3x active_head_ = w->next_;
221 3x if (w->next_)
222 w->next_->prev_ = w->prev_;
223 else
224 3x active_tail_ = w->prev_;
225 3x w->prev_ = nullptr; // Mark as not in active list
226 }
227
228 template<capy::Executor Ex>
229 struct launch_wrapper
230 {
231 struct promise_type
232 {
233 Ex ex; // Executor stored directly in frame (outlives child tasks)
234 capy::io_env env_;
235
236 // For regular coroutines: first arg is executor, second is stop token
237 template<class E, class S, class... Args>
238 requires capy::Executor<std::decay_t<E>>
239 promise_type(E e, S s, Args&&...)
240 : ex(std::move(e))
241 , env_{
242 capy::executor_ref(ex), std::move(s),
243 capy::get_current_frame_allocator()}
244 {
245 }
246
247 // For lambda coroutines: first arg is closure, second is executor, third is stop token
248 template<class Closure, class E, class S, class... Args>
249 requires(!capy::Executor<std::decay_t<Closure>> &&
250 capy::Executor<std::decay_t<E>>)
251 3x promise_type(Closure&&, E e, S s, Args&&...)
252 3x : ex(std::move(e))
253 3x , env_{
254 3x capy::executor_ref(ex), std::move(s),
255 3x capy::get_current_frame_allocator()}
256 {
257 3x }
258
259 3x launch_wrapper get_return_object() noexcept
260 {
261 return {
262 3x std::coroutine_handle<promise_type>::from_promise(*this)};
263 }
264 3x std::suspend_always initial_suspend() noexcept
265 {
266 3x return {};
267 }
268 3x std::suspend_never final_suspend() noexcept
269 {
270 3x return {};
271 }
272 3x void return_void() noexcept {}
273 void unhandled_exception()
274 {
275 std::terminate();
276 }
277
278 // Inject io_env for IoAwaitable
279 template<capy::IoAwaitable Awaitable>
280 6x auto await_transform(Awaitable&& a)
281 {
282 using AwaitableT = std::decay_t<Awaitable>;
283 struct adapter
284 {
285 AwaitableT aw;
286 capy::io_env const* env;
287
288 bool await_ready()
289 {
290 return aw.await_ready();
291 }
292 decltype(auto) await_resume()
293 {
294 return aw.await_resume();
295 }
296
297 auto await_suspend(std::coroutine_handle<promise_type> h)
298 {
299 return aw.await_suspend(h, env);
300 }
301 };
302 9x return adapter{std::forward<Awaitable>(a), &env_};
303 3x }
304 };
305
306 std::coroutine_handle<promise_type> h;
307
308 3x launch_wrapper(std::coroutine_handle<promise_type> handle) noexcept
309 3x : h(handle)
310 {
311 3x }
312
313 3x ~launch_wrapper()
314 {
315 3x if (h)
316 h.destroy();
317 3x }
318
319 launch_wrapper(launch_wrapper&& o) noexcept
320 : h(std::exchange(o.h, nullptr))
321 {
322 }
323
324 launch_wrapper(launch_wrapper const&) = delete;
325 launch_wrapper& operator=(launch_wrapper const&) = delete;
326 launch_wrapper& operator=(launch_wrapper&&) = delete;
327 };
328
329 // Named functor to avoid incomplete lambda type in coroutine promise
330 template<class Executor>
331 struct launch_coro
332 {
333 3x launch_wrapper<Executor> operator()(
334 Executor,
335 std::stop_token,
336 tcp_server* self,
337 capy::task<void> t,
338 worker_base* wp)
339 {
340 // Executor and stop token stored in promise via constructor
341 co_await std::move(t);
342 co_await self->push(*wp); // worker goes back to idle list
343 6x }
344 };
345
346 class push_awaitable
347 {
348 tcp_server& self_;
349 worker_base& w_;
350
351 public:
352 9x push_awaitable(tcp_server& self, worker_base& w) noexcept
353 9x : self_(self)
354 9x , w_(w)
355 {
356 9x }
357
358 9x bool await_ready() const noexcept
359 {
360 9x return false;
361 }
362
363 std::coroutine_handle<>
364 9x await_suspend(std::coroutine_handle<> h, capy::io_env const*) noexcept
365 {
366 // Symmetric transfer to server's executor
367 9x return self_.ex_.dispatch(h);
368 }
369
370 9x void await_resume() noexcept
371 {
372 // Running on server executor - safe to modify lists
373 // Remove from active (if present), then wake waiter or add to idle
374 9x self_.active_remove(&w_);
375 9x if (self_.waiters_)
376 {
377 auto* wait = self_.waiters_;
378 self_.waiters_ = wait->next;
379 wait->w = &w_;
380 self_.ex_.post(wait->h);
381 }
382 else
383 {
384 9x self_.idle_push(&w_);
385 }
386 9x }
387 };
388
389 class pop_awaitable
390 {
391 tcp_server& self_;
392 waiter wait_;
393
394 public:
395 9x pop_awaitable(tcp_server& self) noexcept : self_(self), wait_{} {}
396
397 9x bool await_ready() const noexcept
398 {
399 9x return !self_.idle_empty();
400 }
401
402 bool
403 await_suspend(std::coroutine_handle<> h, capy::io_env const*) noexcept
404 {
405 // Running on server executor (do_accept runs there)
406 wait_.h = h;
407 wait_.w = nullptr;
408 wait_.next = self_.waiters_;
409 self_.waiters_ = &wait_;
410 return true;
411 }
412
413 9x worker_base& await_resume() noexcept
414 {
415 // Running on server executor
416 9x if (wait_.w)
417 return *wait_.w; // Woken by push_awaitable
418 9x return *self_.idle_pop();
419 }
420 };
421
422 9x push_awaitable push(worker_base& w)
423 {
424 9x return push_awaitable{*this, w};
425 }
426
427 // Synchronous version for destructor/guard paths
428 // Must be called from server executor context
429 void push_sync(worker_base& w) noexcept
430 {
431 active_remove(&w);
432 if (waiters_)
433 {
434 auto* wait = waiters_;
435 waiters_ = wait->next;
436 wait->w = &w;
437 ex_.post(wait->h);
438 }
439 else
440 {
441 idle_push(&w);
442 }
443 }
444
445 9x pop_awaitable pop()
446 {
447 9x return pop_awaitable{*this};
448 }
449
450 capy::task<void> do_accept(tcp_acceptor& acc);
451
452 public:
453 /** Abstract base class for connection handlers.
454
455 Derive from this class to implement custom connection handling.
456 Each worker owns a socket and is reused across multiple
457 connections to avoid per-connection allocation.
458
459 @see tcp_server, launcher
460 */
461 class BOOST_COROSIO_DECL worker_base
462 {
463 // Ordered largest to smallest for optimal packing
464 std::stop_source stop_; // ~16 bytes
465 worker_base* next_ = nullptr; // 8 bytes - used by idle and active lists
466 worker_base* prev_ = nullptr; // 8 bytes - used only by active list
467
468 friend class tcp_server;
469
470 public:
471 /// Construct a worker.
472 worker_base();
473
474 /// Destroy the worker.
475 virtual ~worker_base();
476
477 /** Handle an accepted connection.
478
479 Called when this worker is dispatched to handle a new
480 connection. The implementation must invoke the launcher
481 exactly once to start the handling coroutine.
482
483 @param launch Handle to launch the connection coroutine.
484 */
485 virtual void run(launcher launch) = 0;
486
487 /// Return the socket used for connections.
488 virtual corosio::tcp_socket& socket() = 0;
489 };
490
491 /** Move-only handle to launch a worker coroutine.
492
493 Passed to @ref worker_base::run to start the connection-handling
494 coroutine. The launcher ensures the worker returns to the idle
495 pool when the coroutine completes or if launching fails.
496
497 The launcher must be invoked exactly once via `operator()`.
498 If destroyed without invoking, the worker is returned to the
499 idle pool automatically.
500
501 @see worker_base::run
502 */
503 class BOOST_COROSIO_DECL launcher
504 {
505 tcp_server* srv_;
506 worker_base* w_;
507
508 friend class tcp_server;
509
510 3x launcher(tcp_server& srv, worker_base& w) noexcept : srv_(&srv), w_(&w)
511 {
512 3x }
513
514 public:
515 /// Return the worker to the pool if not launched.
516 3x ~launcher()
517 {
518 3x if (w_)
519 srv_->push_sync(*w_);
520 3x }
521
522 launcher(launcher&& o) noexcept
523 : srv_(o.srv_)
524 , w_(std::exchange(o.w_, nullptr))
525 {
526 }
527 launcher(launcher const&) = delete;
528 launcher& operator=(launcher const&) = delete;
529 launcher& operator=(launcher&&) = delete;
530
531 /** Launch the connection-handling coroutine.
532
533 Starts the given coroutine on the specified executor. When
534 the coroutine completes, the worker is automatically returned
535 to the idle pool.
536
537 @param ex The executor to run the coroutine on.
538 @param task The coroutine to execute.
539
540 @throws std::logic_error If this launcher was already invoked.
541 */
542 template<class Executor>
543 3x void operator()(Executor const& ex, capy::task<void> task)
544 {
545 3x if (!w_)
546 detail::throw_logic_error(); // launcher already invoked
547
548 3x auto* w = std::exchange(w_, nullptr);
549
550 // Worker is being dispatched - add to active list
551 3x srv_->active_push(w);
552
553 // Return worker to pool if coroutine setup throws
554 struct guard_t
555 {
556 tcp_server* srv;
557 worker_base* w;
558 3x ~guard_t()
559 {
560 3x if (w)
561 srv->push_sync(*w);
562 3x }
563 3x } guard{srv_, w};
564
565 // Reset worker's stop source for this connection
566 3x w->stop_ = {};
567 3x auto st = w->stop_.get_token();
568
569 3x auto wrapper =
570 3x launch_coro<Executor>{}(ex, st, srv_, std::move(task), w);
571
572 // Executor and stop token stored in promise via constructor
573 3x ex.post(std::exchange(wrapper.h, nullptr)); // Release before post
574 3x guard.w = nullptr; // Success - dismiss guard
575 3x }
576 };
577
578 /** Construct a TCP server.
579
580 @tparam Ctx Execution context type satisfying ExecutionContext.
581 @tparam Ex Executor type satisfying Executor.
582
583 @param ctx The execution context for socket operations.
584 @param ex The executor for dispatching coroutines.
585
586 @par Example
587 @code
588 tcp_server srv(ctx, ctx.get_executor());
589 srv.set_workers(make_workers(ctx, 100));
590 srv.bind(endpoint{...});
591 srv.start();
592 @endcode
593 */
594 template<capy::ExecutionContext Ctx, capy::Executor Ex>
595 9x tcp_server(Ctx& ctx, Ex ex) : impl_(make_impl(ctx))
596 9x , ex_(std::move(ex))
597 {
598 9x }
599
600 public:
601 /// Destroy the server, stopping all accept loops.
602 ~tcp_server();
603
604 tcp_server(tcp_server const&) = delete;
605 tcp_server& operator=(tcp_server const&) = delete;
606
607 /** Move construct from another server.
608
609 @param o The source server. After the move, @p o is
610 in a valid but unspecified state.
611 */
612 tcp_server(tcp_server&& o) noexcept;
613
614 /** Move assign from another server.
615
616 @param o The source server. After the move, @p o is
617 in a valid but unspecified state.
618
619 @return `*this`.
620 */
621 tcp_server& operator=(tcp_server&& o) noexcept;
622
623 /** Bind to a local endpoint.
624
625 Creates an acceptor listening on the specified endpoint.
626 Multiple endpoints can be bound by calling this method
627 multiple times before @ref start.
628
629 @param ep The local endpoint to bind to.
630
631 @return The error code if binding fails.
632 */
633 std::error_code bind(endpoint ep);
634
635 /** Set the worker pool.
636
637 Replaces any existing workers with the given range. Any
638 previous workers are released and the idle/active lists
639 are cleared before populating with new workers.
640
641 @tparam Range Forward range of pointer-like objects to worker_base.
642
643 @param workers Range of workers to manage. Each element must
644 support `std::to_address()` yielding `worker_base*`.
645
646 @par Example
647 @code
648 std::vector<std::unique_ptr<my_worker>> workers;
649 for(int i = 0; i < 100; ++i)
650 workers.push_back(std::make_unique<my_worker>(ctx));
651 srv.set_workers(std::move(workers));
652 @endcode
653 */
654 template<std::ranges::forward_range Range>
655 requires std::convertible_to<
656 decltype(std::to_address(
657 std::declval<std::ranges::range_value_t<Range>&>())),
658 worker_base*>
659 9x void set_workers(Range&& workers)
660 {
661 // Clear existing state
662 9x storage_.reset();
663 9x idle_head_ = nullptr;
664 9x active_head_ = nullptr;
665 9x active_tail_ = nullptr;
666
667 // Take ownership and populate idle list
668 using StorageType = std::decay_t<Range>;
669 9x auto* p = new StorageType(std::forward<Range>(workers));
670 9x storage_ = std::shared_ptr<void>(
671 9x p, [](void* ptr) { delete static_cast<StorageType*>(ptr); });
672 45x for (auto&& elem : *static_cast<StorageType*>(p))
673 36x idle_push(std::to_address(elem));
674 9x }
675
676 /** Start accepting connections.
677
678 Launches accept loops for all bound endpoints. Incoming
679 connections are dispatched to idle workers from the pool.
680
681 Calling `start()` on an already-running server has no effect.
682
683 @par Preconditions
684 - At least one endpoint bound via @ref bind.
685 - Workers provided to the constructor.
686 - If restarting, @ref join must have completed first.
687
688 @par Effects
689 Creates one accept coroutine per bound endpoint. Each coroutine
690 runs on the server's executor, waiting for connections and
691 dispatching them to idle workers.
692
693 @par Restart Sequence
694 To restart after stopping, complete the full shutdown cycle:
695 @code
696 srv.start();
697 ioc.run_for( 1s );
698 srv.stop(); // 1. Signal shutdown
699 ioc.run(); // 2. Drain remaining completions
700 srv.join(); // 3. Wait for accept loops
701
702 // Now safe to restart
703 srv.start();
704 ioc.run();
705 @endcode
706
707 @par Thread Safety
708 Not thread safe.
709
710 @throws std::logic_error If a previous session has not been
711 joined (accept loops still active).
712 */
713 void start();
714
715 /** Stop accepting connections.
716
717 Signals all listening ports to stop accepting new connections
718 and requests cancellation of active workers via their stop tokens.
719
720 This function returns immediately; it does not wait for workers
721 to finish. Pending I/O operations complete asynchronously.
722
723 Calling `stop()` on a non-running server has no effect.
724
725 @par Effects
726 - Closes all acceptors (pending accepts complete with error).
727 - Requests stop on each active worker's stop token.
728 - Workers observing their stop token should exit promptly.
729
730 @par Postconditions
731 No new connections will be accepted. Active workers continue
732 until they observe their stop token or complete naturally.
733
734 @par What Happens Next
735 After calling `stop()`:
736 1. Let `ioc.run()` return (drains pending completions).
737 2. Call @ref join to wait for accept loops to finish.
738 3. Only then is it safe to restart or destroy the server.
739
740 @par Thread Safety
741 Not thread safe.
742
743 @see join, start
744 */
745 void stop();
746
747 /** Block until all accept loops complete.
748
749 Blocks the calling thread until all accept coroutines launched
750 by @ref start have finished executing. This synchronizes the
751 shutdown sequence, ensuring the server is fully stopped before
752 restarting or destroying it.
753
754 @par Preconditions
755 @ref stop has been called and `ioc.run()` has returned.
756
757 @par Postconditions
758 All accept loops have completed. The server is in the stopped
759 state and may be restarted via @ref start.
760
761 @par Example (Correct Usage)
762 @code
763 // main thread
764 srv.start();
765 ioc.run(); // Blocks until work completes
766 srv.join(); // Safe: called after ioc.run() returns
767 @endcode
768
769 @par WARNING: Deadlock Scenarios
770 Calling `join()` from the wrong context causes deadlock:
771
772 @code
773 // WRONG: calling join() from inside a worker coroutine
774 void run( launcher launch ) override
775 {
776 launch( ex, [this]() -> capy::task<>
777 {
778 srv_.join(); // DEADLOCK: blocks the executor
779 co_return;
780 }());
781 }
782
783 // WRONG: calling join() while ioc.run() is still active
784 std::thread t( [&]{ ioc.run(); } );
785 srv.stop();
786 srv.join(); // DEADLOCK: ioc.run() still running in thread t
787 @endcode
788
789 @par Thread Safety
790 May be called from any thread, but will deadlock if called
791 from within the io_context event loop or from a worker coroutine.
792
793 @see stop, start
794 */
795 void join();
796
797 private:
798 capy::task<> do_stop();
799 };
800
801 #ifdef _MSC_VER
802 #pragma warning(pop)
803 #endif
804
805 } // namespace boost::corosio
806
807 #endif
808