xref: /netbsd-src/external/gpl3/gcc.old/dist/libstdc++-v3/include/experimental/io_context (revision 4c3eb207d36f67d31994830c0a694161fc1ca39b)
1627f7eb2Smrg// <experimental/io_service> -*- C++ -*-
2627f7eb2Smrg
3*4c3eb207Smrg// Copyright (C) 2015-2020 Free Software Foundation, Inc.
4627f7eb2Smrg//
5627f7eb2Smrg// This file is part of the GNU ISO C++ Library.  This library is free
6627f7eb2Smrg// software; you can redistribute it and/or modify it under the
7627f7eb2Smrg// terms of the GNU General Public License as published by the
8627f7eb2Smrg// Free Software Foundation; either version 3, or (at your option)
9627f7eb2Smrg// any later version.
10627f7eb2Smrg
11627f7eb2Smrg// This library is distributed in the hope that it will be useful,
12627f7eb2Smrg// but WITHOUT ANY WARRANTY; without even the implied warranty of
13627f7eb2Smrg// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14627f7eb2Smrg// GNU General Public License for more details.
15627f7eb2Smrg
16627f7eb2Smrg// Under Section 7 of GPL version 3, you are granted additional
17627f7eb2Smrg// permissions described in the GCC Runtime Library Exception, version
18627f7eb2Smrg// 3.1, as published by the Free Software Foundation.
19627f7eb2Smrg
20627f7eb2Smrg// You should have received a copy of the GNU General Public License and
21627f7eb2Smrg// a copy of the GCC Runtime Library Exception along with this program;
22627f7eb2Smrg// see the files COPYING3 and COPYING.RUNTIME respectively.  If not, see
23627f7eb2Smrg// <http://www.gnu.org/licenses/>.
24627f7eb2Smrg
25*4c3eb207Smrg/** @file experimental/io_context
26627f7eb2Smrg *  This is a TS C++ Library header.
27*4c3eb207Smrg *  @ingroup networking-ts
28627f7eb2Smrg */
29627f7eb2Smrg
30627f7eb2Smrg#ifndef _GLIBCXX_EXPERIMENTAL_IO_SERVICE
31627f7eb2Smrg#define _GLIBCXX_EXPERIMENTAL_IO_SERVICE 1
32627f7eb2Smrg
33627f7eb2Smrg#pragma GCC system_header
34627f7eb2Smrg
35627f7eb2Smrg#if __cplusplus >= 201402L
36627f7eb2Smrg
37627f7eb2Smrg#include <atomic>
38627f7eb2Smrg#include <chrono>
39627f7eb2Smrg#include <forward_list>
40627f7eb2Smrg#include <functional>
41627f7eb2Smrg#include <system_error>
42627f7eb2Smrg#include <thread>
43627f7eb2Smrg#include <experimental/netfwd>
44627f7eb2Smrg#include <experimental/executor>
45627f7eb2Smrg#if _GLIBCXX_HAVE_UNISTD_H
46627f7eb2Smrg# include <unistd.h>
47627f7eb2Smrg#endif
48627f7eb2Smrg#ifdef _GLIBCXX_HAVE_POLL_H
49627f7eb2Smrg# include <poll.h>
50627f7eb2Smrg#endif
51627f7eb2Smrg#ifdef _GLIBCXX_HAVE_FCNTL_H
52627f7eb2Smrg# include <fcntl.h>
53627f7eb2Smrg#endif
54627f7eb2Smrg
55627f7eb2Smrgnamespace std _GLIBCXX_VISIBILITY(default)
56627f7eb2Smrg{
57627f7eb2Smrg_GLIBCXX_BEGIN_NAMESPACE_VERSION
58627f7eb2Smrgnamespace experimental
59627f7eb2Smrg{
60627f7eb2Smrgnamespace net
61627f7eb2Smrg{
62627f7eb2Smrginline namespace v1
63627f7eb2Smrg{
64627f7eb2Smrg
65*4c3eb207Smrg  /** @addtogroup networking-ts
66627f7eb2Smrg   *  @{
67627f7eb2Smrg   */
68627f7eb2Smrg
69627f7eb2Smrg  class __socket_impl;
70627f7eb2Smrg
71627f7eb2Smrg  /// An ExecutionContext for I/O operations.
72627f7eb2Smrg  class io_context : public execution_context
73627f7eb2Smrg  {
74627f7eb2Smrg  public:
75627f7eb2Smrg    // types:
76627f7eb2Smrg
77627f7eb2Smrg    /// An executor for an io_context.
78627f7eb2Smrg    class executor_type
79627f7eb2Smrg    {
80627f7eb2Smrg    public:
81627f7eb2Smrg      // construct / copy / destroy:
82627f7eb2Smrg
83627f7eb2Smrg      executor_type(const executor_type& __other) noexcept = default;
84627f7eb2Smrg      executor_type(executor_type&& __other) noexcept = default;
85627f7eb2Smrg
86627f7eb2Smrg      executor_type& operator=(const executor_type& __other) noexcept = default;
87627f7eb2Smrg      executor_type& operator=(executor_type&& __other) noexcept = default;
88627f7eb2Smrg
89627f7eb2Smrg      // executor operations:
90627f7eb2Smrg
91627f7eb2Smrg      bool running_in_this_thread() const noexcept
92627f7eb2Smrg      {
93627f7eb2Smrg	lock_guard<mutex> __lock(_M_ctx->_M_mtx);
94627f7eb2Smrg	auto __end = _M_ctx->_M_call_stack.end();
95627f7eb2Smrg	return std::find(_M_ctx->_M_call_stack.begin(), __end,
96627f7eb2Smrg			 this_thread::get_id()) != __end;
97627f7eb2Smrg      }
98627f7eb2Smrg
99627f7eb2Smrg      io_context& context() const noexcept { return *_M_ctx; }
100627f7eb2Smrg
101627f7eb2Smrg      void on_work_started() const noexcept { ++_M_ctx->_M_work_count; }
102627f7eb2Smrg      void on_work_finished() const noexcept { --_M_ctx->_M_work_count; }
103627f7eb2Smrg
104627f7eb2Smrg      template<typename _Func, typename _ProtoAllocator>
105627f7eb2Smrg	void
106627f7eb2Smrg	dispatch(_Func&& __f, const _ProtoAllocator& __a) const
107627f7eb2Smrg	{
108627f7eb2Smrg	  if (running_in_this_thread())
109627f7eb2Smrg	    decay_t<_Func>{std::forward<_Func>(__f)}();
110627f7eb2Smrg	  else
111627f7eb2Smrg	    post(std::forward<_Func>(__f), __a);
112627f7eb2Smrg	}
113627f7eb2Smrg
114627f7eb2Smrg      template<typename _Func, typename _ProtoAllocator>
115627f7eb2Smrg	void
116627f7eb2Smrg	post(_Func&& __f, const _ProtoAllocator& __a) const
117627f7eb2Smrg	{
118627f7eb2Smrg	  lock_guard<mutex> __lock(_M_ctx->_M_mtx);
119627f7eb2Smrg	  // TODO (re-use functionality in system_context)
120627f7eb2Smrg	  _M_ctx->_M_reactor._M_notify();
121627f7eb2Smrg	}
122627f7eb2Smrg
123627f7eb2Smrg      template<typename _Func, typename _ProtoAllocator>
124627f7eb2Smrg	void
125627f7eb2Smrg	defer(_Func&& __f, const _ProtoAllocator& __a) const
126627f7eb2Smrg	{ post(std::forward<_Func>(__f), __a); }
127627f7eb2Smrg
128627f7eb2Smrg    private:
129627f7eb2Smrg      friend io_context;
130627f7eb2Smrg
131627f7eb2Smrg      explicit
132627f7eb2Smrg      executor_type(io_context& __ctx) : _M_ctx(std::addressof(__ctx)) { }
133627f7eb2Smrg
134627f7eb2Smrg      io_context* _M_ctx;
135627f7eb2Smrg    };
136627f7eb2Smrg
137627f7eb2Smrg    using count_type =  size_t;
138627f7eb2Smrg
139627f7eb2Smrg    // construct / copy / destroy:
140627f7eb2Smrg
141627f7eb2Smrg    io_context() : _M_work_count(0) { }
142627f7eb2Smrg
143627f7eb2Smrg    explicit
144627f7eb2Smrg    io_context(int __concurrency_hint) : _M_work_count(0) { }
145627f7eb2Smrg
146627f7eb2Smrg    io_context(const io_context&) = delete;
147627f7eb2Smrg    io_context& operator=(const io_context&) = delete;
148627f7eb2Smrg
149627f7eb2Smrg    // io_context operations:
150627f7eb2Smrg
151627f7eb2Smrg    executor_type get_executor() noexcept { return executor_type(*this); }
152627f7eb2Smrg
153627f7eb2Smrg    count_type
154627f7eb2Smrg    run()
155627f7eb2Smrg    {
156627f7eb2Smrg      count_type __n = 0;
157627f7eb2Smrg      while (run_one())
158627f7eb2Smrg	if (__n != numeric_limits<count_type>::max())
159627f7eb2Smrg	  ++__n;
160627f7eb2Smrg      return __n;
161627f7eb2Smrg    }
162627f7eb2Smrg
163627f7eb2Smrg    template<typename _Rep, typename _Period>
164627f7eb2Smrg      count_type
165627f7eb2Smrg      run_for(const chrono::duration<_Rep, _Period>& __rel_time)
166627f7eb2Smrg      { return run_until(chrono::steady_clock::now() + __rel_time); }
167627f7eb2Smrg
168627f7eb2Smrg    template<typename _Clock, typename _Duration>
169627f7eb2Smrg      count_type
170627f7eb2Smrg      run_until(const chrono::time_point<_Clock, _Duration>& __abs_time)
171627f7eb2Smrg      {
172627f7eb2Smrg	count_type __n = 0;
173627f7eb2Smrg	while (run_one_until(__abs_time))
174627f7eb2Smrg	  if (__n != numeric_limits<count_type>::max())
175627f7eb2Smrg	    ++__n;
176627f7eb2Smrg	return __n;
177627f7eb2Smrg      }
178627f7eb2Smrg
179627f7eb2Smrg    count_type
180627f7eb2Smrg    run_one()
181627f7eb2Smrg    { return _M_do_one(chrono::milliseconds{-1}); }
182627f7eb2Smrg
183627f7eb2Smrg    template<typename _Rep, typename _Period>
184627f7eb2Smrg      count_type
185627f7eb2Smrg      run_one_for(const chrono::duration<_Rep, _Period>& __rel_time)
186627f7eb2Smrg      { return run_one_until(chrono::steady_clock::now() + __rel_time); }
187627f7eb2Smrg
188627f7eb2Smrg    template<typename _Clock, typename _Duration>
189627f7eb2Smrg      count_type
190627f7eb2Smrg      run_one_until(const chrono::time_point<_Clock, _Duration>& __abs_time)
191627f7eb2Smrg      {
192627f7eb2Smrg	auto __now = _Clock::now();
193627f7eb2Smrg	while (__now < __abs_time)
194627f7eb2Smrg	  {
195627f7eb2Smrg	    using namespace std::chrono;
196627f7eb2Smrg	    auto __ms = duration_cast<milliseconds>(__abs_time - __now);
197627f7eb2Smrg	    if (_M_do_one(__ms))
198627f7eb2Smrg	      return 1;
199627f7eb2Smrg	    __now = _Clock::now();
200627f7eb2Smrg	  }
201627f7eb2Smrg	return 0;
202627f7eb2Smrg      }
203627f7eb2Smrg
204627f7eb2Smrg    count_type
205627f7eb2Smrg    poll()
206627f7eb2Smrg    {
207627f7eb2Smrg      count_type __n = 0;
208627f7eb2Smrg      while (poll_one())
209627f7eb2Smrg	if (__n != numeric_limits<count_type>::max())
210627f7eb2Smrg	  ++__n;
211627f7eb2Smrg      return __n;
212627f7eb2Smrg    }
213627f7eb2Smrg
214627f7eb2Smrg    count_type
215627f7eb2Smrg    poll_one()
216627f7eb2Smrg    { return _M_do_one(chrono::milliseconds{0}); }
217627f7eb2Smrg
218627f7eb2Smrg    void stop()
219627f7eb2Smrg    {
220627f7eb2Smrg      lock_guard<mutex> __lock(_M_mtx);
221627f7eb2Smrg      _M_stopped = true;
222627f7eb2Smrg      _M_reactor._M_notify();
223627f7eb2Smrg    }
224627f7eb2Smrg
225627f7eb2Smrg    bool stopped() const noexcept
226627f7eb2Smrg    {
227627f7eb2Smrg      lock_guard<mutex> __lock(_M_mtx);
228627f7eb2Smrg      return _M_stopped;
229627f7eb2Smrg    }
230627f7eb2Smrg
231627f7eb2Smrg    void restart()
232627f7eb2Smrg    {
233627f7eb2Smrg      _M_stopped = false;
234627f7eb2Smrg    }
235627f7eb2Smrg
236627f7eb2Smrg  private:
237627f7eb2Smrg
238627f7eb2Smrg    template<typename _Clock, typename _WaitTraits>
239627f7eb2Smrg      friend class basic_waitable_timer;
240627f7eb2Smrg
241627f7eb2Smrg    friend __socket_impl;
242627f7eb2Smrg
243627f7eb2Smrg    template<typename _Protocol>
244627f7eb2Smrg      friend class __basic_socket_impl;
245627f7eb2Smrg
246627f7eb2Smrg    template<typename _Protocol>
247627f7eb2Smrg      friend class basic_socket;
248627f7eb2Smrg
249627f7eb2Smrg    template<typename _Protocol>
250627f7eb2Smrg      friend class basic_datagram_socket;
251627f7eb2Smrg
252627f7eb2Smrg    template<typename _Protocol>
253627f7eb2Smrg      friend class basic_stream_socket;
254627f7eb2Smrg
255627f7eb2Smrg    template<typename _Protocol>
256627f7eb2Smrg      friend class basic_socket_acceptor;
257627f7eb2Smrg
258627f7eb2Smrg    count_type
259627f7eb2Smrg    _M_outstanding_work() const
260627f7eb2Smrg    { return _M_work_count + !_M_ops.empty(); }
261627f7eb2Smrg
262627f7eb2Smrg    struct __timer_queue_base : execution_context::service
263627f7eb2Smrg    {
264627f7eb2Smrg      // return milliseconds until next timer expires, or milliseconds::max()
265627f7eb2Smrg      virtual chrono::milliseconds _M_next() const = 0;
266627f7eb2Smrg      virtual bool run_one() = 0;
267627f7eb2Smrg
268627f7eb2Smrg    protected:
269627f7eb2Smrg      explicit
270627f7eb2Smrg      __timer_queue_base(execution_context& __ctx) : service(__ctx)
271627f7eb2Smrg      {
272627f7eb2Smrg	auto& __ioc = static_cast<io_context&>(__ctx);
273627f7eb2Smrg	lock_guard<mutex> __lock(__ioc._M_mtx);
274627f7eb2Smrg	__ioc._M_timers.push_back(this);
275627f7eb2Smrg      }
276627f7eb2Smrg
277627f7eb2Smrg      mutable mutex	_M_qmtx;
278627f7eb2Smrg    };
279627f7eb2Smrg
280627f7eb2Smrg    template<typename _Timer, typename _Key = typename _Timer::_Key>
281627f7eb2Smrg      struct __timer_queue : __timer_queue_base
282627f7eb2Smrg      {
283627f7eb2Smrg	using key_type = __timer_queue;
284627f7eb2Smrg
285627f7eb2Smrg	explicit
286627f7eb2Smrg	__timer_queue(execution_context& __ctx) : __timer_queue_base(__ctx)
287627f7eb2Smrg	{ }
288627f7eb2Smrg
289627f7eb2Smrg	void shutdown() noexcept { }
290627f7eb2Smrg
291627f7eb2Smrg	io_context& context() noexcept
292627f7eb2Smrg	{ return static_cast<io_context&>(service::context()); }
293627f7eb2Smrg
294627f7eb2Smrg	// Start an asynchronous wait.
295627f7eb2Smrg	void
296627f7eb2Smrg	push(const _Timer& __t, function<void(error_code)> __h)
297627f7eb2Smrg	{
298627f7eb2Smrg	  context().get_executor().on_work_started();
299627f7eb2Smrg	  lock_guard<mutex> __lock(_M_qmtx);
300627f7eb2Smrg	  _M_queue.emplace(__t, _M_next_id++, std::move(__h));
301627f7eb2Smrg	  // no need to notify reactor unless this timer went to the front?
302627f7eb2Smrg	}
303627f7eb2Smrg
304627f7eb2Smrg	// Cancel all outstanding waits for __t
305627f7eb2Smrg	size_t
306627f7eb2Smrg	cancel(const _Timer& __t)
307627f7eb2Smrg	{
308627f7eb2Smrg	  lock_guard<mutex> __lock(_M_qmtx);
309627f7eb2Smrg	  size_t __count = 0;
310627f7eb2Smrg	  auto __last = _M_queue.end();
311627f7eb2Smrg	  for (auto __it = _M_queue.begin(), __end = __last; __it != __end;
312627f7eb2Smrg	      ++__it)
313627f7eb2Smrg	    {
314627f7eb2Smrg	      if (__it->_M_key == __t._M_key.get())
315627f7eb2Smrg		{
316627f7eb2Smrg		  __it->cancel();
317627f7eb2Smrg		  __last = __it;
318627f7eb2Smrg		  ++__count;
319627f7eb2Smrg		}
320627f7eb2Smrg	    }
321627f7eb2Smrg	  if (__count)
322627f7eb2Smrg	    _M_queue._M_sort_to(__last);
323627f7eb2Smrg	  return __count;
324627f7eb2Smrg	}
325627f7eb2Smrg
326627f7eb2Smrg	// Cancel oldest outstanding wait for __t
327627f7eb2Smrg	bool
328627f7eb2Smrg	cancel_one(const _Timer& __t)
329627f7eb2Smrg	{
330627f7eb2Smrg	  lock_guard<mutex> __lock(_M_qmtx);
331627f7eb2Smrg	  const auto __end = _M_queue.end();
332627f7eb2Smrg	  auto __oldest = __end;
333627f7eb2Smrg	  for (auto __it = _M_queue.begin(); __it != __end; ++__it)
334627f7eb2Smrg	    if (__it->_M_key == __t._M_key.get())
335627f7eb2Smrg	      if (__oldest == __end || __it->_M_id < __oldest->_M_id)
336627f7eb2Smrg		__oldest = __it;
337627f7eb2Smrg	  if (__oldest == __end)
338627f7eb2Smrg	    return false;
339627f7eb2Smrg	  __oldest->cancel();
340627f7eb2Smrg	  _M_queue._M_sort_to(__oldest);
341627f7eb2Smrg	  return true;
342627f7eb2Smrg	}
343627f7eb2Smrg
344627f7eb2Smrg	chrono::milliseconds
345627f7eb2Smrg	_M_next() const override
346627f7eb2Smrg	{
347627f7eb2Smrg	  typename _Timer::time_point __exp;
348627f7eb2Smrg	  {
349627f7eb2Smrg	    lock_guard<mutex> __lock(_M_qmtx);
350627f7eb2Smrg	    if (_M_queue.empty())
351627f7eb2Smrg	      return chrono::milliseconds::max();  // no pending timers
352627f7eb2Smrg	    if (_M_queue.top()._M_key == nullptr)
353627f7eb2Smrg	      return chrono::milliseconds::zero(); // cancelled, run now
354627f7eb2Smrg	    __exp = _M_queue.top()._M_expiry;
355627f7eb2Smrg	  }
356627f7eb2Smrg	  auto __dur = _Timer::traits_type::to_wait_duration(__exp);
357627f7eb2Smrg	  if (__dur < __dur.zero())
358627f7eb2Smrg	    __dur = __dur.zero();
359627f7eb2Smrg	  return chrono::duration_cast<chrono::milliseconds>(__dur);
360627f7eb2Smrg	}
361627f7eb2Smrg
362627f7eb2Smrg      private:
363627f7eb2Smrg
364627f7eb2Smrg	bool run_one() override
365627f7eb2Smrg	{
366627f7eb2Smrg	  auto __now = _Timer::clock_type::now();
367627f7eb2Smrg	  function<void(error_code)> __h;
368627f7eb2Smrg	  error_code __ec;
369627f7eb2Smrg	  {
370627f7eb2Smrg	    lock_guard<mutex> __lock(_M_qmtx);
371627f7eb2Smrg
372627f7eb2Smrg	    if (_M_queue.top()._M_key == nullptr) // cancelled
373627f7eb2Smrg	      {
374627f7eb2Smrg		__h = std::move(_M_queue.top()._M_h);
375627f7eb2Smrg		__ec = std::make_error_code(errc::operation_canceled);
376627f7eb2Smrg		_M_queue.pop();
377627f7eb2Smrg	      }
378627f7eb2Smrg	    else if (_M_queue.top()._M_expiry <= _Timer::clock_type::now())
379627f7eb2Smrg	      {
380627f7eb2Smrg		__h = std::move(_M_queue.top()._M_h);
381627f7eb2Smrg		_M_queue.pop();
382627f7eb2Smrg	      }
383627f7eb2Smrg	  }
384627f7eb2Smrg	  if (__h)
385627f7eb2Smrg	    {
386627f7eb2Smrg	      __h(__ec);
387627f7eb2Smrg	      context().get_executor().on_work_finished();
388627f7eb2Smrg	      return true;
389627f7eb2Smrg	    }
390627f7eb2Smrg	  return false;
391627f7eb2Smrg	}
392627f7eb2Smrg
393627f7eb2Smrg	using __timer_id_type = uint64_t;
394627f7eb2Smrg
395627f7eb2Smrg	struct __pending_timer
396627f7eb2Smrg	{
397627f7eb2Smrg	  __pending_timer(const _Timer& __t, uint64_t __id,
398627f7eb2Smrg			  function<void(error_code)> __h)
399627f7eb2Smrg	  : _M_expiry(__t.expiry()), _M_key(__t._M_key.get()), _M_id(__id),
400627f7eb2Smrg	    _M_h(std::move(__h))
401627f7eb2Smrg	  { }
402627f7eb2Smrg
403627f7eb2Smrg	  typename _Timer::time_point _M_expiry;
404627f7eb2Smrg	  _Key* _M_key;
405627f7eb2Smrg	  __timer_id_type _M_id;
406627f7eb2Smrg	  function<void(error_code)> _M_h;
407627f7eb2Smrg
408627f7eb2Smrg	  void cancel() { _M_expiry = _M_expiry.min(); _M_key = nullptr; }
409627f7eb2Smrg
410627f7eb2Smrg	  bool
411627f7eb2Smrg	  operator<(const __pending_timer& __rhs) const
412627f7eb2Smrg	  { return _M_expiry < __rhs._M_expiry; }
413627f7eb2Smrg	};
414627f7eb2Smrg
415627f7eb2Smrg	struct __queue : priority_queue<__pending_timer>
416627f7eb2Smrg	{
417627f7eb2Smrg	  using iterator =
418627f7eb2Smrg	    typename priority_queue<__pending_timer>::container_type::iterator;
419627f7eb2Smrg
420627f7eb2Smrg	  // expose begin/end/erase for direct access to underlying container
421627f7eb2Smrg	  iterator begin() { return this->c.begin(); }
422627f7eb2Smrg	  iterator end() { return this->c.end(); }
423627f7eb2Smrg	  iterator erase(iterator __it) { return this->c.erase(__it); }
424627f7eb2Smrg
425627f7eb2Smrg	  void
426627f7eb2Smrg	  _M_sort_to(iterator __it)
427627f7eb2Smrg	  { std::stable_sort(this->c.begin(), ++__it); }
428627f7eb2Smrg	};
429627f7eb2Smrg
430627f7eb2Smrg	__queue	_M_queue;
431627f7eb2Smrg	__timer_id_type _M_next_id = 0;
432627f7eb2Smrg      };
433627f7eb2Smrg
434627f7eb2Smrg    template<typename _Timer, typename _CompletionHandler>
435627f7eb2Smrg      void
436627f7eb2Smrg      async_wait(const _Timer& __timer, _CompletionHandler&& __h)
437627f7eb2Smrg      {
438627f7eb2Smrg	auto& __queue = use_service<__timer_queue<_Timer>>(*this);
439627f7eb2Smrg	__queue.push(__timer, std::move(__h));
440627f7eb2Smrg	_M_reactor._M_notify();
441627f7eb2Smrg      }
442627f7eb2Smrg
443627f7eb2Smrg    // Cancel all wait operations initiated by __timer.
444627f7eb2Smrg    template<typename _Timer>
445627f7eb2Smrg      size_t
446627f7eb2Smrg      cancel(const _Timer& __timer)
447627f7eb2Smrg      {
448627f7eb2Smrg	if (!has_service<__timer_queue<_Timer>>(*this))
449627f7eb2Smrg	  return 0;
450627f7eb2Smrg
451627f7eb2Smrg	auto __c = use_service<__timer_queue<_Timer>>(*this).cancel(__timer);
452627f7eb2Smrg	if (__c != 0)
453627f7eb2Smrg	  _M_reactor._M_notify();
454627f7eb2Smrg	return __c;
455627f7eb2Smrg      }
456627f7eb2Smrg
457627f7eb2Smrg    // Cancel the oldest wait operation initiated by __timer.
458627f7eb2Smrg    template<typename _Timer>
459627f7eb2Smrg      size_t
460627f7eb2Smrg      cancel_one(const _Timer& __timer)
461627f7eb2Smrg      {
462627f7eb2Smrg	if (!has_service<__timer_queue<_Timer>>(*this))
463627f7eb2Smrg	  return 0;
464627f7eb2Smrg
465627f7eb2Smrg	if (use_service<__timer_queue<_Timer>>(*this).cancel_one(__timer))
466627f7eb2Smrg	  {
467627f7eb2Smrg	    _M_reactor._M_notify();
468627f7eb2Smrg	    return 1;
469627f7eb2Smrg	  }
470627f7eb2Smrg	return 0;
471627f7eb2Smrg      }
472627f7eb2Smrg
473627f7eb2Smrg    template<typename _Op>
474627f7eb2Smrg      void
475627f7eb2Smrg      async_wait(int __fd, int __w, _Op&& __op)
476627f7eb2Smrg      {
477627f7eb2Smrg	lock_guard<mutex> __lock(_M_mtx);
478627f7eb2Smrg	// TODO need push_back, use std::list not std::forward_list
479627f7eb2Smrg	auto __tail = _M_ops.before_begin(), __it = _M_ops.begin();
480627f7eb2Smrg	while (__it != _M_ops.end())
481627f7eb2Smrg	  {
482627f7eb2Smrg	    ++__it;
483627f7eb2Smrg	    ++__tail;
484627f7eb2Smrg	  }
485627f7eb2Smrg	using __type = __async_operation_impl<_Op>;
486627f7eb2Smrg	_M_ops.emplace_after(__tail,
487627f7eb2Smrg			     make_unique<__type>(std::move(__op), __fd, __w));
488627f7eb2Smrg	_M_reactor._M_fd_interest(__fd, __w);
489627f7eb2Smrg      }
490627f7eb2Smrg
491627f7eb2Smrg    void _M_add_fd(int __fd) { _M_reactor._M_add_fd(__fd); }
492627f7eb2Smrg    void _M_remove_fd(int __fd) { _M_reactor._M_remove_fd(__fd); }
493627f7eb2Smrg
494627f7eb2Smrg    void cancel(int __fd, error_code&)
495627f7eb2Smrg    {
496627f7eb2Smrg      lock_guard<mutex> __lock(_M_mtx);
497627f7eb2Smrg      const auto __end = _M_ops.end();
498627f7eb2Smrg      auto __it = _M_ops.begin();
499627f7eb2Smrg      auto __prev = _M_ops.before_begin();
500627f7eb2Smrg      while (__it != __end && (*__it)->_M_is_cancelled())
501627f7eb2Smrg	{
502627f7eb2Smrg	  ++__it;
503627f7eb2Smrg	  ++__prev;
504627f7eb2Smrg	}
505627f7eb2Smrg      auto __cancelled = __prev;
506627f7eb2Smrg      while (__it != __end)
507627f7eb2Smrg	{
508627f7eb2Smrg	  if ((*__it)->_M_fd == __fd)
509627f7eb2Smrg	    {
510627f7eb2Smrg	      (*__it)->cancel();
511627f7eb2Smrg	      ++__it;
512627f7eb2Smrg	      _M_ops.splice_after(__cancelled, _M_ops, __prev);
513627f7eb2Smrg	      ++__cancelled;
514627f7eb2Smrg	    }
515627f7eb2Smrg	  else
516627f7eb2Smrg	    {
517627f7eb2Smrg	      ++__it;
518627f7eb2Smrg	      ++__prev;
519627f7eb2Smrg	    }
520627f7eb2Smrg	}
521627f7eb2Smrg      _M_reactor._M_not_interested(__fd);
522627f7eb2Smrg    }
523627f7eb2Smrg
524627f7eb2Smrg    struct __async_operation
525627f7eb2Smrg    {
526627f7eb2Smrg      __async_operation(int __fd, int __ev) : _M_fd(__fd), _M_ev(__ev) { }
527627f7eb2Smrg
528627f7eb2Smrg      virtual ~__async_operation() = default;
529627f7eb2Smrg
530627f7eb2Smrg      int _M_fd;
531627f7eb2Smrg      short _M_ev;
532627f7eb2Smrg
533627f7eb2Smrg      void cancel() { _M_fd = -1; }
534627f7eb2Smrg      bool _M_is_cancelled() const { return _M_fd == -1; }
535627f7eb2Smrg      virtual void run(io_context&) = 0;
536627f7eb2Smrg    };
537627f7eb2Smrg
538627f7eb2Smrg    template<typename _Op>
539627f7eb2Smrg      struct __async_operation_impl : __async_operation
540627f7eb2Smrg      {
541627f7eb2Smrg	__async_operation_impl(_Op&& __op, int __fd, int __ev)
542627f7eb2Smrg	: __async_operation{__fd, __ev}, _M_op(std::move(__op)) { }
543627f7eb2Smrg
544627f7eb2Smrg	_Op _M_op;
545627f7eb2Smrg
546627f7eb2Smrg	void run(io_context& __ctx)
547627f7eb2Smrg	{
548627f7eb2Smrg	  if (_M_is_cancelled())
549627f7eb2Smrg	    _M_op(std::make_error_code(errc::operation_canceled));
550627f7eb2Smrg	  else
551627f7eb2Smrg	    _M_op(error_code{});
552627f7eb2Smrg	}
553627f7eb2Smrg      };
554627f7eb2Smrg
555627f7eb2Smrg    atomic<count_type>		_M_work_count;
556627f7eb2Smrg    mutable mutex		_M_mtx;
557627f7eb2Smrg    queue<function<void()>>	_M_op;
558627f7eb2Smrg    bool			_M_stopped = false;
559627f7eb2Smrg
560627f7eb2Smrg    struct __monitor
561627f7eb2Smrg    {
562627f7eb2Smrg      __monitor(io_context& __c) : _M_ctx(__c)
563627f7eb2Smrg      {
564627f7eb2Smrg	lock_guard<mutex> __lock(_M_ctx._M_mtx);
565627f7eb2Smrg	_M_ctx._M_call_stack.push_back(this_thread::get_id());
566627f7eb2Smrg      }
567627f7eb2Smrg
568627f7eb2Smrg      ~__monitor()
569627f7eb2Smrg      {
570627f7eb2Smrg	lock_guard<mutex> __lock(_M_ctx._M_mtx);
571627f7eb2Smrg	_M_ctx._M_call_stack.pop_back();
572627f7eb2Smrg	if (_M_ctx._M_outstanding_work() == 0)
573627f7eb2Smrg	  {
574627f7eb2Smrg	    _M_ctx._M_stopped = true;
575627f7eb2Smrg	    _M_ctx._M_reactor._M_notify();
576627f7eb2Smrg	  }
577627f7eb2Smrg      }
578627f7eb2Smrg
579627f7eb2Smrg      __monitor(__monitor&&) = delete;
580627f7eb2Smrg
581627f7eb2Smrg      io_context& _M_ctx;
582627f7eb2Smrg    };
583627f7eb2Smrg
584627f7eb2Smrg    bool
585627f7eb2Smrg    _M_do_one(chrono::milliseconds __timeout)
586627f7eb2Smrg    {
587627f7eb2Smrg      const bool __block = __timeout != chrono::milliseconds::zero();
588627f7eb2Smrg
589627f7eb2Smrg      __reactor::__fdvec __fds;
590627f7eb2Smrg
591627f7eb2Smrg      __monitor __mon{*this};
592627f7eb2Smrg
593627f7eb2Smrg      __timer_queue_base* __timerq = nullptr;
594627f7eb2Smrg      unique_ptr<__async_operation> __async_op;
595627f7eb2Smrg
596627f7eb2Smrg      while (true)
597627f7eb2Smrg	{
598627f7eb2Smrg	  if (__timerq)
599627f7eb2Smrg	    {
600627f7eb2Smrg	      if (__timerq->run_one())
601627f7eb2Smrg		return true;
602627f7eb2Smrg	      else
603627f7eb2Smrg		__timerq = nullptr;
604627f7eb2Smrg	    }
605627f7eb2Smrg
606627f7eb2Smrg	  if (__async_op)
607627f7eb2Smrg	    {
608627f7eb2Smrg	      __async_op->run(*this);
609627f7eb2Smrg	      // TODO need to unregister __async_op
610627f7eb2Smrg	      return true;
611627f7eb2Smrg	    }
612627f7eb2Smrg
613627f7eb2Smrg	  chrono::milliseconds __ms{0};
614627f7eb2Smrg
615627f7eb2Smrg	  {
616627f7eb2Smrg	    lock_guard<mutex> __lock(_M_mtx);
617627f7eb2Smrg
618627f7eb2Smrg	    if (_M_stopped)
619627f7eb2Smrg	      return false;
620627f7eb2Smrg
621627f7eb2Smrg	    // find first timer with something to do
622627f7eb2Smrg	    for (auto __q : _M_timers)
623627f7eb2Smrg	      {
624627f7eb2Smrg		auto __next = __q->_M_next();
625627f7eb2Smrg		if (__next == __next.zero())  // ready to run immediately
626627f7eb2Smrg		  {
627627f7eb2Smrg		    __timerq = __q;
628627f7eb2Smrg		    __ms = __next;
629627f7eb2Smrg		    break;
630627f7eb2Smrg		  }
631627f7eb2Smrg		else if (__next != __next.max() && __block
632627f7eb2Smrg		    && (__next < __ms || __timerq == nullptr))
633627f7eb2Smrg		  {
634627f7eb2Smrg		    __timerq = __q;
635627f7eb2Smrg		    __ms = __next;
636627f7eb2Smrg		  }
637627f7eb2Smrg	      }
638627f7eb2Smrg
639627f7eb2Smrg	    if (__timerq && __ms == __ms.zero())
640627f7eb2Smrg	      continue;  // restart loop to run a timer immediately
641627f7eb2Smrg
642627f7eb2Smrg	    if (!_M_ops.empty() && _M_ops.front()->_M_is_cancelled())
643627f7eb2Smrg	      {
644627f7eb2Smrg		_M_ops.front().swap(__async_op);
645627f7eb2Smrg		_M_ops.pop_front();
646627f7eb2Smrg		continue;
647627f7eb2Smrg	      }
648627f7eb2Smrg
649627f7eb2Smrg	    // TODO run any posted items
650627f7eb2Smrg
651627f7eb2Smrg	    if (__block)
652627f7eb2Smrg	      {
653627f7eb2Smrg		if (__timerq == nullptr)
654627f7eb2Smrg		  __ms = __timeout;
655627f7eb2Smrg		else if (__ms.zero() <= __timeout && __timeout < __ms)
656627f7eb2Smrg		  __ms = __timeout;
657627f7eb2Smrg		else if (__ms.count() > numeric_limits<int>::max())
658627f7eb2Smrg		  __ms = chrono::milliseconds{numeric_limits<int>::max()};
659627f7eb2Smrg	      }
660627f7eb2Smrg	    // else __ms == 0 and poll() will return immediately
661627f7eb2Smrg
662627f7eb2Smrg	  }
663627f7eb2Smrg
664627f7eb2Smrg	  auto __res = _M_reactor.wait(__fds, __ms);
665627f7eb2Smrg
666627f7eb2Smrg	  if (__res == __reactor::_S_retry)
667627f7eb2Smrg	    continue;
668627f7eb2Smrg
669627f7eb2Smrg	  if (__res == __reactor::_S_timeout)
670*4c3eb207Smrg	    {
671627f7eb2Smrg	      if (__timerq == nullptr)
672627f7eb2Smrg		return false;
673627f7eb2Smrg	      else
674627f7eb2Smrg		continue;  // timed out, so restart loop and process the timer
675*4c3eb207Smrg	    }
676627f7eb2Smrg
677627f7eb2Smrg	  __timerq = nullptr;
678627f7eb2Smrg
679627f7eb2Smrg	  if (__fds.empty()) // nothing to do
680627f7eb2Smrg	    return false;
681627f7eb2Smrg
682627f7eb2Smrg	  lock_guard<mutex> __lock(_M_mtx);
683627f7eb2Smrg	  for (auto __it = _M_ops.begin(), __end = _M_ops.end(),
684627f7eb2Smrg	      __prev = _M_ops.before_begin(); __it != __end; ++__it, ++__prev)
685627f7eb2Smrg	    {
686627f7eb2Smrg	      auto& __op = **__it;
687627f7eb2Smrg	      auto __pos = std::lower_bound(__fds.begin(), __fds.end(),
688627f7eb2Smrg		  __op._M_fd,
689627f7eb2Smrg		  [](const auto& __p, int __fd) { return __p.fd < __fd; });
690627f7eb2Smrg	      if (__pos != __fds.end() && __pos->fd == __op._M_fd
691627f7eb2Smrg		  && __pos->revents & __op._M_ev)
692627f7eb2Smrg		{
693627f7eb2Smrg		  __it->swap(__async_op);
694627f7eb2Smrg		  _M_ops.erase_after(__prev);
695627f7eb2Smrg		  break;  // restart loop and run op
696627f7eb2Smrg		}
697627f7eb2Smrg	    }
698627f7eb2Smrg	}
699627f7eb2Smrg    }
700627f7eb2Smrg
701627f7eb2Smrg    struct __reactor
702627f7eb2Smrg    {
703627f7eb2Smrg      __reactor() : _M_fds(1)
704627f7eb2Smrg      {
705627f7eb2Smrg	int __pipe[2];
706627f7eb2Smrg	if (::pipe(__pipe) == -1)
707627f7eb2Smrg	  __throw_system_error(errno);
708627f7eb2Smrg	if (::fcntl(__pipe[0], F_SETFL, O_NONBLOCK) == -1
709627f7eb2Smrg	    || ::fcntl(__pipe[1], F_SETFL, O_NONBLOCK) == -1)
710627f7eb2Smrg	  {
711627f7eb2Smrg	    int __e = errno;
712627f7eb2Smrg	    ::close(__pipe[0]);
713627f7eb2Smrg	    ::close(__pipe[1]);
714627f7eb2Smrg	    __throw_system_error(__e);
715627f7eb2Smrg	  }
716627f7eb2Smrg	_M_fds.back().events	= POLLIN;
717627f7eb2Smrg	_M_fds.back().fd	= __pipe[0];
718627f7eb2Smrg	_M_notify_wr		= __pipe[1];
719627f7eb2Smrg      }
720627f7eb2Smrg
721627f7eb2Smrg      ~__reactor()
722627f7eb2Smrg      {
723627f7eb2Smrg	::close(_M_fds.back().fd);
724627f7eb2Smrg	::close(_M_notify_wr);
725627f7eb2Smrg      }
726627f7eb2Smrg
727627f7eb2Smrg      // write a notification byte to the pipe (ignoring errors)
728627f7eb2Smrg      void _M_notify()
729627f7eb2Smrg      {
730627f7eb2Smrg	int __n;
731627f7eb2Smrg	do {
732627f7eb2Smrg	  __n = ::write(_M_notify_wr, "", 1);
733627f7eb2Smrg	} while (__n == -1 && errno == EINTR);
734627f7eb2Smrg      }
735627f7eb2Smrg
736627f7eb2Smrg      // read all notification bytes from the pipe
737627f7eb2Smrg      void _M_on_notify()
738627f7eb2Smrg      {
739627f7eb2Smrg	// Drain the pipe.
740627f7eb2Smrg	char __buf[64];
741627f7eb2Smrg	ssize_t __n;
742627f7eb2Smrg	do {
743627f7eb2Smrg	  __n = ::read(_M_fds.back().fd, __buf, sizeof(__buf));
744627f7eb2Smrg	} while (__n != -1 || errno == EINTR);
745627f7eb2Smrg      }
746627f7eb2Smrg
747627f7eb2Smrg      void
748627f7eb2Smrg      _M_add_fd(int __fd)
749627f7eb2Smrg      {
750627f7eb2Smrg	auto __pos = _M_lower_bound(__fd);
751627f7eb2Smrg	if (__pos->fd == __fd)
752627f7eb2Smrg	  __throw_system_error((int)errc::invalid_argument);
753627f7eb2Smrg	_M_fds.insert(__pos, __fdvec::value_type{})->fd = __fd;
754627f7eb2Smrg	_M_notify();
755627f7eb2Smrg      }
756627f7eb2Smrg
757627f7eb2Smrg      void
758627f7eb2Smrg      _M_remove_fd(int __fd)
759627f7eb2Smrg      {
760627f7eb2Smrg	auto __pos = _M_lower_bound(__fd);
761627f7eb2Smrg	if (__pos->fd == __fd)
762627f7eb2Smrg	  _M_fds.erase(__pos);
763627f7eb2Smrg	// else bug!
764627f7eb2Smrg	_M_notify();
765627f7eb2Smrg      }
766627f7eb2Smrg
767627f7eb2Smrg      void
768627f7eb2Smrg      _M_fd_interest(int __fd, int __w)
769627f7eb2Smrg      {
770627f7eb2Smrg	auto __pos = _M_lower_bound(__fd);
771627f7eb2Smrg	if (__pos->fd == __fd)
772627f7eb2Smrg	  __pos->events |= __w;
773627f7eb2Smrg	// else bug!
774627f7eb2Smrg	_M_notify();
775627f7eb2Smrg      }
776627f7eb2Smrg
777627f7eb2Smrg      void
778627f7eb2Smrg      _M_not_interested(int __fd)
779627f7eb2Smrg      {
780627f7eb2Smrg	auto __pos = _M_lower_bound(__fd);
781627f7eb2Smrg	if (__pos->fd == __fd)
782627f7eb2Smrg	  __pos->events = 0;
783627f7eb2Smrg	_M_notify();
784627f7eb2Smrg      }
785627f7eb2Smrg
786627f7eb2Smrg# ifdef _GLIBCXX_HAVE_POLL_H
787627f7eb2Smrg      using __fdvec = vector<::pollfd>;
788627f7eb2Smrg
789627f7eb2Smrg      // Find first element p such that !(p.fd < __fd)
790627f7eb2Smrg      // N.B. always returns a dereferencable iterator.
791627f7eb2Smrg      __fdvec::iterator
792627f7eb2Smrg      _M_lower_bound(int __fd)
793627f7eb2Smrg      {
794627f7eb2Smrg	return std::lower_bound(_M_fds.begin(), _M_fds.end() - 1,
795627f7eb2Smrg	    __fd, [](const auto& __p, int __fd) { return __p.fd < __fd; });
796627f7eb2Smrg      }
797627f7eb2Smrg
798627f7eb2Smrg      enum __status { _S_retry, _S_timeout, _S_ok, _S_error };
799627f7eb2Smrg
800627f7eb2Smrg      __status
801627f7eb2Smrg      wait(__fdvec& __fds, chrono::milliseconds __timeout)
802627f7eb2Smrg      {
803627f7eb2Smrg	// XXX not thread-safe!
804627f7eb2Smrg	__fds = _M_fds;  // take snapshot to pass to poll()
805627f7eb2Smrg
806627f7eb2Smrg	int __res = ::poll(__fds.data(), __fds.size(), __timeout.count());
807627f7eb2Smrg
808627f7eb2Smrg	if (__res == -1)
809627f7eb2Smrg	  {
810627f7eb2Smrg	    __fds.clear();
811627f7eb2Smrg	    if (errno == EINTR)
812627f7eb2Smrg	      return _S_retry;
813627f7eb2Smrg	    return _S_error; // XXX ???
814627f7eb2Smrg	  }
815627f7eb2Smrg	else if (__res == 0)
816627f7eb2Smrg	  {
817627f7eb2Smrg	    __fds.clear();
818627f7eb2Smrg	    return _S_timeout;
819627f7eb2Smrg	  }
820627f7eb2Smrg	else if (__fds.back().revents != 0) // something changed, restart
821627f7eb2Smrg	  {
822627f7eb2Smrg	    __fds.clear();
823627f7eb2Smrg	    _M_on_notify();
824627f7eb2Smrg	    return _S_retry;
825627f7eb2Smrg	  }
826627f7eb2Smrg
827627f7eb2Smrg	auto __part = std::stable_partition(__fds.begin(), __fds.end() - 1,
828627f7eb2Smrg	      [](const __fdvec::value_type& __p) { return __p.revents != 0; });
829627f7eb2Smrg	__fds.erase(__part, __fds.end());
830627f7eb2Smrg
831627f7eb2Smrg	return _S_ok;
832627f7eb2Smrg      }
833627f7eb2Smrg
834627f7eb2Smrg      __fdvec _M_fds;	// _M_fds.back() is the read end of the self-pipe
835627f7eb2Smrg#endif
836627f7eb2Smrg      int _M_notify_wr;	// write end of the self-pipe
837627f7eb2Smrg    };
838627f7eb2Smrg
839627f7eb2Smrg    __reactor _M_reactor;
840627f7eb2Smrg
841627f7eb2Smrg    vector<__timer_queue_base*>			_M_timers;
842627f7eb2Smrg    forward_list<unique_ptr<__async_operation>>	_M_ops;
843627f7eb2Smrg
844627f7eb2Smrg    vector<thread::id>	_M_call_stack;
845627f7eb2Smrg  };
846627f7eb2Smrg
847627f7eb2Smrg  inline bool
848627f7eb2Smrg  operator==(const io_context::executor_type& __a,
849627f7eb2Smrg	     const io_context::executor_type& __b) noexcept
850627f7eb2Smrg  {
851627f7eb2Smrg    // https://github.com/chriskohlhoff/asio-tr2/issues/201
852627f7eb2Smrg    using executor_type = io_context::executor_type;
853627f7eb2Smrg    return std::addressof(executor_type(__a).context())
854627f7eb2Smrg      == std::addressof(executor_type(__b).context());
855627f7eb2Smrg  }
856627f7eb2Smrg
857627f7eb2Smrg  inline bool
858627f7eb2Smrg  operator!=(const io_context::executor_type& __a,
859627f7eb2Smrg	     const io_context::executor_type& __b) noexcept
860627f7eb2Smrg  { return !(__a == __b); }
861627f7eb2Smrg
862627f7eb2Smrg  template<> struct is_executor<io_context::executor_type> : true_type {};
863627f7eb2Smrg
864627f7eb2Smrg  /// @}
865627f7eb2Smrg
866627f7eb2Smrg} // namespace v1
867627f7eb2Smrg} // namespace net
868627f7eb2Smrg} // namespace experimental
869627f7eb2Smrg_GLIBCXX_END_NAMESPACE_VERSION
870627f7eb2Smrg} // namespace std
871627f7eb2Smrg
872627f7eb2Smrg#endif // C++14
873627f7eb2Smrg
874627f7eb2Smrg#endif // _GLIBCXX_EXPERIMENTAL_IO_SERVICE
875