libs/corosio/src/corosio/src/detail/epoll/scheduler.cpp

82.9% Lines (406/490) 93.5% Functions (43/46) 70.2% Branches (212/302)
libs/corosio/src/corosio/src/detail/epoll/scheduler.cpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #include <boost/corosio/detail/platform.hpp>
11
12 #if BOOST_COROSIO_HAS_EPOLL
13
14 #include "src/detail/epoll/scheduler.hpp"
15 #include "src/detail/epoll/op.hpp"
16 #include "src/detail/make_err.hpp"
17 #include "src/detail/posix/resolver_service.hpp"
18 #include "src/detail/posix/signals.hpp"
19
20 #include <boost/corosio/detail/except.hpp>
21 #include <boost/corosio/detail/thread_local_ptr.hpp>
22
23 #include <atomic>
24 #include <chrono>
25 #include <limits>
26 #include <utility>
27
28 #include <errno.h>
29 #include <fcntl.h>
30 #include <sys/epoll.h>
31 #include <sys/eventfd.h>
32 #include <sys/socket.h>
33 #include <sys/timerfd.h>
34 #include <unistd.h>
35
36 /*
37 epoll Scheduler - Single Reactor Model
38 ======================================
39
40 This scheduler uses a thread coordination strategy to provide handler
41 parallelism and avoid the thundering herd problem.
42 Instead of all threads blocking on epoll_wait(), one thread becomes the
43 "reactor" while others wait on a condition variable for handler work.
44
45 Thread Model
46 ------------
47 - ONE thread runs epoll_wait() at a time (the reactor thread)
48 - OTHER threads wait on cond_ (condition variable) for handlers
49 - When work is posted, exactly one waiting thread wakes via notify_one()
50 - This matches Windows IOCP semantics where N posted items wake N threads
51
52 Event Loop Structure (do_one)
53 -----------------------------
54 1. Lock mutex, try to pop handler from queue
55 2. If got handler: execute it (unlocked), return
56 3. If queue empty and no reactor running: become reactor
57 - Run epoll_wait (unlocked), queue I/O completions, loop back
58 4. If queue empty and reactor running: wait on condvar for work
59
60 The task_running_ flag ensures only one thread owns epoll_wait().
61 After the reactor queues I/O completions, it loops back to try getting
62 a handler, giving priority to handler execution over more I/O polling.
63
64 Signaling State (state_)
65 ------------------------
66 The state_ variable encodes two pieces of information:
67 - Bit 0: signaled flag (1 = signaled, persists until cleared)
68 - Upper bits: waiter count (each waiter adds 2 before blocking)
69
70 This allows efficient coordination:
71 - Signalers only call notify when waiters exist (state_ > 1)
72 - Waiters check if already signaled before blocking (fast-path)
73
74 Wake Coordination (wake_one_thread_and_unlock)
75 ----------------------------------------------
76 When posting work:
77 - If waiters exist (state_ > 1): signal and notify_one()
78 - Else if reactor running: interrupt via eventfd write
79 - Else: no-op (thread will find work when it checks queue)
80
81 This avoids waking threads unnecessarily. With cascading wakes,
82 each handler execution wakes at most one additional thread if
83 more work exists in the queue.
84
85 Work Counting
86 -------------
87 outstanding_work_ tracks pending operations. When it hits zero, run()
88 returns. Each operation increments on start, decrements on completion.
89
90 Timer Integration
91 -----------------
92 Timers are handled by timer_service. The reactor adjusts epoll_wait
93 timeout to wake for the nearest timer expiry. When a new timer is
94 scheduled earlier than current, timer_service calls interrupt_reactor()
95 to re-evaluate the timeout.
96 */
97
98 namespace boost::corosio::detail {
99
100 struct scheduler_context
101 {
102 epoll_scheduler const* key;
103 scheduler_context* next;
104 op_queue private_queue;
105 long private_outstanding_work;
106
107 163 scheduler_context(epoll_scheduler const* k, scheduler_context* n)
108 163 : key(k)
109 163 , next(n)
110 163 , private_outstanding_work(0)
111 {
112 163 }
113 };
114
115 namespace {
116
117 corosio::detail::thread_local_ptr<scheduler_context> context_stack;
118
119 struct thread_context_guard
120 {
121 scheduler_context frame_;
122
123 163 explicit thread_context_guard(
124 epoll_scheduler const* ctx) noexcept
125 163 : frame_(ctx, context_stack.get())
126 {
127 163 context_stack.set(&frame_);
128 163 }
129
130 163 ~thread_context_guard() noexcept
131 {
132
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 163 times.
163 if (!frame_.private_queue.empty())
133 frame_.key->drain_thread_queue(frame_.private_queue, frame_.private_outstanding_work);
134 163 context_stack.set(frame_.next);
135 163 }
136 };
137
138 scheduler_context*
139 243727 find_context(epoll_scheduler const* self) noexcept
140 {
141
2/2
✓ Branch 1 taken 242078 times.
✓ Branch 2 taken 1649 times.
243727 for (auto* c = context_stack.get(); c != nullptr; c = c->next)
142
1/2
✓ Branch 0 taken 242078 times.
✗ Branch 1 not taken.
242078 if (c->key == self)
143 242078 return c;
144 1649 return nullptr;
145 }
146
147 } // namespace
148
149 void
150 83982 descriptor_state::
151 operator()()
152 {
153 83982 is_enqueued_.store(false, std::memory_order_relaxed);
154
155 // Take ownership of impl ref set by close_socket() to prevent
156 // the owning impl from being freed while we're executing
157 83982 auto prevent_impl_destruction = std::move(impl_ref_);
158
159 83982 std::uint32_t ev = ready_events_.exchange(0, std::memory_order_acquire);
160
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 83982 times.
83982 if (ev == 0)
161 {
162 scheduler_->compensating_work_started();
163 return;
164 }
165
166 83982 op_queue local_ops;
167
168 83982 int err = 0;
169
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 83980 times.
83982 if (ev & EPOLLERR)
170 {
171 2 socklen_t len = sizeof(err);
172
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2 times.
2 if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
173 err = errno;
174
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 1 time.
2 if (err == 0)
175 1 err = EIO;
176 }
177
178 83982 epoll_op* rd = nullptr;
179 83982 epoll_op* wr = nullptr;
180 83982 epoll_op* cn = nullptr;
181 {
182
1/1
✓ Branch 1 taken 83982 times.
83982 std::lock_guard lock(mutex);
183
2/2
✓ Branch 0 taken 35239 times.
✓ Branch 1 taken 48743 times.
83982 if (ev & EPOLLIN)
184 {
185 35239 rd = std::exchange(read_op, nullptr);
186
2/2
✓ Branch 0 taken 32632 times.
✓ Branch 1 taken 2607 times.
35239 if (!rd)
187 32632 read_ready = true;
188 }
189
2/2
✓ Branch 0 taken 81426 times.
✓ Branch 1 taken 2556 times.
83982 if (ev & EPOLLOUT)
190 {
191 81426 cn = std::exchange(connect_op, nullptr);
192 81426 wr = std::exchange(write_op, nullptr);
193
3/4
✓ Branch 0 taken 78866 times.
✓ Branch 1 taken 2560 times.
✓ Branch 2 taken 78866 times.
✗ Branch 3 not taken.
81426 if (!cn && !wr)
194 78866 write_ready = true;
195 }
196
3/4
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 83980 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 2 times.
83982 if (err && !(ev & (EPOLLIN | EPOLLOUT)))
197 {
198 rd = std::exchange(read_op, nullptr);
199 wr = std::exchange(write_op, nullptr);
200 cn = std::exchange(connect_op, nullptr);
201 }
202 83982 }
203
204 // Non-null after I/O means EAGAIN; re-register under lock below
205
2/2
✓ Branch 0 taken 2607 times.
✓ Branch 1 taken 81375 times.
83982 if (rd)
206 {
207
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2607 times.
2607 if (err)
208 rd->complete(err, 0);
209 else
210 2607 rd->perform_io();
211
212
2/4
✓ Branch 0 taken 2607 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 2607 times.
2607 if (rd->errn == EAGAIN || rd->errn == EWOULDBLOCK)
213 {
214 rd->errn = 0;
215 }
216 else
217 {
218 2607 local_ops.push(rd);
219 2607 rd = nullptr;
220 }
221 }
222
223
2/2
✓ Branch 0 taken 2560 times.
✓ Branch 1 taken 81422 times.
83982 if (cn)
224 {
225
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 2559 times.
2560 if (err)
226 1 cn->complete(err, 0);
227 else
228 2559 cn->perform_io();
229 2560 local_ops.push(cn);
230 2560 cn = nullptr;
231 }
232
233
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 83982 times.
83982 if (wr)
234 {
235 if (err)
236 wr->complete(err, 0);
237 else
238 wr->perform_io();
239
240 if (wr->errn == EAGAIN || wr->errn == EWOULDBLOCK)
241 {
242 wr->errn = 0;
243 }
244 else
245 {
246 local_ops.push(wr);
247 wr = nullptr;
248 }
249 }
250
251
2/4
✓ Branch 0 taken 83982 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 83982 times.
83982 if (rd || wr)
252 {
253 std::lock_guard lock(mutex);
254 if (rd)
255 read_op = rd;
256 if (wr)
257 write_op = wr;
258 }
259
260 // Execute first handler inline — the scheduler's work_cleanup
261 // accounts for this as the "consumed" work item
262 83982 scheduler_op* first = local_ops.pop();
263
2/2
✓ Branch 0 taken 5167 times.
✓ Branch 1 taken 78815 times.
83982 if (first)
264 {
265
1/1
✓ Branch 1 taken 5167 times.
5167 scheduler_->post_deferred_completions(local_ops);
266
1/1
✓ Branch 1 taken 5167 times.
5167 (*first)();
267 }
268 else
269 {
270 78815 scheduler_->compensating_work_started();
271 }
272 83982 }
273
274 189 epoll_scheduler::
275 epoll_scheduler(
276 capy::execution_context& ctx,
277 189 int)
278 189 : epoll_fd_(-1)
279 189 , event_fd_(-1)
280 189 , timer_fd_(-1)
281 189 , outstanding_work_(0)
282 189 , stopped_(false)
283 189 , shutdown_(false)
284 189 , task_running_{false}
285 189 , task_interrupted_(false)
286 378 , state_(0)
287 {
288 189 epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC);
289
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 189 times.
189 if (epoll_fd_ < 0)
290 detail::throw_system_error(make_err(errno), "epoll_create1");
291
292 189 event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
293
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 189 times.
189 if (event_fd_ < 0)
294 {
295 int errn = errno;
296 ::close(epoll_fd_);
297 detail::throw_system_error(make_err(errn), "eventfd");
298 }
299
300 189 timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
301
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 189 times.
189 if (timer_fd_ < 0)
302 {
303 int errn = errno;
304 ::close(event_fd_);
305 ::close(epoll_fd_);
306 detail::throw_system_error(make_err(errn), "timerfd_create");
307 }
308
309 189 epoll_event ev{};
310 189 ev.events = EPOLLIN | EPOLLET;
311 189 ev.data.ptr = nullptr;
312
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 189 times.
189 if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0)
313 {
314 int errn = errno;
315 ::close(timer_fd_);
316 ::close(event_fd_);
317 ::close(epoll_fd_);
318 detail::throw_system_error(make_err(errn), "epoll_ctl");
319 }
320
321 189 epoll_event timer_ev{};
322 189 timer_ev.events = EPOLLIN | EPOLLERR;
323 189 timer_ev.data.ptr = &timer_fd_;
324
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 189 times.
189 if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0)
325 {
326 int errn = errno;
327 ::close(timer_fd_);
328 ::close(event_fd_);
329 ::close(epoll_fd_);
330 detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)");
331 }
332
333
1/1
✓ Branch 1 taken 189 times.
189 timer_svc_ = &get_timer_service(ctx, *this);
334
1/1
✓ Branch 3 taken 189 times.
189 timer_svc_->set_on_earliest_changed(
335 timer_service::callback(
336 this,
337 [](void* p) {
338 2800 auto* self = static_cast<epoll_scheduler*>(p);
339 2800 self->timerfd_stale_.store(true, std::memory_order_release);
340
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2800 times.
2800 if (self->task_running_.load(std::memory_order_relaxed))
341 self->interrupt_reactor();
342 2800 }));
343
344 // Initialize resolver service
345
1/1
✓ Branch 1 taken 189 times.
189 get_resolver_service(ctx, *this);
346
347 // Initialize signal service
348
1/1
✓ Branch 1 taken 189 times.
189 get_signal_service(ctx, *this);
349
350 // Push task sentinel to interleave reactor runs with handler execution
351 189 completed_ops_.push(&task_op_);
352 189 }
353
354 378 epoll_scheduler::
355 189 ~epoll_scheduler()
356 {
357
1/2
✓ Branch 0 taken 189 times.
✗ Branch 1 not taken.
189 if (timer_fd_ >= 0)
358 189 ::close(timer_fd_);
359
1/2
✓ Branch 0 taken 189 times.
✗ Branch 1 not taken.
189 if (event_fd_ >= 0)
360 189 ::close(event_fd_);
361
1/2
✓ Branch 0 taken 189 times.
✗ Branch 1 not taken.
189 if (epoll_fd_ >= 0)
362 189 ::close(epoll_fd_);
363 378 }
364
365 void
366 189 epoll_scheduler::
367 shutdown()
368 {
369 {
370
1/1
✓ Branch 1 taken 189 times.
189 std::unique_lock lock(mutex_);
371 189 shutdown_ = true;
372
373
2/2
✓ Branch 1 taken 189 times.
✓ Branch 2 taken 189 times.
378 while (auto* h = completed_ops_.pop())
374 {
375
1/2
✓ Branch 0 taken 189 times.
✗ Branch 1 not taken.
189 if (h == &task_op_)
376 189 continue;
377 lock.unlock();
378 h->destroy();
379 lock.lock();
380 189 }
381
382 189 signal_all(lock);
383 189 }
384
385 189 outstanding_work_.store(0, std::memory_order_release);
386
387
1/2
✓ Branch 0 taken 189 times.
✗ Branch 1 not taken.
189 if (event_fd_ >= 0)
388 189 interrupt_reactor();
389 189 }
390
391 void
392 4431 epoll_scheduler::
393 post(capy::coro h) const
394 {
395 struct post_handler final
396 : scheduler_op
397 {
398 capy::coro h_;
399
400 explicit
401 4431 post_handler(capy::coro h)
402 4431 : h_(h)
403 {
404 4431 }
405
406 8862 ~post_handler() = default;
407
408 4431 void operator()() override
409 {
410 4431 auto h = h_;
411
1/2
✓ Branch 0 taken 4431 times.
✗ Branch 1 not taken.
4431 delete this;
412 std::atomic_thread_fence(std::memory_order_acquire);
413
1/1
✓ Branch 1 taken 4431 times.
4431 h.resume();
414 4431 }
415
416 void destroy() override
417 {
418 delete this;
419 }
420 };
421
422
1/1
✓ Branch 1 taken 4431 times.
4431 auto ph = std::make_unique<post_handler>(h);
423
424 // Fast path: same thread posts to private queue
425 // Only count locally; work_cleanup batches to global counter
426
2/2
✓ Branch 1 taken 2808 times.
✓ Branch 2 taken 1623 times.
4431 if (auto* ctx = find_context(this))
427 {
428 2808 ++ctx->private_outstanding_work;
429 2808 ctx->private_queue.push(ph.release());
430 2808 return;
431 }
432
433 // Slow path: cross-thread post requires mutex
434 1623 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
435
436
1/1
✓ Branch 1 taken 1623 times.
1623 std::unique_lock lock(mutex_);
437 1623 completed_ops_.push(ph.release());
438
1/1
✓ Branch 1 taken 1623 times.
1623 wake_one_thread_and_unlock(lock);
439 4431 }
440
441 void
442 160481 epoll_scheduler::
443 post(scheduler_op* h) const
444 {
445 // Fast path: same thread posts to private queue
446 // Only count locally; work_cleanup batches to global counter
447
2/2
✓ Branch 1 taken 160455 times.
✓ Branch 2 taken 26 times.
160481 if (auto* ctx = find_context(this))
448 {
449 160455 ++ctx->private_outstanding_work;
450 160455 ctx->private_queue.push(h);
451 160455 return;
452 }
453
454 // Slow path: cross-thread post requires mutex
455 26 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
456
457
1/1
✓ Branch 1 taken 26 times.
26 std::unique_lock lock(mutex_);
458 26 completed_ops_.push(h);
459
1/1
✓ Branch 1 taken 26 times.
26 wake_one_thread_and_unlock(lock);
460 26 }
461
462 void
463 3282 epoll_scheduler::
464 on_work_started() noexcept
465 {
466 3282 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
467 3282 }
468
469 void
470 3250 epoll_scheduler::
471 on_work_finished() noexcept
472 {
473
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3250 times.
6500 if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
474 stop();
475 3250 }
476
477 bool
478 3037 epoll_scheduler::
479 running_in_this_thread() const noexcept
480 {
481
2/2
✓ Branch 1 taken 2827 times.
✓ Branch 2 taken 210 times.
3037 for (auto* c = context_stack.get(); c != nullptr; c = c->next)
482
1/2
✓ Branch 0 taken 2827 times.
✗ Branch 1 not taken.
2827 if (c->key == this)
483 2827 return true;
484 210 return false;
485 }
486
487 void
488 35 epoll_scheduler::
489 stop()
490 {
491
1/1
✓ Branch 1 taken 35 times.
35 std::unique_lock lock(mutex_);
492
2/2
✓ Branch 0 taken 18 times.
✓ Branch 1 taken 17 times.
35 if (!stopped_)
493 {
494 18 stopped_ = true;
495 18 signal_all(lock);
496
1/1
✓ Branch 1 taken 18 times.
18 interrupt_reactor();
497 }
498 35 }
499
500 bool
501 16 epoll_scheduler::
502 stopped() const noexcept
503 {
504 16 std::unique_lock lock(mutex_);
505 32 return stopped_;
506 16 }
507
508 void
509 49 epoll_scheduler::
510 restart()
511 {
512
1/1
✓ Branch 1 taken 49 times.
49 std::unique_lock lock(mutex_);
513 49 stopped_ = false;
514 49 }
515
516 std::size_t
517 175 epoll_scheduler::
518 run()
519 {
520
2/2
✓ Branch 1 taken 26 times.
✓ Branch 2 taken 149 times.
350 if (outstanding_work_.load(std::memory_order_acquire) == 0)
521 {
522
1/1
✓ Branch 1 taken 26 times.
26 stop();
523 26 return 0;
524 }
525
526 149 thread_context_guard ctx(this);
527
1/1
✓ Branch 1 taken 149 times.
149 std::unique_lock lock(mutex_);
528
529 149 std::size_t n = 0;
530 for (;;)
531 {
532
3/3
✓ Branch 1 taken 249028 times.
✓ Branch 3 taken 149 times.
✓ Branch 4 taken 248879 times.
249028 if (!do_one(lock, -1, &ctx.frame_))
533 149 break;
534
1/2
✓ Branch 1 taken 248879 times.
✗ Branch 2 not taken.
248879 if (n != (std::numeric_limits<std::size_t>::max)())
535 248879 ++n;
536
2/2
✓ Branch 1 taken 88452 times.
✓ Branch 2 taken 160427 times.
248879 if (!lock.owns_lock())
537
1/1
✓ Branch 1 taken 88452 times.
88452 lock.lock();
538 }
539 149 return n;
540 149 }
541
542 std::size_t
543 2 epoll_scheduler::
544 run_one()
545 {
546
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2 times.
4 if (outstanding_work_.load(std::memory_order_acquire) == 0)
547 {
548 stop();
549 return 0;
550 }
551
552 2 thread_context_guard ctx(this);
553
1/1
✓ Branch 1 taken 2 times.
2 std::unique_lock lock(mutex_);
554
1/1
✓ Branch 1 taken 2 times.
2 return do_one(lock, -1, &ctx.frame_);
555 2 }
556
557 std::size_t
558 14 epoll_scheduler::
559 wait_one(long usec)
560 {
561
2/2
✓ Branch 1 taken 5 times.
✓ Branch 2 taken 9 times.
28 if (outstanding_work_.load(std::memory_order_acquire) == 0)
562 {
563
1/1
✓ Branch 1 taken 5 times.
5 stop();
564 5 return 0;
565 }
566
567 9 thread_context_guard ctx(this);
568
1/1
✓ Branch 1 taken 9 times.
9 std::unique_lock lock(mutex_);
569
1/1
✓ Branch 1 taken 9 times.
9 return do_one(lock, usec, &ctx.frame_);
570 9 }
571
572 std::size_t
573 2 epoll_scheduler::
574 poll()
575 {
576
2/2
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 1 time.
4 if (outstanding_work_.load(std::memory_order_acquire) == 0)
577 {
578
1/1
✓ Branch 1 taken 1 time.
1 stop();
579 1 return 0;
580 }
581
582 1 thread_context_guard ctx(this);
583
1/1
✓ Branch 1 taken 1 time.
1 std::unique_lock lock(mutex_);
584
585 1 std::size_t n = 0;
586 for (;;)
587 {
588
3/3
✓ Branch 1 taken 3 times.
✓ Branch 3 taken 1 time.
✓ Branch 4 taken 2 times.
3 if (!do_one(lock, 0, &ctx.frame_))
589 1 break;
590
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 if (n != (std::numeric_limits<std::size_t>::max)())
591 2 ++n;
592
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 if (!lock.owns_lock())
593
1/1
✓ Branch 1 taken 2 times.
2 lock.lock();
594 }
595 1 return n;
596 1 }
597
598 std::size_t
599 4 epoll_scheduler::
600 poll_one()
601 {
602
2/2
✓ Branch 1 taken 2 times.
✓ Branch 2 taken 2 times.
8 if (outstanding_work_.load(std::memory_order_acquire) == 0)
603 {
604
1/1
✓ Branch 1 taken 2 times.
2 stop();
605 2 return 0;
606 }
607
608 2 thread_context_guard ctx(this);
609
1/1
✓ Branch 1 taken 2 times.
2 std::unique_lock lock(mutex_);
610
1/1
✓ Branch 1 taken 2 times.
2 return do_one(lock, 0, &ctx.frame_);
611 2 }
612
613 void
614 5191 epoll_scheduler::
615 register_descriptor(int fd, descriptor_state* desc) const
616 {
617 5191 epoll_event ev{};
618 5191 ev.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
619 5191 ev.data.ptr = desc;
620
621
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 5191 times.
5191 if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0)
622 detail::throw_system_error(make_err(errno), "epoll_ctl (register)");
623
624 5191 desc->registered_events = ev.events;
625 5191 desc->fd = fd;
626 5191 desc->scheduler_ = this;
627
628
1/1
✓ Branch 1 taken 5191 times.
5191 std::lock_guard lock(desc->mutex);
629 5191 desc->read_ready = false;
630 5191 desc->write_ready = false;
631 5191 }
632
633 void
634 5191 epoll_scheduler::
635 deregister_descriptor(int fd) const
636 {
637 5191 ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
638 5191 }
639
640 void
641 5293 epoll_scheduler::
642 work_started() const noexcept
643 {
644 5293 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
645 5293 }
646
647 void
648 9807 epoll_scheduler::
649 work_finished() const noexcept
650 {
651
2/2
✓ Branch 0 taken 148 times.
✓ Branch 1 taken 9659 times.
19614 if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
652 {
653 // Last work item completed - wake all threads so they can exit.
654 // signal_all() wakes threads waiting on the condvar.
655 // interrupt_reactor() wakes the reactor thread blocked in epoll_wait().
656 // Both are needed because they target different blocking mechanisms.
657 148 std::unique_lock lock(mutex_);
658 148 signal_all(lock);
659
5/6
✓ Branch 1 taken 2 times.
✓ Branch 2 taken 146 times.
✓ Branch 3 taken 2 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 2 times.
✓ Branch 6 taken 146 times.
148 if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_)
660 {
661 2 task_interrupted_ = true;
662 2 lock.unlock();
663 2 interrupt_reactor();
664 }
665 148 }
666 9807 }
667
668 void
669 78815 epoll_scheduler::
670 compensating_work_started() const noexcept
671 {
672 78815 auto* ctx = find_context(this);
673
1/2
✓ Branch 0 taken 78815 times.
✗ Branch 1 not taken.
78815 if (ctx)
674 78815 ++ctx->private_outstanding_work;
675 78815 }
676
677 void
678 epoll_scheduler::
679 drain_thread_queue(op_queue& queue, long count) const
680 {
681 // Note: outstanding_work_ was already incremented when posting
682 std::unique_lock lock(mutex_);
683 completed_ops_.splice(queue);
684 if (count > 0)
685 maybe_unlock_and_signal_one(lock);
686 }
687
688 void
689 5167 epoll_scheduler::
690 post_deferred_completions(op_queue& ops) const
691 {
692
1/2
✓ Branch 1 taken 5167 times.
✗ Branch 2 not taken.
5167 if (ops.empty())
693 5167 return;
694
695 // Fast path: if on scheduler thread, use private queue
696 if (auto* ctx = find_context(this))
697 {
698 ctx->private_queue.splice(ops);
699 return;
700 }
701
702 // Slow path: add to global queue and wake a thread
703 std::unique_lock lock(mutex_);
704 completed_ops_.splice(ops);
705 wake_one_thread_and_unlock(lock);
706 }
707
708 void
709 235 epoll_scheduler::
710 interrupt_reactor() const
711 {
712 // Only write if not already armed to avoid redundant writes
713 235 bool expected = false;
714
2/2
✓ Branch 1 taken 223 times.
✓ Branch 2 taken 12 times.
235 if (eventfd_armed_.compare_exchange_strong(expected, true,
715 std::memory_order_release, std::memory_order_relaxed))
716 {
717 223 std::uint64_t val = 1;
718
1/1
✓ Branch 1 taken 223 times.
223 [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
719 }
720 235 }
721
722 void
723 355 epoll_scheduler::
724 signal_all(std::unique_lock<std::mutex>&) const
725 {
726 355 state_ |= 1;
727 355 cond_.notify_all();
728 355 }
729
730 bool
731 1649 epoll_scheduler::
732 maybe_unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const
733 {
734 1649 state_ |= 1;
735
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1649 times.
1649 if (state_ > 1)
736 {
737 lock.unlock();
738 cond_.notify_one();
739 return true;
740 }
741 1649 return false;
742 }
743
744 void
745 328767 epoll_scheduler::
746 unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const
747 {
748 328767 state_ |= 1;
749 328767 bool have_waiters = state_ > 1;
750 328767 lock.unlock();
751
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 328767 times.
328767 if (have_waiters)
752 cond_.notify_one();
753 328767 }
754
755 void
756 2 epoll_scheduler::
757 clear_signal() const
758 {
759 2 state_ &= ~std::size_t(1);
760 2 }
761
762 void
763 2 epoll_scheduler::
764 wait_for_signal(std::unique_lock<std::mutex>& lock) const
765 {
766
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 2 times.
4 while ((state_ & 1) == 0)
767 {
768 2 state_ += 2;
769 2 cond_.wait(lock);
770 2 state_ -= 2;
771 }
772 2 }
773
774 void
775 epoll_scheduler::
776 wait_for_signal_for(
777 std::unique_lock<std::mutex>& lock,
778 long timeout_us) const
779 {
780 if ((state_ & 1) == 0)
781 {
782 state_ += 2;
783 cond_.wait_for(lock, std::chrono::microseconds(timeout_us));
784 state_ -= 2;
785 }
786 }
787
788 void
789 1649 epoll_scheduler::
790 wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const
791 {
792
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1649 times.
1649 if (maybe_unlock_and_signal_one(lock))
793 return;
794
795
5/6
✓ Branch 1 taken 26 times.
✓ Branch 2 taken 1623 times.
✓ Branch 3 taken 26 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 26 times.
✓ Branch 6 taken 1623 times.
1649 if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_)
796 {
797 26 task_interrupted_ = true;
798 26 lock.unlock();
799 26 interrupt_reactor();
800 }
801 else
802 {
803 1623 lock.unlock();
804 }
805 }
806
807 /** RAII guard for handler execution work accounting.
808
809 Handler consumes 1 work item, may produce N new items via fast-path posts.
810 Net change = N - 1:
811 - If N > 1: add (N-1) to global (more work produced than consumed)
812 - If N == 1: net zero, do nothing
813 - If N < 1: call work_finished() (work consumed, may trigger stop)
814
815 Also drains private queue to global for other threads to process.
816 */
817 struct work_cleanup
818 {
819 epoll_scheduler const* scheduler;
820 std::unique_lock<std::mutex>* lock;
821 scheduler_context* ctx;
822
823 248894 ~work_cleanup()
824 {
825
1/2
✓ Branch 0 taken 248894 times.
✗ Branch 1 not taken.
248894 if (ctx)
826 {
827 248894 long produced = ctx->private_outstanding_work;
828
2/2
✓ Branch 0 taken 47 times.
✓ Branch 1 taken 248847 times.
248894 if (produced > 1)
829 47 scheduler->outstanding_work_.fetch_add(produced - 1, std::memory_order_relaxed);
830
2/2
✓ Branch 0 taken 9649 times.
✓ Branch 1 taken 239198 times.
248847 else if (produced < 1)
831 9649 scheduler->work_finished();
832 // produced == 1: net zero, handler consumed what it produced
833 248894 ctx->private_outstanding_work = 0;
834
835
2/2
✓ Branch 1 taken 160430 times.
✓ Branch 2 taken 88464 times.
248894 if (!ctx->private_queue.empty())
836 {
837 160430 lock->lock();
838 160430 scheduler->completed_ops_.splice(ctx->private_queue);
839 }
840 }
841 else
842 {
843 // No thread context - slow-path op was already counted globally
844 scheduler->work_finished();
845 }
846 248894 }
847 };
848
849 /** RAII guard for reactor work accounting.
850
851 Reactor only produces work via timer/signal callbacks posting handlers.
852 Unlike handler execution which consumes 1, the reactor consumes nothing.
853 All produced work must be flushed to global counter.
854 */
855 struct task_cleanup
856 {
857 epoll_scheduler const* scheduler;
858 std::unique_lock<std::mutex>* lock;
859 scheduler_context* ctx;
860
861 85240 ~task_cleanup()
862 85240 {
863
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 85240 times.
85240 if (!ctx)
864 return;
865
866
2/2
✓ Branch 0 taken 2784 times.
✓ Branch 1 taken 82456 times.
85240 if (ctx->private_outstanding_work > 0)
867 {
868 2784 scheduler->outstanding_work_.fetch_add(
869 2784 ctx->private_outstanding_work, std::memory_order_relaxed);
870 2784 ctx->private_outstanding_work = 0;
871 }
872
873
2/2
✓ Branch 1 taken 2784 times.
✓ Branch 2 taken 82456 times.
85240 if (!ctx->private_queue.empty())
874 {
875
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2784 times.
2784 if (!lock->owns_lock())
876 lock->lock();
877 2784 scheduler->completed_ops_.splice(ctx->private_queue);
878 }
879 85240 }
880 };
881
882 void
883 5570 epoll_scheduler::
884 update_timerfd() const
885 {
886 5570 auto nearest = timer_svc_->nearest_expiry();
887
888 5570 itimerspec ts{};
889 5570 int flags = 0;
890
891
3/3
✓ Branch 2 taken 5570 times.
✓ Branch 4 taken 5526 times.
✓ Branch 5 taken 44 times.
5570 if (nearest == timer_service::time_point::max())
892 {
893 // No timers - disarm by setting to 0 (relative)
894 }
895 else
896 {
897 5526 auto now = std::chrono::steady_clock::now();
898
3/3
✓ Branch 1 taken 5526 times.
✓ Branch 4 taken 65 times.
✓ Branch 5 taken 5461 times.
5526 if (nearest <= now)
899 {
900 // Use 1ns instead of 0 - zero disarms the timerfd
901 65 ts.it_value.tv_nsec = 1;
902 }
903 else
904 {
905 5461 auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
906
1/1
✓ Branch 1 taken 5461 times.
10922 nearest - now).count();
907 5461 ts.it_value.tv_sec = nsec / 1000000000;
908 5461 ts.it_value.tv_nsec = nsec % 1000000000;
909 // Ensure non-zero to avoid disarming if duration rounds to 0
910
3/4
✓ Branch 0 taken 5457 times.
✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 5457 times.
5461 if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0)
911 ts.it_value.tv_nsec = 1;
912 }
913 }
914
915
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 5570 times.
5570 if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0)
916 detail::throw_system_error(make_err(errno), "timerfd_settime");
917 5570 }
918
919 void
920 85240 epoll_scheduler::
921 run_task(std::unique_lock<std::mutex>& lock, scheduler_context* ctx)
922 {
923
2/2
✓ Branch 0 taken 79873 times.
✓ Branch 1 taken 5367 times.
85240 int timeout_ms = task_interrupted_ ? 0 : -1;
924
925
2/2
✓ Branch 1 taken 5367 times.
✓ Branch 2 taken 79873 times.
85240 if (lock.owns_lock())
926
1/1
✓ Branch 1 taken 5367 times.
5367 lock.unlock();
927
928 85240 task_cleanup on_exit{this, &lock, ctx};
929
930 // Flush deferred timerfd programming before blocking
931
2/2
✓ Branch 1 taken 2783 times.
✓ Branch 2 taken 82457 times.
85240 if (timerfd_stale_.exchange(false, std::memory_order_acquire))
932
1/1
✓ Branch 1 taken 2783 times.
2783 update_timerfd();
933
934 // Event loop runs without mutex held
935 epoll_event events[128];
936
1/1
✓ Branch 1 taken 85240 times.
85240 int nfds = ::epoll_wait(epoll_fd_, events, 128, timeout_ms);
937
938
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 85240 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
85240 if (nfds < 0 && errno != EINTR)
939 detail::throw_system_error(make_err(errno), "epoll_wait");
940
941 85240 bool check_timers = false;
942 85240 op_queue local_ops;
943
944 // Process events without holding the mutex
945
2/2
✓ Branch 0 taken 86803 times.
✓ Branch 1 taken 85240 times.
172043 for (int i = 0; i < nfds; ++i)
946 {
947
2/2
✓ Branch 0 taken 34 times.
✓ Branch 1 taken 86769 times.
86803 if (events[i].data.ptr == nullptr)
948 {
949 std::uint64_t val;
950
1/1
✓ Branch 1 taken 34 times.
34 [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
951 34 eventfd_armed_.store(false, std::memory_order_relaxed);
952 34 continue;
953 34 }
954
955
2/2
✓ Branch 0 taken 2787 times.
✓ Branch 1 taken 83982 times.
86769 if (events[i].data.ptr == &timer_fd_)
956 {
957 std::uint64_t expirations;
958
1/1
✓ Branch 1 taken 2787 times.
2787 [[maybe_unused]] auto r = ::read(timer_fd_, &expirations, sizeof(expirations));
959 2787 check_timers = true;
960 2787 continue;
961 2787 }
962
963 // Deferred I/O: just set ready events and enqueue descriptor
964 // No per-descriptor mutex locking in reactor hot path!
965 83982 auto* desc = static_cast<descriptor_state*>(events[i].data.ptr);
966 83982 desc->add_ready_events(events[i].events);
967
968 // Only enqueue if not already enqueued
969 83982 bool expected = false;
970
1/2
✓ Branch 1 taken 83982 times.
✗ Branch 2 not taken.
83982 if (desc->is_enqueued_.compare_exchange_strong(expected, true,
971 std::memory_order_release, std::memory_order_relaxed))
972 {
973 83982 local_ops.push(desc);
974 }
975 }
976
977 // Process timers only when timerfd fires
978
2/2
✓ Branch 0 taken 2787 times.
✓ Branch 1 taken 82453 times.
85240 if (check_timers)
979 {
980
1/1
✓ Branch 1 taken 2787 times.
2787 timer_svc_->process_expired();
981
1/1
✓ Branch 1 taken 2787 times.
2787 update_timerfd();
982 }
983
984
1/1
✓ Branch 1 taken 85240 times.
85240 lock.lock();
985
986
2/2
✓ Branch 1 taken 46839 times.
✓ Branch 2 taken 38401 times.
85240 if (!local_ops.empty())
987 46839 completed_ops_.splice(local_ops);
988 85240 }
989
990 std::size_t
991 249044 epoll_scheduler::
992 do_one(std::unique_lock<std::mutex>& lock, long timeout_us, scheduler_context* ctx)
993 {
994 for (;;)
995 {
996
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 334284 times.
334286 if (stopped_)
997 2 return 0;
998
999 334284 scheduler_op* op = completed_ops_.pop();
1000
1001 // Handle reactor sentinel - time to poll for I/O
1002
2/2
✓ Branch 0 taken 85383 times.
✓ Branch 1 taken 248901 times.
334284 if (op == &task_op_)
1003 {
1004 85383 bool more_handlers = !completed_ops_.empty();
1005
1006 // Nothing to run the reactor for: no pending work to wait on,
1007 // or caller requested a non-blocking poll
1008
4/4
✓ Branch 0 taken 5510 times.
✓ Branch 1 taken 79873 times.
✓ Branch 2 taken 143 times.
✓ Branch 3 taken 85240 times.
90893 if (!more_handlers &&
1009
3/4
✓ Branch 1 taken 5367 times.
✓ Branch 2 taken 143 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 5367 times.
11020 (outstanding_work_.load(std::memory_order_acquire) == 0 ||
1010 timeout_us == 0))
1011 {
1012 143 completed_ops_.push(&task_op_);
1013 143 return 0;
1014 }
1015
1016
3/4
✓ Branch 0 taken 5367 times.
✓ Branch 1 taken 79873 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 5367 times.
85240 task_interrupted_ = more_handlers || timeout_us == 0;
1017 85240 task_running_.store(true, std::memory_order_relaxed);
1018
1019
2/2
✓ Branch 0 taken 79873 times.
✓ Branch 1 taken 5367 times.
85240 if (more_handlers)
1020 79873 unlock_and_signal_one(lock);
1021
1022 85240 run_task(lock, ctx);
1023
1024 85240 task_running_.store(false, std::memory_order_relaxed);
1025 85240 completed_ops_.push(&task_op_);
1026 85240 continue;
1027 85240 }
1028
1029 // Handle operation
1030
2/2
✓ Branch 0 taken 248894 times.
✓ Branch 1 taken 7 times.
248901 if (op != nullptr)
1031 {
1032
1/2
✓ Branch 1 taken 248894 times.
✗ Branch 2 not taken.
248894 if (!completed_ops_.empty())
1033
1/1
✓ Branch 1 taken 248894 times.
248894 unlock_and_signal_one(lock);
1034 else
1035 lock.unlock();
1036
1037 248894 work_cleanup on_exit{this, &lock, ctx};
1038
1039
1/1
✓ Branch 1 taken 248894 times.
248894 (*op)();
1040 248894 return 1;
1041 248894 }
1042
1043 // No pending work to wait on, or caller requested non-blocking poll
1044
5/6
✓ Branch 1 taken 2 times.
✓ Branch 2 taken 5 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 2 times.
✓ Branch 5 taken 5 times.
✓ Branch 6 taken 2 times.
14 if (outstanding_work_.load(std::memory_order_acquire) == 0 ||
1045 timeout_us == 0)
1046 5 return 0;
1047
1048 2 clear_signal();
1049
1/2
✓ Branch 0 taken 2 times.
✗ Branch 1 not taken.
2 if (timeout_us < 0)
1050 2 wait_for_signal(lock);
1051 else
1052 wait_for_signal_for(lock, timeout_us);
1053 85242 }
1054 }
1055
1056 } // namespace boost::corosio::detail
1057
1058 #endif
1059