1181254a7Smrg// <experimental/io_service> -*- C++ -*- 2181254a7Smrg 3*b1e83836Smrg// Copyright (C) 2015-2022 Free Software Foundation, Inc. 4181254a7Smrg// 5181254a7Smrg// This file is part of the GNU ISO C++ Library. This library is free 6181254a7Smrg// software; you can redistribute it and/or modify it under the 7181254a7Smrg// terms of the GNU General Public License as published by the 8181254a7Smrg// Free Software Foundation; either version 3, or (at your option) 9181254a7Smrg// any later version. 10181254a7Smrg 11181254a7Smrg// This library is distributed in the hope that it will be useful, 12181254a7Smrg// but WITHOUT ANY WARRANTY; without even the implied warranty of 13181254a7Smrg// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 14181254a7Smrg// GNU General Public License for more details. 15181254a7Smrg 16181254a7Smrg// Under Section 7 of GPL version 3, you are granted additional 17181254a7Smrg// permissions described in the GCC Runtime Library Exception, version 18181254a7Smrg// 3.1, as published by the Free Software Foundation. 19181254a7Smrg 20181254a7Smrg// You should have received a copy of the GNU General Public License and 21181254a7Smrg// a copy of the GCC Runtime Library Exception along with this program; 22181254a7Smrg// see the files COPYING3 and COPYING.RUNTIME respectively. If not, see 23181254a7Smrg// <http://www.gnu.org/licenses/>. 24181254a7Smrg 25fb8a8121Smrg/** @file experimental/io_context 26181254a7Smrg * This is a TS C++ Library header. 27fb8a8121Smrg * @ingroup networking-ts 28181254a7Smrg */ 29181254a7Smrg 30181254a7Smrg#ifndef _GLIBCXX_EXPERIMENTAL_IO_SERVICE 31181254a7Smrg#define _GLIBCXX_EXPERIMENTAL_IO_SERVICE 1 32181254a7Smrg 33181254a7Smrg#pragma GCC system_header 34181254a7Smrg 35181254a7Smrg#if __cplusplus >= 201402L 36181254a7Smrg 37181254a7Smrg#include <atomic> 38181254a7Smrg#include <forward_list> 39181254a7Smrg#include <functional> 40181254a7Smrg#include <system_error> 41181254a7Smrg#include <thread> 42*b1e83836Smrg#include <vector> 43181254a7Smrg#include <experimental/netfwd> 44181254a7Smrg#include <experimental/executor> 45*b1e83836Smrg#include <bits/chrono.h> 46181254a7Smrg#if _GLIBCXX_HAVE_UNISTD_H 47181254a7Smrg# include <unistd.h> 48181254a7Smrg#endif 49181254a7Smrg#ifdef _GLIBCXX_HAVE_POLL_H 50181254a7Smrg# include <poll.h> 51181254a7Smrg#endif 52181254a7Smrg#ifdef _GLIBCXX_HAVE_FCNTL_H 53181254a7Smrg# include <fcntl.h> 54181254a7Smrg#endif 55181254a7Smrg 56181254a7Smrgnamespace std _GLIBCXX_VISIBILITY(default) 57181254a7Smrg{ 58181254a7Smrg_GLIBCXX_BEGIN_NAMESPACE_VERSION 59181254a7Smrgnamespace experimental 60181254a7Smrg{ 61181254a7Smrgnamespace net 62181254a7Smrg{ 63181254a7Smrginline namespace v1 64181254a7Smrg{ 65181254a7Smrg 66fb8a8121Smrg /** @addtogroup networking-ts 67181254a7Smrg * @{ 68181254a7Smrg */ 69181254a7Smrg 70181254a7Smrg class __socket_impl; 71181254a7Smrg 72181254a7Smrg /// An ExecutionContext for I/O operations. 73181254a7Smrg class io_context : public execution_context 74181254a7Smrg { 75181254a7Smrg public: 76181254a7Smrg // types: 77181254a7Smrg 78181254a7Smrg /// An executor for an io_context. 79181254a7Smrg class executor_type 80181254a7Smrg { 81181254a7Smrg public: 82181254a7Smrg // construct / copy / destroy: 83181254a7Smrg 84181254a7Smrg executor_type(const executor_type& __other) noexcept = default; 85181254a7Smrg executor_type(executor_type&& __other) noexcept = default; 86181254a7Smrg 87181254a7Smrg executor_type& operator=(const executor_type& __other) noexcept = default; 88181254a7Smrg executor_type& operator=(executor_type&& __other) noexcept = default; 89181254a7Smrg 90181254a7Smrg // executor operations: 91181254a7Smrg 92181254a7Smrg bool running_in_this_thread() const noexcept 93181254a7Smrg { 94*b1e83836Smrg#ifdef _GLIBCXX_HAS_GTHREADS 95*b1e83836Smrg lock_guard<execution_context::mutex_type> __lock(_M_ctx->_M_mtx); 96181254a7Smrg auto __end = _M_ctx->_M_call_stack.end(); 97181254a7Smrg return std::find(_M_ctx->_M_call_stack.begin(), __end, 98181254a7Smrg this_thread::get_id()) != __end; 99*b1e83836Smrg#else 100*b1e83836Smrg return _M_ctx->_M_run_count != 0; 101*b1e83836Smrg#endif 102181254a7Smrg } 103181254a7Smrg 104181254a7Smrg io_context& context() const noexcept { return *_M_ctx; } 105181254a7Smrg 106181254a7Smrg void on_work_started() const noexcept { ++_M_ctx->_M_work_count; } 107181254a7Smrg void on_work_finished() const noexcept { --_M_ctx->_M_work_count; } 108181254a7Smrg 109181254a7Smrg template<typename _Func, typename _ProtoAllocator> 110181254a7Smrg void 111181254a7Smrg dispatch(_Func&& __f, const _ProtoAllocator& __a) const 112181254a7Smrg { 113181254a7Smrg if (running_in_this_thread()) 114181254a7Smrg decay_t<_Func>{std::forward<_Func>(__f)}(); 115181254a7Smrg else 116181254a7Smrg post(std::forward<_Func>(__f), __a); 117181254a7Smrg } 118181254a7Smrg 119181254a7Smrg template<typename _Func, typename _ProtoAllocator> 120181254a7Smrg void 121181254a7Smrg post(_Func&& __f, const _ProtoAllocator& __a) const 122181254a7Smrg { 123*b1e83836Smrg lock_guard<execution_context::mutex_type> __lock(_M_ctx->_M_mtx); 124181254a7Smrg // TODO (re-use functionality in system_context) 125181254a7Smrg _M_ctx->_M_reactor._M_notify(); 126181254a7Smrg } 127181254a7Smrg 128181254a7Smrg template<typename _Func, typename _ProtoAllocator> 129181254a7Smrg void 130181254a7Smrg defer(_Func&& __f, const _ProtoAllocator& __a) const 131181254a7Smrg { post(std::forward<_Func>(__f), __a); } 132181254a7Smrg 133181254a7Smrg private: 134181254a7Smrg friend io_context; 135181254a7Smrg 136181254a7Smrg explicit 137181254a7Smrg executor_type(io_context& __ctx) : _M_ctx(std::addressof(__ctx)) { } 138181254a7Smrg 139181254a7Smrg io_context* _M_ctx; 140181254a7Smrg }; 141181254a7Smrg 142181254a7Smrg using count_type = size_t; 143181254a7Smrg 144181254a7Smrg // construct / copy / destroy: 145181254a7Smrg 146181254a7Smrg io_context() : _M_work_count(0) { } 147181254a7Smrg 148181254a7Smrg explicit 149181254a7Smrg io_context(int __concurrency_hint) : _M_work_count(0) { } 150181254a7Smrg 151181254a7Smrg io_context(const io_context&) = delete; 152181254a7Smrg io_context& operator=(const io_context&) = delete; 153181254a7Smrg 154181254a7Smrg // io_context operations: 155181254a7Smrg 156181254a7Smrg executor_type get_executor() noexcept { return executor_type(*this); } 157181254a7Smrg 158181254a7Smrg count_type 159181254a7Smrg run() 160181254a7Smrg { 161181254a7Smrg count_type __n = 0; 162181254a7Smrg while (run_one()) 163181254a7Smrg if (__n != numeric_limits<count_type>::max()) 164181254a7Smrg ++__n; 165181254a7Smrg return __n; 166181254a7Smrg } 167181254a7Smrg 168181254a7Smrg template<typename _Rep, typename _Period> 169181254a7Smrg count_type 170181254a7Smrg run_for(const chrono::duration<_Rep, _Period>& __rel_time) 171181254a7Smrg { return run_until(chrono::steady_clock::now() + __rel_time); } 172181254a7Smrg 173181254a7Smrg template<typename _Clock, typename _Duration> 174181254a7Smrg count_type 175181254a7Smrg run_until(const chrono::time_point<_Clock, _Duration>& __abs_time) 176181254a7Smrg { 177181254a7Smrg count_type __n = 0; 178181254a7Smrg while (run_one_until(__abs_time)) 179181254a7Smrg if (__n != numeric_limits<count_type>::max()) 180181254a7Smrg ++__n; 181181254a7Smrg return __n; 182181254a7Smrg } 183181254a7Smrg 184181254a7Smrg count_type 185181254a7Smrg run_one() 186181254a7Smrg { return _M_do_one(chrono::milliseconds{-1}); } 187181254a7Smrg 188181254a7Smrg template<typename _Rep, typename _Period> 189181254a7Smrg count_type 190181254a7Smrg run_one_for(const chrono::duration<_Rep, _Period>& __rel_time) 191181254a7Smrg { return run_one_until(chrono::steady_clock::now() + __rel_time); } 192181254a7Smrg 193181254a7Smrg template<typename _Clock, typename _Duration> 194181254a7Smrg count_type 195181254a7Smrg run_one_until(const chrono::time_point<_Clock, _Duration>& __abs_time) 196181254a7Smrg { 197181254a7Smrg auto __now = _Clock::now(); 198181254a7Smrg while (__now < __abs_time) 199181254a7Smrg { 200181254a7Smrg using namespace std::chrono; 201181254a7Smrg auto __ms = duration_cast<milliseconds>(__abs_time - __now); 202181254a7Smrg if (_M_do_one(__ms)) 203181254a7Smrg return 1; 204181254a7Smrg __now = _Clock::now(); 205181254a7Smrg } 206181254a7Smrg return 0; 207181254a7Smrg } 208181254a7Smrg 209181254a7Smrg count_type 210181254a7Smrg poll() 211181254a7Smrg { 212181254a7Smrg count_type __n = 0; 213181254a7Smrg while (poll_one()) 214181254a7Smrg if (__n != numeric_limits<count_type>::max()) 215181254a7Smrg ++__n; 216181254a7Smrg return __n; 217181254a7Smrg } 218181254a7Smrg 219181254a7Smrg count_type 220181254a7Smrg poll_one() 221181254a7Smrg { return _M_do_one(chrono::milliseconds{0}); } 222181254a7Smrg 223181254a7Smrg void stop() 224181254a7Smrg { 225*b1e83836Smrg lock_guard<execution_context::mutex_type> __lock(_M_mtx); 226181254a7Smrg _M_stopped = true; 227181254a7Smrg _M_reactor._M_notify(); 228181254a7Smrg } 229181254a7Smrg 230181254a7Smrg bool stopped() const noexcept 231181254a7Smrg { 232*b1e83836Smrg lock_guard<execution_context::mutex_type> __lock(_M_mtx); 233181254a7Smrg return _M_stopped; 234181254a7Smrg } 235181254a7Smrg 236181254a7Smrg void restart() 237181254a7Smrg { 238181254a7Smrg _M_stopped = false; 239181254a7Smrg } 240181254a7Smrg 241181254a7Smrg private: 242181254a7Smrg 243181254a7Smrg template<typename _Clock, typename _WaitTraits> 244181254a7Smrg friend class basic_waitable_timer; 245181254a7Smrg 246181254a7Smrg friend __socket_impl; 247181254a7Smrg 248181254a7Smrg template<typename _Protocol> 249181254a7Smrg friend class __basic_socket_impl; 250181254a7Smrg 251181254a7Smrg template<typename _Protocol> 252181254a7Smrg friend class basic_socket; 253181254a7Smrg 254181254a7Smrg template<typename _Protocol> 255181254a7Smrg friend class basic_datagram_socket; 256181254a7Smrg 257181254a7Smrg template<typename _Protocol> 258181254a7Smrg friend class basic_stream_socket; 259181254a7Smrg 260181254a7Smrg template<typename _Protocol> 261181254a7Smrg friend class basic_socket_acceptor; 262181254a7Smrg 263181254a7Smrg count_type 264181254a7Smrg _M_outstanding_work() const 265181254a7Smrg { return _M_work_count + !_M_ops.empty(); } 266181254a7Smrg 267181254a7Smrg struct __timer_queue_base : execution_context::service 268181254a7Smrg { 269181254a7Smrg // return milliseconds until next timer expires, or milliseconds::max() 270181254a7Smrg virtual chrono::milliseconds _M_next() const = 0; 271181254a7Smrg virtual bool run_one() = 0; 272181254a7Smrg 273181254a7Smrg protected: 274181254a7Smrg explicit 275181254a7Smrg __timer_queue_base(execution_context& __ctx) : service(__ctx) 276181254a7Smrg { 277181254a7Smrg auto& __ioc = static_cast<io_context&>(__ctx); 278*b1e83836Smrg lock_guard<execution_context::mutex_type> __lock(__ioc._M_mtx); 279181254a7Smrg __ioc._M_timers.push_back(this); 280181254a7Smrg } 281181254a7Smrg 282*b1e83836Smrg mutable execution_context::mutex_type _M_qmtx; 283181254a7Smrg }; 284181254a7Smrg 285181254a7Smrg template<typename _Timer, typename _Key = typename _Timer::_Key> 286181254a7Smrg struct __timer_queue : __timer_queue_base 287181254a7Smrg { 288181254a7Smrg using key_type = __timer_queue; 289181254a7Smrg 290181254a7Smrg explicit 291181254a7Smrg __timer_queue(execution_context& __ctx) : __timer_queue_base(__ctx) 292181254a7Smrg { } 293181254a7Smrg 294181254a7Smrg void shutdown() noexcept { } 295181254a7Smrg 296181254a7Smrg io_context& context() noexcept 297181254a7Smrg { return static_cast<io_context&>(service::context()); } 298181254a7Smrg 299181254a7Smrg // Start an asynchronous wait. 300181254a7Smrg void 301181254a7Smrg push(const _Timer& __t, function<void(error_code)> __h) 302181254a7Smrg { 303181254a7Smrg context().get_executor().on_work_started(); 304*b1e83836Smrg lock_guard<execution_context::mutex_type> __lock(_M_qmtx); 305181254a7Smrg _M_queue.emplace(__t, _M_next_id++, std::move(__h)); 306181254a7Smrg // no need to notify reactor unless this timer went to the front? 307181254a7Smrg } 308181254a7Smrg 309181254a7Smrg // Cancel all outstanding waits for __t 310181254a7Smrg size_t 311181254a7Smrg cancel(const _Timer& __t) 312181254a7Smrg { 313*b1e83836Smrg lock_guard<execution_context::mutex_type> __lock(_M_qmtx); 314181254a7Smrg size_t __count = 0; 315181254a7Smrg auto __last = _M_queue.end(); 316181254a7Smrg for (auto __it = _M_queue.begin(), __end = __last; __it != __end; 317181254a7Smrg ++__it) 318181254a7Smrg { 319181254a7Smrg if (__it->_M_key == __t._M_key.get()) 320181254a7Smrg { 321181254a7Smrg __it->cancel(); 322181254a7Smrg __last = __it; 323181254a7Smrg ++__count; 324181254a7Smrg } 325181254a7Smrg } 326181254a7Smrg if (__count) 327181254a7Smrg _M_queue._M_sort_to(__last); 328181254a7Smrg return __count; 329181254a7Smrg } 330181254a7Smrg 331181254a7Smrg // Cancel oldest outstanding wait for __t 332181254a7Smrg bool 333181254a7Smrg cancel_one(const _Timer& __t) 334181254a7Smrg { 335*b1e83836Smrg lock_guard<execution_context::mutex_type> __lock(_M_qmtx); 336181254a7Smrg const auto __end = _M_queue.end(); 337181254a7Smrg auto __oldest = __end; 338181254a7Smrg for (auto __it = _M_queue.begin(); __it != __end; ++__it) 339181254a7Smrg if (__it->_M_key == __t._M_key.get()) 340181254a7Smrg if (__oldest == __end || __it->_M_id < __oldest->_M_id) 341181254a7Smrg __oldest = __it; 342181254a7Smrg if (__oldest == __end) 343181254a7Smrg return false; 344181254a7Smrg __oldest->cancel(); 345181254a7Smrg _M_queue._M_sort_to(__oldest); 346181254a7Smrg return true; 347181254a7Smrg } 348181254a7Smrg 349181254a7Smrg chrono::milliseconds 350181254a7Smrg _M_next() const override 351181254a7Smrg { 352181254a7Smrg typename _Timer::time_point __exp; 353181254a7Smrg { 354*b1e83836Smrg lock_guard<execution_context::mutex_type> __lock(_M_qmtx); 355181254a7Smrg if (_M_queue.empty()) 356181254a7Smrg return chrono::milliseconds::max(); // no pending timers 357181254a7Smrg if (_M_queue.top()._M_key == nullptr) 358181254a7Smrg return chrono::milliseconds::zero(); // cancelled, run now 359181254a7Smrg __exp = _M_queue.top()._M_expiry; 360181254a7Smrg } 361181254a7Smrg auto __dur = _Timer::traits_type::to_wait_duration(__exp); 362181254a7Smrg if (__dur < __dur.zero()) 363181254a7Smrg __dur = __dur.zero(); 364181254a7Smrg return chrono::duration_cast<chrono::milliseconds>(__dur); 365181254a7Smrg } 366181254a7Smrg 367181254a7Smrg private: 368181254a7Smrg 369181254a7Smrg bool run_one() override 370181254a7Smrg { 371181254a7Smrg auto __now = _Timer::clock_type::now(); 372181254a7Smrg function<void(error_code)> __h; 373181254a7Smrg error_code __ec; 374181254a7Smrg { 375*b1e83836Smrg lock_guard<execution_context::mutex_type> __lock(_M_qmtx); 376181254a7Smrg 377181254a7Smrg if (_M_queue.top()._M_key == nullptr) // cancelled 378181254a7Smrg { 379181254a7Smrg __h = std::move(_M_queue.top()._M_h); 380181254a7Smrg __ec = std::make_error_code(errc::operation_canceled); 381181254a7Smrg _M_queue.pop(); 382181254a7Smrg } 383181254a7Smrg else if (_M_queue.top()._M_expiry <= _Timer::clock_type::now()) 384181254a7Smrg { 385181254a7Smrg __h = std::move(_M_queue.top()._M_h); 386181254a7Smrg _M_queue.pop(); 387181254a7Smrg } 388181254a7Smrg } 389181254a7Smrg if (__h) 390181254a7Smrg { 391181254a7Smrg __h(__ec); 392181254a7Smrg context().get_executor().on_work_finished(); 393181254a7Smrg return true; 394181254a7Smrg } 395181254a7Smrg return false; 396181254a7Smrg } 397181254a7Smrg 398181254a7Smrg using __timer_id_type = uint64_t; 399181254a7Smrg 400181254a7Smrg struct __pending_timer 401181254a7Smrg { 402181254a7Smrg __pending_timer(const _Timer& __t, uint64_t __id, 403181254a7Smrg function<void(error_code)> __h) 404181254a7Smrg : _M_expiry(__t.expiry()), _M_key(__t._M_key.get()), _M_id(__id), 405181254a7Smrg _M_h(std::move(__h)) 406181254a7Smrg { } 407181254a7Smrg 408181254a7Smrg typename _Timer::time_point _M_expiry; 409181254a7Smrg _Key* _M_key; 410181254a7Smrg __timer_id_type _M_id; 411181254a7Smrg function<void(error_code)> _M_h; 412181254a7Smrg 413181254a7Smrg void cancel() { _M_expiry = _M_expiry.min(); _M_key = nullptr; } 414181254a7Smrg 415181254a7Smrg bool 416181254a7Smrg operator<(const __pending_timer& __rhs) const 417181254a7Smrg { return _M_expiry < __rhs._M_expiry; } 418181254a7Smrg }; 419181254a7Smrg 420181254a7Smrg struct __queue : priority_queue<__pending_timer> 421181254a7Smrg { 422181254a7Smrg using iterator = 423181254a7Smrg typename priority_queue<__pending_timer>::container_type::iterator; 424181254a7Smrg 425181254a7Smrg // expose begin/end/erase for direct access to underlying container 426181254a7Smrg iterator begin() { return this->c.begin(); } 427181254a7Smrg iterator end() { return this->c.end(); } 428181254a7Smrg iterator erase(iterator __it) { return this->c.erase(__it); } 429181254a7Smrg 430181254a7Smrg void 431181254a7Smrg _M_sort_to(iterator __it) 432181254a7Smrg { std::stable_sort(this->c.begin(), ++__it); } 433181254a7Smrg }; 434181254a7Smrg 435181254a7Smrg __queue _M_queue; 436181254a7Smrg __timer_id_type _M_next_id = 0; 437181254a7Smrg }; 438181254a7Smrg 439181254a7Smrg template<typename _Timer, typename _CompletionHandler> 440181254a7Smrg void 441181254a7Smrg async_wait(const _Timer& __timer, _CompletionHandler&& __h) 442181254a7Smrg { 443181254a7Smrg auto& __queue = use_service<__timer_queue<_Timer>>(*this); 444181254a7Smrg __queue.push(__timer, std::move(__h)); 445181254a7Smrg _M_reactor._M_notify(); 446181254a7Smrg } 447181254a7Smrg 448181254a7Smrg // Cancel all wait operations initiated by __timer. 449181254a7Smrg template<typename _Timer> 450181254a7Smrg size_t 451181254a7Smrg cancel(const _Timer& __timer) 452181254a7Smrg { 453181254a7Smrg if (!has_service<__timer_queue<_Timer>>(*this)) 454181254a7Smrg return 0; 455181254a7Smrg 456181254a7Smrg auto __c = use_service<__timer_queue<_Timer>>(*this).cancel(__timer); 457181254a7Smrg if (__c != 0) 458181254a7Smrg _M_reactor._M_notify(); 459181254a7Smrg return __c; 460181254a7Smrg } 461181254a7Smrg 462181254a7Smrg // Cancel the oldest wait operation initiated by __timer. 463181254a7Smrg template<typename _Timer> 464181254a7Smrg size_t 465181254a7Smrg cancel_one(const _Timer& __timer) 466181254a7Smrg { 467181254a7Smrg if (!has_service<__timer_queue<_Timer>>(*this)) 468181254a7Smrg return 0; 469181254a7Smrg 470181254a7Smrg if (use_service<__timer_queue<_Timer>>(*this).cancel_one(__timer)) 471181254a7Smrg { 472181254a7Smrg _M_reactor._M_notify(); 473181254a7Smrg return 1; 474181254a7Smrg } 475181254a7Smrg return 0; 476181254a7Smrg } 477181254a7Smrg 478*b1e83836Smrg // The caller must know what the wait-type __w will be interpreted. 479*b1e83836Smrg // In the current implementation the reactor is based on <poll.h> 480*b1e83836Smrg // so the parameter must be one of POLLIN, POLLOUT or POLLERR. 481181254a7Smrg template<typename _Op> 482181254a7Smrg void 483181254a7Smrg async_wait(int __fd, int __w, _Op&& __op) 484181254a7Smrg { 485*b1e83836Smrg lock_guard<execution_context::mutex_type> __lock(_M_mtx); 486181254a7Smrg // TODO need push_back, use std::list not std::forward_list 487181254a7Smrg auto __tail = _M_ops.before_begin(), __it = _M_ops.begin(); 488181254a7Smrg while (__it != _M_ops.end()) 489181254a7Smrg { 490181254a7Smrg ++__it; 491181254a7Smrg ++__tail; 492181254a7Smrg } 493181254a7Smrg using __type = __async_operation_impl<_Op>; 494181254a7Smrg _M_ops.emplace_after(__tail, 495181254a7Smrg make_unique<__type>(std::move(__op), __fd, __w)); 496181254a7Smrg _M_reactor._M_fd_interest(__fd, __w); 497181254a7Smrg } 498181254a7Smrg 499181254a7Smrg void _M_add_fd(int __fd) { _M_reactor._M_add_fd(__fd); } 500181254a7Smrg void _M_remove_fd(int __fd) { _M_reactor._M_remove_fd(__fd); } 501181254a7Smrg 502181254a7Smrg void cancel(int __fd, error_code&) 503181254a7Smrg { 504*b1e83836Smrg lock_guard<execution_context::mutex_type> __lock(_M_mtx); 505181254a7Smrg const auto __end = _M_ops.end(); 506181254a7Smrg auto __it = _M_ops.begin(); 507181254a7Smrg auto __prev = _M_ops.before_begin(); 508181254a7Smrg while (__it != __end && (*__it)->_M_is_cancelled()) 509181254a7Smrg { 510181254a7Smrg ++__it; 511181254a7Smrg ++__prev; 512181254a7Smrg } 513181254a7Smrg auto __cancelled = __prev; 514181254a7Smrg while (__it != __end) 515181254a7Smrg { 516181254a7Smrg if ((*__it)->_M_fd == __fd) 517181254a7Smrg { 518181254a7Smrg (*__it)->cancel(); 519181254a7Smrg ++__it; 520181254a7Smrg _M_ops.splice_after(__cancelled, _M_ops, __prev); 521181254a7Smrg ++__cancelled; 522181254a7Smrg } 523181254a7Smrg else 524181254a7Smrg { 525181254a7Smrg ++__it; 526181254a7Smrg ++__prev; 527181254a7Smrg } 528181254a7Smrg } 529181254a7Smrg _M_reactor._M_not_interested(__fd); 530181254a7Smrg } 531181254a7Smrg 532181254a7Smrg struct __async_operation 533181254a7Smrg { 534181254a7Smrg __async_operation(int __fd, int __ev) : _M_fd(__fd), _M_ev(__ev) { } 535181254a7Smrg 536181254a7Smrg virtual ~__async_operation() = default; 537181254a7Smrg 538181254a7Smrg int _M_fd; 539181254a7Smrg short _M_ev; 540181254a7Smrg 541181254a7Smrg void cancel() { _M_fd = -1; } 542181254a7Smrg bool _M_is_cancelled() const { return _M_fd == -1; } 543181254a7Smrg virtual void run(io_context&) = 0; 544181254a7Smrg }; 545181254a7Smrg 546181254a7Smrg template<typename _Op> 547181254a7Smrg struct __async_operation_impl : __async_operation 548181254a7Smrg { 549181254a7Smrg __async_operation_impl(_Op&& __op, int __fd, int __ev) 550181254a7Smrg : __async_operation{__fd, __ev}, _M_op(std::move(__op)) { } 551181254a7Smrg 552181254a7Smrg _Op _M_op; 553181254a7Smrg 554181254a7Smrg void run(io_context& __ctx) 555181254a7Smrg { 556181254a7Smrg if (_M_is_cancelled()) 557181254a7Smrg _M_op(std::make_error_code(errc::operation_canceled)); 558181254a7Smrg else 559181254a7Smrg _M_op(error_code{}); 560181254a7Smrg } 561181254a7Smrg }; 562181254a7Smrg 563181254a7Smrg atomic<count_type> _M_work_count; 564*b1e83836Smrg mutable execution_context::mutex_type _M_mtx; 565181254a7Smrg queue<function<void()>> _M_op; 566181254a7Smrg bool _M_stopped = false; 567181254a7Smrg 568181254a7Smrg struct __monitor 569181254a7Smrg { 570181254a7Smrg __monitor(io_context& __c) : _M_ctx(__c) 571181254a7Smrg { 572*b1e83836Smrg#ifdef _GLIBCXX_HAS_GTHREADS 573*b1e83836Smrg lock_guard<execution_context::mutex_type> __lock(_M_ctx._M_mtx); 574181254a7Smrg _M_ctx._M_call_stack.push_back(this_thread::get_id()); 575*b1e83836Smrg#else 576*b1e83836Smrg _M_ctx._M_run_count++; 577*b1e83836Smrg#endif 578181254a7Smrg } 579181254a7Smrg 580181254a7Smrg ~__monitor() 581181254a7Smrg { 582*b1e83836Smrg#ifdef _GLIBCXX_HAS_GTHREADS 583*b1e83836Smrg lock_guard<execution_context::mutex_type> __lock(_M_ctx._M_mtx); 584181254a7Smrg _M_ctx._M_call_stack.pop_back(); 585*b1e83836Smrg#else 586*b1e83836Smrg _M_ctx._M_run_count--; 587*b1e83836Smrg#endif 588181254a7Smrg if (_M_ctx._M_outstanding_work() == 0) 589181254a7Smrg { 590181254a7Smrg _M_ctx._M_stopped = true; 591181254a7Smrg _M_ctx._M_reactor._M_notify(); 592181254a7Smrg } 593181254a7Smrg } 594181254a7Smrg 595181254a7Smrg __monitor(__monitor&&) = delete; 596181254a7Smrg 597181254a7Smrg io_context& _M_ctx; 598181254a7Smrg }; 599181254a7Smrg 600181254a7Smrg bool 601181254a7Smrg _M_do_one(chrono::milliseconds __timeout) 602181254a7Smrg { 603181254a7Smrg const bool __block = __timeout != chrono::milliseconds::zero(); 604181254a7Smrg 605181254a7Smrg __reactor::__fdvec __fds; 606181254a7Smrg 607181254a7Smrg __monitor __mon{*this}; 608181254a7Smrg 609181254a7Smrg __timer_queue_base* __timerq = nullptr; 610181254a7Smrg unique_ptr<__async_operation> __async_op; 611181254a7Smrg 612181254a7Smrg while (true) 613181254a7Smrg { 614181254a7Smrg if (__timerq) 615181254a7Smrg { 616181254a7Smrg if (__timerq->run_one()) 617181254a7Smrg return true; 618181254a7Smrg else 619181254a7Smrg __timerq = nullptr; 620181254a7Smrg } 621181254a7Smrg 622181254a7Smrg if (__async_op) 623181254a7Smrg { 624181254a7Smrg __async_op->run(*this); 625181254a7Smrg // TODO need to unregister __async_op 626181254a7Smrg return true; 627181254a7Smrg } 628181254a7Smrg 629181254a7Smrg chrono::milliseconds __ms{0}; 630181254a7Smrg 631181254a7Smrg { 632*b1e83836Smrg lock_guard<execution_context::mutex_type> __lock(_M_mtx); 633181254a7Smrg 634181254a7Smrg if (_M_stopped) 635181254a7Smrg return false; 636181254a7Smrg 637181254a7Smrg // find first timer with something to do 638181254a7Smrg for (auto __q : _M_timers) 639181254a7Smrg { 640181254a7Smrg auto __next = __q->_M_next(); 641181254a7Smrg if (__next == __next.zero()) // ready to run immediately 642181254a7Smrg { 643181254a7Smrg __timerq = __q; 644181254a7Smrg __ms = __next; 645181254a7Smrg break; 646181254a7Smrg } 647181254a7Smrg else if (__next != __next.max() && __block 648181254a7Smrg && (__next < __ms || __timerq == nullptr)) 649181254a7Smrg { 650181254a7Smrg __timerq = __q; 651181254a7Smrg __ms = __next; 652181254a7Smrg } 653181254a7Smrg } 654181254a7Smrg 655181254a7Smrg if (__timerq && __ms == __ms.zero()) 656181254a7Smrg continue; // restart loop to run a timer immediately 657181254a7Smrg 658181254a7Smrg if (!_M_ops.empty() && _M_ops.front()->_M_is_cancelled()) 659181254a7Smrg { 660181254a7Smrg _M_ops.front().swap(__async_op); 661181254a7Smrg _M_ops.pop_front(); 662181254a7Smrg continue; 663181254a7Smrg } 664181254a7Smrg 665181254a7Smrg // TODO run any posted items 666181254a7Smrg 667181254a7Smrg if (__block) 668181254a7Smrg { 669181254a7Smrg if (__timerq == nullptr) 670181254a7Smrg __ms = __timeout; 671181254a7Smrg else if (__ms.zero() <= __timeout && __timeout < __ms) 672181254a7Smrg __ms = __timeout; 673181254a7Smrg else if (__ms.count() > numeric_limits<int>::max()) 674181254a7Smrg __ms = chrono::milliseconds{numeric_limits<int>::max()}; 675181254a7Smrg } 676181254a7Smrg // else __ms == 0 and poll() will return immediately 677181254a7Smrg 678181254a7Smrg } 679181254a7Smrg 680181254a7Smrg auto __res = _M_reactor.wait(__fds, __ms); 681181254a7Smrg 682181254a7Smrg if (__res == __reactor::_S_retry) 683181254a7Smrg continue; 684181254a7Smrg 685181254a7Smrg if (__res == __reactor::_S_timeout) 686fb8a8121Smrg { 687181254a7Smrg if (__timerq == nullptr) 688181254a7Smrg return false; 689181254a7Smrg else 690181254a7Smrg continue; // timed out, so restart loop and process the timer 691fb8a8121Smrg } 692181254a7Smrg 693181254a7Smrg __timerq = nullptr; 694181254a7Smrg 695181254a7Smrg if (__fds.empty()) // nothing to do 696181254a7Smrg return false; 697181254a7Smrg 698*b1e83836Smrg lock_guard<execution_context::mutex_type> __lock(_M_mtx); 699181254a7Smrg for (auto __it = _M_ops.begin(), __end = _M_ops.end(), 700181254a7Smrg __prev = _M_ops.before_begin(); __it != __end; ++__it, ++__prev) 701181254a7Smrg { 702181254a7Smrg auto& __op = **__it; 703181254a7Smrg auto __pos = std::lower_bound(__fds.begin(), __fds.end(), 704181254a7Smrg __op._M_fd, 705181254a7Smrg [](const auto& __p, int __fd) { return __p.fd < __fd; }); 706181254a7Smrg if (__pos != __fds.end() && __pos->fd == __op._M_fd 707181254a7Smrg && __pos->revents & __op._M_ev) 708181254a7Smrg { 709181254a7Smrg __it->swap(__async_op); 710181254a7Smrg _M_ops.erase_after(__prev); 711181254a7Smrg break; // restart loop and run op 712181254a7Smrg } 713181254a7Smrg } 714181254a7Smrg } 715181254a7Smrg } 716181254a7Smrg 717181254a7Smrg struct __reactor 718181254a7Smrg { 719*b1e83836Smrg#ifdef _GLIBCXX_HAVE_POLL_H 720181254a7Smrg __reactor() : _M_fds(1) 721181254a7Smrg { 722181254a7Smrg int __pipe[2]; 723181254a7Smrg if (::pipe(__pipe) == -1) 724181254a7Smrg __throw_system_error(errno); 725181254a7Smrg if (::fcntl(__pipe[0], F_SETFL, O_NONBLOCK) == -1 726181254a7Smrg || ::fcntl(__pipe[1], F_SETFL, O_NONBLOCK) == -1) 727181254a7Smrg { 728181254a7Smrg int __e = errno; 729181254a7Smrg ::close(__pipe[0]); 730181254a7Smrg ::close(__pipe[1]); 731181254a7Smrg __throw_system_error(__e); 732181254a7Smrg } 733181254a7Smrg _M_fds.back().events = POLLIN; 734181254a7Smrg _M_fds.back().fd = __pipe[0]; 735181254a7Smrg _M_notify_wr = __pipe[1]; 736181254a7Smrg } 737181254a7Smrg 738181254a7Smrg ~__reactor() 739181254a7Smrg { 740181254a7Smrg ::close(_M_fds.back().fd); 741181254a7Smrg ::close(_M_notify_wr); 742181254a7Smrg } 743*b1e83836Smrg#endif 744181254a7Smrg 745181254a7Smrg // write a notification byte to the pipe (ignoring errors) 746181254a7Smrg void _M_notify() 747181254a7Smrg { 748181254a7Smrg int __n; 749181254a7Smrg do { 750181254a7Smrg __n = ::write(_M_notify_wr, "", 1); 751181254a7Smrg } while (__n == -1 && errno == EINTR); 752181254a7Smrg } 753181254a7Smrg 754181254a7Smrg // read all notification bytes from the pipe 755181254a7Smrg void _M_on_notify() 756181254a7Smrg { 757181254a7Smrg // Drain the pipe. 758181254a7Smrg char __buf[64]; 759181254a7Smrg ssize_t __n; 760181254a7Smrg do { 761181254a7Smrg __n = ::read(_M_fds.back().fd, __buf, sizeof(__buf)); 762181254a7Smrg } while (__n != -1 || errno == EINTR); 763181254a7Smrg } 764181254a7Smrg 765181254a7Smrg void 766181254a7Smrg _M_add_fd(int __fd) 767181254a7Smrg { 768181254a7Smrg auto __pos = _M_lower_bound(__fd); 769181254a7Smrg if (__pos->fd == __fd) 770181254a7Smrg __throw_system_error((int)errc::invalid_argument); 771181254a7Smrg _M_fds.insert(__pos, __fdvec::value_type{})->fd = __fd; 772181254a7Smrg _M_notify(); 773181254a7Smrg } 774181254a7Smrg 775181254a7Smrg void 776181254a7Smrg _M_remove_fd(int __fd) 777181254a7Smrg { 778181254a7Smrg auto __pos = _M_lower_bound(__fd); 779181254a7Smrg if (__pos->fd == __fd) 780181254a7Smrg _M_fds.erase(__pos); 781181254a7Smrg // else bug! 782181254a7Smrg _M_notify(); 783181254a7Smrg } 784181254a7Smrg 785181254a7Smrg void 786181254a7Smrg _M_fd_interest(int __fd, int __w) 787181254a7Smrg { 788181254a7Smrg auto __pos = _M_lower_bound(__fd); 789181254a7Smrg if (__pos->fd == __fd) 790181254a7Smrg __pos->events |= __w; 791181254a7Smrg // else bug! 792181254a7Smrg _M_notify(); 793181254a7Smrg } 794181254a7Smrg 795181254a7Smrg void 796181254a7Smrg _M_not_interested(int __fd) 797181254a7Smrg { 798181254a7Smrg auto __pos = _M_lower_bound(__fd); 799181254a7Smrg if (__pos->fd == __fd) 800181254a7Smrg __pos->events = 0; 801181254a7Smrg _M_notify(); 802181254a7Smrg } 803181254a7Smrg 804181254a7Smrg#ifdef _GLIBCXX_HAVE_POLL_H 805181254a7Smrg using __fdvec = vector<::pollfd>; 806*b1e83836Smrg#else 807*b1e83836Smrg struct dummy_pollfd { int fd = -1; short events = 0, revents = 0; }; 808*b1e83836Smrg using __fdvec = vector<dummy_pollfd>; 809*b1e83836Smrg#endif 810181254a7Smrg 811181254a7Smrg // Find first element p such that !(p.fd < __fd) 812181254a7Smrg // N.B. always returns a dereferencable iterator. 813181254a7Smrg __fdvec::iterator 814181254a7Smrg _M_lower_bound(int __fd) 815181254a7Smrg { 816181254a7Smrg return std::lower_bound(_M_fds.begin(), _M_fds.end() - 1, 817181254a7Smrg __fd, [](const auto& __p, int __fd) { return __p.fd < __fd; }); 818181254a7Smrg } 819181254a7Smrg 820181254a7Smrg enum __status { _S_retry, _S_timeout, _S_ok, _S_error }; 821181254a7Smrg 822181254a7Smrg __status 823181254a7Smrg wait(__fdvec& __fds, chrono::milliseconds __timeout) 824181254a7Smrg { 825*b1e83836Smrg#ifdef _GLIBCXX_HAVE_POLL_H 826181254a7Smrg // XXX not thread-safe! 827181254a7Smrg __fds = _M_fds; // take snapshot to pass to poll() 828181254a7Smrg 829181254a7Smrg int __res = ::poll(__fds.data(), __fds.size(), __timeout.count()); 830181254a7Smrg 831181254a7Smrg if (__res == -1) 832181254a7Smrg { 833181254a7Smrg __fds.clear(); 834181254a7Smrg if (errno == EINTR) 835181254a7Smrg return _S_retry; 836181254a7Smrg return _S_error; // XXX ??? 837181254a7Smrg } 838181254a7Smrg else if (__res == 0) 839181254a7Smrg { 840181254a7Smrg __fds.clear(); 841181254a7Smrg return _S_timeout; 842181254a7Smrg } 843181254a7Smrg else if (__fds.back().revents != 0) // something changed, restart 844181254a7Smrg { 845181254a7Smrg __fds.clear(); 846181254a7Smrg _M_on_notify(); 847181254a7Smrg return _S_retry; 848181254a7Smrg } 849181254a7Smrg 850181254a7Smrg auto __part = std::stable_partition(__fds.begin(), __fds.end() - 1, 851181254a7Smrg [](const __fdvec::value_type& __p) { return __p.revents != 0; }); 852181254a7Smrg __fds.erase(__part, __fds.end()); 853181254a7Smrg 854181254a7Smrg return _S_ok; 855*b1e83836Smrg#else 856*b1e83836Smrg (void) __timeout; 857*b1e83836Smrg __fds.clear(); 858*b1e83836Smrg return _S_error; 859*b1e83836Smrg#endif 860181254a7Smrg } 861181254a7Smrg 862181254a7Smrg __fdvec _M_fds; // _M_fds.back() is the read end of the self-pipe 863181254a7Smrg int _M_notify_wr; // write end of the self-pipe 864181254a7Smrg }; 865181254a7Smrg 866181254a7Smrg __reactor _M_reactor; 867181254a7Smrg 868181254a7Smrg vector<__timer_queue_base*> _M_timers; 869181254a7Smrg forward_list<unique_ptr<__async_operation>> _M_ops; 870181254a7Smrg 871*b1e83836Smrg#ifdef _GLIBCXX_HAS_GTHREADS 872181254a7Smrg vector<thread::id> _M_call_stack; 873*b1e83836Smrg#else 874*b1e83836Smrg int _M_run_count = 0; 875*b1e83836Smrg#endif 876181254a7Smrg }; 877181254a7Smrg 878181254a7Smrg inline bool 879181254a7Smrg operator==(const io_context::executor_type& __a, 880181254a7Smrg const io_context::executor_type& __b) noexcept 881181254a7Smrg { 882181254a7Smrg // https://github.com/chriskohlhoff/asio-tr2/issues/201 883181254a7Smrg using executor_type = io_context::executor_type; 884181254a7Smrg return std::addressof(executor_type(__a).context()) 885181254a7Smrg == std::addressof(executor_type(__b).context()); 886181254a7Smrg } 887181254a7Smrg 888181254a7Smrg inline bool 889181254a7Smrg operator!=(const io_context::executor_type& __a, 890181254a7Smrg const io_context::executor_type& __b) noexcept 891181254a7Smrg { return !(__a == __b); } 892181254a7Smrg 893181254a7Smrg template<> struct is_executor<io_context::executor_type> : true_type {}; 894181254a7Smrg 895181254a7Smrg /// @} 896181254a7Smrg 897181254a7Smrg} // namespace v1 898181254a7Smrg} // namespace net 899181254a7Smrg} // namespace experimental 900181254a7Smrg_GLIBCXX_END_NAMESPACE_VERSION 901181254a7Smrg} // namespace std 902181254a7Smrg 903181254a7Smrg#endif // C++14 904181254a7Smrg 905181254a7Smrg#endif // _GLIBCXX_EXPERIMENTAL_IO_SERVICE 906