libs/corosio/src/corosio/src/detail/epoll/op.hpp

84.3% Lines (97/115) 81.0% Functions (17/21) 64.1% Branches (25/39)
libs/corosio/src/corosio/src/detail/epoll/op.hpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_DETAIL_EPOLL_OP_HPP
11 #define BOOST_COROSIO_DETAIL_EPOLL_OP_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_EPOLL
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/corosio/io_object.hpp>
19 #include <boost/corosio/endpoint.hpp>
20 #include <boost/capy/ex/executor_ref.hpp>
21 #include <boost/capy/coro.hpp>
22 #include <boost/capy/error.hpp>
23 #include <system_error>
24
25 #include "src/detail/make_err.hpp"
26 #include "src/detail/resume_coro.hpp"
27 #include "src/detail/scheduler_op.hpp"
28 #include "src/detail/endpoint_convert.hpp"
29
30 #include <unistd.h>
31 #include <errno.h>
32
33 #include <atomic>
34 #include <cstddef>
35 #include <memory>
36 #include <mutex>
37 #include <optional>
38 #include <stop_token>
39
40 #include <netinet/in.h>
41 #include <sys/socket.h>
42 #include <sys/uio.h>
43
44 /*
45 epoll Operation State
46 =====================
47
48 Each async I/O operation has a corresponding epoll_op-derived struct that
49 holds the operation's state while it's in flight. The socket impl owns
50 fixed slots for each operation type (conn_, rd_, wr_), so only one
51 operation of each type can be pending per socket at a time.
52
53 Persistent Registration
54 -----------------------
55 File descriptors are registered with epoll once (via descriptor_state) and
56 stay registered until closed. The descriptor_state tracks which operations
57 are pending (read_op, write_op, connect_op). When an event arrives, the
58 reactor dispatches to the appropriate pending operation.
59
60 Impl Lifetime Management
61 ------------------------
62 When cancel() posts an op to the scheduler's ready queue, the socket impl
63 might be destroyed before the scheduler processes the op. The `impl_ptr`
64 member holds a shared_ptr to the impl, keeping it alive until the op
65 completes. This is set by cancel() and cleared in operator() after the
66 coroutine is resumed.
67
68 EOF Detection
69 -------------
70 For reads, 0 bytes with no error means EOF. But an empty user buffer also
71 returns 0 bytes. The `empty_buffer_read` flag distinguishes these cases.
72
73 SIGPIPE Prevention
74 ------------------
75 Writes use sendmsg() with MSG_NOSIGNAL instead of writev() to prevent
76 SIGPIPE when the peer has closed.
77 */
78
79 namespace boost::corosio::detail {
80
81 // Forward declarations
82 class epoll_socket_impl;
83 class epoll_acceptor_impl;
84 struct epoll_op;
85
86 // Forward declaration
87 class epoll_scheduler;
88
89 /** Per-descriptor state for persistent epoll registration.
90
91 Tracks pending operations for a file descriptor. The fd is registered
92 once with epoll and stays registered until closed.
93
94 This struct extends scheduler_op to support deferred I/O processing.
95 When epoll events arrive, the reactor sets ready_events and queues
96 this descriptor for processing. When popped from the scheduler queue,
97 operator() performs the actual I/O and queues completion handlers.
98
99 @par Deferred I/O Model
100 The reactor no longer performs I/O directly. Instead:
101 1. Reactor sets ready_events and queues descriptor_state
102 2. Scheduler pops descriptor_state and calls operator()
103 3. operator() performs I/O under mutex and queues completions
104
105 This eliminates per-descriptor mutex locking from the reactor hot path.
106
107 @par Thread Safety
108 The mutex protects operation pointers and ready flags during I/O.
109 ready_events_ and is_enqueued_ are atomic for lock-free reactor access.
110 */
111 struct descriptor_state : scheduler_op
112 {
113 std::mutex mutex;
114
115 // Protected by mutex
116 epoll_op* read_op = nullptr;
117 epoll_op* write_op = nullptr;
118 epoll_op* connect_op = nullptr;
119
120 // Caches edge events that arrived before an op was registered
121 bool read_ready = false;
122 bool write_ready = false;
123
124 // Set during registration only (no mutex needed)
125 std::uint32_t registered_events = 0;
126 int fd = -1;
127
128 // For deferred I/O - set by reactor, read by scheduler
129 std::atomic<std::uint32_t> ready_events_{0};
130 std::atomic<bool> is_enqueued_{false};
131 epoll_scheduler const* scheduler_ = nullptr;
132
133 // Prevents impl destruction while this descriptor_state is queued.
134 // Set by close_socket() when is_enqueued_ is true, cleared by operator().
135 std::shared_ptr<void> impl_ref_;
136
137 /// Add ready events atomically.
138 83982 void add_ready_events(std::uint32_t ev) noexcept
139 {
140 83982 ready_events_.fetch_or(ev, std::memory_order_relaxed);
141 83982 }
142
143 /// Perform deferred I/O and queue completions.
144 void operator()() override;
145
146 /// Destroy without invoking.
147 void destroy() override {}
148 };
149
150 struct epoll_op : scheduler_op
151 {
152 struct canceller
153 {
154 epoll_op* op;
155 void operator()() const noexcept;
156 };
157
158 capy::coro h;
159 capy::executor_ref ex;
160 std::error_code* ec_out = nullptr;
161 std::size_t* bytes_out = nullptr;
162
163 int fd = -1;
164 int errn = 0;
165 std::size_t bytes_transferred = 0;
166
167 std::atomic<bool> cancelled{false};
168 std::optional<std::stop_callback<canceller>> stop_cb;
169
170 // Prevents use-after-free when socket is closed with pending ops.
171 // See "Impl Lifetime Management" in file header.
172 std::shared_ptr<void> impl_ptr;
173
174 // For stop_token cancellation - pointer to owning socket/acceptor impl.
175 // When stop is requested, we call back to the impl to perform actual I/O cancellation.
176 epoll_socket_impl* socket_impl_ = nullptr;
177 epoll_acceptor_impl* acceptor_impl_ = nullptr;
178
179 15461 epoll_op() = default;
180
181 162822 void reset() noexcept
182 {
183 162822 fd = -1;
184 162822 errn = 0;
185 162822 bytes_transferred = 0;
186 162822 cancelled.store(false, std::memory_order_relaxed);
187 162822 impl_ptr.reset();
188 162822 socket_impl_ = nullptr;
189 162822 acceptor_impl_ = nullptr;
190 162822 }
191
192 157695 void operator()() override
193 {
194 157695 stop_cb.reset();
195
196
1/2
✓ Branch 0 taken 157695 times.
✗ Branch 1 not taken.
157695 if (ec_out)
197 {
198
2/2
✓ Branch 1 taken 203 times.
✓ Branch 2 taken 157492 times.
157695 if (cancelled.load(std::memory_order_acquire))
199 203 *ec_out = capy::error::canceled;
200
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 157491 times.
157492 else if (errn != 0)
201 1 *ec_out = make_err(errn);
202
6/6
✓ Branch 1 taken 78708 times.
✓ Branch 2 taken 78783 times.
✓ Branch 3 taken 5 times.
✓ Branch 4 taken 78703 times.
✓ Branch 5 taken 5 times.
✓ Branch 6 taken 157486 times.
157491 else if (is_read_operation() && bytes_transferred == 0)
203 5 *ec_out = capy::error::eof;
204 else
205 157486 *ec_out = {};
206 }
207
208
1/2
✓ Branch 0 taken 157695 times.
✗ Branch 1 not taken.
157695 if (bytes_out)
209 157695 *bytes_out = bytes_transferred;
210
211 // Move to stack before resuming coroutine. The coroutine might close
212 // the socket, releasing the last wrapper ref. If impl_ptr were the
213 // last ref and we destroyed it while still in operator(), we'd have
214 // use-after-free. Moving to local ensures destruction happens at
215 // function exit, after all member accesses are complete.
216 157695 capy::executor_ref saved_ex( std::move( ex ) );
217 157695 capy::coro saved_h( std::move( h ) );
218 157695 auto prevent_premature_destruction = std::move(impl_ptr);
219
1/1
✓ Branch 1 taken 157695 times.
157695 resume_coro(saved_ex, saved_h);
220 157695 }
221
222 78782 virtual bool is_read_operation() const noexcept { return false; }
223 virtual void cancel() noexcept = 0;
224
225 void destroy() override
226 {
227 stop_cb.reset();
228 impl_ptr.reset();
229 }
230
231 23641 void request_cancel() noexcept
232 {
233 23641 cancelled.store(true, std::memory_order_release);
234 23641 }
235
236 160255 void start(std::stop_token token, epoll_socket_impl* impl)
237 {
238 160255 cancelled.store(false, std::memory_order_release);
239 160255 stop_cb.reset();
240 160255 socket_impl_ = impl;
241 160255 acceptor_impl_ = nullptr;
242
243
2/2
✓ Branch 1 taken 105 times.
✓ Branch 2 taken 160150 times.
160255 if (token.stop_possible())
244 105 stop_cb.emplace(token, canceller{this});
245 160255 }
246
247 2567 void start(std::stop_token token, epoll_acceptor_impl* impl)
248 {
249 2567 cancelled.store(false, std::memory_order_release);
250 2567 stop_cb.reset();
251 2567 socket_impl_ = nullptr;
252 2567 acceptor_impl_ = impl;
253
254
2/2
✓ Branch 1 taken 9 times.
✓ Branch 2 taken 2558 times.
2567 if (token.stop_possible())
255 9 stop_cb.emplace(token, canceller{this});
256 2567 }
257
258 162741 void complete(int err, std::size_t bytes) noexcept
259 {
260 162741 errn = err;
261 162741 bytes_transferred = bytes;
262 162741 }
263
264 virtual void perform_io() noexcept {}
265 };
266
267
268 struct epoll_connect_op : epoll_op
269 {
270 endpoint target_endpoint;
271
272 2560 void reset() noexcept
273 {
274 2560 epoll_op::reset();
275 2560 target_endpoint = endpoint{};
276 2560 }
277
278 2559 void perform_io() noexcept override
279 {
280 // connect() completion status is retrieved via SO_ERROR, not return value
281 2559 int err = 0;
282 2559 socklen_t len = sizeof(err);
283
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2559 times.
2559 if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
284 err = errno;
285 2559 complete(err, 0);
286 2559 }
287
288 // Defined in sockets.cpp where epoll_socket_impl is complete
289 void operator()() override;
290 void cancel() noexcept override;
291 };
292
293
294 struct epoll_read_op : epoll_op
295 {
296 static constexpr std::size_t max_buffers = 16;
297 iovec iovecs[max_buffers];
298 int iovec_count = 0;
299 bool empty_buffer_read = false;
300
301 78709 bool is_read_operation() const noexcept override
302 {
303 78709 return !empty_buffer_read;
304 }
305
306 78908 void reset() noexcept
307 {
308 78908 epoll_op::reset();
309 78908 iovec_count = 0;
310 78908 empty_buffer_read = false;
311 78908 }
312
313 96 void perform_io() noexcept override
314 {
315 ssize_t n;
316 do {
317 96 n = ::readv(fd, iovecs, iovec_count);
318
3/4
✓ Branch 0 taken 45 times.
✓ Branch 1 taken 51 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 45 times.
96 } while (n < 0 && errno == EINTR);
319
320
2/2
✓ Branch 0 taken 51 times.
✓ Branch 1 taken 45 times.
96 if (n >= 0)
321 51 complete(0, static_cast<std::size_t>(n));
322 else
323 45 complete(errno, 0);
324 96 }
325
326 void cancel() noexcept override;
327 };
328
329
330 struct epoll_write_op : epoll_op
331 {
332 static constexpr std::size_t max_buffers = 16;
333 iovec iovecs[max_buffers];
334 int iovec_count = 0;
335
336 78787 void reset() noexcept
337 {
338 78787 epoll_op::reset();
339 78787 iovec_count = 0;
340 78787 }
341
342 void perform_io() noexcept override
343 {
344 msghdr msg{};
345 msg.msg_iov = iovecs;
346 msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
347
348 ssize_t n;
349 do {
350 n = ::sendmsg(fd, &msg, MSG_NOSIGNAL);
351 } while (n < 0 && errno == EINTR);
352
353 if (n >= 0)
354 complete(0, static_cast<std::size_t>(n));
355 else
356 complete(errno, 0);
357 }
358
359 void cancel() noexcept override;
360 };
361
362
363 struct epoll_accept_op : epoll_op
364 {
365 int accepted_fd = -1;
366 io_object::io_object_impl* peer_impl = nullptr;
367 io_object::io_object_impl** impl_out = nullptr;
368
369 2567 void reset() noexcept
370 {
371 2567 epoll_op::reset();
372 2567 accepted_fd = -1;
373 2567 peer_impl = nullptr;
374 2567 impl_out = nullptr;
375 2567 }
376
377 2556 void perform_io() noexcept override
378 {
379 2556 sockaddr_in addr{};
380 2556 socklen_t addrlen = sizeof(addr);
381 int new_fd;
382 do {
383 2556 new_fd = ::accept4(fd, reinterpret_cast<sockaddr*>(&addr),
384 &addrlen, SOCK_NONBLOCK | SOCK_CLOEXEC);
385
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 2556 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
2556 } while (new_fd < 0 && errno == EINTR);
386
387
1/2
✓ Branch 0 taken 2556 times.
✗ Branch 1 not taken.
2556 if (new_fd >= 0)
388 {
389 2556 accepted_fd = new_fd;
390 2556 complete(0, 0);
391 }
392 else
393 {
394 complete(errno, 0);
395 }
396 2556 }
397
398 // Defined in acceptors.cpp where epoll_acceptor_impl is complete
399 void operator()() override;
400 void cancel() noexcept override;
401 };
402
403 } // namespace boost::corosio::detail
404
405 #endif // BOOST_COROSIO_HAS_EPOLL
406
407 #endif // BOOST_COROSIO_DETAIL_EPOLL_OP_HPP
408