libs/corosio/src/corosio/src/detail/epoll/scheduler.cpp

80.4% Lines (394/490) 89.1% Functions (41/46) 68.2% Branches (206/302)
libs/corosio/src/corosio/src/detail/epoll/scheduler.cpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #include <boost/corosio/detail/platform.hpp>
11
12 #if BOOST_COROSIO_HAS_EPOLL
13
14 #include "src/detail/epoll/scheduler.hpp"
15 #include "src/detail/epoll/op.hpp"
16 #include "src/detail/make_err.hpp"
17 #include "src/detail/posix/resolver_service.hpp"
18 #include "src/detail/posix/signals.hpp"
19
20 #include <boost/corosio/detail/except.hpp>
21 #include <boost/corosio/detail/thread_local_ptr.hpp>
22
23 #include <atomic>
24 #include <chrono>
25 #include <limits>
26 #include <utility>
27
28 #include <errno.h>
29 #include <fcntl.h>
30 #include <sys/epoll.h>
31 #include <sys/eventfd.h>
32 #include <sys/socket.h>
33 #include <sys/timerfd.h>
34 #include <unistd.h>
35
36 /*
37 epoll Scheduler - Single Reactor Model
38 ======================================
39
40 This scheduler uses a thread coordination strategy to provide handler
41 parallelism and avoid the thundering herd problem.
42 Instead of all threads blocking on epoll_wait(), one thread becomes the
43 "reactor" while others wait on a condition variable for handler work.
44
45 Thread Model
46 ------------
47 - ONE thread runs epoll_wait() at a time (the reactor thread)
48 - OTHER threads wait on cond_ (condition variable) for handlers
49 - When work is posted, exactly one waiting thread wakes via notify_one()
50 - This matches Windows IOCP semantics where N posted items wake N threads
51
52 Event Loop Structure (do_one)
53 -----------------------------
54 1. Lock mutex, try to pop handler from queue
55 2. If got handler: execute it (unlocked), return
56 3. If queue empty and no reactor running: become reactor
57 - Run epoll_wait (unlocked), queue I/O completions, loop back
58 4. If queue empty and reactor running: wait on condvar for work
59
60 The task_running_ flag ensures only one thread owns epoll_wait().
61 After the reactor queues I/O completions, it loops back to try getting
62 a handler, giving priority to handler execution over more I/O polling.
63
64 Signaling State (state_)
65 ------------------------
66 The state_ variable encodes two pieces of information:
67 - Bit 0: signaled flag (1 = signaled, persists until cleared)
68 - Upper bits: waiter count (each waiter adds 2 before blocking)
69
70 This allows efficient coordination:
71 - Signalers only call notify when waiters exist (state_ > 1)
72 - Waiters check if already signaled before blocking (fast-path)
73
74 Wake Coordination (wake_one_thread_and_unlock)
75 ----------------------------------------------
76 When posting work:
77 - If waiters exist (state_ > 1): signal and notify_one()
78 - Else if reactor running: interrupt via eventfd write
79 - Else: no-op (thread will find work when it checks queue)
80
81 This avoids waking threads unnecessarily. With cascading wakes,
82 each handler execution wakes at most one additional thread if
83 more work exists in the queue.
84
85 Work Counting
86 -------------
87 outstanding_work_ tracks pending operations. When it hits zero, run()
88 returns. Each operation increments on start, decrements on completion.
89
90 Timer Integration
91 -----------------
92 Timers are handled by timer_service. The reactor adjusts epoll_wait
93 timeout to wake for the nearest timer expiry. When a new timer is
94 scheduled earlier than current, timer_service calls interrupt_reactor()
95 to re-evaluate the timeout.
96 */
97
98 namespace boost::corosio::detail {
99
100 struct scheduler_context
101 {
102 epoll_scheduler const* key;
103 scheduler_context* next;
104 op_queue private_queue;
105 long private_outstanding_work;
106
107 161 scheduler_context(epoll_scheduler const* k, scheduler_context* n)
108 161 : key(k)
109 161 , next(n)
110 161 , private_outstanding_work(0)
111 {
112 161 }
113 };
114
115 namespace {
116
117 corosio::detail::thread_local_ptr<scheduler_context> context_stack;
118
119 struct thread_context_guard
120 {
121 scheduler_context frame_;
122
123 161 explicit thread_context_guard(
124 epoll_scheduler const* ctx) noexcept
125 161 : frame_(ctx, context_stack.get())
126 {
127 161 context_stack.set(&frame_);
128 161 }
129
130 161 ~thread_context_guard() noexcept
131 {
132
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 161 times.
161 if (!frame_.private_queue.empty())
133 frame_.key->drain_thread_queue(frame_.private_queue, frame_.private_outstanding_work);
134 161 context_stack.set(frame_.next);
135 161 }
136 };
137
138 scheduler_context*
139 161668 find_context(epoll_scheduler const* self) noexcept
140 {
141
2/2
✓ Branch 1 taken 160019 times.
✓ Branch 2 taken 1649 times.
161668 for (auto* c = context_stack.get(); c != nullptr; c = c->next)
142
1/2
✓ Branch 0 taken 160019 times.
✗ Branch 1 not taken.
160019 if (c->key == self)
143 160019 return c;
144 1649 return nullptr;
145 }
146
147 } // namespace
148
149 void
150 56003 descriptor_state::
151 operator()()
152 {
153 56003 is_enqueued_.store(false, std::memory_order_relaxed);
154
155 // Take ownership of impl ref set by close_socket() to prevent
156 // the owning impl from being freed while we're executing
157 56003 auto prevent_impl_destruction = std::move(impl_ref_);
158
159 56003 std::uint32_t ev = ready_events_.exchange(0, std::memory_order_acquire);
160
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 56003 times.
56003 if (ev == 0)
161 {
162 scheduler_->compensating_work_started();
163 return;
164 }
165
166 56003 op_queue local_ops;
167
168 56003 int err = 0;
169
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 56001 times.
56003 if (ev & EPOLLERR)
170 {
171 2 socklen_t len = sizeof(err);
172
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2 times.
2 if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
173 err = errno;
174
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 1 time.
2 if (err == 0)
175 1 err = EIO;
176 }
177
178 56003 epoll_op* rd = nullptr;
179 56003 epoll_op* wr = nullptr;
180 56003 epoll_op* cn = nullptr;
181 {
182
1/1
✓ Branch 1 taken 56003 times.
56003 std::lock_guard lock(mutex);
183
2/2
✓ Branch 0 taken 16465 times.
✓ Branch 1 taken 39538 times.
56003 if (ev & EPOLLIN)
184 {
185 16465 rd = std::exchange(read_op, nullptr);
186
2/2
✓ Branch 0 taken 14343 times.
✓ Branch 1 taken 2122 times.
16465 if (!rd)
187 14343 read_ready = true;
188 }
189
2/2
✓ Branch 0 taken 53932 times.
✓ Branch 1 taken 2071 times.
56003 if (ev & EPOLLOUT)
190 {
191 53932 cn = std::exchange(connect_op, nullptr);
192 53932 wr = std::exchange(write_op, nullptr);
193
3/4
✓ Branch 0 taken 51857 times.
✓ Branch 1 taken 2075 times.
✓ Branch 2 taken 51857 times.
✗ Branch 3 not taken.
53932 if (!cn && !wr)
194 51857 write_ready = true;
195 }
196
3/4
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 56001 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 2 times.
56003 if (err && !(ev & (EPOLLIN | EPOLLOUT)))
197 {
198 rd = std::exchange(read_op, nullptr);
199 wr = std::exchange(write_op, nullptr);
200 cn = std::exchange(connect_op, nullptr);
201 }
202 56003 }
203
204 // Non-null after I/O means EAGAIN; re-register under lock below
205
2/2
✓ Branch 0 taken 2122 times.
✓ Branch 1 taken 53881 times.
56003 if (rd)
206 {
207
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2122 times.
2122 if (err)
208 rd->complete(err, 0);
209 else
210 2122 rd->perform_io();
211
212
2/4
✓ Branch 0 taken 2122 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 2122 times.
2122 if (rd->errn == EAGAIN || rd->errn == EWOULDBLOCK)
213 {
214 rd->errn = 0;
215 }
216 else
217 {
218 2122 local_ops.push(rd);
219 2122 rd = nullptr;
220 }
221 }
222
223
2/2
✓ Branch 0 taken 2075 times.
✓ Branch 1 taken 53928 times.
56003 if (cn)
224 {
225
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 2074 times.
2075 if (err)
226 1 cn->complete(err, 0);
227 else
228 2074 cn->perform_io();
229 2075 local_ops.push(cn);
230 2075 cn = nullptr;
231 }
232
233
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 56003 times.
56003 if (wr)
234 {
235 if (err)
236 wr->complete(err, 0);
237 else
238 wr->perform_io();
239
240 if (wr->errn == EAGAIN || wr->errn == EWOULDBLOCK)
241 {
242 wr->errn = 0;
243 }
244 else
245 {
246 local_ops.push(wr);
247 wr = nullptr;
248 }
249 }
250
251
2/4
✓ Branch 0 taken 56003 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 56003 times.
56003 if (rd || wr)
252 {
253 std::lock_guard lock(mutex);
254 if (rd)
255 read_op = rd;
256 if (wr)
257 write_op = wr;
258 }
259
260 // Execute first handler inline — the scheduler's work_cleanup
261 // accounts for this as the "consumed" work item
262 56003 scheduler_op* first = local_ops.pop();
263
2/2
✓ Branch 0 taken 4197 times.
✓ Branch 1 taken 51806 times.
56003 if (first)
264 {
265
1/1
✓ Branch 1 taken 4197 times.
4197 scheduler_->post_deferred_completions(local_ops);
266
1/1
✓ Branch 1 taken 4197 times.
4197 (*first)();
267 }
268 else
269 {
270 51806 scheduler_->compensating_work_started();
271 }
272 56003 }
273
274 189 epoll_scheduler::
275 epoll_scheduler(
276 capy::execution_context& ctx,
277 189 int)
278 189 : epoll_fd_(-1)
279 189 , event_fd_(-1)
280 189 , timer_fd_(-1)
281 189 , outstanding_work_(0)
282 189 , stopped_(false)
283 189 , shutdown_(false)
284 189 , task_running_{false}
285 189 , task_interrupted_(false)
286 378 , state_(0)
287 {
288 189 epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC);
289
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 189 times.
189 if (epoll_fd_ < 0)
290 detail::throw_system_error(make_err(errno), "epoll_create1");
291
292 189 event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
293
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 189 times.
189 if (event_fd_ < 0)
294 {
295 int errn = errno;
296 ::close(epoll_fd_);
297 detail::throw_system_error(make_err(errn), "eventfd");
298 }
299
300 189 timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
301
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 189 times.
189 if (timer_fd_ < 0)
302 {
303 int errn = errno;
304 ::close(event_fd_);
305 ::close(epoll_fd_);
306 detail::throw_system_error(make_err(errn), "timerfd_create");
307 }
308
309 189 epoll_event ev{};
310 189 ev.events = EPOLLIN | EPOLLET;
311 189 ev.data.ptr = nullptr;
312
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 189 times.
189 if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0)
313 {
314 int errn = errno;
315 ::close(timer_fd_);
316 ::close(event_fd_);
317 ::close(epoll_fd_);
318 detail::throw_system_error(make_err(errn), "epoll_ctl");
319 }
320
321 189 epoll_event timer_ev{};
322 189 timer_ev.events = EPOLLIN | EPOLLERR;
323 189 timer_ev.data.ptr = &timer_fd_;
324
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 189 times.
189 if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0)
325 {
326 int errn = errno;
327 ::close(timer_fd_);
328 ::close(event_fd_);
329 ::close(epoll_fd_);
330 detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)");
331 }
332
333
1/1
✓ Branch 1 taken 189 times.
189 timer_svc_ = &get_timer_service(ctx, *this);
334
1/1
✓ Branch 3 taken 189 times.
189 timer_svc_->set_on_earliest_changed(
335 timer_service::callback(
336 this,
337 [](void* p) {
338 2257 auto* self = static_cast<epoll_scheduler*>(p);
339 2257 self->timerfd_stale_.store(true, std::memory_order_release);
340
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2257 times.
2257 if (self->task_running_.load(std::memory_order_relaxed))
341 self->interrupt_reactor();
342 2257 }));
343
344 // Initialize resolver service
345
1/1
✓ Branch 1 taken 189 times.
189 get_resolver_service(ctx, *this);
346
347 // Initialize signal service
348
1/1
✓ Branch 1 taken 189 times.
189 get_signal_service(ctx, *this);
349
350 // Push task sentinel to interleave reactor runs with handler execution
351 189 completed_ops_.push(&task_op_);
352 189 }
353
354 378 epoll_scheduler::
355 189 ~epoll_scheduler()
356 {
357
1/2
✓ Branch 0 taken 189 times.
✗ Branch 1 not taken.
189 if (timer_fd_ >= 0)
358 189 ::close(timer_fd_);
359
1/2
✓ Branch 0 taken 189 times.
✗ Branch 1 not taken.
189 if (event_fd_ >= 0)
360 189 ::close(event_fd_);
361
1/2
✓ Branch 0 taken 189 times.
✗ Branch 1 not taken.
189 if (epoll_fd_ >= 0)
362 189 ::close(epoll_fd_);
363 378 }
364
365 void
366 189 epoll_scheduler::
367 shutdown()
368 {
369 {
370
1/1
✓ Branch 1 taken 189 times.
189 std::unique_lock lock(mutex_);
371 189 shutdown_ = true;
372
373
2/2
✓ Branch 1 taken 189 times.
✓ Branch 2 taken 189 times.
378 while (auto* h = completed_ops_.pop())
374 {
375
1/2
✓ Branch 0 taken 189 times.
✗ Branch 1 not taken.
189 if (h == &task_op_)
376 189 continue;
377 lock.unlock();
378 h->destroy();
379 lock.lock();
380 189 }
381
382 189 signal_all(lock);
383 189 }
384
385 189 outstanding_work_.store(0, std::memory_order_release);
386
387
1/2
✓ Branch 0 taken 189 times.
✗ Branch 1 not taken.
189 if (event_fd_ >= 0)
388 189 interrupt_reactor();
389 189 }
390
391 void
392 3933 epoll_scheduler::
393 post(capy::coro h) const
394 {
395 struct post_handler final
396 : scheduler_op
397 {
398 capy::coro h_;
399
400 explicit
401 3933 post_handler(capy::coro h)
402 3933 : h_(h)
403 {
404 3933 }
405
406 7866 ~post_handler() = default;
407
408 3933 void operator()() override
409 {
410 3933 auto h = h_;
411
1/2
✓ Branch 0 taken 3933 times.
✗ Branch 1 not taken.
3933 delete this;
412 std::atomic_thread_fence(std::memory_order_acquire);
413
1/1
✓ Branch 1 taken 3933 times.
3933 h.resume();
414 3933 }
415
416 void destroy() override
417 {
418 delete this;
419 }
420 };
421
422
1/1
✓ Branch 1 taken 3933 times.
3933 auto ph = std::make_unique<post_handler>(h);
423
424 // Fast path: same thread posts to private queue
425 // Only count locally; work_cleanup batches to global counter
426
2/2
✓ Branch 1 taken 2310 times.
✓ Branch 2 taken 1623 times.
3933 if (auto* ctx = find_context(this))
427 {
428 2310 ++ctx->private_outstanding_work;
429 2310 ctx->private_queue.push(ph.release());
430 2310 return;
431 }
432
433 // Slow path: cross-thread post requires mutex
434 1623 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
435
436
1/1
✓ Branch 1 taken 1623 times.
1623 std::unique_lock lock(mutex_);
437 1623 completed_ops_.push(ph.release());
438
1/1
✓ Branch 1 taken 1623 times.
1623 wake_one_thread_and_unlock(lock);
439 3933 }
440
441 void
442 105929 epoll_scheduler::
443 post(scheduler_op* h) const
444 {
445 // Fast path: same thread posts to private queue
446 // Only count locally; work_cleanup batches to global counter
447
2/2
✓ Branch 1 taken 105903 times.
✓ Branch 2 taken 26 times.
105929 if (auto* ctx = find_context(this))
448 {
449 105903 ++ctx->private_outstanding_work;
450 105903 ctx->private_queue.push(h);
451 105903 return;
452 }
453
454 // Slow path: cross-thread post requires mutex
455 26 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
456
457
1/1
✓ Branch 1 taken 26 times.
26 std::unique_lock lock(mutex_);
458 26 completed_ops_.push(h);
459
1/1
✓ Branch 1 taken 26 times.
26 wake_one_thread_and_unlock(lock);
460 26 }
461
462 void
463 2744 epoll_scheduler::
464 on_work_started() noexcept
465 {
466 2744 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
467 2744 }
468
469 void
470 2712 epoll_scheduler::
471 on_work_finished() noexcept
472 {
473
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2712 times.
5424 if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
474 stop();
475 2712 }
476
477 bool
478 2541 epoll_scheduler::
479 running_in_this_thread() const noexcept
480 {
481
2/2
✓ Branch 1 taken 2331 times.
✓ Branch 2 taken 210 times.
2541 for (auto* c = context_stack.get(); c != nullptr; c = c->next)
482
1/2
✓ Branch 0 taken 2331 times.
✗ Branch 1 not taken.
2331 if (c->key == this)
483 2331 return true;
484 210 return false;
485 }
486
487 void
488 37 epoll_scheduler::
489 stop()
490 {
491
1/1
✓ Branch 1 taken 37 times.
37 std::unique_lock lock(mutex_);
492
2/2
✓ Branch 0 taken 19 times.
✓ Branch 1 taken 18 times.
37 if (!stopped_)
493 {
494 19 stopped_ = true;
495 19 signal_all(lock);
496
1/1
✓ Branch 1 taken 19 times.
19 interrupt_reactor();
497 }
498 37 }
499
500 bool
501 16 epoll_scheduler::
502 stopped() const noexcept
503 {
504 16 std::unique_lock lock(mutex_);
505 32 return stopped_;
506 16 }
507
508 void
509 49 epoll_scheduler::
510 restart()
511 {
512
1/1
✓ Branch 1 taken 49 times.
49 std::unique_lock lock(mutex_);
513 49 stopped_ = false;
514 49 }
515
516 std::size_t
517 175 epoll_scheduler::
518 run()
519 {
520
2/2
✓ Branch 1 taken 28 times.
✓ Branch 2 taken 147 times.
350 if (outstanding_work_.load(std::memory_order_acquire) == 0)
521 {
522
1/1
✓ Branch 1 taken 28 times.
28 stop();
523 28 return 0;
524 }
525
526 147 thread_context_guard ctx(this);
527
1/1
✓ Branch 1 taken 147 times.
147 std::unique_lock lock(mutex_);
528
529 147 std::size_t n = 0;
530 for (;;)
531 {
532
3/3
✓ Branch 1 taken 165997 times.
✓ Branch 3 taken 147 times.
✓ Branch 4 taken 165850 times.
165997 if (!do_one(lock, -1, &ctx.frame_))
533 147 break;
534
1/2
✓ Branch 1 taken 165850 times.
✗ Branch 2 not taken.
165850 if (n != (std::numeric_limits<std::size_t>::max)())
535 165850 ++n;
536
2/2
✓ Branch 1 taken 59945 times.
✓ Branch 2 taken 105905 times.
165850 if (!lock.owns_lock())
537
1/1
✓ Branch 1 taken 59945 times.
59945 lock.lock();
538 }
539 147 return n;
540 147 }
541
542 std::size_t
543 2 epoll_scheduler::
544 run_one()
545 {
546
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2 times.
4 if (outstanding_work_.load(std::memory_order_acquire) == 0)
547 {
548 stop();
549 return 0;
550 }
551
552 2 thread_context_guard ctx(this);
553
1/1
✓ Branch 1 taken 2 times.
2 std::unique_lock lock(mutex_);
554
1/1
✓ Branch 1 taken 2 times.
2 return do_one(lock, -1, &ctx.frame_);
555 2 }
556
557 std::size_t
558 14 epoll_scheduler::
559 wait_one(long usec)
560 {
561
2/2
✓ Branch 1 taken 5 times.
✓ Branch 2 taken 9 times.
28 if (outstanding_work_.load(std::memory_order_acquire) == 0)
562 {
563
1/1
✓ Branch 1 taken 5 times.
5 stop();
564 5 return 0;
565 }
566
567 9 thread_context_guard ctx(this);
568
1/1
✓ Branch 1 taken 9 times.
9 std::unique_lock lock(mutex_);
569
1/1
✓ Branch 1 taken 9 times.
9 return do_one(lock, usec, &ctx.frame_);
570 9 }
571
572 std::size_t
573 2 epoll_scheduler::
574 poll()
575 {
576
2/2
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 1 time.
4 if (outstanding_work_.load(std::memory_order_acquire) == 0)
577 {
578
1/1
✓ Branch 1 taken 1 time.
1 stop();
579 1 return 0;
580 }
581
582 1 thread_context_guard ctx(this);
583
1/1
✓ Branch 1 taken 1 time.
1 std::unique_lock lock(mutex_);
584
585 1 std::size_t n = 0;
586 for (;;)
587 {
588
3/3
✓ Branch 1 taken 3 times.
✓ Branch 3 taken 1 time.
✓ Branch 4 taken 2 times.
3 if (!do_one(lock, 0, &ctx.frame_))
589 1 break;
590
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 if (n != (std::numeric_limits<std::size_t>::max)())
591 2 ++n;
592
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 if (!lock.owns_lock())
593
1/1
✓ Branch 1 taken 2 times.
2 lock.lock();
594 }
595 1 return n;
596 1 }
597
598 std::size_t
599 4 epoll_scheduler::
600 poll_one()
601 {
602
2/2
✓ Branch 1 taken 2 times.
✓ Branch 2 taken 2 times.
8 if (outstanding_work_.load(std::memory_order_acquire) == 0)
603 {
604
1/1
✓ Branch 1 taken 2 times.
2 stop();
605 2 return 0;
606 }
607
608 2 thread_context_guard ctx(this);
609
1/1
✓ Branch 1 taken 2 times.
2 std::unique_lock lock(mutex_);
610
1/1
✓ Branch 1 taken 2 times.
2 return do_one(lock, 0, &ctx.frame_);
611 2 }
612
613 void
614 4221 epoll_scheduler::
615 register_descriptor(int fd, descriptor_state* desc) const
616 {
617 4221 epoll_event ev{};
618 4221 ev.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
619 4221 ev.data.ptr = desc;
620
621
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 4221 times.
4221 if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0)
622 detail::throw_system_error(make_err(errno), "epoll_ctl (register)");
623
624 4221 desc->registered_events = ev.events;
625 4221 desc->fd = fd;
626 4221 desc->scheduler_ = this;
627
628
1/1
✓ Branch 1 taken 4221 times.
4221 std::lock_guard lock(desc->mutex);
629 4221 desc->read_ready = false;
630 4221 desc->write_ready = false;
631 4221 }
632
633 void
634 4221 epoll_scheduler::
635 deregister_descriptor(int fd) const
636 {
637 4221 ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
638 4221 }
639
640 void
641 4316 epoll_scheduler::
642 work_started() const noexcept
643 {
644 4316 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
645 4316 }
646
647 void
648 8302 epoll_scheduler::
649 work_finished() const noexcept
650 {
651
2/2
✓ Branch 0 taken 148 times.
✓ Branch 1 taken 8154 times.
16604 if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
652 {
653 // Last work item completed - wake all threads so they can exit.
654 // signal_all() wakes threads waiting on the condvar.
655 // interrupt_reactor() wakes the reactor thread blocked in epoll_wait().
656 // Both are needed because they target different blocking mechanisms.
657 148 std::unique_lock lock(mutex_);
658 148 signal_all(lock);
659
5/6
✓ Branch 1 taken 3 times.
✓ Branch 2 taken 145 times.
✓ Branch 3 taken 3 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 3 times.
✓ Branch 6 taken 145 times.
148 if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_)
660 {
661 3 task_interrupted_ = true;
662 3 lock.unlock();
663 3 interrupt_reactor();
664 }
665 148 }
666 8302 }
667
668 void
669 51806 epoll_scheduler::
670 compensating_work_started() const noexcept
671 {
672 51806 auto* ctx = find_context(this);
673
1/2
✓ Branch 0 taken 51806 times.
✗ Branch 1 not taken.
51806 if (ctx)
674 51806 ++ctx->private_outstanding_work;
675 51806 }
676
677 void
678 epoll_scheduler::
679 drain_thread_queue(op_queue& queue, long count) const
680 {
681 // Note: outstanding_work_ was already incremented when posting
682 std::unique_lock lock(mutex_);
683 completed_ops_.splice(queue);
684 if (count > 0)
685 maybe_unlock_and_signal_one(lock);
686 }
687
688 void
689 4197 epoll_scheduler::
690 post_deferred_completions(op_queue& ops) const
691 {
692
1/2
✓ Branch 1 taken 4197 times.
✗ Branch 2 not taken.
4197 if (ops.empty())
693 4197 return;
694
695 // Fast path: if on scheduler thread, use private queue
696 if (auto* ctx = find_context(this))
697 {
698 ctx->private_queue.splice(ops);
699 return;
700 }
701
702 // Slow path: add to global queue and wake a thread
703 std::unique_lock lock(mutex_);
704 completed_ops_.splice(ops);
705 wake_one_thread_and_unlock(lock);
706 }
707
708 void
709 237 epoll_scheduler::
710 interrupt_reactor() const
711 {
712 // Only write if not already armed to avoid redundant writes
713 237 bool expected = false;
714
2/2
✓ Branch 1 taken 224 times.
✓ Branch 2 taken 13 times.
237 if (eventfd_armed_.compare_exchange_strong(expected, true,
715 std::memory_order_release, std::memory_order_relaxed))
716 {
717 224 std::uint64_t val = 1;
718
1/1
✓ Branch 1 taken 224 times.
224 [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
719 }
720 237 }
721
722 void
723 356 epoll_scheduler::
724 signal_all(std::unique_lock<std::mutex>&) const
725 {
726 356 state_ |= 1;
727 356 cond_.notify_all();
728 356 }
729
730 bool
731 1649 epoll_scheduler::
732 maybe_unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const
733 {
734 1649 state_ |= 1;
735
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1649 times.
1649 if (state_ > 1)
736 {
737 lock.unlock();
738 cond_.notify_one();
739 return true;
740 }
741 1649 return false;
742 }
743
744 void
745 206476 epoll_scheduler::
746 unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const
747 {
748 206476 state_ |= 1;
749 206476 bool have_waiters = state_ > 1;
750 206476 lock.unlock();
751
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 206476 times.
206476 if (have_waiters)
752 cond_.notify_one();
753 206476 }
754
755 void
756 epoll_scheduler::
757 clear_signal() const
758 {
759 state_ &= ~std::size_t(1);
760 }
761
762 void
763 epoll_scheduler::
764 wait_for_signal(std::unique_lock<std::mutex>& lock) const
765 {
766 while ((state_ & 1) == 0)
767 {
768 state_ += 2;
769 cond_.wait(lock);
770 state_ -= 2;
771 }
772 }
773
774 void
775 epoll_scheduler::
776 wait_for_signal_for(
777 std::unique_lock<std::mutex>& lock,
778 long timeout_us) const
779 {
780 if ((state_ & 1) == 0)
781 {
782 state_ += 2;
783 cond_.wait_for(lock, std::chrono::microseconds(timeout_us));
784 state_ -= 2;
785 }
786 }
787
788 void
789 1649 epoll_scheduler::
790 wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const
791 {
792
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1649 times.
1649 if (maybe_unlock_and_signal_one(lock))
793 return;
794
795
5/6
✓ Branch 1 taken 26 times.
✓ Branch 2 taken 1623 times.
✓ Branch 3 taken 26 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 26 times.
✓ Branch 6 taken 1623 times.
1649 if (task_running_.load(std::memory_order_relaxed) && !task_interrupted_)
796 {
797 26 task_interrupted_ = true;
798 26 lock.unlock();
799 26 interrupt_reactor();
800 }
801 else
802 {
803 1623 lock.unlock();
804 }
805 }
806
807 /** RAII guard for handler execution work accounting.
808
809 Handler consumes 1 work item, may produce N new items via fast-path posts.
810 Net change = N - 1:
811 - If N > 1: add (N-1) to global (more work produced than consumed)
812 - If N == 1: net zero, do nothing
813 - If N < 1: call work_finished() (work consumed, may trigger stop)
814
815 Also drains private queue to global for other threads to process.
816 */
817 struct work_cleanup
818 {
819 epoll_scheduler const* scheduler;
820 std::unique_lock<std::mutex>* lock;
821 scheduler_context* ctx;
822
823 165865 ~work_cleanup()
824 {
825
1/2
✓ Branch 0 taken 165865 times.
✗ Branch 1 not taken.
165865 if (ctx)
826 {
827 165865 long produced = ctx->private_outstanding_work;
828
2/2
✓ Branch 0 taken 46 times.
✓ Branch 1 taken 165819 times.
165865 if (produced > 1)
829 46 scheduler->outstanding_work_.fetch_add(produced - 1, std::memory_order_relaxed);
830
2/2
✓ Branch 0 taken 8151 times.
✓ Branch 1 taken 157668 times.
165819 else if (produced < 1)
831 8151 scheduler->work_finished();
832 // produced == 1: net zero, handler consumed what it produced
833 165865 ctx->private_outstanding_work = 0;
834
835
2/2
✓ Branch 1 taken 105908 times.
✓ Branch 2 taken 59957 times.
165865 if (!ctx->private_queue.empty())
836 {
837 105908 lock->lock();
838 105908 scheduler->completed_ops_.splice(ctx->private_queue);
839 }
840 }
841 else
842 {
843 // No thread context - slow-path op was already counted globally
844 scheduler->work_finished();
845 }
846 165865 }
847 };
848
849 /** RAII guard for reactor work accounting.
850
851 Reactor only produces work via timer/signal callbacks posting handlers.
852 Unlike handler execution which consumes 1, the reactor consumes nothing.
853 All produced work must be flushed to global counter.
854 */
855 struct task_cleanup
856 {
857 epoll_scheduler const* scheduler;
858 std::unique_lock<std::mutex>* lock;
859 scheduler_context* ctx;
860
861 44967 ~task_cleanup()
862 44967 {
863
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 44967 times.
44967 if (!ctx)
864 return;
865
866
2/2
✓ Branch 0 taken 2257 times.
✓ Branch 1 taken 42710 times.
44967 if (ctx->private_outstanding_work > 0)
867 {
868 2257 scheduler->outstanding_work_.fetch_add(
869 2257 ctx->private_outstanding_work, std::memory_order_relaxed);
870 2257 ctx->private_outstanding_work = 0;
871 }
872
873
2/2
✓ Branch 1 taken 2257 times.
✓ Branch 2 taken 42710 times.
44967 if (!ctx->private_queue.empty())
874 {
875
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2257 times.
2257 if (!lock->owns_lock())
876 lock->lock();
877 2257 scheduler->completed_ops_.splice(ctx->private_queue);
878 }
879 44967 }
880 };
881
882 void
883 4510 epoll_scheduler::
884 update_timerfd() const
885 {
886 4510 auto nearest = timer_svc_->nearest_expiry();
887
888 4510 itimerspec ts{};
889 4510 int flags = 0;
890
891
3/3
✓ Branch 2 taken 4510 times.
✓ Branch 4 taken 4469 times.
✓ Branch 5 taken 41 times.
4510 if (nearest == timer_service::time_point::max())
892 {
893 // No timers - disarm by setting to 0 (relative)
894 }
895 else
896 {
897 4469 auto now = std::chrono::steady_clock::now();
898
3/3
✓ Branch 1 taken 4469 times.
✓ Branch 4 taken 15 times.
✓ Branch 5 taken 4454 times.
4469 if (nearest <= now)
899 {
900 // Use 1ns instead of 0 - zero disarms the timerfd
901 15 ts.it_value.tv_nsec = 1;
902 }
903 else
904 {
905 4454 auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
906
1/1
✓ Branch 1 taken 4454 times.
8908 nearest - now).count();
907 4454 ts.it_value.tv_sec = nsec / 1000000000;
908 4454 ts.it_value.tv_nsec = nsec % 1000000000;
909 // Ensure non-zero to avoid disarming if duration rounds to 0
910
3/4
✓ Branch 0 taken 4450 times.
✓ Branch 1 taken 4 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 4450 times.
4454 if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0)
911 ts.it_value.tv_nsec = 1;
912 }
913 }
914
915
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 4510 times.
4510 if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0)
916 detail::throw_system_error(make_err(errno), "timerfd_settime");
917 4510 }
918
919 void
920 44967 epoll_scheduler::
921 run_task(std::unique_lock<std::mutex>& lock, scheduler_context* ctx)
922 {
923
2/2
✓ Branch 0 taken 40611 times.
✓ Branch 1 taken 4356 times.
44967 int timeout_ms = task_interrupted_ ? 0 : -1;
924
925
2/2
✓ Branch 1 taken 4356 times.
✓ Branch 2 taken 40611 times.
44967 if (lock.owns_lock())
926
1/1
✓ Branch 1 taken 4356 times.
4356 lock.unlock();
927
928 44967 task_cleanup on_exit{this, &lock, ctx};
929
930 // Flush deferred timerfd programming before blocking
931
2/2
✓ Branch 1 taken 2253 times.
✓ Branch 2 taken 42714 times.
44967 if (timerfd_stale_.exchange(false, std::memory_order_acquire))
932
1/1
✓ Branch 1 taken 2253 times.
2253 update_timerfd();
933
934 // Event loop runs without mutex held
935 epoll_event events[128];
936
1/1
✓ Branch 1 taken 44967 times.
44967 int nfds = ::epoll_wait(epoll_fd_, events, 128, timeout_ms);
937
938
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 44967 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
44967 if (nfds < 0 && errno != EINTR)
939 detail::throw_system_error(make_err(errno), "epoll_wait");
940
941 44967 bool check_timers = false;
942 44967 op_queue local_ops;
943
944 // Process events without holding the mutex
945
2/2
✓ Branch 0 taken 58295 times.
✓ Branch 1 taken 44967 times.
103262 for (int i = 0; i < nfds; ++i)
946 {
947
2/2
✓ Branch 0 taken 35 times.
✓ Branch 1 taken 58260 times.
58295 if (events[i].data.ptr == nullptr)
948 {
949 std::uint64_t val;
950
1/1
✓ Branch 1 taken 35 times.
35 [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
951 35 eventfd_armed_.store(false, std::memory_order_relaxed);
952 35 continue;
953 35 }
954
955
2/2
✓ Branch 0 taken 2257 times.
✓ Branch 1 taken 56003 times.
58260 if (events[i].data.ptr == &timer_fd_)
956 {
957 std::uint64_t expirations;
958
1/1
✓ Branch 1 taken 2257 times.
2257 [[maybe_unused]] auto r = ::read(timer_fd_, &expirations, sizeof(expirations));
959 2257 check_timers = true;
960 2257 continue;
961 2257 }
962
963 // Deferred I/O: just set ready events and enqueue descriptor
964 // No per-descriptor mutex locking in reactor hot path!
965 56003 auto* desc = static_cast<descriptor_state*>(events[i].data.ptr);
966 56003 desc->add_ready_events(events[i].events);
967
968 // Only enqueue if not already enqueued
969 56003 bool expected = false;
970
1/2
✓ Branch 1 taken 56003 times.
✗ Branch 2 not taken.
56003 if (desc->is_enqueued_.compare_exchange_strong(expected, true,
971 std::memory_order_release, std::memory_order_relaxed))
972 {
973 56003 local_ops.push(desc);
974 }
975 }
976
977 // Process timers only when timerfd fires
978
2/2
✓ Branch 0 taken 2257 times.
✓ Branch 1 taken 42710 times.
44967 if (check_timers)
979 {
980
1/1
✓ Branch 1 taken 2257 times.
2257 timer_svc_->process_expired();
981
1/1
✓ Branch 1 taken 2257 times.
2257 update_timerfd();
982 }
983
984
1/1
✓ Branch 1 taken 44967 times.
44967 lock.lock();
985
986
2/2
✓ Branch 1 taken 25885 times.
✓ Branch 2 taken 19082 times.
44967 if (!local_ops.empty())
987 25885 completed_ops_.splice(local_ops);
988 44967 }
989
990 std::size_t
991 166013 epoll_scheduler::
992 do_one(std::unique_lock<std::mutex>& lock, long timeout_us, scheduler_context* ctx)
993 {
994 for (;;)
995 {
996
2/2
✓ Branch 0 taken 4 times.
✓ Branch 1 taken 210976 times.
210980 if (stopped_)
997 4 return 0;
998
999 210976 scheduler_op* op = completed_ops_.pop();
1000
1001 // Handle reactor sentinel - time to poll for I/O
1002
2/2
✓ Branch 0 taken 45107 times.
✓ Branch 1 taken 165869 times.
210976 if (op == &task_op_)
1003 {
1004 45107 bool more_handlers = !completed_ops_.empty();
1005
1006 // Nothing to run the reactor for: no pending work to wait on,
1007 // or caller requested a non-blocking poll
1008
4/4
✓ Branch 0 taken 4496 times.
✓ Branch 1 taken 40611 times.
✓ Branch 2 taken 140 times.
✓ Branch 3 taken 44967 times.
49603 if (!more_handlers &&
1009
3/4
✓ Branch 1 taken 4356 times.
✓ Branch 2 taken 140 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 4356 times.
8992 (outstanding_work_.load(std::memory_order_acquire) == 0 ||
1010 timeout_us == 0))
1011 {
1012 140 completed_ops_.push(&task_op_);
1013 140 return 0;
1014 }
1015
1016
3/4
✓ Branch 0 taken 4356 times.
✓ Branch 1 taken 40611 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 4356 times.
44967 task_interrupted_ = more_handlers || timeout_us == 0;
1017 44967 task_running_.store(true, std::memory_order_relaxed);
1018
1019
2/2
✓ Branch 0 taken 40611 times.
✓ Branch 1 taken 4356 times.
44967 if (more_handlers)
1020 40611 unlock_and_signal_one(lock);
1021
1022 44967 run_task(lock, ctx);
1023
1024 44967 task_running_.store(false, std::memory_order_relaxed);
1025 44967 completed_ops_.push(&task_op_);
1026 44967 continue;
1027 44967 }
1028
1029 // Handle operation
1030
2/2
✓ Branch 0 taken 165865 times.
✓ Branch 1 taken 4 times.
165869 if (op != nullptr)
1031 {
1032
1/2
✓ Branch 1 taken 165865 times.
✗ Branch 2 not taken.
165865 if (!completed_ops_.empty())
1033
1/1
✓ Branch 1 taken 165865 times.
165865 unlock_and_signal_one(lock);
1034 else
1035 lock.unlock();
1036
1037 165865 work_cleanup on_exit{this, &lock, ctx};
1038
1039
1/1
✓ Branch 1 taken 165865 times.
165865 (*op)();
1040 165865 return 1;
1041 165865 }
1042
1043 // No pending work to wait on, or caller requested non-blocking poll
1044
2/6
✗ Branch 1 not taken.
✓ Branch 2 taken 4 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 4 times.
✗ Branch 6 not taken.
8 if (outstanding_work_.load(std::memory_order_acquire) == 0 ||
1045 timeout_us == 0)
1046 4 return 0;
1047
1048 clear_signal();
1049 if (timeout_us < 0)
1050 wait_for_signal(lock);
1051 else
1052 wait_for_signal_for(lock, timeout_us);
1053 44967 }
1054 }
1055
1056 } // namespace boost::corosio::detail
1057
1058 #endif
1059