libqi-api  2.8.7.4
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
strand.hpp
Go to the documentation of this file.
1 #pragma once
2 /*
3 ** Copyright (C) 2012 Aldebaran Robotics
4 ** See COPYING for the license
5 */
6 
7 #ifndef _QI_STRAND_HPP_
8 #define _QI_STRAND_HPP_
9 
10 #include <deque>
11 #include <atomic>
12 #include <memory>
13 #include <ka/functional.hpp>
14 #include <ka/mutablestore.hpp>
15 #include <qi/assert.hpp>
16 #include <qi/config.hpp>
19 #include <boost/enable_shared_from_this.hpp>
20 #include <boost/shared_ptr.hpp>
21 #include <boost/function.hpp>
22 #include <boost/noncopyable.hpp>
23 #include <boost/type_traits/function_traits.hpp>
24 
25 # ifdef _MSC_VER
26 # pragma warning( push )
27 # pragma warning( disable: 4996 ) // TODO: Reactivate this warning once msvc stop triggerring a warning on overloading a deprecated function
28 # endif
29 
30 namespace qi
31 {
32 
33 // C++14 these can be lambdas, but we need perfect forwarding in the capture in scheduleFor below
34 namespace detail
35 {
36  template <typename F>
37  struct Stranded;
38 
39  template <typename F>
41 }
42 
43 // we use ExecutionContext's helpers in schedulerFor, we don't need to implement all the methods
44 class QI_API StrandPrivate : public ExecutionContext, public boost::enable_shared_from_this<StrandPrivate>
45 {
46 public:
47  enum class State;
48 
49  struct Callback;
50 
51  using Queue = std::deque<boost::shared_ptr<Callback>>;
52 
54  std::atomic<unsigned int> _curId;
55  std::atomic<unsigned int> _aliveCount;
56  bool _processing; // protected by mutex, no need for atomic
57  std::atomic<int> _processingThread;
58  boost::recursive_mutex _mutex;
59  boost::condition_variable_any _processFinished;
60  bool _dying;
62  class ScopedPromiseGroup;
63  std::shared_ptr<ScopedPromiseGroup> _deferredTasksFutures; // Shared to avoid including issues
64 
65  explicit StrandPrivate(qi::ExecutionContext& executor);
66  ~StrandPrivate();
67 
68  void join() QI_NOEXCEPT(true);
69 
70  // Schedules the callback for execution. If the trigger date `tp` is in the past, executes the
71  // callback immediately in the calling thread.
72  Future<void> asyncAtImpl(boost::function<void()> cb, qi::SteadyClockTimePoint tp, ExecutionOptions options = defaultExecutionOptions()) override;
73 
74  // Schedules the callback for execution. If delay is 0, executes the callback immediately in the
75  // calling thread.
76  Future<void> asyncDelayImpl(boost::function<void()> cb, qi::Duration delay, ExecutionOptions options = defaultExecutionOptions()) override;
77 
78  // Schedules the callback for deferred execution and returns immediately.
79  Future<void> deferImpl(boost::function<void()> cb, qi::Duration delay, ExecutionOptions options = defaultExecutionOptions());
80 
81  boost::shared_ptr<Callback> createCallback(boost::function<void()> cb, ExecutionOptions options);
82  void enqueue(boost::shared_ptr<Callback> cbStruct, ExecutionOptions options);
83 
84  void process();
85  void cancel(boost::shared_ptr<Callback> cbStruct);
86  bool isInThisContext() const override;
87 
88  void postImpl(boost::function<void()> callback, ExecutionOptions options) override
89  { QI_ASSERT(false); throw 0; }
90 
91  qi::Future<void> async(const boost::function<void()>& callback, qi::SteadyClockTimePoint tp) override
92  { QI_ASSERT(false); throw 0; }
93 
94  qi::Future<void> async(const boost::function<void()>& callback, qi::Duration delay) override
95  { QI_ASSERT(false); throw 0; }
96 
98 private:
99  void stopProcess(boost::recursive_mutex::scoped_lock& lock,
100  bool finished);
101 
102  bool joined = false;
103 
104  template<class Task>
105  auto track(Task&& task)
106  // TODO: C++ >= 14 : Remove the following line.
107  -> decltype(ka::scope_lock_proc(ka::fwd<Task>(task), ka::mutable_store(weak_from_this())))
108  {
109  return ka::scope_lock_proc(ka::fwd<Task>(task), ka::mutable_store(weak_from_this()));
110  }
111 
112 };
113 
114 
125 class QI_API Strand : public ExecutionContext, private boost::noncopyable
126 {
127 public:
128  using OptionalErrorMessage = boost::optional<std::string>;
129 
131  Strand();
133  Strand(qi::ExecutionContext& executionContext);
134 
136  ~Strand();
137 
149  void join() QI_NOEXCEPT(true);
150 
169  QI_API_DEPRECATED_MSG(Use 'join()' instead)
170  OptionalErrorMessage join(std::nothrow_t) QI_NOEXCEPT(true);
171 
172  // DEPRECATED
173  QI_API_DEPRECATED_MSG(Use 'asyncAt' instead)
174  qi::Future<void> async(const boost::function<void()>& cb,
175  qi::SteadyClockTimePoint tp) override;
176  QI_API_DEPRECATED_MSG(Use 'asyncDelay' instead)
177  qi::Future<void> async(const boost::function<void()>& cb,
178  qi::Duration delay) override;
179  using ExecutionContext::async;
180 
181 #define genCall(n, ATYPEDECL, ATYPES, ADECL, AUSE, comma) \
182  template <typename T, typename F, typename ARG0 comma ATYPEDECL> \
183  QI_API_DEPRECATED_MSG(Use generic 'schedulerFor' overload instead) boost::function<T> schedulerFor( \
184  const F& func, const ARG0& arg0 comma ADECL, \
185  const boost::function<void()>& fallbackCb = boost::function<void()>()) \
186  { \
187  boost::function<T> funcbind = qi::bind<T>(func, arg0 comma AUSE); \
188  return qi::trackWithFallback( \
189  fallbackCb, \
190  SchedulerHelper<boost::function_traits<T>::arity, T>::_scheduler( \
191  funcbind, this), \
192  arg0); \
193  }
194  QI_GEN(genCall)
195 #undef genCall
196  // END DEPRECATED
197 
202  bool isInThisContext() const override;
203 
204 
234  template <typename F>
235  auto schedulerFor(F&& func, boost::function<void()> onFail = {}, ExecutionOptions options = defaultExecutionOptions())
236  -> detail::Stranded<typename std::decay<F>::type>
237  {
238  return detail::Stranded<typename std::decay<F>::type>(std::forward<F>(func),
239  boost::atomic_load(&_p),
240  std::move(onFail),
241  options);
242  }
243 
247  template <typename F>
248  auto unwrappedSchedulerFor(F&& func, boost::function<void()> onFail = {}, ExecutionOptions options = defaultExecutionOptions())
249  -> detail::StrandedUnwrapped<typename std::decay<F>::type>
250  {
251  return detail::StrandedUnwrapped<typename std::decay<F>::type>(std::forward<F>(func),
252  boost::atomic_load(&_p),
253  std::move(onFail),
254  options);
255  }
256 
267  Future<void> defer(const boost::function<void()>& cb,
268  MicroSeconds delay = MicroSeconds::zero(),
269  ExecutionOptions options = defaultExecutionOptions());
270 
271 private:
272  boost::shared_ptr<StrandPrivate> _p;
273 
274  void postImpl(boost::function<void()> callback, ExecutionOptions options) override;
275 
276  qi::Future<void> asyncAtImpl(boost::function<void()> cb, qi::SteadyClockTimePoint tp, ExecutionOptions options) override;
277  qi::Future<void> asyncDelayImpl(boost::function<void()> cb, qi::Duration delay, ExecutionOptions options) override;
278 
279  // DEPRECATED
280  template <int N, typename T>
281  struct SchedulerHelper;
282 #define typedefi(z, n, _) \
283  typedef typename boost::function_traits<T>::BOOST_PP_CAT( \
284  BOOST_PP_CAT(arg, BOOST_PP_INC(n)), _type) BOOST_PP_CAT(P, n);
285 #define placeholders(z, n, __) , BOOST_PP_CAT(_, BOOST_PP_INC(n))
286 #define genCall(n, ATYPEDECL, ATYPES, ADECL, AUSE, comma) \
287  template <typename T> \
288  struct SchedulerHelper<n, T> \
289  { \
290  BOOST_PP_REPEAT(n, typedefi, _); \
291  typedef typename boost::function_traits<T>::result_type R; \
292  static boost::function<T> _scheduler(const boost::function<T>& f, \
293  Strand* strand) \
294  { \
295  return qi::bind<T>(&_asyncCall, strand, \
296  f BOOST_PP_REPEAT(n, placeholders, _)); \
297  } \
298  static qi::Future<R> _asyncCall(Strand* strand, \
299  const boost::function<T>& func comma \
300  ADECL) \
301  { \
302  /* use qi::bind again since first arg may be a Trackable */ \
303  return ((qi::ExecutionContext*)strand) \
304  ->async(qi::bind<R()>(func comma AUSE)); \
305  } \
306  };
307  QI_GEN(genCall)
308 #undef genCall
309 #undef placeholders
310 #undef typedefi
311  // END DEPRECATED
312 };
313 
314 namespace detail
315 {
316  template <typename F, typename... Args>
317  static auto callInStrand(
318  F& func,
319  const boost::function<void()>& onFail,
320  boost::weak_ptr<StrandPrivate> weakStrand,
321  ExecutionOptions options,
322  Args... args)
323  -> decltype(weakStrand.lock()->async(std::bind(func, std::move(args)...), options)) // TODO: remove in C++14
324  {
325  if (auto strand = weakStrand.lock())
326  {
327  return strand->async(std::bind(func, std::move(args)...), options);
328  }
329  else
330  {
331  if (onFail)
332  onFail();
333  return qi::makeFutureError<
334  typename std::decay<decltype(func(std::forward<Args>(args)...))>::type>("strand is dead");
335  }
336  }
337 
338  // C++14 these can be lambdas, but we need perfect forwarding in the capture in scheduleFor
339  // A callable object that, when called defers the call of the given function to the strand.
340  template <typename F>
341  struct Stranded
342  {
343  static const bool is_async = true;
344 
345  F _func;
346  boost::weak_ptr<StrandPrivate> _strand;
347  boost::function<void()> _onFail;
349 
350  Stranded(F f, boost::weak_ptr<StrandPrivate> strand, boost::function<void()> onFail, ExecutionOptions options)
351  : _func(std::move(f))
352  , _strand(std::move(strand))
353  , _onFail(std::move(onFail))
354  , _executionOptions(std::move(options))
355  {
356  }
357 
358  template <typename... Args>
359  auto operator()(Args&&... args) const
360  -> decltype(callInStrand(_func, _onFail, _strand, _executionOptions, std::forward<Args>(args)...))
361  {
362  return callInStrand(_func, _onFail, _strand, _executionOptions, std::forward<Args>(args)...);
363  }
364 
365  template <typename... Args>
366  auto operator()(Args&&... args)
367  -> decltype(callInStrand(_func, _onFail, _strand, _executionOptions, std::forward<Args>(args)...))
368  {
369  return callInStrand(_func, _onFail, _strand, _executionOptions, std::forward<Args>(args)...);
370  }
371  };
372 
373  // Like Stranded, but unwraps the result.
374  template <typename F>
375  struct StrandedUnwrapped
376  {
377  static const bool is_async = true;
378 
379  private:
380  Stranded<F> _stranded;
381 
382  public:
383  template <typename FF>
384  StrandedUnwrapped(FF&& f, const boost::weak_ptr<StrandPrivate>& strand, const boost::function<void()>& onFail, ExecutionOptions options)
385  : _stranded(std::forward<FF>(f), strand, onFail, options)
386  {}
387 
388  template <typename... Args>
389  auto operator()(Args&&... args) const
390  -> decltype(tryUnwrap(_stranded(std::forward<Args>(args)...)))
391  {
392  return tryUnwrap(_stranded(std::forward<Args>(args)...));
393  }
394 
395  template <typename... Args>
396  auto operator()(Args&&... args)
397  -> decltype(tryUnwrap(_stranded(std::forward<Args>(args)...)))
398  {
399  return tryUnwrap(_stranded(std::forward<Args>(args)...));
400  }
401  };
402 } // detail
403 } // qi
404 
405 # ifdef _MSC_VER
406 # pragma warning( pop )
407 # endif
408 
409 # include <qi/async.hpp>
410 
411 #endif // _QI_STRAND_HPP_
boost::condition_variable_any _processFinished
Definition: strand.hpp:59
std::atomic< unsigned int > _aliveCount
Definition: strand.hpp:55
std::atomic< unsigned int > _curId
Definition: strand.hpp:54
auto asyncDelay(F &&callback, qi::Duration delay) -> decltype(detail::asyncMaybeActor(std::forward< F >(callback), delay))
Definition: async.hpp:46
DurationType< int64_t, boost::micro > MicroSeconds
Definition: clock.hpp:26
auto operator()(Args &&...args) -> decltype(tryUnwrap(_stranded(std::forward< Args >(args)...)))
Definition: strand.hpp:396
auto operator()(Args &&...args) const -> decltype(tryUnwrap(_stranded(std::forward< Args >(args)...)))
Definition: strand.hpp:389
BOOST_CONSTEXPR ExecutionOptions defaultExecutionOptions() BOOST_NOEXCEPT
#define QI_API
Definition: api.hpp:33
qi::Future< void > async(const boost::function< void()> &callback, qi::SteadyClockTimePoint tp) override
Definition: strand.hpp:91
auto track(F &&f, T &&toTrack) -> decltype(trackWithFallback(detail::throwPointerLockException, std::forward< F >(f), std::forward< T >(toTrack)))
Definition: trackable.hxx:420
SteadyClock::time_point SteadyClockTimePoint
Steady clock time point.
Definition: clock.hpp:211
std::deque< boost::shared_ptr< Callback >> Queue
Definition: strand.hpp:51
virtual qi::Future< void > async(const boost::function< void()> &callback, qi::SteadyClockTimePoint tp)=0
StrandedUnwrapped(FF &&f, const boost::weak_ptr< StrandPrivate > &strand, const boost::function< void()> &onFail, ExecutionOptions options)
Definition: strand.hpp:384
#define QI_ASSERT(expr__)
Definition: assert.hpp:27
boost::recursive_mutex _mutex
Definition: strand.hpp:58
qi::ExecutionContext & _executor
Definition: strand.hpp:53
boost::weak_ptr< StrandPrivate > _strand
Definition: strand.hpp:346
qi::Future< T > makeFutureError(const std::string &error)
Helper function to return a future with the error set.
Definition: future.hxx:466
static const bool is_async
Definition: strand.hpp:343
#define genCall(n, ATYPEDECL, ATYPES, ADECL, AUSE, comma)
Definition: strand.hpp:286
NanoSeconds Duration
Definition: clock.hpp:32
#define QI_NOEXCEPT(cond)
Specify that a function may throw or not. Do nothing if noexcept is not available.
Definition: macro.hpp:318
static const bool is_async
Definition: strand.hpp:377
#define QI_API_DEPRECATED_MSG(msg__)
Compiler flags to mark a function as deprecated. It will generate a compiler warning.
Definition: macro.hpp:55
auto async(F &&callback) -> decltype(asyncDelay(std::forward< F >(callback), qi::Duration(0)))
Definition: async.hpp:53
ExecutionOptions _executionOptions
Definition: strand.hpp:348
boost::optional< std::string > OptionalErrorMessage
Definition: strand.hpp:128
qi::Future< void > async(const boost::function< void()> &callback, qi::Duration delay) override
Definition: strand.hpp:94
T tryUnwrap(T anything)
auto asyncAt(F &&callback, qi::SteadyClockTimePoint timepoint) -> decltype(qi::getEventLoop() ->asyncAt(std::forward< F >(callback), timepoint))
Definition: async.hpp:39
boost::function< void()> _onFail
Definition: strand.hpp:347
std::atomic< int > _processingThread
Definition: strand.hpp:57
auto operator()(Args &&...args) const -> decltype(callInStrand(_func, _onFail, _strand, _executionOptions, std::forward< Args >(args)...))
Definition: strand.hpp:359
auto schedulerFor(F &&func, boost::function< void()> onFail={}, ExecutionOptions options=defaultExecutionOptions()) -> detail::Stranded< typename std::decay< F >::type >
Definition: strand.hpp:235
auto unwrappedSchedulerFor(F &&func, boost::function< void()> onFail={}, ExecutionOptions options=defaultExecutionOptions()) -> detail::StrandedUnwrapped< typename std::decay< F >::type >
Definition: strand.hpp:248
auto operator()(Args &&...args) -> decltype(callInStrand(_func, _onFail, _strand, _executionOptions, std::forward< Args >(args)...))
Definition: strand.hpp:366
Stranded(F f, boost::weak_ptr< StrandPrivate > strand, boost::function< void()> onFail, ExecutionOptions options)
Definition: strand.hpp:350
std::shared_ptr< ScopedPromiseGroup > _deferredTasksFutures
Definition: strand.hpp:62
std::enable_if< std::is_function< RF >::value, boost::function< RF > >::type bind(AF &&fun, Arg0 &&arg0, Args &&...args)
Definition: trackable.hxx:308
#define QI_GEN(f)
Definition: preproc.hpp:476
Represent execution behaviour options attached to a task that must be interpreted by an ExecutionCont...