libqi-api  2.8.7.4
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Pages
future.hxx
Go to the documentation of this file.
1 #pragma once
2 /*
3 ** Copyright (C) 2012 Aldebaran Robotics
4 ** See COPYING for the license
5 */
6 
7 #ifndef _QI_DETAIL_FUTURE_HXX_
8 #define _QI_DETAIL_FUTURE_HXX_
9 
10 #include <vector>
11 #include <utility> // pair
12 #include <boost/bind.hpp>
13 #include <qi/eventloop.hpp>
14 #include <qi/log.hpp>
15 #include <qi/strand.hpp>
17 
18 namespace qi {
19 
20 namespace detail {
21 
22  template <typename R, typename F>
24  {
25  static_assert(
26  std::is_convertible<typename std::result_of<F()>::type, R>::value,
27  "function return type is incompatible with promise");
28  p.setValue(f());
29  }
30 
31  template <typename F>
33  {
34  f();
35  p.setValue(nullptr);
36  }
37 
38  template <typename R, typename F>
40  {
41  try
42  {
43  setPromiseFromCall(p, std::forward<F>(f));
44  }
45  catch (std::exception& e)
46  {
47  p.setError(e.what());
48  }
49  catch (...)
50  {
51  p.setError("unknown exception");
52  }
53  }
54 } // namespace detail
55 
56  template <typename T>
57  template <typename R, typename AF>
59  {
60  return thenRImpl<R>(type, std::forward<AF>(func));
61  }
62 
63  template <typename T>
64  template <typename R, typename F>
65  inline Future<R> Future<T>::thenRImpl(FutureCallbackType callbackType, F&& continuation)
66  {
67  boost::weak_ptr<detail::FutureBaseTyped<T> > weakp(_p);
68  qi::Promise<R> promise([weakp](const qi::Promise<R>&){
69  if (auto futureb = weakp.lock())
70  Future<T>(futureb).cancel();
71  });
72 
73  using isAsync = detail::IsAsyncBind<F>;
74  if (callbackType == FutureCallbackType_Auto && isAsync::value)
75  callbackType = FutureCallbackType_Sync;
76 
77  auto adaptedContinuation = [promise, continuation](const Future<T>& future) mutable
78  {
80  promise, [&continuation, &future]() mutable { return continuation(future); });
81  };
82 
83  _p->connect(*this, adaptedContinuation, callbackType);
84  return promise.future();
85  }
86 
87  template <typename T>
88  template <typename R, typename AF>
90  {
91  return andThenRImpl<R>(type, std::forward(func));
92  }
93 
94  template <typename T>
95  template <typename R, typename F>
96  inline Future<R> Future<T>::andThenRImpl(FutureCallbackType callbackType, F&& continuation)
97  {
98  boost::weak_ptr<detail::FutureBaseTyped<T> > weakp(_p);
99  qi::Promise<R> promise([weakp](const qi::Promise<R>&){
100  if (auto futureb = weakp.lock())
101  Future<T>(futureb).cancel();
102  });
103 
104  using isAsync = detail::IsAsyncBind<F>;
105  if (callbackType == FutureCallbackType_Auto && isAsync::value)
106  callbackType = FutureCallbackType_Sync;
107 
108  auto adaptedContinuation = [promise, continuation](const Future<T>& future) mutable
109  {
110  if (future.isCanceled())
111  promise.setCanceled();
112  else if (future.hasError())
113  promise.setError(future.error());
114  else if (promise.isCancelRequested())
115  promise.setCanceled();
116  else
118  promise, [&continuation, &future]() mutable { return continuation(future.value()); });
119  };
120 
121  _p->connect(*this, adaptedContinuation, callbackType);
122  return promise.future();
123  }
124 
125  template <typename T>
127  const boost::function<void(const Future<T>&)>& cb)
128  {
129  connectWithStrand(*strand, cb);
130  }
131 
132  template <typename T>
134  const boost::function<void(const Future<T>&)>& cb)
135  {
136  _p->connect(
137  *this,
138  strand.schedulerFor(cb),
140  }
141 
142  template <typename T>
143  void Future<T>::_weakCancelCb(const boost::weak_ptr<detail::FutureBaseTyped<T> >& wfuture)
144  {
145  if (boost::shared_ptr<detail::FutureBaseTyped<T> > fbt = wfuture.lock())
146  {
147  Future<T> future(fbt);
148  future.cancel();
149  }
150  }
151 
152  template <typename T>
153  boost::function<void()> Future<T>::makeCanceler()
154  {
155  return boost::bind(&Future<T>::_weakCancelCb, boost::weak_ptr<detail::FutureBaseTyped<T> >(_p));
156  }
157 
158  namespace detail {
159 
160  template <typename T>
162  : _value()
163  , _async(FutureCallbackType_Auto)
164  {
165  }
166 
167  template <typename T>
169  {
170  boost::recursive_mutex::scoped_lock lock(mutex());
171  if (_onDestroyed && state() == FutureState_FinishedWithValue)
172  _onDestroyed(_value);
173  }
174 
175  template <typename T>
177  {
178  CancelCallback onCancel;
179  {
180  boost::recursive_mutex::scoped_lock lock(mutex());
181  if (isFinished())
182  return;
183  requestCancel();
184  std::swap(onCancel, _onCancel);
185  }
186  if (onCancel)
187  {
188  qi::Promise<T> prom(future);
189  onCancel(prom);
190  }
191  }
192 
193  template <typename T>
195  {
196  bool doCancel = false;
197  {
198  boost::recursive_mutex::scoped_lock lock(mutex());
199  _onCancel = onCancel;
200  doCancel = isCancelRequested();
201  }
202  qi::Future<T> fut = promise.future();
203  if (doCancel)
204  cancel(fut);
205  }
206 
207  template <typename T>
208  void FutureBaseTyped<T>::executeCallbacks(bool defaultAsync, const Callbacks& callbacks, qi::Future<T>& future)
209  {
210  for (const auto& callback : callbacks)
211  {
212  const bool async = [&]{
213  if (callback.callType != FutureCallbackType_Auto)
214  return callback.callType != FutureCallbackType_Sync;
215  else
216  return defaultAsync != FutureCallbackType_Sync;
217  }();
218 
219  if (async)
220  getEventLoop()->post(boost::bind(callback.callback, future));
221  else
222  try
223  {
224  callback.callback(future);
225  }
226  catch (const qi::PointerLockException&)
227  { // do nothing
228  }
229  catch (const std::exception& e)
230  {
231  qiLogError("qi.future") << "Exception caught in future callback " << e.what();
232  }
233  catch (...)
234  {
235  qiLogError("qi.future") << "Unknown exception caught in future callback";
236  }
237  }
238  }
239 
240  template <typename T>
241  template <typename F> // FunctionObject<R()> F (R unconstrained)
242  void FutureBaseTyped<T>::finish(qi::Future<T>& future, F&& finishTask)
243  {
244  bool async;
245  Callbacks onResult;
246  {
247  // report-ready + onResult() must be Atomic to avoid
248  // missing callbacks/double calls in case connect() is invoked at
249  // the same time
250  boost::recursive_mutex::scoped_lock lock(mutex());
251  if (!isRunning())
253  finishTask();
254 
255  async = (_async != FutureCallbackType_Sync ? true : false);
256  onResult = takeOutResultCallbacks();
257  clearCancelCallback();
258 
259  // wake the waiting threads up
260  notifyFinish();
261  }
262  // call the callbacks without the mutex
263  executeCallbacks(async, onResult, future);
264  }
265 
266  template <typename T>
268  {
269  finish(future, [this, &value] {
270  _value = value;
271  reportValue();
272  });
273  }
274 
275  template <typename T>
277  {
278  finish(future, [this] {
279  reportValue();
280  });
281  }
282 
283  template <typename T>
284  void FutureBaseTyped<T>::setError(qi::Future<T>& future, const std::string& message)
285  {
286  finish(future, [this, &message] {
287  reportError(message);
288  });
289  }
290 
291  template <typename T>
293  {
294  finish(future, [this] {
295  reportError("Promise broken (all promises are destroyed)");
296  });
297  }
298 
299  template <typename T>
301  {
302  finish(future, [this] {
303  reportCanceled();
304  });
305  }
306 
307  template <typename T>
308  void FutureBaseTyped<T>::setOnDestroyed(boost::function<void(ValueType)> f)
309  {
310  boost::recursive_mutex::scoped_lock lock(mutex());
311  _onDestroyed = f;
312  }
313 
314  template <typename T>
316  const boost::function<void(qi::Future<T>)>& callback,
317  FutureCallbackType type)
318  {
319  if (state() == FutureState_None)
321 
322  bool ready;
323  {
324  boost::recursive_mutex::scoped_lock lock(mutex());
325  ready = isFinished();
326  if (!ready)
327  _onResult.push_back(Callback(callback, type));
328  }
329 
330  // result already ready, notify the callback
331  if (ready)
332  {
333  const bool async = [&]{
334  if (type != FutureCallbackType_Auto)
335  return type != FutureCallbackType_Sync;
336  else
337  return _async != FutureCallbackType_Sync;
338  }();
339 
340  auto soCalledEventLoop = getEventLoop();
341  if (async && soCalledEventLoop)
342  { // if no event loop was found (for example when exiting), force sync callbacks
343  soCalledEventLoop->post(boost::bind(callback, future));
344  }
345  else
346  {
347  try
348  {
349  callback(future);
350  }
351  catch (const ::qi::PointerLockException&)
352  { /*do nothing*/
353  }
354  }
355  }
356  }
357 
358  template <typename T>
360  {
361  FutureState state = wait(msecs);
362  if (state == FutureState_None)
364  if (state == FutureState_Running)
366  if (state == FutureState_Canceled)
368  if (state == FutureState_FinishedWithError)
370  return _value;
371  }
372 
373  template <typename T>
375  {
376  Callbacks onResult;
377  using std::swap;
378  swap(onResult, _onResult);
379  return onResult;
380  }
381 
382  template <typename T>
383  void FutureBaseTyped<T>::clearCancelCallback()
384  {
385  _onCancel.clear();
386  }
387 
388  template <typename T>
390  qi::Future<T>& fut,
391  qi::Atomic<int>* count) {
392  if (!prom.future().isFinished() && !fut.hasError())
393  {
394  // An other future can trigger at the same time.
395  // Don't bother to lock, just catch the FutureAlreadySet exception
396  try
397  {
398  prom.setValue(fut);
399  }
400  catch(const FutureException&)
401  {}
402  }
403  if (! --*count)
404  {
405  // I'm the last
406  if (!prom.future().isFinished())
407  {
408  // same 'race' as above. between two setError, not between a value and
409  // an error.
410  try
411  {
412  prom.setValue(makeFutureError<T>("No future returned successfully."));
413  }
414  catch(const FutureException&)
415  {}
416  }
417  delete count;
418  }
419  }
420 
421  template <typename T>
422  class AddUnwrap<Future<T> >
423  {
424  public:
426  {
427  Future<Future<T> >* self = static_cast<Future<Future<T> >*>(this);
428 
429  Promise<T> promise(boost::bind(&AddUnwrap<Future<T> >::_cancel, _1,
430  boost::weak_ptr<FutureBaseTyped<Future<T> > >(self->_p)));
431 
432  self->connect(
433  boost::bind(&AddUnwrap<Future<T> >::_forward, _1, promise),
435 
436  return promise.future();
437  }
438 
439  private:
440  static void _forward(const Future<Future<T> >& future,
441  Promise<T>& promise)
442  {
443  if (future.isCanceled())
444  promise.setCanceled();
445  else if (future.hasError())
446  promise.setError(future.error());
447  else
448  adaptFuture(future.value(), promise);
449  }
450 
451  static void _cancel(Promise<T>& promise,
452  const boost::weak_ptr<FutureBaseTyped<Future<T> > >& wfuture)
453  {
454  if (boost::shared_ptr<FutureBaseTyped<Future<T> > > fbt =
455  wfuture.lock())
456  {
457  Future<Future<T> > future(fbt);
458  future.cancel();
459  }
460  }
461  };
462 
463  } // namespace detail
464 
465  template <typename T>
466  qi::Future<T> makeFutureError(const std::string &error) {
467  qi::Promise<T> prom;
468  prom.setError(error);
469  return prom.future();
470  }
471 
472  namespace detail
473  {
474  template<typename FT, typename PT, typename CONV>
475  void futureAdapter(const Future<FT>& f, Promise<PT> p, CONV converter)
476  {
477  if (f.hasError())
478  p.setError(f.error());
479  else if (f.isCanceled())
480  p.setCanceled();
481  else
482  {
483  try {
484  converter(f.value(), p.value());
485  }
486  catch (const std::exception& e)
487  {
488  p.setError(std::string("futureAdapter conversion error: ") + e.what());
489  return;
490  }
491  p.trigger();
492  }
493  }
494 
495  template<typename FT>
496  void futureCancelAdapter(boost::weak_ptr<FutureBaseTyped<FT> > wf)
497  {
498  if (boost::shared_ptr<FutureBaseTyped<FT> > f = wf.lock())
499  Future<FT>(f).cancel();
500  }
501  }
502 
503  template <>
504  struct FutureValueConverter<void, void>
505  {
506  void operator()(void* in, void* out)
507  {
508  }
509  };
510 
511  template <typename T>
512  struct FutureValueConverter<T, void>
513  {
514  void operator()(const T& in, void* out)
515  {
516  }
517  };
518 
519  template <typename T>
520  struct FutureValueConverter<void, T>
521  {
522  void operator()(void* in, const T& out)
523  {
524  }
525  };
526 
527  template<typename R>
529  {
530  p.setup(boost::bind(&detail::futureCancelAdapter<AnyReference>,
531  boost::weak_ptr<detail::FutureBaseTyped<AnyReference> >(f._p)));
532  f.connect(boost::function<void(const qi::Future<AnyReference>&)>(
533  boost::bind(&detail::futureAdapter<R>, _1, p)));
534  }
535 
536  template<typename FT, typename PT>
538  {
539  if (option == AdaptFutureOption_ForwardCancel)
540  p.setup(boost::bind(&detail::futureCancelAdapter<FT>,
541  boost::weak_ptr<detail::FutureBaseTyped<FT> >(f._p)));
542  const_cast<Future<FT>&>(f).connect(boost::bind(detail::futureAdapter<FT, PT, FutureValueConverter<FT, PT> >, _1, p,
544  }
545 
546  template<typename FT, typename PT, typename CONV>
547  void adaptFuture(const Future<FT>& f, Promise<PT>& p, CONV converter, AdaptFutureOption option)
548  {
549  if (option == AdaptFutureOption_ForwardCancel)
550  p.setup(boost::bind(&detail::futureCancelAdapter<FT>,
551  boost::weak_ptr<detail::FutureBaseTyped<FT> >(f._p)));
552  const_cast<Future<FT>&>(f).connect(boost::bind(detail::futureAdapter<FT, PT, CONV>, _1, p, converter));
553  }
554 
555  template <typename T>
557  {
558  return future.andThen([](const typename Future<T>::ValueType& value)
559  { // convert the result to qi::AnyValue
560  return AnyValue::from(value);
561  });
562  }
563 
564  template <>
566  {
567  return future.andThen([](void *) {
568  // create a void AnyValue
569  return AnyValue(typeOf<void>());
570  });
571  }
572 
573  namespace detail
574  {
575 
576  template <typename T>
578  {
579  void operator,(const T& val)
580  {
581  future = Future<T>(val);
582  }
583 
584  void operator,(const Future<T>& val)
585  {
586  future = val;
587  }
588 
590  {
591  return *this;
592  }
593 
595  };
596 
597  template <>
598  struct FutureWrapper<void>
599  {
600  // initialize the future as operator, wont be called if the function returns void
601  // if it returns a future, then this value will be overwritten anyway, so no problem
603  : future(0)
604  {}
605 
606  void operator,(const Future<void>& val)
607  {
608  future = val;
609  }
610 
612  {
613  return *this;
614  }
615 
617  };
618  } // detail
619 } // qi
620 
622 
623 #endif // _QI_DETAIL_FUTURE_HXX_
void setError(qi::Future< T > &future, const std::string &message)
Definition: future.hxx:284
AdaptFutureOption
Definition: future_fwd.hpp:129
Future< AnyValue > toAnyValueFuture(Future< T > future)
Definition: future.hxx:556
void setup(boost::function< void(qi::Promise< T > &)> cancelCallback, FutureCallbackType async=FutureCallbackType_Auto)
Definition: future_fwd.hpp:915
void connect(const AF &fun, FutureCallbackType type=FutureCallbackType_Auto)
Definition: future_fwd.hpp:545
FutureWrapper< T > & operator()()
Definition: future.hxx:589
The future has been canceled.
Definition: future_fwd.hpp:85
void futureAdapter(const Future< FT > &f, Promise< PT > p, CONV converter)
Definition: future.hxx:475
void operator()(const T &in, void *out)
Definition: future.hxx:514
void operator,(const T &val)
Definition: future.hxx:579
void operator()(void *in, void *out)
Definition: future.hxx:506
void setCanceled(qi::Future< T > &future)
Definition: future.hxx:300
void setValue(qi::Future< T > &future, const ValueType &value)
Definition: future.hxx:267
void setError(const std::string &msg)
Definition: future_fwd.hpp:862
static AnyValue from(const T &r)
Definition: anyvalue.hpp:94
ValueType & value()
Definition: future_fwd.hpp:887
void set(qi::Future< T > &future)
Definition: future.hxx:276
bool isCanceled() const
Definition: future_fwd.hpp:339
boost::function< void()> makeCanceler()
Get a functor that will cancel the future.
Definition: future.hxx:153
void trigger()
Definition: future_fwd.hpp:890
Specialize this struct to provide conversion between future values.
FutureWrapper< void > & operator()()
Definition: future.hxx:611
qi::Future< T > makeFutureError(const std::string &error)
Helper function to return a future with the error set.
Definition: future.hxx:466
void connect(qi::Future< T > future, const boost::function< void(qi::Future< T >)> &callback, FutureCallbackType type)
Definition: future.hxx:315
void post(const boost::function< void()> &callback, uint64_t usDelay)
Similar to async() but without cancelation or notification.
Definition: eventloop.hpp:155
qi::Future< T > future
Definition: future.hxx:594
Future< T > future() const
Get a future linked to this promise. Can be called multiple times.
Definition: future_fwd.hpp:881
Future< R > andThenR(FutureCallbackType type, AF &&func)
Same as thenR(), but the callback is called only if this future finishes with a value.
Definition: future.hxx:89
boost::shared_ptr< detail::FutureBaseTyped< T > > _p
Definition: future_fwd.hpp:598
#define qiLogError(...)
Log in error mode.
Definition: log.hpp:120
the future is not associated to a promise
Definition: future_fwd.hpp:146
const ValueType & value(int msecs) const
Definition: future.hxx:359
FutureState
Definition: future_fwd.hpp:82
typename FutureType< T >::type ValueType
Definition: future_fwd.hpp:988
void setPromiseFromCallWithExceptionSupport(Promise< R > &p, F &&f)
Definition: future.hxx:39
qi::Future< void > future
Definition: future.hxx:616
Future is not tied to a promise.
Definition: future_fwd.hpp:83
Operation pending.
Definition: future_fwd.hpp:84
void setPromiseFromCall(Promise< R > &p, F &&f)
Definition: future.hxx:23
void swap(::qi::AnyFunction &a,::qi::AnyFunction &b)
Definition: anyfunction.hxx:92
void setOnDestroyed(boost::function< void(ValueType)> f)
Definition: future.hxx:308
void operator,(const Future< void > &val)
Definition: future.hxx:606
auto async(F &&callback) -> decltype(asyncDelay(std::forward< F >(callback), qi::Duration(0)))
Definition: async.hpp:53
typename detail::FutureType< T >::type ValueType
Definition: future_fwd.hpp:187
void cancel()
Definition: future_fwd.hpp:383
EventLoop * getEventLoop()
Returns the global eventloop, created on demand on first call.
The operation is finished with an error.
Definition: future_fwd.hpp:86
Future< R > thenR(FutureCallbackType type, AF &&func)
Execute a callback when the future is finished.
Definition: future.hxx:58
void waitForFirstHelper(qi::Promise< qi::Future< T > > &prom, qi::Future< T > &fut, qi::Atomic< int > *count)
Definition: future.hxx:389
auto andThen(FutureCallbackType type, F &&func) -> Future< typename std::decay< typename std::result_of< F(ValueType)>::type >::type >
Same as then(), but the callback is called only if this future finishes with a value.
Definition: future_fwd.hpp:506
void setCanceled()
Definition: future_fwd.hpp:869
auto schedulerFor(F &&func, boost::function< void()> onFail={}, ExecutionOptions options=defaultExecutionOptions()) -> detail::Stranded< typename std::decay< F >::type >
Definition: strand.hpp:235
boost::function< void(Promise< T > &)> CancelCallback
Definition: future_fwd.hpp:987
void cancel(qi::Future< T > &future)
Definition: future.hxx:176
void futureCancelAdapter(boost::weak_ptr< FutureBaseTyped< FT > > wf)
Definition: future.hxx:496
void operator,(const Future< T > &val)
Definition: future.hxx:584
const std::string & error(int msecs=FutureTimeout_Infinite) const
Definition: future_fwd.hpp:367
const ValueType & value(int msecs=FutureTimeout_Infinite) const
Return the value associated to a Future.
Definition: future_fwd.hpp:249
Convenient log macro.
FutureCallbackType
Definition: future_fwd.hpp:104
void setBroken(qi::Future< T > &future)
Definition: future.hxx:292
void setValue(const ValueType &value)
Definition: future_fwd.hpp:855
void setOnCancel(const qi::Promise< T > &promise, CancelCallback onCancel)
Definition: future.hxx:194
void adaptFuture(const Future< FT > &f, Promise< PT > &p, AdaptFutureOption option)
Feed a promise from a future of possibly different type.
Definition: future.hxx:537
void connectWithStrand(qi::Strand *strand, const boost::function< void(const Future< T > &)> &cb)
Definition: future.hxx:126
The operation is finished with a value.
Definition: future_fwd.hpp:87
std::enable_if< std::is_function< RF >::value, boost::function< RF > >::type bind(AF &&fun, Arg0 &&arg0, Args &&...args)
Definition: trackable.hxx:308
void operator()(void *in, const T &out)
Definition: future.hxx:522
void adaptFutureUnwrap(Future< AnyReference > &f, Promise< R > &p)
Feed a promise from a generic future which may be unwrapped if it contains itself a future...
Definition: future.hxx:528
bool hasError(int msecs=FutureTimeout_Infinite) const
Definition: future_fwd.hpp:348