xref: /netbsd-src/external/gpl3/gcc/dist/libstdc++-v3/include/experimental/io_context (revision b1e838363e3c6fc78a55519254d99869742dd33c)
1181254a7Smrg// <experimental/io_service> -*- C++ -*-
2181254a7Smrg
3*b1e83836Smrg// Copyright (C) 2015-2022 Free Software Foundation, Inc.
4181254a7Smrg//
5181254a7Smrg// This file is part of the GNU ISO C++ Library.  This library is free
6181254a7Smrg// software; you can redistribute it and/or modify it under the
7181254a7Smrg// terms of the GNU General Public License as published by the
8181254a7Smrg// Free Software Foundation; either version 3, or (at your option)
9181254a7Smrg// any later version.
10181254a7Smrg
11181254a7Smrg// This library is distributed in the hope that it will be useful,
12181254a7Smrg// but WITHOUT ANY WARRANTY; without even the implied warranty of
13181254a7Smrg// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14181254a7Smrg// GNU General Public License for more details.
15181254a7Smrg
16181254a7Smrg// Under Section 7 of GPL version 3, you are granted additional
17181254a7Smrg// permissions described in the GCC Runtime Library Exception, version
18181254a7Smrg// 3.1, as published by the Free Software Foundation.
19181254a7Smrg
20181254a7Smrg// You should have received a copy of the GNU General Public License and
21181254a7Smrg// a copy of the GCC Runtime Library Exception along with this program;
22181254a7Smrg// see the files COPYING3 and COPYING.RUNTIME respectively.  If not, see
23181254a7Smrg// <http://www.gnu.org/licenses/>.
24181254a7Smrg
25fb8a8121Smrg/** @file experimental/io_context
26181254a7Smrg *  This is a TS C++ Library header.
27fb8a8121Smrg *  @ingroup networking-ts
28181254a7Smrg */
29181254a7Smrg
30181254a7Smrg#ifndef _GLIBCXX_EXPERIMENTAL_IO_SERVICE
31181254a7Smrg#define _GLIBCXX_EXPERIMENTAL_IO_SERVICE 1
32181254a7Smrg
33181254a7Smrg#pragma GCC system_header
34181254a7Smrg
35181254a7Smrg#if __cplusplus >= 201402L
36181254a7Smrg
37181254a7Smrg#include <atomic>
38181254a7Smrg#include <forward_list>
39181254a7Smrg#include <functional>
40181254a7Smrg#include <system_error>
41181254a7Smrg#include <thread>
42*b1e83836Smrg#include <vector>
43181254a7Smrg#include <experimental/netfwd>
44181254a7Smrg#include <experimental/executor>
45*b1e83836Smrg#include <bits/chrono.h>
46181254a7Smrg#if _GLIBCXX_HAVE_UNISTD_H
47181254a7Smrg# include <unistd.h>
48181254a7Smrg#endif
49181254a7Smrg#ifdef _GLIBCXX_HAVE_POLL_H
50181254a7Smrg# include <poll.h>
51181254a7Smrg#endif
52181254a7Smrg#ifdef _GLIBCXX_HAVE_FCNTL_H
53181254a7Smrg# include <fcntl.h>
54181254a7Smrg#endif
55181254a7Smrg
56181254a7Smrgnamespace std _GLIBCXX_VISIBILITY(default)
57181254a7Smrg{
58181254a7Smrg_GLIBCXX_BEGIN_NAMESPACE_VERSION
59181254a7Smrgnamespace experimental
60181254a7Smrg{
61181254a7Smrgnamespace net
62181254a7Smrg{
63181254a7Smrginline namespace v1
64181254a7Smrg{
65181254a7Smrg
66fb8a8121Smrg  /** @addtogroup networking-ts
67181254a7Smrg   *  @{
68181254a7Smrg   */
69181254a7Smrg
70181254a7Smrg  class __socket_impl;
71181254a7Smrg
72181254a7Smrg  /// An ExecutionContext for I/O operations.
73181254a7Smrg  class io_context : public execution_context
74181254a7Smrg  {
75181254a7Smrg  public:
76181254a7Smrg    // types:
77181254a7Smrg
78181254a7Smrg    /// An executor for an io_context.
79181254a7Smrg    class executor_type
80181254a7Smrg    {
81181254a7Smrg    public:
82181254a7Smrg      // construct / copy / destroy:
83181254a7Smrg
84181254a7Smrg      executor_type(const executor_type& __other) noexcept = default;
85181254a7Smrg      executor_type(executor_type&& __other) noexcept = default;
86181254a7Smrg
87181254a7Smrg      executor_type& operator=(const executor_type& __other) noexcept = default;
88181254a7Smrg      executor_type& operator=(executor_type&& __other) noexcept = default;
89181254a7Smrg
90181254a7Smrg      // executor operations:
91181254a7Smrg
92181254a7Smrg      bool running_in_this_thread() const noexcept
93181254a7Smrg      {
94*b1e83836Smrg#ifdef _GLIBCXX_HAS_GTHREADS
95*b1e83836Smrg	lock_guard<execution_context::mutex_type> __lock(_M_ctx->_M_mtx);
96181254a7Smrg	auto __end = _M_ctx->_M_call_stack.end();
97181254a7Smrg	return std::find(_M_ctx->_M_call_stack.begin(), __end,
98181254a7Smrg			 this_thread::get_id()) != __end;
99*b1e83836Smrg#else
100*b1e83836Smrg	return _M_ctx->_M_run_count != 0;
101*b1e83836Smrg#endif
102181254a7Smrg      }
103181254a7Smrg
104181254a7Smrg      io_context& context() const noexcept { return *_M_ctx; }
105181254a7Smrg
106181254a7Smrg      void on_work_started() const noexcept { ++_M_ctx->_M_work_count; }
107181254a7Smrg      void on_work_finished() const noexcept { --_M_ctx->_M_work_count; }
108181254a7Smrg
109181254a7Smrg      template<typename _Func, typename _ProtoAllocator>
110181254a7Smrg	void
111181254a7Smrg	dispatch(_Func&& __f, const _ProtoAllocator& __a) const
112181254a7Smrg	{
113181254a7Smrg	  if (running_in_this_thread())
114181254a7Smrg	    decay_t<_Func>{std::forward<_Func>(__f)}();
115181254a7Smrg	  else
116181254a7Smrg	    post(std::forward<_Func>(__f), __a);
117181254a7Smrg	}
118181254a7Smrg
119181254a7Smrg      template<typename _Func, typename _ProtoAllocator>
120181254a7Smrg	void
121181254a7Smrg	post(_Func&& __f, const _ProtoAllocator& __a) const
122181254a7Smrg	{
123*b1e83836Smrg	  lock_guard<execution_context::mutex_type> __lock(_M_ctx->_M_mtx);
124181254a7Smrg	  // TODO (re-use functionality in system_context)
125181254a7Smrg	  _M_ctx->_M_reactor._M_notify();
126181254a7Smrg	}
127181254a7Smrg
128181254a7Smrg      template<typename _Func, typename _ProtoAllocator>
129181254a7Smrg	void
130181254a7Smrg	defer(_Func&& __f, const _ProtoAllocator& __a) const
131181254a7Smrg	{ post(std::forward<_Func>(__f), __a); }
132181254a7Smrg
133181254a7Smrg    private:
134181254a7Smrg      friend io_context;
135181254a7Smrg
136181254a7Smrg      explicit
137181254a7Smrg      executor_type(io_context& __ctx) : _M_ctx(std::addressof(__ctx)) { }
138181254a7Smrg
139181254a7Smrg      io_context* _M_ctx;
140181254a7Smrg    };
141181254a7Smrg
142181254a7Smrg    using count_type =  size_t;
143181254a7Smrg
144181254a7Smrg    // construct / copy / destroy:
145181254a7Smrg
146181254a7Smrg    io_context() : _M_work_count(0) { }
147181254a7Smrg
148181254a7Smrg    explicit
149181254a7Smrg    io_context(int __concurrency_hint) : _M_work_count(0) { }
150181254a7Smrg
151181254a7Smrg    io_context(const io_context&) = delete;
152181254a7Smrg    io_context& operator=(const io_context&) = delete;
153181254a7Smrg
154181254a7Smrg    // io_context operations:
155181254a7Smrg
156181254a7Smrg    executor_type get_executor() noexcept { return executor_type(*this); }
157181254a7Smrg
158181254a7Smrg    count_type
159181254a7Smrg    run()
160181254a7Smrg    {
161181254a7Smrg      count_type __n = 0;
162181254a7Smrg      while (run_one())
163181254a7Smrg	if (__n != numeric_limits<count_type>::max())
164181254a7Smrg	  ++__n;
165181254a7Smrg      return __n;
166181254a7Smrg    }
167181254a7Smrg
168181254a7Smrg    template<typename _Rep, typename _Period>
169181254a7Smrg      count_type
170181254a7Smrg      run_for(const chrono::duration<_Rep, _Period>& __rel_time)
171181254a7Smrg      { return run_until(chrono::steady_clock::now() + __rel_time); }
172181254a7Smrg
173181254a7Smrg    template<typename _Clock, typename _Duration>
174181254a7Smrg      count_type
175181254a7Smrg      run_until(const chrono::time_point<_Clock, _Duration>& __abs_time)
176181254a7Smrg      {
177181254a7Smrg	count_type __n = 0;
178181254a7Smrg	while (run_one_until(__abs_time))
179181254a7Smrg	  if (__n != numeric_limits<count_type>::max())
180181254a7Smrg	    ++__n;
181181254a7Smrg	return __n;
182181254a7Smrg      }
183181254a7Smrg
184181254a7Smrg    count_type
185181254a7Smrg    run_one()
186181254a7Smrg    { return _M_do_one(chrono::milliseconds{-1}); }
187181254a7Smrg
188181254a7Smrg    template<typename _Rep, typename _Period>
189181254a7Smrg      count_type
190181254a7Smrg      run_one_for(const chrono::duration<_Rep, _Period>& __rel_time)
191181254a7Smrg      { return run_one_until(chrono::steady_clock::now() + __rel_time); }
192181254a7Smrg
193181254a7Smrg    template<typename _Clock, typename _Duration>
194181254a7Smrg      count_type
195181254a7Smrg      run_one_until(const chrono::time_point<_Clock, _Duration>& __abs_time)
196181254a7Smrg      {
197181254a7Smrg	auto __now = _Clock::now();
198181254a7Smrg	while (__now < __abs_time)
199181254a7Smrg	  {
200181254a7Smrg	    using namespace std::chrono;
201181254a7Smrg	    auto __ms = duration_cast<milliseconds>(__abs_time - __now);
202181254a7Smrg	    if (_M_do_one(__ms))
203181254a7Smrg	      return 1;
204181254a7Smrg	    __now = _Clock::now();
205181254a7Smrg	  }
206181254a7Smrg	return 0;
207181254a7Smrg      }
208181254a7Smrg
209181254a7Smrg    count_type
210181254a7Smrg    poll()
211181254a7Smrg    {
212181254a7Smrg      count_type __n = 0;
213181254a7Smrg      while (poll_one())
214181254a7Smrg	if (__n != numeric_limits<count_type>::max())
215181254a7Smrg	  ++__n;
216181254a7Smrg      return __n;
217181254a7Smrg    }
218181254a7Smrg
219181254a7Smrg    count_type
220181254a7Smrg    poll_one()
221181254a7Smrg    { return _M_do_one(chrono::milliseconds{0}); }
222181254a7Smrg
223181254a7Smrg    void stop()
224181254a7Smrg    {
225*b1e83836Smrg      lock_guard<execution_context::mutex_type> __lock(_M_mtx);
226181254a7Smrg      _M_stopped = true;
227181254a7Smrg      _M_reactor._M_notify();
228181254a7Smrg    }
229181254a7Smrg
230181254a7Smrg    bool stopped() const noexcept
231181254a7Smrg    {
232*b1e83836Smrg      lock_guard<execution_context::mutex_type> __lock(_M_mtx);
233181254a7Smrg      return _M_stopped;
234181254a7Smrg    }
235181254a7Smrg
236181254a7Smrg    void restart()
237181254a7Smrg    {
238181254a7Smrg      _M_stopped = false;
239181254a7Smrg    }
240181254a7Smrg
241181254a7Smrg  private:
242181254a7Smrg
243181254a7Smrg    template<typename _Clock, typename _WaitTraits>
244181254a7Smrg      friend class basic_waitable_timer;
245181254a7Smrg
246181254a7Smrg    friend __socket_impl;
247181254a7Smrg
248181254a7Smrg    template<typename _Protocol>
249181254a7Smrg      friend class __basic_socket_impl;
250181254a7Smrg
251181254a7Smrg    template<typename _Protocol>
252181254a7Smrg      friend class basic_socket;
253181254a7Smrg
254181254a7Smrg    template<typename _Protocol>
255181254a7Smrg      friend class basic_datagram_socket;
256181254a7Smrg
257181254a7Smrg    template<typename _Protocol>
258181254a7Smrg      friend class basic_stream_socket;
259181254a7Smrg
260181254a7Smrg    template<typename _Protocol>
261181254a7Smrg      friend class basic_socket_acceptor;
262181254a7Smrg
263181254a7Smrg    count_type
264181254a7Smrg    _M_outstanding_work() const
265181254a7Smrg    { return _M_work_count + !_M_ops.empty(); }
266181254a7Smrg
267181254a7Smrg    struct __timer_queue_base : execution_context::service
268181254a7Smrg    {
269181254a7Smrg      // return milliseconds until next timer expires, or milliseconds::max()
270181254a7Smrg      virtual chrono::milliseconds _M_next() const = 0;
271181254a7Smrg      virtual bool run_one() = 0;
272181254a7Smrg
273181254a7Smrg    protected:
274181254a7Smrg      explicit
275181254a7Smrg      __timer_queue_base(execution_context& __ctx) : service(__ctx)
276181254a7Smrg      {
277181254a7Smrg	auto& __ioc = static_cast<io_context&>(__ctx);
278*b1e83836Smrg	lock_guard<execution_context::mutex_type> __lock(__ioc._M_mtx);
279181254a7Smrg	__ioc._M_timers.push_back(this);
280181254a7Smrg      }
281181254a7Smrg
282*b1e83836Smrg      mutable execution_context::mutex_type _M_qmtx;
283181254a7Smrg    };
284181254a7Smrg
285181254a7Smrg    template<typename _Timer, typename _Key = typename _Timer::_Key>
286181254a7Smrg      struct __timer_queue : __timer_queue_base
287181254a7Smrg      {
288181254a7Smrg	using key_type = __timer_queue;
289181254a7Smrg
290181254a7Smrg	explicit
291181254a7Smrg	__timer_queue(execution_context& __ctx) : __timer_queue_base(__ctx)
292181254a7Smrg	{ }
293181254a7Smrg
294181254a7Smrg	void shutdown() noexcept { }
295181254a7Smrg
296181254a7Smrg	io_context& context() noexcept
297181254a7Smrg	{ return static_cast<io_context&>(service::context()); }
298181254a7Smrg
299181254a7Smrg	// Start an asynchronous wait.
300181254a7Smrg	void
301181254a7Smrg	push(const _Timer& __t, function<void(error_code)> __h)
302181254a7Smrg	{
303181254a7Smrg	  context().get_executor().on_work_started();
304*b1e83836Smrg	  lock_guard<execution_context::mutex_type> __lock(_M_qmtx);
305181254a7Smrg	  _M_queue.emplace(__t, _M_next_id++, std::move(__h));
306181254a7Smrg	  // no need to notify reactor unless this timer went to the front?
307181254a7Smrg	}
308181254a7Smrg
309181254a7Smrg	// Cancel all outstanding waits for __t
310181254a7Smrg	size_t
311181254a7Smrg	cancel(const _Timer& __t)
312181254a7Smrg	{
313*b1e83836Smrg	  lock_guard<execution_context::mutex_type> __lock(_M_qmtx);
314181254a7Smrg	  size_t __count = 0;
315181254a7Smrg	  auto __last = _M_queue.end();
316181254a7Smrg	  for (auto __it = _M_queue.begin(), __end = __last; __it != __end;
317181254a7Smrg	      ++__it)
318181254a7Smrg	    {
319181254a7Smrg	      if (__it->_M_key == __t._M_key.get())
320181254a7Smrg		{
321181254a7Smrg		  __it->cancel();
322181254a7Smrg		  __last = __it;
323181254a7Smrg		  ++__count;
324181254a7Smrg		}
325181254a7Smrg	    }
326181254a7Smrg	  if (__count)
327181254a7Smrg	    _M_queue._M_sort_to(__last);
328181254a7Smrg	  return __count;
329181254a7Smrg	}
330181254a7Smrg
331181254a7Smrg	// Cancel oldest outstanding wait for __t
332181254a7Smrg	bool
333181254a7Smrg	cancel_one(const _Timer& __t)
334181254a7Smrg	{
335*b1e83836Smrg	  lock_guard<execution_context::mutex_type> __lock(_M_qmtx);
336181254a7Smrg	  const auto __end = _M_queue.end();
337181254a7Smrg	  auto __oldest = __end;
338181254a7Smrg	  for (auto __it = _M_queue.begin(); __it != __end; ++__it)
339181254a7Smrg	    if (__it->_M_key == __t._M_key.get())
340181254a7Smrg	      if (__oldest == __end || __it->_M_id < __oldest->_M_id)
341181254a7Smrg		__oldest = __it;
342181254a7Smrg	  if (__oldest == __end)
343181254a7Smrg	    return false;
344181254a7Smrg	  __oldest->cancel();
345181254a7Smrg	  _M_queue._M_sort_to(__oldest);
346181254a7Smrg	  return true;
347181254a7Smrg	}
348181254a7Smrg
349181254a7Smrg	chrono::milliseconds
350181254a7Smrg	_M_next() const override
351181254a7Smrg	{
352181254a7Smrg	  typename _Timer::time_point __exp;
353181254a7Smrg	  {
354*b1e83836Smrg	    lock_guard<execution_context::mutex_type> __lock(_M_qmtx);
355181254a7Smrg	    if (_M_queue.empty())
356181254a7Smrg	      return chrono::milliseconds::max();  // no pending timers
357181254a7Smrg	    if (_M_queue.top()._M_key == nullptr)
358181254a7Smrg	      return chrono::milliseconds::zero(); // cancelled, run now
359181254a7Smrg	    __exp = _M_queue.top()._M_expiry;
360181254a7Smrg	  }
361181254a7Smrg	  auto __dur = _Timer::traits_type::to_wait_duration(__exp);
362181254a7Smrg	  if (__dur < __dur.zero())
363181254a7Smrg	    __dur = __dur.zero();
364181254a7Smrg	  return chrono::duration_cast<chrono::milliseconds>(__dur);
365181254a7Smrg	}
366181254a7Smrg
367181254a7Smrg      private:
368181254a7Smrg
369181254a7Smrg	bool run_one() override
370181254a7Smrg	{
371181254a7Smrg	  auto __now = _Timer::clock_type::now();
372181254a7Smrg	  function<void(error_code)> __h;
373181254a7Smrg	  error_code __ec;
374181254a7Smrg	  {
375*b1e83836Smrg	    lock_guard<execution_context::mutex_type> __lock(_M_qmtx);
376181254a7Smrg
377181254a7Smrg	    if (_M_queue.top()._M_key == nullptr) // cancelled
378181254a7Smrg	      {
379181254a7Smrg		__h = std::move(_M_queue.top()._M_h);
380181254a7Smrg		__ec = std::make_error_code(errc::operation_canceled);
381181254a7Smrg		_M_queue.pop();
382181254a7Smrg	      }
383181254a7Smrg	    else if (_M_queue.top()._M_expiry <= _Timer::clock_type::now())
384181254a7Smrg	      {
385181254a7Smrg		__h = std::move(_M_queue.top()._M_h);
386181254a7Smrg		_M_queue.pop();
387181254a7Smrg	      }
388181254a7Smrg	  }
389181254a7Smrg	  if (__h)
390181254a7Smrg	    {
391181254a7Smrg	      __h(__ec);
392181254a7Smrg	      context().get_executor().on_work_finished();
393181254a7Smrg	      return true;
394181254a7Smrg	    }
395181254a7Smrg	  return false;
396181254a7Smrg	}
397181254a7Smrg
398181254a7Smrg	using __timer_id_type = uint64_t;
399181254a7Smrg
400181254a7Smrg	struct __pending_timer
401181254a7Smrg	{
402181254a7Smrg	  __pending_timer(const _Timer& __t, uint64_t __id,
403181254a7Smrg			  function<void(error_code)> __h)
404181254a7Smrg	  : _M_expiry(__t.expiry()), _M_key(__t._M_key.get()), _M_id(__id),
405181254a7Smrg	    _M_h(std::move(__h))
406181254a7Smrg	  { }
407181254a7Smrg
408181254a7Smrg	  typename _Timer::time_point _M_expiry;
409181254a7Smrg	  _Key* _M_key;
410181254a7Smrg	  __timer_id_type _M_id;
411181254a7Smrg	  function<void(error_code)> _M_h;
412181254a7Smrg
413181254a7Smrg	  void cancel() { _M_expiry = _M_expiry.min(); _M_key = nullptr; }
414181254a7Smrg
415181254a7Smrg	  bool
416181254a7Smrg	  operator<(const __pending_timer& __rhs) const
417181254a7Smrg	  { return _M_expiry < __rhs._M_expiry; }
418181254a7Smrg	};
419181254a7Smrg
420181254a7Smrg	struct __queue : priority_queue<__pending_timer>
421181254a7Smrg	{
422181254a7Smrg	  using iterator =
423181254a7Smrg	    typename priority_queue<__pending_timer>::container_type::iterator;
424181254a7Smrg
425181254a7Smrg	  // expose begin/end/erase for direct access to underlying container
426181254a7Smrg	  iterator begin() { return this->c.begin(); }
427181254a7Smrg	  iterator end() { return this->c.end(); }
428181254a7Smrg	  iterator erase(iterator __it) { return this->c.erase(__it); }
429181254a7Smrg
430181254a7Smrg	  void
431181254a7Smrg	  _M_sort_to(iterator __it)
432181254a7Smrg	  { std::stable_sort(this->c.begin(), ++__it); }
433181254a7Smrg	};
434181254a7Smrg
435181254a7Smrg	__queue	_M_queue;
436181254a7Smrg	__timer_id_type _M_next_id = 0;
437181254a7Smrg      };
438181254a7Smrg
439181254a7Smrg    template<typename _Timer, typename _CompletionHandler>
440181254a7Smrg      void
441181254a7Smrg      async_wait(const _Timer& __timer, _CompletionHandler&& __h)
442181254a7Smrg      {
443181254a7Smrg	auto& __queue = use_service<__timer_queue<_Timer>>(*this);
444181254a7Smrg	__queue.push(__timer, std::move(__h));
445181254a7Smrg	_M_reactor._M_notify();
446181254a7Smrg      }
447181254a7Smrg
448181254a7Smrg    // Cancel all wait operations initiated by __timer.
449181254a7Smrg    template<typename _Timer>
450181254a7Smrg      size_t
451181254a7Smrg      cancel(const _Timer& __timer)
452181254a7Smrg      {
453181254a7Smrg	if (!has_service<__timer_queue<_Timer>>(*this))
454181254a7Smrg	  return 0;
455181254a7Smrg
456181254a7Smrg	auto __c = use_service<__timer_queue<_Timer>>(*this).cancel(__timer);
457181254a7Smrg	if (__c != 0)
458181254a7Smrg	  _M_reactor._M_notify();
459181254a7Smrg	return __c;
460181254a7Smrg      }
461181254a7Smrg
462181254a7Smrg    // Cancel the oldest wait operation initiated by __timer.
463181254a7Smrg    template<typename _Timer>
464181254a7Smrg      size_t
465181254a7Smrg      cancel_one(const _Timer& __timer)
466181254a7Smrg      {
467181254a7Smrg	if (!has_service<__timer_queue<_Timer>>(*this))
468181254a7Smrg	  return 0;
469181254a7Smrg
470181254a7Smrg	if (use_service<__timer_queue<_Timer>>(*this).cancel_one(__timer))
471181254a7Smrg	  {
472181254a7Smrg	    _M_reactor._M_notify();
473181254a7Smrg	    return 1;
474181254a7Smrg	  }
475181254a7Smrg	return 0;
476181254a7Smrg      }
477181254a7Smrg
478*b1e83836Smrg    // The caller must know what the wait-type __w will be interpreted.
479*b1e83836Smrg    // In the current implementation the reactor is based on <poll.h>
480*b1e83836Smrg    // so the parameter must be one of POLLIN, POLLOUT or POLLERR.
481181254a7Smrg    template<typename _Op>
482181254a7Smrg      void
483181254a7Smrg      async_wait(int __fd, int __w, _Op&& __op)
484181254a7Smrg      {
485*b1e83836Smrg	lock_guard<execution_context::mutex_type> __lock(_M_mtx);
486181254a7Smrg	// TODO need push_back, use std::list not std::forward_list
487181254a7Smrg	auto __tail = _M_ops.before_begin(), __it = _M_ops.begin();
488181254a7Smrg	while (__it != _M_ops.end())
489181254a7Smrg	  {
490181254a7Smrg	    ++__it;
491181254a7Smrg	    ++__tail;
492181254a7Smrg	  }
493181254a7Smrg	using __type = __async_operation_impl<_Op>;
494181254a7Smrg	_M_ops.emplace_after(__tail,
495181254a7Smrg			     make_unique<__type>(std::move(__op), __fd, __w));
496181254a7Smrg	_M_reactor._M_fd_interest(__fd, __w);
497181254a7Smrg      }
498181254a7Smrg
499181254a7Smrg    void _M_add_fd(int __fd) { _M_reactor._M_add_fd(__fd); }
500181254a7Smrg    void _M_remove_fd(int __fd) { _M_reactor._M_remove_fd(__fd); }
501181254a7Smrg
502181254a7Smrg    void cancel(int __fd, error_code&)
503181254a7Smrg    {
504*b1e83836Smrg      lock_guard<execution_context::mutex_type> __lock(_M_mtx);
505181254a7Smrg      const auto __end = _M_ops.end();
506181254a7Smrg      auto __it = _M_ops.begin();
507181254a7Smrg      auto __prev = _M_ops.before_begin();
508181254a7Smrg      while (__it != __end && (*__it)->_M_is_cancelled())
509181254a7Smrg	{
510181254a7Smrg	  ++__it;
511181254a7Smrg	  ++__prev;
512181254a7Smrg	}
513181254a7Smrg      auto __cancelled = __prev;
514181254a7Smrg      while (__it != __end)
515181254a7Smrg	{
516181254a7Smrg	  if ((*__it)->_M_fd == __fd)
517181254a7Smrg	    {
518181254a7Smrg	      (*__it)->cancel();
519181254a7Smrg	      ++__it;
520181254a7Smrg	      _M_ops.splice_after(__cancelled, _M_ops, __prev);
521181254a7Smrg	      ++__cancelled;
522181254a7Smrg	    }
523181254a7Smrg	  else
524181254a7Smrg	    {
525181254a7Smrg	      ++__it;
526181254a7Smrg	      ++__prev;
527181254a7Smrg	    }
528181254a7Smrg	}
529181254a7Smrg      _M_reactor._M_not_interested(__fd);
530181254a7Smrg    }
531181254a7Smrg
532181254a7Smrg    struct __async_operation
533181254a7Smrg    {
534181254a7Smrg      __async_operation(int __fd, int __ev) : _M_fd(__fd), _M_ev(__ev) { }
535181254a7Smrg
536181254a7Smrg      virtual ~__async_operation() = default;
537181254a7Smrg
538181254a7Smrg      int _M_fd;
539181254a7Smrg      short _M_ev;
540181254a7Smrg
541181254a7Smrg      void cancel() { _M_fd = -1; }
542181254a7Smrg      bool _M_is_cancelled() const { return _M_fd == -1; }
543181254a7Smrg      virtual void run(io_context&) = 0;
544181254a7Smrg    };
545181254a7Smrg
546181254a7Smrg    template<typename _Op>
547181254a7Smrg      struct __async_operation_impl : __async_operation
548181254a7Smrg      {
549181254a7Smrg	__async_operation_impl(_Op&& __op, int __fd, int __ev)
550181254a7Smrg	: __async_operation{__fd, __ev}, _M_op(std::move(__op)) { }
551181254a7Smrg
552181254a7Smrg	_Op _M_op;
553181254a7Smrg
554181254a7Smrg	void run(io_context& __ctx)
555181254a7Smrg	{
556181254a7Smrg	  if (_M_is_cancelled())
557181254a7Smrg	    _M_op(std::make_error_code(errc::operation_canceled));
558181254a7Smrg	  else
559181254a7Smrg	    _M_op(error_code{});
560181254a7Smrg	}
561181254a7Smrg      };
562181254a7Smrg
563181254a7Smrg    atomic<count_type>		_M_work_count;
564*b1e83836Smrg    mutable execution_context::mutex_type		_M_mtx;
565181254a7Smrg    queue<function<void()>>	_M_op;
566181254a7Smrg    bool			_M_stopped = false;
567181254a7Smrg
568181254a7Smrg    struct __monitor
569181254a7Smrg    {
570181254a7Smrg      __monitor(io_context& __c) : _M_ctx(__c)
571181254a7Smrg      {
572*b1e83836Smrg#ifdef _GLIBCXX_HAS_GTHREADS
573*b1e83836Smrg	lock_guard<execution_context::mutex_type> __lock(_M_ctx._M_mtx);
574181254a7Smrg	_M_ctx._M_call_stack.push_back(this_thread::get_id());
575*b1e83836Smrg#else
576*b1e83836Smrg	_M_ctx._M_run_count++;
577*b1e83836Smrg#endif
578181254a7Smrg      }
579181254a7Smrg
580181254a7Smrg      ~__monitor()
581181254a7Smrg      {
582*b1e83836Smrg#ifdef _GLIBCXX_HAS_GTHREADS
583*b1e83836Smrg	lock_guard<execution_context::mutex_type> __lock(_M_ctx._M_mtx);
584181254a7Smrg	_M_ctx._M_call_stack.pop_back();
585*b1e83836Smrg#else
586*b1e83836Smrg	_M_ctx._M_run_count--;
587*b1e83836Smrg#endif
588181254a7Smrg	if (_M_ctx._M_outstanding_work() == 0)
589181254a7Smrg	  {
590181254a7Smrg	    _M_ctx._M_stopped = true;
591181254a7Smrg	    _M_ctx._M_reactor._M_notify();
592181254a7Smrg	  }
593181254a7Smrg      }
594181254a7Smrg
595181254a7Smrg      __monitor(__monitor&&) = delete;
596181254a7Smrg
597181254a7Smrg      io_context& _M_ctx;
598181254a7Smrg    };
599181254a7Smrg
600181254a7Smrg    bool
601181254a7Smrg    _M_do_one(chrono::milliseconds __timeout)
602181254a7Smrg    {
603181254a7Smrg      const bool __block = __timeout != chrono::milliseconds::zero();
604181254a7Smrg
605181254a7Smrg      __reactor::__fdvec __fds;
606181254a7Smrg
607181254a7Smrg      __monitor __mon{*this};
608181254a7Smrg
609181254a7Smrg      __timer_queue_base* __timerq = nullptr;
610181254a7Smrg      unique_ptr<__async_operation> __async_op;
611181254a7Smrg
612181254a7Smrg      while (true)
613181254a7Smrg	{
614181254a7Smrg	  if (__timerq)
615181254a7Smrg	    {
616181254a7Smrg	      if (__timerq->run_one())
617181254a7Smrg		return true;
618181254a7Smrg	      else
619181254a7Smrg		__timerq = nullptr;
620181254a7Smrg	    }
621181254a7Smrg
622181254a7Smrg	  if (__async_op)
623181254a7Smrg	    {
624181254a7Smrg	      __async_op->run(*this);
625181254a7Smrg	      // TODO need to unregister __async_op
626181254a7Smrg	      return true;
627181254a7Smrg	    }
628181254a7Smrg
629181254a7Smrg	  chrono::milliseconds __ms{0};
630181254a7Smrg
631181254a7Smrg	  {
632*b1e83836Smrg	    lock_guard<execution_context::mutex_type> __lock(_M_mtx);
633181254a7Smrg
634181254a7Smrg	    if (_M_stopped)
635181254a7Smrg	      return false;
636181254a7Smrg
637181254a7Smrg	    // find first timer with something to do
638181254a7Smrg	    for (auto __q : _M_timers)
639181254a7Smrg	      {
640181254a7Smrg		auto __next = __q->_M_next();
641181254a7Smrg		if (__next == __next.zero())  // ready to run immediately
642181254a7Smrg		  {
643181254a7Smrg		    __timerq = __q;
644181254a7Smrg		    __ms = __next;
645181254a7Smrg		    break;
646181254a7Smrg		  }
647181254a7Smrg		else if (__next != __next.max() && __block
648181254a7Smrg		    && (__next < __ms || __timerq == nullptr))
649181254a7Smrg		  {
650181254a7Smrg		    __timerq = __q;
651181254a7Smrg		    __ms = __next;
652181254a7Smrg		  }
653181254a7Smrg	      }
654181254a7Smrg
655181254a7Smrg	    if (__timerq && __ms == __ms.zero())
656181254a7Smrg	      continue;  // restart loop to run a timer immediately
657181254a7Smrg
658181254a7Smrg	    if (!_M_ops.empty() && _M_ops.front()->_M_is_cancelled())
659181254a7Smrg	      {
660181254a7Smrg		_M_ops.front().swap(__async_op);
661181254a7Smrg		_M_ops.pop_front();
662181254a7Smrg		continue;
663181254a7Smrg	      }
664181254a7Smrg
665181254a7Smrg	    // TODO run any posted items
666181254a7Smrg
667181254a7Smrg	    if (__block)
668181254a7Smrg	      {
669181254a7Smrg		if (__timerq == nullptr)
670181254a7Smrg		  __ms = __timeout;
671181254a7Smrg		else if (__ms.zero() <= __timeout && __timeout < __ms)
672181254a7Smrg		  __ms = __timeout;
673181254a7Smrg		else if (__ms.count() > numeric_limits<int>::max())
674181254a7Smrg		  __ms = chrono::milliseconds{numeric_limits<int>::max()};
675181254a7Smrg	      }
676181254a7Smrg	    // else __ms == 0 and poll() will return immediately
677181254a7Smrg
678181254a7Smrg	  }
679181254a7Smrg
680181254a7Smrg	  auto __res = _M_reactor.wait(__fds, __ms);
681181254a7Smrg
682181254a7Smrg	  if (__res == __reactor::_S_retry)
683181254a7Smrg	    continue;
684181254a7Smrg
685181254a7Smrg	  if (__res == __reactor::_S_timeout)
686fb8a8121Smrg	    {
687181254a7Smrg	      if (__timerq == nullptr)
688181254a7Smrg		return false;
689181254a7Smrg	      else
690181254a7Smrg		continue;  // timed out, so restart loop and process the timer
691fb8a8121Smrg	    }
692181254a7Smrg
693181254a7Smrg	  __timerq = nullptr;
694181254a7Smrg
695181254a7Smrg	  if (__fds.empty()) // nothing to do
696181254a7Smrg	    return false;
697181254a7Smrg
698*b1e83836Smrg	  lock_guard<execution_context::mutex_type> __lock(_M_mtx);
699181254a7Smrg	  for (auto __it = _M_ops.begin(), __end = _M_ops.end(),
700181254a7Smrg	      __prev = _M_ops.before_begin(); __it != __end; ++__it, ++__prev)
701181254a7Smrg	    {
702181254a7Smrg	      auto& __op = **__it;
703181254a7Smrg	      auto __pos = std::lower_bound(__fds.begin(), __fds.end(),
704181254a7Smrg		  __op._M_fd,
705181254a7Smrg		  [](const auto& __p, int __fd) { return __p.fd < __fd; });
706181254a7Smrg	      if (__pos != __fds.end() && __pos->fd == __op._M_fd
707181254a7Smrg		  && __pos->revents & __op._M_ev)
708181254a7Smrg		{
709181254a7Smrg		  __it->swap(__async_op);
710181254a7Smrg		  _M_ops.erase_after(__prev);
711181254a7Smrg		  break;  // restart loop and run op
712181254a7Smrg		}
713181254a7Smrg	    }
714181254a7Smrg	}
715181254a7Smrg    }
716181254a7Smrg
717181254a7Smrg    struct __reactor
718181254a7Smrg    {
719*b1e83836Smrg#ifdef _GLIBCXX_HAVE_POLL_H
720181254a7Smrg      __reactor() : _M_fds(1)
721181254a7Smrg      {
722181254a7Smrg	int __pipe[2];
723181254a7Smrg	if (::pipe(__pipe) == -1)
724181254a7Smrg	  __throw_system_error(errno);
725181254a7Smrg	if (::fcntl(__pipe[0], F_SETFL, O_NONBLOCK) == -1
726181254a7Smrg	    || ::fcntl(__pipe[1], F_SETFL, O_NONBLOCK) == -1)
727181254a7Smrg	  {
728181254a7Smrg	    int __e = errno;
729181254a7Smrg	    ::close(__pipe[0]);
730181254a7Smrg	    ::close(__pipe[1]);
731181254a7Smrg	    __throw_system_error(__e);
732181254a7Smrg	  }
733181254a7Smrg	_M_fds.back().events	= POLLIN;
734181254a7Smrg	_M_fds.back().fd	= __pipe[0];
735181254a7Smrg	_M_notify_wr		= __pipe[1];
736181254a7Smrg      }
737181254a7Smrg
738181254a7Smrg      ~__reactor()
739181254a7Smrg      {
740181254a7Smrg	::close(_M_fds.back().fd);
741181254a7Smrg	::close(_M_notify_wr);
742181254a7Smrg      }
743*b1e83836Smrg#endif
744181254a7Smrg
745181254a7Smrg      // write a notification byte to the pipe (ignoring errors)
746181254a7Smrg      void _M_notify()
747181254a7Smrg      {
748181254a7Smrg	int __n;
749181254a7Smrg	do {
750181254a7Smrg	  __n = ::write(_M_notify_wr, "", 1);
751181254a7Smrg	} while (__n == -1 && errno == EINTR);
752181254a7Smrg      }
753181254a7Smrg
754181254a7Smrg      // read all notification bytes from the pipe
755181254a7Smrg      void _M_on_notify()
756181254a7Smrg      {
757181254a7Smrg	// Drain the pipe.
758181254a7Smrg	char __buf[64];
759181254a7Smrg	ssize_t __n;
760181254a7Smrg	do {
761181254a7Smrg	  __n = ::read(_M_fds.back().fd, __buf, sizeof(__buf));
762181254a7Smrg	} while (__n != -1 || errno == EINTR);
763181254a7Smrg      }
764181254a7Smrg
765181254a7Smrg      void
766181254a7Smrg      _M_add_fd(int __fd)
767181254a7Smrg      {
768181254a7Smrg	auto __pos = _M_lower_bound(__fd);
769181254a7Smrg	if (__pos->fd == __fd)
770181254a7Smrg	  __throw_system_error((int)errc::invalid_argument);
771181254a7Smrg	_M_fds.insert(__pos, __fdvec::value_type{})->fd = __fd;
772181254a7Smrg	_M_notify();
773181254a7Smrg      }
774181254a7Smrg
775181254a7Smrg      void
776181254a7Smrg      _M_remove_fd(int __fd)
777181254a7Smrg      {
778181254a7Smrg	auto __pos = _M_lower_bound(__fd);
779181254a7Smrg	if (__pos->fd == __fd)
780181254a7Smrg	  _M_fds.erase(__pos);
781181254a7Smrg	// else bug!
782181254a7Smrg	_M_notify();
783181254a7Smrg      }
784181254a7Smrg
785181254a7Smrg      void
786181254a7Smrg      _M_fd_interest(int __fd, int __w)
787181254a7Smrg      {
788181254a7Smrg	auto __pos = _M_lower_bound(__fd);
789181254a7Smrg	if (__pos->fd == __fd)
790181254a7Smrg	  __pos->events |= __w;
791181254a7Smrg	// else bug!
792181254a7Smrg	_M_notify();
793181254a7Smrg      }
794181254a7Smrg
795181254a7Smrg      void
796181254a7Smrg      _M_not_interested(int __fd)
797181254a7Smrg      {
798181254a7Smrg	auto __pos = _M_lower_bound(__fd);
799181254a7Smrg	if (__pos->fd == __fd)
800181254a7Smrg	  __pos->events = 0;
801181254a7Smrg	_M_notify();
802181254a7Smrg      }
803181254a7Smrg
804181254a7Smrg#ifdef _GLIBCXX_HAVE_POLL_H
805181254a7Smrg      using __fdvec = vector<::pollfd>;
806*b1e83836Smrg#else
807*b1e83836Smrg      struct dummy_pollfd { int fd = -1; short events = 0, revents = 0; };
808*b1e83836Smrg      using __fdvec = vector<dummy_pollfd>;
809*b1e83836Smrg#endif
810181254a7Smrg
811181254a7Smrg      // Find first element p such that !(p.fd < __fd)
812181254a7Smrg      // N.B. always returns a dereferencable iterator.
813181254a7Smrg      __fdvec::iterator
814181254a7Smrg      _M_lower_bound(int __fd)
815181254a7Smrg      {
816181254a7Smrg	return std::lower_bound(_M_fds.begin(), _M_fds.end() - 1,
817181254a7Smrg	    __fd, [](const auto& __p, int __fd) { return __p.fd < __fd; });
818181254a7Smrg      }
819181254a7Smrg
820181254a7Smrg      enum __status { _S_retry, _S_timeout, _S_ok, _S_error };
821181254a7Smrg
822181254a7Smrg      __status
823181254a7Smrg      wait(__fdvec& __fds, chrono::milliseconds __timeout)
824181254a7Smrg      {
825*b1e83836Smrg#ifdef _GLIBCXX_HAVE_POLL_H
826181254a7Smrg	// XXX not thread-safe!
827181254a7Smrg	__fds = _M_fds;  // take snapshot to pass to poll()
828181254a7Smrg
829181254a7Smrg	int __res = ::poll(__fds.data(), __fds.size(), __timeout.count());
830181254a7Smrg
831181254a7Smrg	if (__res == -1)
832181254a7Smrg	  {
833181254a7Smrg	    __fds.clear();
834181254a7Smrg	    if (errno == EINTR)
835181254a7Smrg	      return _S_retry;
836181254a7Smrg	    return _S_error; // XXX ???
837181254a7Smrg	  }
838181254a7Smrg	else if (__res == 0)
839181254a7Smrg	  {
840181254a7Smrg	    __fds.clear();
841181254a7Smrg	    return _S_timeout;
842181254a7Smrg	  }
843181254a7Smrg	else if (__fds.back().revents != 0) // something changed, restart
844181254a7Smrg	  {
845181254a7Smrg	    __fds.clear();
846181254a7Smrg	    _M_on_notify();
847181254a7Smrg	    return _S_retry;
848181254a7Smrg	  }
849181254a7Smrg
850181254a7Smrg	auto __part = std::stable_partition(__fds.begin(), __fds.end() - 1,
851181254a7Smrg	      [](const __fdvec::value_type& __p) { return __p.revents != 0; });
852181254a7Smrg	__fds.erase(__part, __fds.end());
853181254a7Smrg
854181254a7Smrg	return _S_ok;
855*b1e83836Smrg#else
856*b1e83836Smrg	(void) __timeout;
857*b1e83836Smrg	__fds.clear();
858*b1e83836Smrg	return _S_error;
859*b1e83836Smrg#endif
860181254a7Smrg      }
861181254a7Smrg
862181254a7Smrg      __fdvec _M_fds;	// _M_fds.back() is the read end of the self-pipe
863181254a7Smrg      int _M_notify_wr;	// write end of the self-pipe
864181254a7Smrg    };
865181254a7Smrg
866181254a7Smrg    __reactor _M_reactor;
867181254a7Smrg
868181254a7Smrg    vector<__timer_queue_base*>			_M_timers;
869181254a7Smrg    forward_list<unique_ptr<__async_operation>>	_M_ops;
870181254a7Smrg
871*b1e83836Smrg#ifdef _GLIBCXX_HAS_GTHREADS
872181254a7Smrg    vector<thread::id>	_M_call_stack;
873*b1e83836Smrg#else
874*b1e83836Smrg    int _M_run_count = 0;
875*b1e83836Smrg#endif
876181254a7Smrg  };
877181254a7Smrg
878181254a7Smrg  inline bool
879181254a7Smrg  operator==(const io_context::executor_type& __a,
880181254a7Smrg	     const io_context::executor_type& __b) noexcept
881181254a7Smrg  {
882181254a7Smrg    // https://github.com/chriskohlhoff/asio-tr2/issues/201
883181254a7Smrg    using executor_type = io_context::executor_type;
884181254a7Smrg    return std::addressof(executor_type(__a).context())
885181254a7Smrg      == std::addressof(executor_type(__b).context());
886181254a7Smrg  }
887181254a7Smrg
888181254a7Smrg  inline bool
889181254a7Smrg  operator!=(const io_context::executor_type& __a,
890181254a7Smrg	     const io_context::executor_type& __b) noexcept
891181254a7Smrg  { return !(__a == __b); }
892181254a7Smrg
893181254a7Smrg  template<> struct is_executor<io_context::executor_type> : true_type {};
894181254a7Smrg
895181254a7Smrg  /// @}
896181254a7Smrg
897181254a7Smrg} // namespace v1
898181254a7Smrg} // namespace net
899181254a7Smrg} // namespace experimental
900181254a7Smrg_GLIBCXX_END_NAMESPACE_VERSION
901181254a7Smrg} // namespace std
902181254a7Smrg
903181254a7Smrg#endif // C++14
904181254a7Smrg
905181254a7Smrg#endif // _GLIBCXX_EXPERIMENTAL_IO_SERVICE
906