libs/corosio/src/corosio/src/detail/epoll/acceptors.cpp

79.7% Lines (196/246) 100.0% Functions (18/18) 51.1% Branches (69/135)
libs/corosio/src/corosio/src/detail/epoll/acceptors.cpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #include <boost/corosio/detail/platform.hpp>
11
12 #if BOOST_COROSIO_HAS_EPOLL
13
14 #include "src/detail/epoll/acceptors.hpp"
15 #include "src/detail/epoll/sockets.hpp"
16 #include "src/detail/endpoint_convert.hpp"
17 #include "src/detail/make_err.hpp"
18
19 #include <utility>
20
21 #include <errno.h>
22 #include <netinet/in.h>
23 #include <sys/epoll.h>
24 #include <sys/socket.h>
25 #include <unistd.h>
26
27 namespace boost::corosio::detail {
28
29 void
30 6 epoll_accept_op::
31 cancel() noexcept
32 {
33
1/2
✓ Branch 0 taken 6 times.
✗ Branch 1 not taken.
6 if (acceptor_impl_)
34 6 acceptor_impl_->cancel_single_op(*this);
35 else
36 request_cancel();
37 6 }
38
39 void
40 2567 epoll_accept_op::
41 operator()()
42 {
43 2567 stop_cb.reset();
44
45
3/4
✓ Branch 0 taken 2567 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 2558 times.
✓ Branch 4 taken 9 times.
2567 bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
46
47
1/2
✓ Branch 0 taken 2567 times.
✗ Branch 1 not taken.
2567 if (ec_out)
48 {
49
2/2
✓ Branch 1 taken 9 times.
✓ Branch 2 taken 2558 times.
2567 if (cancelled.load(std::memory_order_acquire))
50 9 *ec_out = capy::error::canceled;
51
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2558 times.
2558 else if (errn != 0)
52 *ec_out = make_err(errn);
53 else
54 2558 *ec_out = {};
55 }
56
57
3/4
✓ Branch 0 taken 2558 times.
✓ Branch 1 taken 9 times.
✓ Branch 2 taken 2558 times.
✗ Branch 3 not taken.
2567 if (success && accepted_fd >= 0)
58 {
59
1/2
✓ Branch 0 taken 2558 times.
✗ Branch 1 not taken.
2558 if (acceptor_impl_)
60 {
61 2558 auto* socket_svc = static_cast<epoll_acceptor_impl*>(acceptor_impl_)
62 2558 ->service().socket_service();
63
1/2
✓ Branch 0 taken 2558 times.
✗ Branch 1 not taken.
2558 if (socket_svc)
64 {
65
1/1
✓ Branch 1 taken 2558 times.
2558 auto& impl = static_cast<epoll_socket_impl&>(socket_svc->create_impl());
66 2558 impl.set_socket(accepted_fd);
67
68 // Register accepted socket with epoll (edge-triggered mode)
69 2558 impl.desc_state_.fd = accepted_fd;
70 {
71
1/1
✓ Branch 1 taken 2558 times.
2558 std::lock_guard lock(impl.desc_state_.mutex);
72 2558 impl.desc_state_.read_op = nullptr;
73 2558 impl.desc_state_.write_op = nullptr;
74 2558 impl.desc_state_.connect_op = nullptr;
75 2558 }
76
1/1
✓ Branch 2 taken 2558 times.
2558 socket_svc->scheduler().register_descriptor(accepted_fd, &impl.desc_state_);
77
78 2558 sockaddr_in local_addr{};
79 2558 socklen_t local_len = sizeof(local_addr);
80 2558 sockaddr_in remote_addr{};
81 2558 socklen_t remote_len = sizeof(remote_addr);
82
83 2558 endpoint local_ep, remote_ep;
84
1/2
✓ Branch 1 taken 2558 times.
✗ Branch 2 not taken.
2558 if (::getsockname(accepted_fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
85 2558 local_ep = from_sockaddr_in(local_addr);
86
1/2
✓ Branch 1 taken 2558 times.
✗ Branch 2 not taken.
2558 if (::getpeername(accepted_fd, reinterpret_cast<sockaddr*>(&remote_addr), &remote_len) == 0)
87 2558 remote_ep = from_sockaddr_in(remote_addr);
88
89 2558 impl.set_endpoints(local_ep, remote_ep);
90
91
1/2
✓ Branch 0 taken 2558 times.
✗ Branch 1 not taken.
2558 if (impl_out)
92 2558 *impl_out = &impl;
93
94 2558 accepted_fd = -1;
95 }
96 else
97 {
98 if (ec_out && !*ec_out)
99 *ec_out = make_err(ENOENT);
100 ::close(accepted_fd);
101 accepted_fd = -1;
102 if (impl_out)
103 *impl_out = nullptr;
104 }
105 }
106 else
107 {
108 ::close(accepted_fd);
109 accepted_fd = -1;
110 if (impl_out)
111 *impl_out = nullptr;
112 }
113 2558 }
114 else
115 {
116
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9 times.
9 if (accepted_fd >= 0)
117 {
118 ::close(accepted_fd);
119 accepted_fd = -1;
120 }
121
122
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9 times.
9 if (peer_impl)
123 {
124 peer_impl->release();
125 peer_impl = nullptr;
126 }
127
128
1/2
✓ Branch 0 taken 9 times.
✗ Branch 1 not taken.
9 if (impl_out)
129 9 *impl_out = nullptr;
130 }
131
132 // Move to stack before resuming. See epoll_op::operator()() for rationale.
133 2567 capy::executor_ref saved_ex( std::move( ex ) );
134 2567 capy::coro saved_h( std::move( h ) );
135 2567 auto prevent_premature_destruction = std::move(impl_ptr);
136
1/1
✓ Branch 1 taken 2567 times.
2567 saved_ex.dispatch( saved_h );
137 2567 }
138
139 74 epoll_acceptor_impl::
140 74 epoll_acceptor_impl(epoll_acceptor_service& svc) noexcept
141 74 : svc_(svc)
142 {
143 74 }
144
145 void
146 74 epoll_acceptor_impl::
147 release()
148 {
149 74 close_socket();
150 74 svc_.destroy_acceptor_impl(*this);
151 74 }
152
153 std::coroutine_handle<>
154 2567 epoll_acceptor_impl::
155 accept(
156 std::coroutine_handle<> h,
157 capy::executor_ref ex,
158 std::stop_token token,
159 std::error_code* ec,
160 io_object::io_object_impl** impl_out)
161 {
162 2567 auto& op = acc_;
163 2567 op.reset();
164 2567 op.h = h;
165 2567 op.ex = ex;
166 2567 op.ec_out = ec;
167 2567 op.impl_out = impl_out;
168 2567 op.fd = fd_;
169 2567 op.start(token, this);
170
171 2567 sockaddr_in addr{};
172 2567 socklen_t addrlen = sizeof(addr);
173 int accepted;
174 do {
175
1/1
✓ Branch 1 taken 2567 times.
2567 accepted = ::accept4(fd_, reinterpret_cast<sockaddr*>(&addr),
176 &addrlen, SOCK_NONBLOCK | SOCK_CLOEXEC);
177
3/4
✓ Branch 0 taken 2565 times.
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 2565 times.
2567 } while (accepted < 0 && errno == EINTR);
178
179
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 2565 times.
2567 if (accepted >= 0)
180 {
181 {
182
1/1
✓ Branch 1 taken 2 times.
2 std::lock_guard lock(desc_state_.mutex);
183 2 desc_state_.read_ready = false;
184 2 }
185 2 op.accepted_fd = accepted;
186 2 op.complete(0, 0);
187
1/1
✓ Branch 1 taken 2 times.
2 op.impl_ptr = shared_from_this();
188
1/1
✓ Branch 1 taken 2 times.
2 svc_.post(&op);
189 // completion is always posted to scheduler queue, never inline.
190 2 return std::noop_coroutine();
191 }
192
193
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 2565 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
2565 if (errno == EAGAIN || errno == EWOULDBLOCK)
194 {
195 2565 svc_.work_started();
196
1/1
✓ Branch 1 taken 2565 times.
2565 op.impl_ptr = shared_from_this();
197
198 2565 bool perform_now = false;
199 {
200
1/1
✓ Branch 1 taken 2565 times.
2565 std::lock_guard lock(desc_state_.mutex);
201
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2565 times.
2565 if (desc_state_.read_ready)
202 {
203 desc_state_.read_ready = false;
204 perform_now = true;
205 }
206 else
207 {
208 2565 desc_state_.read_op = &op;
209 }
210 2565 }
211
212
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2565 times.
2565 if (perform_now)
213 {
214 op.perform_io();
215 if (op.errn == EAGAIN || op.errn == EWOULDBLOCK)
216 {
217 op.errn = 0;
218 std::lock_guard lock(desc_state_.mutex);
219 desc_state_.read_op = &op;
220 }
221 else
222 {
223 svc_.post(&op);
224 svc_.work_finished();
225 }
226 return std::noop_coroutine();
227 }
228
229
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2565 times.
2565 if (op.cancelled.load(std::memory_order_acquire))
230 {
231 epoll_op* claimed = nullptr;
232 {
233 std::lock_guard lock(desc_state_.mutex);
234 if (desc_state_.read_op == &op)
235 claimed = std::exchange(desc_state_.read_op, nullptr);
236 }
237 if (claimed)
238 {
239 svc_.post(claimed);
240 svc_.work_finished();
241 }
242 }
243 // completion is always posted to scheduler queue, never inline.
244 2565 return std::noop_coroutine();
245 }
246
247 op.complete(errno, 0);
248 op.impl_ptr = shared_from_this();
249 svc_.post(&op);
250 // completion is always posted to scheduler queue, never inline.
251 return std::noop_coroutine();
252 }
253
254 void
255 149 epoll_acceptor_impl::
256 cancel() noexcept
257 {
258 149 std::shared_ptr<epoll_acceptor_impl> self;
259 try {
260
1/1
✓ Branch 1 taken 149 times.
149 self = shared_from_this();
261 } catch (const std::bad_weak_ptr&) {
262 return;
263 }
264
265 149 acc_.request_cancel();
266
267 149 epoll_op* claimed = nullptr;
268 {
269 149 std::lock_guard lock(desc_state_.mutex);
270
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 146 times.
149 if (desc_state_.read_op == &acc_)
271 3 claimed = std::exchange(desc_state_.read_op, nullptr);
272 149 }
273
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 146 times.
149 if (claimed)
274 {
275 3 acc_.impl_ptr = self;
276 3 svc_.post(&acc_);
277 3 svc_.work_finished();
278 }
279 149 }
280
281 void
282 6 epoll_acceptor_impl::
283 cancel_single_op(epoll_op& op) noexcept
284 {
285 6 op.request_cancel();
286
287 6 epoll_op* claimed = nullptr;
288 {
289 6 std::lock_guard lock(desc_state_.mutex);
290
1/2
✓ Branch 0 taken 6 times.
✗ Branch 1 not taken.
6 if (desc_state_.read_op == &op)
291 6 claimed = std::exchange(desc_state_.read_op, nullptr);
292 6 }
293
1/2
✓ Branch 0 taken 6 times.
✗ Branch 1 not taken.
6 if (claimed)
294 {
295 try {
296
1/1
✓ Branch 1 taken 6 times.
6 op.impl_ptr = shared_from_this();
297 } catch (const std::bad_weak_ptr&) {}
298 6 svc_.post(&op);
299 6 svc_.work_finished();
300 }
301 6 }
302
303 void
304 148 epoll_acceptor_impl::
305 close_socket() noexcept
306 {
307 148 cancel();
308
309
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 148 times.
148 if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
310 {
311 try {
312 desc_state_.impl_ref_ = shared_from_this();
313 } catch (std::bad_weak_ptr const&) {}
314 }
315
316
2/2
✓ Branch 0 taken 62 times.
✓ Branch 1 taken 86 times.
148 if (fd_ >= 0)
317 {
318
1/2
✓ Branch 0 taken 62 times.
✗ Branch 1 not taken.
62 if (desc_state_.registered_events != 0)
319 62 svc_.scheduler().deregister_descriptor(fd_);
320 62 ::close(fd_);
321 62 fd_ = -1;
322 }
323
324 148 desc_state_.fd = -1;
325 {
326 148 std::lock_guard lock(desc_state_.mutex);
327 148 desc_state_.read_op = nullptr;
328 148 desc_state_.read_ready = false;
329 148 desc_state_.write_ready = false;
330 148 }
331 148 desc_state_.registered_events = 0;
332
333 // Clear cached endpoint
334 148 local_endpoint_ = endpoint{};
335 148 }
336
337 189 epoll_acceptor_service::
338 189 epoll_acceptor_service(capy::execution_context& ctx)
339 189 : ctx_(ctx)
340
2/2
✓ Branch 2 taken 189 times.
✓ Branch 5 taken 189 times.
189 , state_(std::make_unique<epoll_acceptor_state>(ctx.use_service<epoll_scheduler>()))
341 {
342 189 }
343
344 378 epoll_acceptor_service::
345 189 ~epoll_acceptor_service()
346 {
347 378 }
348
349 void
350 189 epoll_acceptor_service::
351 shutdown()
352 {
353
1/1
✓ Branch 2 taken 189 times.
189 std::lock_guard lock(state_->mutex_);
354
355
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 189 times.
189 while (auto* impl = state_->acceptor_list_.pop_front())
356 impl->close_socket();
357
358 189 state_->acceptor_ptrs_.clear();
359 189 }
360
361 tcp_acceptor::acceptor_impl&
362 74 epoll_acceptor_service::
363 create_acceptor_impl()
364 {
365
1/1
✓ Branch 1 taken 74 times.
74 auto impl = std::make_shared<epoll_acceptor_impl>(*this);
366 74 auto* raw = impl.get();
367
368
1/1
✓ Branch 2 taken 74 times.
74 std::lock_guard lock(state_->mutex_);
369 74 state_->acceptor_list_.push_back(raw);
370
1/1
✓ Branch 3 taken 74 times.
74 state_->acceptor_ptrs_.emplace(raw, std::move(impl));
371
372 74 return *raw;
373 74 }
374
375 void
376 74 epoll_acceptor_service::
377 destroy_acceptor_impl(tcp_acceptor::acceptor_impl& impl)
378 {
379 74 auto* epoll_impl = static_cast<epoll_acceptor_impl*>(&impl);
380
1/1
✓ Branch 2 taken 74 times.
74 std::lock_guard lock(state_->mutex_);
381 74 state_->acceptor_list_.remove(epoll_impl);
382
1/1
✓ Branch 2 taken 74 times.
74 state_->acceptor_ptrs_.erase(epoll_impl);
383 74 }
384
385 std::error_code
386 74 epoll_acceptor_service::
387 open_acceptor(
388 tcp_acceptor::acceptor_impl& impl,
389 endpoint ep,
390 int backlog)
391 {
392 74 auto* epoll_impl = static_cast<epoll_acceptor_impl*>(&impl);
393 74 epoll_impl->close_socket();
394
395 74 int fd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
396
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 74 times.
74 if (fd < 0)
397 return make_err(errno);
398
399 74 int reuse = 1;
400 74 ::setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
401
402 74 sockaddr_in addr = detail::to_sockaddr_in(ep);
403
2/2
✓ Branch 1 taken 12 times.
✓ Branch 2 taken 62 times.
74 if (::bind(fd, reinterpret_cast<sockaddr*>(&addr), sizeof(addr)) < 0)
404 {
405 12 int errn = errno;
406
1/1
✓ Branch 1 taken 12 times.
12 ::close(fd);
407 12 return make_err(errn);
408 }
409
410
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 62 times.
62 if (::listen(fd, backlog) < 0)
411 {
412 int errn = errno;
413 ::close(fd);
414 return make_err(errn);
415 }
416
417 62 epoll_impl->fd_ = fd;
418
419 // Register fd with epoll (edge-triggered mode)
420 62 epoll_impl->desc_state_.fd = fd;
421 {
422
1/1
✓ Branch 1 taken 62 times.
62 std::lock_guard lock(epoll_impl->desc_state_.mutex);
423 62 epoll_impl->desc_state_.read_op = nullptr;
424 62 }
425
1/1
✓ Branch 2 taken 62 times.
62 scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
426
427 // Cache the local endpoint (queries OS for ephemeral port if port was 0)
428 62 sockaddr_in local_addr{};
429 62 socklen_t local_len = sizeof(local_addr);
430
1/2
✓ Branch 1 taken 62 times.
✗ Branch 2 not taken.
62 if (::getsockname(fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
431 62 epoll_impl->set_local_endpoint(detail::from_sockaddr_in(local_addr));
432
433 62 return {};
434 }
435
436 void
437 11 epoll_acceptor_service::
438 post(epoll_op* op)
439 {
440 11 state_->sched_.post(op);
441 11 }
442
443 void
444 2565 epoll_acceptor_service::
445 work_started() noexcept
446 {
447 2565 state_->sched_.work_started();
448 2565 }
449
450 void
451 9 epoll_acceptor_service::
452 work_finished() noexcept
453 {
454 9 state_->sched_.work_finished();
455 9 }
456
457 epoll_socket_service*
458 2558 epoll_acceptor_service::
459 socket_service() const noexcept
460 {
461 2558 auto* svc = ctx_.find_service<detail::socket_service>();
462
2/4
✓ Branch 0 taken 2558 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 2558 times.
✗ Branch 3 not taken.
2558 return svc ? dynamic_cast<epoll_socket_service*>(svc) : nullptr;
463 }
464
465 } // namespace boost::corosio::detail
466
467 #endif // BOOST_COROSIO_HAS_EPOLL
468