libs/corosio/src/corosio/src/detail/epoll/sockets.cpp

73.0% Lines (322/441) 94.4% Functions (34/36) 55.6% Branches (130/234)
libs/corosio/src/corosio/src/detail/epoll/sockets.cpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #include <boost/corosio/detail/platform.hpp>
11
12 #if BOOST_COROSIO_HAS_EPOLL
13
14 #include "src/detail/epoll/sockets.hpp"
15 #include "src/detail/endpoint_convert.hpp"
16 #include "src/detail/make_err.hpp"
17 #include "src/detail/resume_coro.hpp"
18
19 #include <boost/corosio/detail/except.hpp>
20 #include <boost/capy/buffers.hpp>
21
22 #include <utility>
23
24 #include <errno.h>
25 #include <netinet/in.h>
26 #include <netinet/tcp.h>
27 #include <sys/epoll.h>
28 #include <sys/socket.h>
29 #include <unistd.h>
30
31 namespace boost::corosio::detail {
32
33 void
34 104 epoll_op::canceller::
35 operator()() const noexcept
36 {
37 104 op->cancel();
38 104 }
39
40 void
41 epoll_connect_op::
42 cancel() noexcept
43 {
44 if (socket_impl_)
45 socket_impl_->cancel_single_op(*this);
46 else
47 request_cancel();
48 }
49
50 void
51 98 epoll_read_op::
52 cancel() noexcept
53 {
54
1/2
✓ Branch 0 taken 98 times.
✗ Branch 1 not taken.
98 if (socket_impl_)
55 98 socket_impl_->cancel_single_op(*this);
56 else
57 request_cancel();
58 98 }
59
60 void
61 epoll_write_op::
62 cancel() noexcept
63 {
64 if (socket_impl_)
65 socket_impl_->cancel_single_op(*this);
66 else
67 request_cancel();
68 }
69
70 void
71 2560 epoll_connect_op::
72 operator()()
73 {
74 2560 stop_cb.reset();
75
76
3/4
✓ Branch 0 taken 2558 times.
✓ Branch 1 taken 2 times.
✓ Branch 3 taken 2558 times.
✗ Branch 4 not taken.
2560 bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
77
78 // Cache endpoints on successful connect
79
3/4
✓ Branch 0 taken 2558 times.
✓ Branch 1 taken 2 times.
✓ Branch 2 taken 2558 times.
✗ Branch 3 not taken.
2560 if (success && socket_impl_)
80 {
81 // Query local endpoint via getsockname (may fail, but remote is always known)
82 2558 endpoint local_ep;
83 2558 sockaddr_in local_addr{};
84 2558 socklen_t local_len = sizeof(local_addr);
85
1/2
✓ Branch 1 taken 2558 times.
✗ Branch 2 not taken.
2558 if (::getsockname(fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
86 2558 local_ep = from_sockaddr_in(local_addr);
87 // Always cache remote endpoint; local may be default if getsockname failed
88 2558 static_cast<epoll_socket_impl*>(socket_impl_)->set_endpoints(local_ep, target_endpoint);
89 }
90
91
1/2
✓ Branch 0 taken 2560 times.
✗ Branch 1 not taken.
2560 if (ec_out)
92 {
93
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2560 times.
2560 if (cancelled.load(std::memory_order_acquire))
94 *ec_out = capy::error::canceled;
95
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 2558 times.
2560 else if (errn != 0)
96 2 *ec_out = make_err(errn);
97 else
98 2558 *ec_out = {};
99 }
100
101
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2560 times.
2560 if (bytes_out)
102 *bytes_out = bytes_transferred;
103
104 // Move to stack before resuming. See epoll_op::operator()() for rationale.
105 2560 capy::executor_ref saved_ex( std::move( ex ) );
106 2560 capy::coro saved_h( std::move( h ) );
107 2560 auto prevent_premature_destruction = std::move(impl_ptr);
108
1/1
✓ Branch 1 taken 2560 times.
2560 resume_coro(saved_ex, saved_h);
109 2560 }
110
111 5129 epoll_socket_impl::
112 5129 epoll_socket_impl(epoll_socket_service& svc) noexcept
113 5129 : svc_(svc)
114 {
115 5129 }
116
117 5129 epoll_socket_impl::
118 ~epoll_socket_impl() = default;
119
120 void
121 5129 epoll_socket_impl::
122 release()
123 {
124 5129 close_socket();
125 5129 svc_.destroy_impl(*this);
126 5129 }
127
128 std::coroutine_handle<>
129 2560 epoll_socket_impl::
130 connect(
131 std::coroutine_handle<> h,
132 capy::executor_ref ex,
133 endpoint ep,
134 std::stop_token token,
135 std::error_code* ec)
136 {
137 2560 auto& op = conn_;
138 2560 op.reset();
139 2560 op.h = h;
140 2560 op.ex = ex;
141 2560 op.ec_out = ec;
142 2560 op.fd = fd_;
143 2560 op.target_endpoint = ep; // Store target for endpoint caching
144 2560 op.start(token, this);
145
146 2560 sockaddr_in addr = detail::to_sockaddr_in(ep);
147
1/1
✓ Branch 1 taken 2560 times.
2560 int result = ::connect(fd_, reinterpret_cast<sockaddr*>(&addr), sizeof(addr));
148
149
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2560 times.
2560 if (result == 0)
150 {
151 // Sync success - cache endpoints immediately
152 // Remote is always known; local may fail but we still cache remote
153 sockaddr_in local_addr{};
154 socklen_t local_len = sizeof(local_addr);
155 if (::getsockname(fd_, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
156 local_endpoint_ = detail::from_sockaddr_in(local_addr);
157 remote_endpoint_ = ep;
158
159 op.complete(0, 0);
160 op.impl_ptr = shared_from_this();
161 svc_.post(&op);
162 // completion is always posted to scheduler queue, never inline.
163 return std::noop_coroutine();
164 }
165
166
1/2
✓ Branch 0 taken 2560 times.
✗ Branch 1 not taken.
2560 if (errno == EINPROGRESS)
167 {
168 2560 svc_.work_started();
169
1/1
✓ Branch 1 taken 2560 times.
2560 op.impl_ptr = shared_from_this();
170
171 2560 bool perform_now = false;
172 {
173
1/1
✓ Branch 1 taken 2560 times.
2560 std::lock_guard lock(desc_state_.mutex);
174
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2560 times.
2560 if (desc_state_.write_ready)
175 {
176 desc_state_.write_ready = false;
177 perform_now = true;
178 }
179 else
180 {
181 2560 desc_state_.connect_op = &op;
182 }
183 2560 }
184
185
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2560 times.
2560 if (perform_now)
186 {
187 op.perform_io();
188 if (op.errn == EAGAIN || op.errn == EWOULDBLOCK)
189 {
190 op.errn = 0;
191 std::lock_guard lock(desc_state_.mutex);
192 desc_state_.connect_op = &op;
193 }
194 else
195 {
196 svc_.post(&op);
197 svc_.work_finished();
198 }
199 return std::noop_coroutine();
200 }
201
202
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2560 times.
2560 if (op.cancelled.load(std::memory_order_acquire))
203 {
204 epoll_op* claimed = nullptr;
205 {
206 std::lock_guard lock(desc_state_.mutex);
207 if (desc_state_.connect_op == &op)
208 claimed = std::exchange(desc_state_.connect_op, nullptr);
209 }
210 if (claimed)
211 {
212 svc_.post(claimed);
213 svc_.work_finished();
214 }
215 }
216 // completion is always posted to scheduler queue, never inline.
217 2560 return std::noop_coroutine();
218 }
219
220 op.complete(errno, 0);
221 op.impl_ptr = shared_from_this();
222 svc_.post(&op);
223 // completion is always posted to scheduler queue, never inline.
224 return std::noop_coroutine();
225 }
226
227 void
228 78907 epoll_socket_impl::
229 do_read_io()
230 {
231 78907 auto& op = rd_;
232
233 ssize_t n;
234 do {
235 78907 n = ::readv(fd_, op.iovecs, op.iovec_count);
236
3/4
✓ Branch 0 taken 168 times.
✓ Branch 1 taken 78739 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 168 times.
78907 } while (n < 0 && errno == EINTR);
237
238
2/2
✓ Branch 0 taken 78734 times.
✓ Branch 1 taken 173 times.
78907 if (n > 0)
239 {
240 {
241
1/1
✓ Branch 1 taken 78734 times.
78734 std::lock_guard lock(desc_state_.mutex);
242 78734 desc_state_.read_ready = false;
243 78734 }
244 78734 op.complete(0, static_cast<std::size_t>(n));
245 78734 svc_.post(&op);
246 78734 return;
247 }
248
249
2/2
✓ Branch 0 taken 5 times.
✓ Branch 1 taken 168 times.
173 if (n == 0)
250 {
251 {
252
1/1
✓ Branch 1 taken 5 times.
5 std::lock_guard lock(desc_state_.mutex);
253 5 desc_state_.read_ready = false;
254 5 }
255 5 op.complete(0, 0);
256 5 svc_.post(&op);
257 5 return;
258 }
259
260
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 168 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
168 if (errno == EAGAIN || errno == EWOULDBLOCK)
261 {
262 168 svc_.work_started();
263
264 168 bool perform_now = false;
265 {
266
1/1
✓ Branch 1 taken 168 times.
168 std::lock_guard lock(desc_state_.mutex);
267
2/2
✓ Branch 0 taken 45 times.
✓ Branch 1 taken 123 times.
168 if (desc_state_.read_ready)
268 {
269 45 desc_state_.read_ready = false;
270 45 perform_now = true;
271 }
272 else
273 {
274 123 desc_state_.read_op = &op;
275 }
276 168 }
277
278
2/2
✓ Branch 0 taken 45 times.
✓ Branch 1 taken 123 times.
168 if (perform_now)
279 {
280 45 op.perform_io();
281
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 45 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
45 if (op.errn == EAGAIN || op.errn == EWOULDBLOCK)
282 {
283 45 op.errn = 0;
284
1/1
✓ Branch 1 taken 45 times.
45 std::lock_guard lock(desc_state_.mutex);
285 45 desc_state_.read_op = &op;
286 45 }
287 else
288 {
289 svc_.post(&op);
290 svc_.work_finished();
291 }
292 45 return;
293 }
294
295
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 123 times.
123 if (op.cancelled.load(std::memory_order_acquire))
296 {
297 epoll_op* claimed = nullptr;
298 {
299 std::lock_guard lock(desc_state_.mutex);
300 if (desc_state_.read_op == &op)
301 claimed = std::exchange(desc_state_.read_op, nullptr);
302 }
303 if (claimed)
304 {
305 svc_.post(claimed);
306 svc_.work_finished();
307 }
308 }
309 123 return;
310 }
311
312 op.complete(errno, 0);
313 svc_.post(&op);
314 }
315
316 void
317 78786 epoll_socket_impl::
318 do_write_io()
319 {
320 78786 auto& op = wr_;
321
322 78786 msghdr msg{};
323 78786 msg.msg_iov = op.iovecs;
324 78786 msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
325
326 ssize_t n;
327 do {
328
1/1
✓ Branch 1 taken 78786 times.
78786 n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
329
3/4
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 78785 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 1 time.
78786 } while (n < 0 && errno == EINTR);
330
331
2/2
✓ Branch 0 taken 78785 times.
✓ Branch 1 taken 1 time.
78786 if (n > 0)
332 {
333 {
334
1/1
✓ Branch 1 taken 78785 times.
78785 std::lock_guard lock(desc_state_.mutex);
335 78785 desc_state_.write_ready = false;
336 78785 }
337 78785 op.complete(0, static_cast<std::size_t>(n));
338
1/1
✓ Branch 1 taken 78785 times.
78785 svc_.post(&op);
339 78785 return;
340 }
341
342
2/4
✓ Branch 0 taken 1 time.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 1 time.
1 if (errno == EAGAIN || errno == EWOULDBLOCK)
343 {
344 svc_.work_started();
345
346 bool perform_now = false;
347 {
348 std::lock_guard lock(desc_state_.mutex);
349 if (desc_state_.write_ready)
350 {
351 desc_state_.write_ready = false;
352 perform_now = true;
353 }
354 else
355 {
356 desc_state_.write_op = &op;
357 }
358 }
359
360 if (perform_now)
361 {
362 op.perform_io();
363 if (op.errn == EAGAIN || op.errn == EWOULDBLOCK)
364 {
365 op.errn = 0;
366 std::lock_guard lock(desc_state_.mutex);
367 desc_state_.write_op = &op;
368 }
369 else
370 {
371 svc_.post(&op);
372 svc_.work_finished();
373 }
374 return;
375 }
376
377 if (op.cancelled.load(std::memory_order_acquire))
378 {
379 epoll_op* claimed = nullptr;
380 {
381 std::lock_guard lock(desc_state_.mutex);
382 if (desc_state_.write_op == &op)
383 claimed = std::exchange(desc_state_.write_op, nullptr);
384 }
385 if (claimed)
386 {
387 svc_.post(claimed);
388 svc_.work_finished();
389 }
390 }
391 return;
392 }
393
394
1/2
✓ Branch 0 taken 1 time.
✗ Branch 1 not taken.
1 op.complete(errno ? errno : EIO, 0);
395
1/1
✓ Branch 1 taken 1 time.
1 svc_.post(&op);
396 }
397
398 std::coroutine_handle<>
399 78908 epoll_socket_impl::
400 read_some(
401 std::coroutine_handle<> h,
402 capy::executor_ref ex,
403 io_buffer_param param,
404 std::stop_token token,
405 std::error_code* ec,
406 std::size_t* bytes_out)
407 {
408 78908 auto& op = rd_;
409 78908 op.reset();
410 78908 op.h = h;
411 78908 op.ex = ex;
412 78908 op.ec_out = ec;
413 78908 op.bytes_out = bytes_out;
414 78908 op.fd = fd_;
415 78908 op.start(token, this);
416
1/1
✓ Branch 1 taken 78908 times.
78908 op.impl_ptr = shared_from_this();
417
418 // Must prepare buffers before initiator runs
419 78908 capy::mutable_buffer bufs[epoll_read_op::max_buffers];
420 78908 op.iovec_count = static_cast<int>(param.copy_to(bufs, epoll_read_op::max_buffers));
421
422
6/8
✓ Branch 0 taken 78907 times.
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 78907 times.
✗ Branch 3 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 78907 times.
✓ Branch 7 taken 1 time.
✓ Branch 8 taken 78907 times.
78908 if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
423 {
424 1 op.empty_buffer_read = true;
425 1 op.complete(0, 0);
426
1/1
✓ Branch 1 taken 1 time.
1 svc_.post(&op);
427 1 return std::noop_coroutine();
428 }
429
430
2/2
✓ Branch 0 taken 78907 times.
✓ Branch 1 taken 78907 times.
157814 for (int i = 0; i < op.iovec_count; ++i)
431 {
432 78907 op.iovecs[i].iov_base = bufs[i].data();
433 78907 op.iovecs[i].iov_len = bufs[i].size();
434 }
435
436 // Symmetric transfer ensures caller is suspended before I/O starts
437
1/1
✓ Branch 1 taken 78907 times.
78907 return read_initiator_.start<&epoll_socket_impl::do_read_io>(this);
438 }
439
440 std::coroutine_handle<>
441 78787 epoll_socket_impl::
442 write_some(
443 std::coroutine_handle<> h,
444 capy::executor_ref ex,
445 io_buffer_param param,
446 std::stop_token token,
447 std::error_code* ec,
448 std::size_t* bytes_out)
449 {
450 78787 auto& op = wr_;
451 78787 op.reset();
452 78787 op.h = h;
453 78787 op.ex = ex;
454 78787 op.ec_out = ec;
455 78787 op.bytes_out = bytes_out;
456 78787 op.fd = fd_;
457 78787 op.start(token, this);
458
1/1
✓ Branch 1 taken 78787 times.
78787 op.impl_ptr = shared_from_this();
459
460 // Must prepare buffers before initiator runs
461 78787 capy::mutable_buffer bufs[epoll_write_op::max_buffers];
462 78787 op.iovec_count = static_cast<int>(param.copy_to(bufs, epoll_write_op::max_buffers));
463
464
6/8
✓ Branch 0 taken 78786 times.
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 78786 times.
✗ Branch 3 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 78786 times.
✓ Branch 7 taken 1 time.
✓ Branch 8 taken 78786 times.
78787 if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
465 {
466 1 op.complete(0, 0);
467
1/1
✓ Branch 1 taken 1 time.
1 svc_.post(&op);
468 1 return std::noop_coroutine();
469 }
470
471
2/2
✓ Branch 0 taken 78786 times.
✓ Branch 1 taken 78786 times.
157572 for (int i = 0; i < op.iovec_count; ++i)
472 {
473 78786 op.iovecs[i].iov_base = bufs[i].data();
474 78786 op.iovecs[i].iov_len = bufs[i].size();
475 }
476
477 // Symmetric transfer ensures caller is suspended before I/O starts
478
1/1
✓ Branch 1 taken 78786 times.
78786 return write_initiator_.start<&epoll_socket_impl::do_write_io>(this);
479 }
480
481 std::error_code
482 3 epoll_socket_impl::
483 shutdown(tcp_socket::shutdown_type what) noexcept
484 {
485 int how;
486
3/4
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 1 time.
✗ Branch 3 not taken.
3 switch (what)
487 {
488 1 case tcp_socket::shutdown_receive: how = SHUT_RD; break;
489 1 case tcp_socket::shutdown_send: how = SHUT_WR; break;
490 1 case tcp_socket::shutdown_both: how = SHUT_RDWR; break;
491 default:
492 return make_err(EINVAL);
493 }
494
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::shutdown(fd_, how) != 0)
495 return make_err(errno);
496 3 return {};
497 }
498
499 std::error_code
500 5 epoll_socket_impl::
501 set_no_delay(bool value) noexcept
502 {
503
2/2
✓ Branch 0 taken 4 times.
✓ Branch 1 taken 1 time.
5 int flag = value ? 1 : 0;
504
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 5 times.
5 if (::setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag)) != 0)
505 return make_err(errno);
506 5 return {};
507 }
508
509 bool
510 5 epoll_socket_impl::
511 no_delay(std::error_code& ec) const noexcept
512 {
513 5 int flag = 0;
514 5 socklen_t len = sizeof(flag);
515
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 5 times.
5 if (::getsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, &len) != 0)
516 {
517 ec = make_err(errno);
518 return false;
519 }
520 5 ec = {};
521 5 return flag != 0;
522 }
523
524 std::error_code
525 4 epoll_socket_impl::
526 set_keep_alive(bool value) noexcept
527 {
528
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 1 time.
4 int flag = value ? 1 : 0;
529
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 4 times.
4 if (::setsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, sizeof(flag)) != 0)
530 return make_err(errno);
531 4 return {};
532 }
533
534 bool
535 4 epoll_socket_impl::
536 keep_alive(std::error_code& ec) const noexcept
537 {
538 4 int flag = 0;
539 4 socklen_t len = sizeof(flag);
540
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 4 times.
4 if (::getsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, &len) != 0)
541 {
542 ec = make_err(errno);
543 return false;
544 }
545 4 ec = {};
546 4 return flag != 0;
547 }
548
549 std::error_code
550 1 epoll_socket_impl::
551 set_receive_buffer_size(int size) noexcept
552 {
553
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 time.
1 if (::setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)) != 0)
554 return make_err(errno);
555 1 return {};
556 }
557
558 int
559 3 epoll_socket_impl::
560 receive_buffer_size(std::error_code& ec) const noexcept
561 {
562 3 int size = 0;
563 3 socklen_t len = sizeof(size);
564
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::getsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, &len) != 0)
565 {
566 ec = make_err(errno);
567 return 0;
568 }
569 3 ec = {};
570 3 return size;
571 }
572
573 std::error_code
574 1 epoll_socket_impl::
575 set_send_buffer_size(int size) noexcept
576 {
577
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 time.
1 if (::setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, sizeof(size)) != 0)
578 return make_err(errno);
579 1 return {};
580 }
581
582 int
583 3 epoll_socket_impl::
584 send_buffer_size(std::error_code& ec) const noexcept
585 {
586 3 int size = 0;
587 3 socklen_t len = sizeof(size);
588
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::getsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, &len) != 0)
589 {
590 ec = make_err(errno);
591 return 0;
592 }
593 3 ec = {};
594 3 return size;
595 }
596
597 std::error_code
598 4 epoll_socket_impl::
599 set_linger(bool enabled, int timeout) noexcept
600 {
601
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 3 times.
4 if (timeout < 0)
602 1 return make_err(EINVAL);
603 struct ::linger lg;
604
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 1 time.
3 lg.l_onoff = enabled ? 1 : 0;
605 3 lg.l_linger = timeout;
606
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::setsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, sizeof(lg)) != 0)
607 return make_err(errno);
608 3 return {};
609 }
610
611 tcp_socket::linger_options
612 3 epoll_socket_impl::
613 linger(std::error_code& ec) const noexcept
614 {
615 3 struct ::linger lg{};
616 3 socklen_t len = sizeof(lg);
617
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::getsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, &len) != 0)
618 {
619 ec = make_err(errno);
620 return {};
621 }
622 3 ec = {};
623 3 return {.enabled = lg.l_onoff != 0, .timeout = lg.l_linger};
624 }
625
626 void
627 7796 epoll_socket_impl::
628 cancel() noexcept
629 {
630 7796 std::shared_ptr<epoll_socket_impl> self;
631 try {
632
1/1
✓ Branch 1 taken 7796 times.
7796 self = shared_from_this();
633 } catch (const std::bad_weak_ptr&) {
634 return;
635 }
636
637 7796 conn_.request_cancel();
638 7796 rd_.request_cancel();
639 7796 wr_.request_cancel();
640
641 7796 epoll_op* conn_claimed = nullptr;
642 7796 epoll_op* rd_claimed = nullptr;
643 7796 epoll_op* wr_claimed = nullptr;
644 {
645 7796 std::lock_guard lock(desc_state_.mutex);
646
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7796 times.
7796 if (desc_state_.connect_op == &conn_)
647 conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
648
2/2
✓ Branch 0 taken 51 times.
✓ Branch 1 taken 7745 times.
7796 if (desc_state_.read_op == &rd_)
649 51 rd_claimed = std::exchange(desc_state_.read_op, nullptr);
650
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7796 times.
7796 if (desc_state_.write_op == &wr_)
651 wr_claimed = std::exchange(desc_state_.write_op, nullptr);
652 7796 }
653
654
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7796 times.
7796 if (conn_claimed)
655 {
656 conn_.impl_ptr = self;
657 svc_.post(&conn_);
658 svc_.work_finished();
659 }
660
2/2
✓ Branch 0 taken 51 times.
✓ Branch 1 taken 7745 times.
7796 if (rd_claimed)
661 {
662 51 rd_.impl_ptr = self;
663 51 svc_.post(&rd_);
664 51 svc_.work_finished();
665 }
666
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 7796 times.
7796 if (wr_claimed)
667 {
668 wr_.impl_ptr = self;
669 svc_.post(&wr_);
670 svc_.work_finished();
671 }
672 7796 }
673
674 void
675 98 epoll_socket_impl::
676 cancel_single_op(epoll_op& op) noexcept
677 {
678 98 op.request_cancel();
679
680 98 epoll_op** desc_op_ptr = nullptr;
681
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 98 times.
98 if (&op == &conn_) desc_op_ptr = &desc_state_.connect_op;
682
1/2
✓ Branch 0 taken 98 times.
✗ Branch 1 not taken.
98 else if (&op == &rd_) desc_op_ptr = &desc_state_.read_op;
683 else if (&op == &wr_) desc_op_ptr = &desc_state_.write_op;
684
685
1/2
✓ Branch 0 taken 98 times.
✗ Branch 1 not taken.
98 if (desc_op_ptr)
686 {
687 98 epoll_op* claimed = nullptr;
688 {
689 98 std::lock_guard lock(desc_state_.mutex);
690
2/2
✓ Branch 0 taken 66 times.
✓ Branch 1 taken 32 times.
98 if (*desc_op_ptr == &op)
691 66 claimed = std::exchange(*desc_op_ptr, nullptr);
692 98 }
693
2/2
✓ Branch 0 taken 66 times.
✓ Branch 1 taken 32 times.
98 if (claimed)
694 {
695 try {
696
1/1
✓ Branch 1 taken 66 times.
66 op.impl_ptr = shared_from_this();
697 } catch (const std::bad_weak_ptr&) {}
698 66 svc_.post(&op);
699 66 svc_.work_finished();
700 }
701 }
702 98 }
703
704 void
705 7700 epoll_socket_impl::
706 close_socket() noexcept
707 {
708 7700 cancel();
709
710 // Keep impl alive if descriptor_state is queued in the scheduler.
711 // Without this, destroy_impl() drops the last shared_ptr while
712 // the queued descriptor_state node would become dangling.
713
2/2
✓ Branch 1 taken 4 times.
✓ Branch 2 taken 7696 times.
7700 if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
714 {
715 try {
716
1/1
✓ Branch 1 taken 4 times.
4 desc_state_.impl_ref_ = shared_from_this();
717 } catch (std::bad_weak_ptr const&) {}
718 }
719
720
2/2
✓ Branch 0 taken 5129 times.
✓ Branch 1 taken 2571 times.
7700 if (fd_ >= 0)
721 {
722
1/2
✓ Branch 0 taken 5129 times.
✗ Branch 1 not taken.
5129 if (desc_state_.registered_events != 0)
723 5129 svc_.scheduler().deregister_descriptor(fd_);
724 5129 ::close(fd_);
725 5129 fd_ = -1;
726 }
727
728 7700 desc_state_.fd = -1;
729 {
730 7700 std::lock_guard lock(desc_state_.mutex);
731 7700 desc_state_.read_op = nullptr;
732 7700 desc_state_.write_op = nullptr;
733 7700 desc_state_.connect_op = nullptr;
734 7700 desc_state_.read_ready = false;
735 7700 desc_state_.write_ready = false;
736 7700 }
737 7700 desc_state_.registered_events = 0;
738
739 7700 local_endpoint_ = endpoint{};
740 7700 remote_endpoint_ = endpoint{};
741 7700 }
742
743 189 epoll_socket_service::
744 189 epoll_socket_service(capy::execution_context& ctx)
745
2/2
✓ Branch 2 taken 189 times.
✓ Branch 5 taken 189 times.
189 : state_(std::make_unique<epoll_socket_state>(ctx.use_service<epoll_scheduler>()))
746 {
747 189 }
748
749 378 epoll_socket_service::
750 189 ~epoll_socket_service()
751 {
752 378 }
753
754 void
755 189 epoll_socket_service::
756 shutdown()
757 {
758
1/1
✓ Branch 2 taken 189 times.
189 std::lock_guard lock(state_->mutex_);
759
760
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 189 times.
189 while (auto* impl = state_->socket_list_.pop_front())
761 impl->close_socket();
762
763 189 state_->socket_ptrs_.clear();
764 189 }
765
766 tcp_socket::socket_impl&
767 5129 epoll_socket_service::
768 create_impl()
769 {
770
1/1
✓ Branch 1 taken 5129 times.
5129 auto impl = std::make_shared<epoll_socket_impl>(*this);
771 5129 auto* raw = impl.get();
772
773 {
774
1/1
✓ Branch 2 taken 5129 times.
5129 std::lock_guard lock(state_->mutex_);
775 5129 state_->socket_list_.push_back(raw);
776
1/1
✓ Branch 3 taken 5129 times.
5129 state_->socket_ptrs_.emplace(raw, std::move(impl));
777 5129 }
778
779 5129 return *raw;
780 5129 }
781
782 void
783 5129 epoll_socket_service::
784 destroy_impl(tcp_socket::socket_impl& impl)
785 {
786 5129 auto* epoll_impl = static_cast<epoll_socket_impl*>(&impl);
787
1/1
✓ Branch 2 taken 5129 times.
5129 std::lock_guard lock(state_->mutex_);
788 5129 state_->socket_list_.remove(epoll_impl);
789
1/1
✓ Branch 2 taken 5129 times.
5129 state_->socket_ptrs_.erase(epoll_impl);
790 5129 }
791
792 std::error_code
793 2571 epoll_socket_service::
794 open_socket(tcp_socket::socket_impl& impl)
795 {
796 2571 auto* epoll_impl = static_cast<epoll_socket_impl*>(&impl);
797 2571 epoll_impl->close_socket();
798
799 2571 int fd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
800
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2571 times.
2571 if (fd < 0)
801 return make_err(errno);
802
803 2571 epoll_impl->fd_ = fd;
804
805 // Register fd with epoll (edge-triggered mode)
806 2571 epoll_impl->desc_state_.fd = fd;
807 {
808
1/1
✓ Branch 1 taken 2571 times.
2571 std::lock_guard lock(epoll_impl->desc_state_.mutex);
809 2571 epoll_impl->desc_state_.read_op = nullptr;
810 2571 epoll_impl->desc_state_.write_op = nullptr;
811 2571 epoll_impl->desc_state_.connect_op = nullptr;
812 2571 }
813 2571 scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
814
815 2571 return {};
816 }
817
818 void
819 157644 epoll_socket_service::
820 post(epoll_op* op)
821 {
822 157644 state_->sched_.post(op);
823 157644 }
824
825 void
826 2728 epoll_socket_service::
827 work_started() noexcept
828 {
829 2728 state_->sched_.work_started();
830 2728 }
831
832 void
833 117 epoll_socket_service::
834 work_finished() noexcept
835 {
836 117 state_->sched_.work_finished();
837 117 }
838
839 } // namespace boost::corosio::detail
840
841 #endif // BOOST_COROSIO_HAS_EPOLL
842