include/boost/corosio/native/detail/select/select_socket_service.hpp

75.7% Lines (262/346) 93.1% Functions (27/29)
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
11 #define BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_SELECT
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/capy/ex/execution_context.hpp>
19 #include <boost/corosio/detail/socket_service.hpp>
20
21 #include <boost/corosio/native/detail/select/select_socket.hpp>
22 #include <boost/corosio/native/detail/select/select_scheduler.hpp>
23
24 #include <boost/corosio/native/detail/endpoint_convert.hpp>
25 #include <boost/corosio/detail/dispatch_coro.hpp>
26 #include <boost/corosio/native/detail/make_err.hpp>
27
28 #include <boost/corosio/detail/except.hpp>
29
30 #include <boost/capy/buffers.hpp>
31
32 #include <errno.h>
33 #include <fcntl.h>
34 #include <netinet/in.h>
35 #include <netinet/tcp.h>
36 #include <sys/socket.h>
37 #include <unistd.h>
38
39 #include <memory>
40 #include <mutex>
41 #include <unordered_map>
42
43 /*
44 select Socket Implementation
45 ============================
46
47 This mirrors the epoll_sockets design for behavioral consistency.
48 Each I/O operation follows the same pattern:
49 1. Try the syscall immediately (non-blocking socket)
50 2. If it succeeds or fails with a real error, post to completion queue
51 3. If EAGAIN/EWOULDBLOCK, register with select scheduler and wait
52
53 Cancellation
54 ------------
55 See op.hpp for the completion/cancellation race handling via the
56 `registered` atomic. cancel() must complete pending operations (post
57 them with cancelled flag) so coroutines waiting on them can resume.
58 close_socket() calls cancel() first to ensure this.
59
60 Impl Lifetime with shared_ptr
61 -----------------------------
62 Socket impls use enable_shared_from_this. The service owns impls via
63 shared_ptr maps (socket_ptrs_) keyed by raw pointer for O(1) lookup and
64 removal. When a user calls close(), we call cancel() which posts pending
65 ops to the scheduler.
66
67 CRITICAL: The posted ops must keep the impl alive until they complete.
68 Otherwise the scheduler would process a freed op (use-after-free). The
69 cancel() method captures shared_from_this() into op.impl_ptr before
70 posting. When the op completes, impl_ptr is cleared, allowing the impl
71 to be destroyed if no other references exist.
72
73 Service Ownership
74 -----------------
75 select_socket_service owns all socket impls. destroy() removes the
76 shared_ptr from the map, but the impl may survive if ops still hold
77 impl_ptr refs. shutdown() closes all sockets and clears the map; any
78 in-flight ops will complete and release their refs.
79 */
80
81 namespace boost::corosio::detail {
82
83 /** State for select socket service. */
84 class select_socket_state
85 {
86 public:
87 168x explicit select_socket_state(select_scheduler& sched) noexcept
88 168x : sched_(sched)
89 {
90 168x }
91
92 select_scheduler& sched_;
93 std::mutex mutex_;
94 intrusive_list<select_socket> socket_list_;
95 std::unordered_map<select_socket*, std::shared_ptr<select_socket>>
96 socket_ptrs_;
97 };
98
99 /** select socket service implementation.
100
101 Inherits from socket_service to enable runtime polymorphism.
102 Uses key_type = socket_service for service lookup.
103 */
104 class BOOST_COROSIO_DECL select_socket_service final : public socket_service
105 {
106 public:
107 explicit select_socket_service(capy::execution_context& ctx);
108 ~select_socket_service() override;
109
110 select_socket_service(select_socket_service const&) = delete;
111 select_socket_service& operator=(select_socket_service const&) = delete;
112
113 void shutdown() override;
114
115 io_object::implementation* construct() override;
116 void destroy(io_object::implementation*) override;
117 void close(io_object::handle&) override;
118 std::error_code open_socket(
119 tcp_socket::implementation& impl,
120 int family,
121 int type,
122 int protocol) override;
123
124 9201x select_scheduler& scheduler() const noexcept
125 {
126 9201x return state_->sched_;
127 }
128 void post(select_op* op);
129 void work_started() noexcept;
130 void work_finished() noexcept;
131
132 private:
133 std::unique_ptr<select_socket_state> state_;
134 };
135
136 // Backward compatibility alias
137 using select_sockets = select_socket_service;
138
139 inline void
140 99x select_op::canceller::operator()() const noexcept
141 {
142 99x op->cancel();
143 99x }
144
145 inline void
146 select_connect_op::cancel() noexcept
147 {
148 if (socket_impl_)
149 socket_impl_->cancel_single_op(*this);
150 else
151 request_cancel();
152 }
153
154 inline void
155 99x select_read_op::cancel() noexcept
156 {
157 99x if (socket_impl_)
158 99x socket_impl_->cancel_single_op(*this);
159 else
160 request_cancel();
161 99x }
162
163 inline void
164 select_write_op::cancel() noexcept
165 {
166 if (socket_impl_)
167 socket_impl_->cancel_single_op(*this);
168 else
169 request_cancel();
170 }
171
172 inline void
173 2915x select_connect_op::operator()()
174 {
175 2915x stop_cb.reset();
176
177 2915x bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
178
179 // Cache endpoints on successful connect
180 2915x if (success && socket_impl_)
181 {
182 2913x endpoint local_ep;
183 2913x sockaddr_storage local_storage{};
184 2913x socklen_t local_len = sizeof(local_storage);
185 2913x if (::getsockname(
186 2913x fd, reinterpret_cast<sockaddr*>(&local_storage), &local_len) ==
187 0)
188 2913x local_ep = from_sockaddr(local_storage);
189 2913x static_cast<select_socket*>(socket_impl_)
190 2913x ->set_endpoints(local_ep, target_endpoint);
191 }
192
193 2915x if (ec_out)
194 {
195 2915x if (cancelled.load(std::memory_order_acquire))
196 *ec_out = capy::error::canceled;
197 2915x else if (errn != 0)
198 2x *ec_out = make_err(errn);
199 else
200 2913x *ec_out = {};
201 }
202
203 2915x if (bytes_out)
204 *bytes_out = bytes_transferred;
205
206 // Move to stack before destroying the frame
207 2915x capy::executor_ref saved_ex(ex);
208 2915x std::coroutine_handle<> saved_h(h);
209 2915x impl_ptr.reset();
210 2915x dispatch_coro(saved_ex, saved_h).resume();
211 2915x }
212
213 8765x inline select_socket::select_socket(select_socket_service& svc) noexcept
214 8765x : svc_(svc)
215 {
216 8765x }
217
218 inline std::coroutine_handle<>
219 2915x select_socket::connect(
220 std::coroutine_handle<> h,
221 capy::executor_ref ex,
222 endpoint ep,
223 std::stop_token token,
224 std::error_code* ec)
225 {
226 2915x auto& op = conn_;
227 2915x op.reset();
228 2915x op.h = h;
229 2915x op.ex = ex;
230 2915x op.ec_out = ec;
231 2915x op.fd = fd_;
232 2915x op.target_endpoint = ep; // Store target for endpoint caching
233 2915x op.start(token, this);
234
235 2915x sockaddr_storage storage{};
236 socklen_t addrlen =
237 2915x detail::to_sockaddr(ep, detail::socket_family(fd_), storage);
238 2915x int result = ::connect(fd_, reinterpret_cast<sockaddr*>(&storage), addrlen);
239
240 2915x if (result == 0)
241 {
242 // Sync success — cache endpoints immediately
243 sockaddr_storage local_storage{};
244 socklen_t local_len = sizeof(local_storage);
245 if (::getsockname(
246 fd_, reinterpret_cast<sockaddr*>(&local_storage), &local_len) ==
247 0)
248 local_endpoint_ = detail::from_sockaddr(local_storage);
249 remote_endpoint_ = ep;
250
251 op.complete(0, 0);
252 op.impl_ptr = shared_from_this();
253 svc_.post(&op);
254 // completion is always posted to scheduler queue, never inline.
255 return std::noop_coroutine();
256 }
257
258 2915x if (errno == EINPROGRESS)
259 {
260 2915x svc_.work_started();
261 2915x op.impl_ptr = shared_from_this();
262
263 // Set registering BEFORE register_fd to close the race window where
264 // reactor sees an event before we set registered. The reactor treats
265 // registering the same as registered when claiming the op.
266 2915x op.registered.store(
267 select_registration_state::registering, std::memory_order_release);
268 2915x svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_write);
269
270 // Transition to registered. If this fails, reactor or cancel already
271 // claimed the op (state is now unregistered), so we're done. However,
272 // we must still deregister the fd because cancel's deregister_fd may
273 // have run before our register_fd, leaving the fd orphaned.
274 2915x auto expected = select_registration_state::registering;
275 2915x if (!op.registered.compare_exchange_strong(
276 expected, select_registration_state::registered,
277 std::memory_order_acq_rel))
278 {
279 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
280 // completion is always posted to scheduler queue, never inline.
281 return std::noop_coroutine();
282 }
283
284 // If cancelled was set before we registered, handle it now.
285 2915x if (op.cancelled.load(std::memory_order_acquire))
286 {
287 auto prev = op.registered.exchange(
288 select_registration_state::unregistered,
289 std::memory_order_acq_rel);
290 if (prev != select_registration_state::unregistered)
291 {
292 svc_.scheduler().deregister_fd(
293 fd_, select_scheduler::event_write);
294 op.impl_ptr = shared_from_this();
295 svc_.post(&op);
296 svc_.work_finished();
297 }
298 }
299 // completion is always posted to scheduler queue, never inline.
300 2915x return std::noop_coroutine();
301 }
302
303 op.complete(errno, 0);
304 op.impl_ptr = shared_from_this();
305 svc_.post(&op);
306 // completion is always posted to scheduler queue, never inline.
307 return std::noop_coroutine();
308 }
309
310 inline std::coroutine_handle<>
311 112697x select_socket::read_some(
312 std::coroutine_handle<> h,
313 capy::executor_ref ex,
314 buffer_param param,
315 std::stop_token token,
316 std::error_code* ec,
317 std::size_t* bytes_out)
318 {
319 112697x auto& op = rd_;
320 112697x op.reset();
321 112697x op.h = h;
322 112697x op.ex = ex;
323 112697x op.ec_out = ec;
324 112697x op.bytes_out = bytes_out;
325 112697x op.fd = fd_;
326 112697x op.start(token, this);
327
328 112697x capy::mutable_buffer bufs[select_read_op::max_buffers];
329 112697x op.iovec_count =
330 112697x static_cast<int>(param.copy_to(bufs, select_read_op::max_buffers));
331
332 112697x if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
333 {
334 1x op.empty_buffer_read = true;
335 1x op.complete(0, 0);
336 1x op.impl_ptr = shared_from_this();
337 1x svc_.post(&op);
338 1x return std::noop_coroutine();
339 }
340
341 225392x for (int i = 0; i < op.iovec_count; ++i)
342 {
343 112696x op.iovecs[i].iov_base = bufs[i].data();
344 112696x op.iovecs[i].iov_len = bufs[i].size();
345 }
346
347 112696x ssize_t n = ::readv(fd_, op.iovecs, op.iovec_count);
348
349 112696x if (n > 0)
350 {
351 112408x op.complete(0, static_cast<std::size_t>(n));
352 112408x op.impl_ptr = shared_from_this();
353 112408x svc_.post(&op);
354 112408x return std::noop_coroutine();
355 }
356
357 288x if (n == 0)
358 {
359 5x op.complete(0, 0);
360 5x op.impl_ptr = shared_from_this();
361 5x svc_.post(&op);
362 5x return std::noop_coroutine();
363 }
364
365 283x if (errno == EAGAIN || errno == EWOULDBLOCK)
366 {
367 283x svc_.work_started();
368 283x op.impl_ptr = shared_from_this();
369
370 // Set registering BEFORE register_fd to close the race window where
371 // reactor sees an event before we set registered.
372 283x op.registered.store(
373 select_registration_state::registering, std::memory_order_release);
374 283x svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_read);
375
376 // Transition to registered. If this fails, reactor or cancel already
377 // claimed the op (state is now unregistered), so we're done. However,
378 // we must still deregister the fd because cancel's deregister_fd may
379 // have run before our register_fd, leaving the fd orphaned.
380 283x auto expected = select_registration_state::registering;
381 283x if (!op.registered.compare_exchange_strong(
382 expected, select_registration_state::registered,
383 std::memory_order_acq_rel))
384 {
385 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
386 return std::noop_coroutine();
387 }
388
389 // If cancelled was set before we registered, handle it now.
390 283x if (op.cancelled.load(std::memory_order_acquire))
391 {
392 auto prev = op.registered.exchange(
393 select_registration_state::unregistered,
394 std::memory_order_acq_rel);
395 if (prev != select_registration_state::unregistered)
396 {
397 svc_.scheduler().deregister_fd(
398 fd_, select_scheduler::event_read);
399 op.impl_ptr = shared_from_this();
400 svc_.post(&op);
401 svc_.work_finished();
402 }
403 }
404 283x return std::noop_coroutine();
405 }
406
407 op.complete(errno, 0);
408 op.impl_ptr = shared_from_this();
409 svc_.post(&op);
410 return std::noop_coroutine();
411 }
412
413 inline std::coroutine_handle<>
414 112533x select_socket::write_some(
415 std::coroutine_handle<> h,
416 capy::executor_ref ex,
417 buffer_param param,
418 std::stop_token token,
419 std::error_code* ec,
420 std::size_t* bytes_out)
421 {
422 112533x auto& op = wr_;
423 112533x op.reset();
424 112533x op.h = h;
425 112533x op.ex = ex;
426 112533x op.ec_out = ec;
427 112533x op.bytes_out = bytes_out;
428 112533x op.fd = fd_;
429 112533x op.start(token, this);
430
431 112533x capy::mutable_buffer bufs[select_write_op::max_buffers];
432 112533x op.iovec_count =
433 112533x static_cast<int>(param.copy_to(bufs, select_write_op::max_buffers));
434
435 112533x if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
436 {
437 1x op.complete(0, 0);
438 1x op.impl_ptr = shared_from_this();
439 1x svc_.post(&op);
440 1x return std::noop_coroutine();
441 }
442
443 225064x for (int i = 0; i < op.iovec_count; ++i)
444 {
445 112532x op.iovecs[i].iov_base = bufs[i].data();
446 112532x op.iovecs[i].iov_len = bufs[i].size();
447 }
448
449 112532x msghdr msg{};
450 112532x msg.msg_iov = op.iovecs;
451 112532x msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
452
453 112532x ssize_t n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
454
455 112532x if (n > 0)
456 {
457 112531x op.complete(0, static_cast<std::size_t>(n));
458 112531x op.impl_ptr = shared_from_this();
459 112531x svc_.post(&op);
460 112531x return std::noop_coroutine();
461 }
462
463 1x if (errno == EAGAIN || errno == EWOULDBLOCK)
464 {
465 svc_.work_started();
466 op.impl_ptr = shared_from_this();
467
468 // Set registering BEFORE register_fd to close the race window where
469 // reactor sees an event before we set registered.
470 op.registered.store(
471 select_registration_state::registering, std::memory_order_release);
472 svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_write);
473
474 // Transition to registered. If this fails, reactor or cancel already
475 // claimed the op (state is now unregistered), so we're done. However,
476 // we must still deregister the fd because cancel's deregister_fd may
477 // have run before our register_fd, leaving the fd orphaned.
478 auto expected = select_registration_state::registering;
479 if (!op.registered.compare_exchange_strong(
480 expected, select_registration_state::registered,
481 std::memory_order_acq_rel))
482 {
483 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
484 return std::noop_coroutine();
485 }
486
487 // If cancelled was set before we registered, handle it now.
488 if (op.cancelled.load(std::memory_order_acquire))
489 {
490 auto prev = op.registered.exchange(
491 select_registration_state::unregistered,
492 std::memory_order_acq_rel);
493 if (prev != select_registration_state::unregistered)
494 {
495 svc_.scheduler().deregister_fd(
496 fd_, select_scheduler::event_write);
497 op.impl_ptr = shared_from_this();
498 svc_.post(&op);
499 svc_.work_finished();
500 }
501 }
502 return std::noop_coroutine();
503 }
504
505 1x op.complete(errno ? errno : EIO, 0);
506 1x op.impl_ptr = shared_from_this();
507 1x svc_.post(&op);
508 1x return std::noop_coroutine();
509 }
510
511 inline std::error_code
512 3x select_socket::shutdown(tcp_socket::shutdown_type what) noexcept
513 {
514 int how;
515 3x switch (what)
516 {
517 1x case tcp_socket::shutdown_receive:
518 1x how = SHUT_RD;
519 1x break;
520 1x case tcp_socket::shutdown_send:
521 1x how = SHUT_WR;
522 1x break;
523 1x case tcp_socket::shutdown_both:
524 1x how = SHUT_RDWR;
525 1x break;
526 default:
527 return make_err(EINVAL);
528 }
529 3x if (::shutdown(fd_, how) != 0)
530 return make_err(errno);
531 3x return {};
532 }
533
534 inline std::error_code
535 28x select_socket::set_option(
536 int level, int optname, void const* data, std::size_t size) noexcept
537 {
538 28x if (::setsockopt(fd_, level, optname, data, static_cast<socklen_t>(size)) !=
539 0)
540 return make_err(errno);
541 28x return {};
542 }
543
544 inline std::error_code
545 31x select_socket::get_option(
546 int level, int optname, void* data, std::size_t* size) const noexcept
547 {
548 31x socklen_t len = static_cast<socklen_t>(*size);
549 31x if (::getsockopt(fd_, level, optname, data, &len) != 0)
550 return make_err(errno);
551 31x *size = static_cast<std::size_t>(len);
552 31x return {};
553 }
554
555 inline void
556 177x select_socket::cancel() noexcept
557 {
558 177x auto self = weak_from_this().lock();
559 177x if (!self)
560 return;
561
562 531x auto cancel_op = [this, &self](select_op& op, int events) {
563 531x auto prev = op.registered.exchange(
564 select_registration_state::unregistered, std::memory_order_acq_rel);
565 531x op.request_cancel();
566 531x if (prev != select_registration_state::unregistered)
567 {
568 92x svc_.scheduler().deregister_fd(fd_, events);
569 92x op.impl_ptr = self;
570 92x svc_.post(&op);
571 92x svc_.work_finished();
572 }
573 708x };
574
575 177x cancel_op(conn_, select_scheduler::event_write);
576 177x cancel_op(rd_, select_scheduler::event_read);
577 177x cancel_op(wr_, select_scheduler::event_write);
578 177x }
579
580 inline void
581 99x select_socket::cancel_single_op(select_op& op) noexcept
582 {
583 99x auto self = weak_from_this().lock();
584 99x if (!self)
585 return;
586
587 // Called from stop_token callback to cancel a specific pending operation.
588 99x auto prev = op.registered.exchange(
589 select_registration_state::unregistered, std::memory_order_acq_rel);
590 99x op.request_cancel();
591
592 99x if (prev != select_registration_state::unregistered)
593 {
594 // Determine which event type to deregister
595 67x int events = 0;
596 67x if (&op == &conn_ || &op == &wr_)
597 events = select_scheduler::event_write;
598 67x else if (&op == &rd_)
599 67x events = select_scheduler::event_read;
600
601 67x svc_.scheduler().deregister_fd(fd_, events);
602
603 67x op.impl_ptr = self;
604 67x svc_.post(&op);
605 67x svc_.work_finished();
606 }
607 99x }
608
609 inline void
610 26303x select_socket::close_socket() noexcept
611 {
612 26303x auto self = weak_from_this().lock();
613 26303x if (self)
614 {
615 78909x auto cancel_op = [this, &self](select_op& op, int events) {
616 78909x auto prev = op.registered.exchange(
617 select_registration_state::unregistered,
618 std::memory_order_acq_rel);
619 78909x op.request_cancel();
620 78909x if (prev != select_registration_state::unregistered)
621 {
622 1x svc_.scheduler().deregister_fd(fd_, events);
623 1x op.impl_ptr = self;
624 1x svc_.post(&op);
625 1x svc_.work_finished();
626 }
627 105212x };
628
629 26303x cancel_op(conn_, select_scheduler::event_write);
630 26303x cancel_op(rd_, select_scheduler::event_read);
631 26303x cancel_op(wr_, select_scheduler::event_write);
632 }
633
634 26303x if (fd_ >= 0)
635 {
636 5843x svc_.scheduler().deregister_fd(
637 fd_, select_scheduler::event_read | select_scheduler::event_write);
638 5843x ::close(fd_);
639 5843x fd_ = -1;
640 }
641
642 26303x local_endpoint_ = endpoint{};
643 26303x remote_endpoint_ = endpoint{};
644 26303x }
645
646 168x inline select_socket_service::select_socket_service(
647 168x capy::execution_context& ctx)
648 168x : state_(
649 std::make_unique<select_socket_state>(
650 168x ctx.use_service<select_scheduler>()))
651 {
652 168x }
653
654 336x inline select_socket_service::~select_socket_service() {}
655
656 inline void
657 168x select_socket_service::shutdown()
658 {
659 168x std::lock_guard lock(state_->mutex_);
660
661 168x while (auto* impl = state_->socket_list_.pop_front())
662 impl->close_socket();
663
664 // Don't clear socket_ptrs_ here. The scheduler shuts down after us and
665 // drains completed_ops_, calling destroy() on each queued op. Letting
666 // ~state_ release the ptrs (during service destruction, after scheduler
667 // shutdown) keeps every impl alive until all ops have been drained.
668 168x }
669
670 inline io_object::implementation*
671 8765x select_socket_service::construct()
672 {
673 8765x auto impl = std::make_shared<select_socket>(*this);
674 8765x auto* raw = impl.get();
675
676 {
677 8765x std::lock_guard lock(state_->mutex_);
678 8765x state_->socket_list_.push_back(raw);
679 8765x state_->socket_ptrs_.emplace(raw, std::move(impl));
680 8765x }
681
682 8765x return raw;
683 8765x }
684
685 inline void
686 8765x select_socket_service::destroy(io_object::implementation* impl)
687 {
688 8765x auto* select_impl = static_cast<select_socket*>(impl);
689 8765x select_impl->close_socket();
690 8765x std::lock_guard lock(state_->mutex_);
691 8765x state_->socket_list_.remove(select_impl);
692 8765x state_->socket_ptrs_.erase(select_impl);
693 8765x }
694
695 inline std::error_code
696 2930x select_socket_service::open_socket(
697 tcp_socket::implementation& impl, int family, int type, int protocol)
698 {
699 2930x auto* select_impl = static_cast<select_socket*>(&impl);
700 2930x select_impl->close_socket();
701
702 2930x int fd = ::socket(family, type, protocol);
703 2930x if (fd < 0)
704 return make_err(errno);
705
706 2930x if (family == AF_INET6)
707 {
708 5x int one = 1;
709 5x ::setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &one, sizeof(one));
710 }
711
712 // Set non-blocking and close-on-exec
713 2930x int flags = ::fcntl(fd, F_GETFL, 0);
714 2930x if (flags == -1)
715 {
716 int errn = errno;
717 ::close(fd);
718 return make_err(errn);
719 }
720 2930x if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
721 {
722 int errn = errno;
723 ::close(fd);
724 return make_err(errn);
725 }
726 2930x if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
727 {
728 int errn = errno;
729 ::close(fd);
730 return make_err(errn);
731 }
732
733 // Check fd is within select() limits
734 2930x if (fd >= FD_SETSIZE)
735 {
736 ::close(fd);
737 return make_err(EMFILE); // Too many open files
738 }
739
740 2930x select_impl->fd_ = fd;
741 2930x return {};
742 }
743
744 inline void
745 14608x select_socket_service::close(io_object::handle& h)
746 {
747 14608x static_cast<select_socket*>(h.get())->close_socket();
748 14608x }
749
750 inline void
751 225107x select_socket_service::post(select_op* op)
752 {
753 225107x state_->sched_.post(op);
754 225107x }
755
756 inline void
757 3198x select_socket_service::work_started() noexcept
758 {
759 3198x state_->sched_.work_started();
760 3198x }
761
762 inline void
763 160x select_socket_service::work_finished() noexcept
764 {
765 160x state_->sched_.work_finished();
766 160x }
767
768 } // namespace boost::corosio::detail
769
770 #endif // BOOST_COROSIO_HAS_SELECT
771
772 #endif // BOOST_COROSIO_NATIVE_DETAIL_SELECT_SELECT_SOCKET_SERVICE_HPP
773