7 #ifndef _QI_STRAND_HPP_
8 #define _QI_STRAND_HPP_
13 #include <ka/functional.hpp>
14 #include <ka/mutablestore.hpp>
16 #include <qi/config.hpp>
19 #include <boost/enable_shared_from_this.hpp>
20 #include <boost/shared_ptr.hpp>
21 #include <boost/function.hpp>
22 #include <boost/noncopyable.hpp>
23 #include <boost/type_traits/function_traits.hpp>
26 # pragma warning( push )
27 # pragma warning( disable: 4996 ) // TODO: Reactivate this warning once msvc stop triggerring a warning on overloading a deprecated function
51 using Queue = std::deque<boost::shared_ptr<Callback>>;
62 class ScopedPromiseGroup;
81 boost::shared_ptr<Callback> createCallback(boost::function<
void()> cb,
ExecutionOptions options);
82 void enqueue(boost::shared_ptr<Callback> cbStruct,
ExecutionOptions options);
85 void cancel(boost::shared_ptr<Callback> cbStruct);
86 bool isInThisContext() const override;
99 void stopProcess(boost::recursive_mutex::scoped_lock& lock,
105 auto track(Task&& task)
107 -> decltype(ka::scope_lock_proc(ka::fwd<Task>(task), ka::mutable_store(weak_from_this())))
109 return ka::scope_lock_proc(ka::fwd<Task>(task), ka::mutable_store(weak_from_this()));
174 qi::
Future<
void>
async(const boost::function<
void()>& cb,
177 qi::
Future<
void>
async(const boost::function<
void()>& cb,
181 #define genCall(n, ATYPEDECL, ATYPES, ADECL, AUSE, comma) \
182 template <typename T, typename F, typename ARG0 comma ATYPEDECL> \
183 QI_API_DEPRECATED_MSG(Use generic 'schedulerFor' overload instead) boost::function<T> schedulerFor( \
184 const F& func, const ARG0& arg0 comma ADECL, \
185 const boost::function<void()>& fallbackCb = boost::function<void()>()) \
187 boost::function<T> funcbind = qi::bind<T>(func, arg0 comma AUSE); \
188 return qi::trackWithFallback( \
190 SchedulerHelper<boost::function_traits<T>::arity, T>::_scheduler( \
202 bool isInThisContext()
const override;
234 template <
typename F>
239 boost::atomic_load(&_p),
247 template <
typename F>
252 boost::atomic_load(&_p),
267 Future<void> defer(
const boost::function<
void()>& cb,
272 boost::shared_ptr<StrandPrivate> _p;
274 void postImpl(boost::function<
void()> callback, ExecutionOptions options)
override;
280 template <
int N,
typename T>
281 struct SchedulerHelper;
282 #define typedefi(z, n, _) \
283 typedef typename boost::function_traits<T>::BOOST_PP_CAT( \
284 BOOST_PP_CAT(arg, BOOST_PP_INC(n)), _type) BOOST_PP_CAT(P, n);
285 #define placeholders(z, n, __) , BOOST_PP_CAT(_, BOOST_PP_INC(n))
286 #define genCall(n, ATYPEDECL, ATYPES, ADECL, AUSE, comma) \
287 template <typename T> \
288 struct SchedulerHelper<n, T> \
290 BOOST_PP_REPEAT(n, typedefi, _); \
291 typedef typename boost::function_traits<T>::result_type R; \
292 static boost::function<T> _scheduler(const boost::function<T>& f, \
295 return qi::bind<T>(&_asyncCall, strand, \
296 f BOOST_PP_REPEAT(n, placeholders, _)); \
298 static qi::Future<R> _asyncCall(Strand* strand, \
299 const boost::function<T>& func comma \
303 return ((qi::ExecutionContext*)strand) \
304 ->async(qi::bind<R()>(func comma AUSE)); \
316 template <
typename F,
typename... Args>
317 static auto callInStrand(
319 const boost::function<
void()>& onFail,
320 boost::weak_ptr<StrandPrivate> weakStrand,
321 ExecutionOptions options,
323 -> decltype(weakStrand.lock()->async(
std::bind(func, std::move(args)...), options))
325 if (
auto strand = weakStrand.lock())
327 return strand->async(
std::bind(func, std::move(args)...), options);
334 typename std::decay<decltype(func(std::forward<Args>(args)...))>::type>(
"strand is dead");
340 template <
typename F>
351 :
_func(std::move(f))
358 template <
typename... Args>
362 return callInStrand(_func, _onFail, _strand, _executionOptions, std::forward<Args>(args)...);
365 template <
typename... Args>
374 template <
typename F>
375 struct StrandedUnwrapped
383 template <
typename FF>
385 : _stranded(std::forward<FF>(f), strand, onFail, options)
388 template <
typename... Args>
390 -> decltype(
tryUnwrap(_stranded(std::forward<Args>(args)...)))
392 return tryUnwrap(_stranded(std::forward<Args>(args)...));
395 template <
typename... Args>
397 -> decltype(
tryUnwrap(_stranded(std::forward<Args>(args)...)))
399 return tryUnwrap(_stranded(std::forward<Args>(args)...));
406 # pragma warning( pop )
411 #endif // _QI_STRAND_HPP_
boost::condition_variable_any _processFinished
std::atomic< unsigned int > _aliveCount
std::atomic< unsigned int > _curId
auto asyncDelay(F &&callback, qi::Duration delay) -> decltype(detail::asyncMaybeActor(std::forward< F >(callback), delay))
DurationType< int64_t, boost::micro > MicroSeconds
auto operator()(Args &&...args) -> decltype(tryUnwrap(_stranded(std::forward< Args >(args)...)))
auto operator()(Args &&...args) const -> decltype(tryUnwrap(_stranded(std::forward< Args >(args)...)))
BOOST_CONSTEXPR ExecutionOptions defaultExecutionOptions() BOOST_NOEXCEPT
qi::Future< void > async(const boost::function< void()> &callback, qi::SteadyClockTimePoint tp) override
auto track(F &&f, T &&toTrack) -> decltype(trackWithFallback(detail::throwPointerLockException, std::forward< F >(f), std::forward< T >(toTrack)))
SteadyClock::time_point SteadyClockTimePoint
Steady clock time point.
std::deque< boost::shared_ptr< Callback >> Queue
virtual qi::Future< void > async(const boost::function< void()> &callback, qi::SteadyClockTimePoint tp)=0
StrandedUnwrapped(FF &&f, const boost::weak_ptr< StrandPrivate > &strand, const boost::function< void()> &onFail, ExecutionOptions options)
#define QI_ASSERT(expr__)
boost::recursive_mutex _mutex
qi::ExecutionContext & _executor
boost::weak_ptr< StrandPrivate > _strand
qi::Future< T > makeFutureError(const std::string &error)
Helper function to return a future with the error set.
static const bool is_async
#define genCall(n, ATYPEDECL, ATYPES, ADECL, AUSE, comma)
#define QI_NOEXCEPT(cond)
Specify that a function may throw or not. Do nothing if noexcept is not available.
static const bool is_async
#define QI_API_DEPRECATED_MSG(msg__)
Compiler flags to mark a function as deprecated. It will generate a compiler warning.
auto async(F &&callback) -> decltype(asyncDelay(std::forward< F >(callback), qi::Duration(0)))
ExecutionOptions _executionOptions
boost::optional< std::string > OptionalErrorMessage
qi::Future< void > async(const boost::function< void()> &callback, qi::Duration delay) override
auto asyncAt(F &&callback, qi::SteadyClockTimePoint timepoint) -> decltype(qi::getEventLoop() ->asyncAt(std::forward< F >(callback), timepoint))
boost::function< void()> _onFail
std::atomic< int > _processingThread
auto operator()(Args &&...args) const -> decltype(callInStrand(_func, _onFail, _strand, _executionOptions, std::forward< Args >(args)...))
auto schedulerFor(F &&func, boost::function< void()> onFail={}, ExecutionOptions options=defaultExecutionOptions()) -> detail::Stranded< typename std::decay< F >::type >
auto unwrappedSchedulerFor(F &&func, boost::function< void()> onFail={}, ExecutionOptions options=defaultExecutionOptions()) -> detail::StrandedUnwrapped< typename std::decay< F >::type >
auto operator()(Args &&...args) -> decltype(callInStrand(_func, _onFail, _strand, _executionOptions, std::forward< Args >(args)...))
Stranded(F f, boost::weak_ptr< StrandPrivate > strand, boost::function< void()> onFail, ExecutionOptions options)
std::shared_ptr< ScopedPromiseGroup > _deferredTasksFutures
std::enable_if< std::is_function< RF >::value, boost::function< RF > >::type bind(AF &&fun, Arg0 &&arg0, Args &&...args)
Represent execution behaviour options attached to a task that must be interpreted by an ExecutionCont...