Skip to content

Commit

Permalink
Provide scheduler_ptr to _ScheduleFuncWithAutoInline
Browse files Browse the repository at this point in the history
The ambient_scheduler may not be the desired scheduler to schedule
functions on, but _ScheduleFuncWithAutoInline does not offer any
other option. Provide a means to select a desired scheduler and
fall back to the ambient scheduler if not specified.
  • Loading branch information
parkske committed Mar 13, 2021
1 parent bfe3487 commit baded66
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 7 deletions.
14 changes: 11 additions & 3 deletions Release/include/pplx/pplx.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,16 +165,24 @@ class _TaskCollectionImpl
scheduler_ptr _GetScheduler() const { return _M_pScheduler; }

// Fire and forget
static void _RunTask(TaskProc_t _Proc, void* _Parameter, _TaskInliningMode _InliningMode)
static void _RunTask(TaskProc_t _Proc, void* _Parameter, _TaskInliningMode _InliningMode, scheduler_ptr _Scheduler)
{
if (_InliningMode == _ForceInline)
{
_Proc(_Parameter);
}
else
{
// Schedule the work on the ambient scheduler
get_ambient_scheduler()->schedule(_Proc, _Parameter);
if (_Scheduler.get())
{
// Schedule the work on the desired scheduler
_Scheduler->schedule(_Proc, _Parameter);
}
else
{
// Schedule the work on the ambient scheduler as a fallback
get_ambient_scheduler()->schedule(_Proc, _Parameter);
}
}
}

Expand Down
12 changes: 8 additions & 4 deletions Release/include/pplx/pplxtasks.h
Original file line number Diff line number Diff line change
Expand Up @@ -622,9 +622,12 @@ struct _TaskProcThunk
/// <param name="_InliningMode">
/// The inlining scheduling policy for current functor.
/// </param>
static void _ScheduleFuncWithAutoInline(const std::function<void()>& _Func, _TaskInliningMode_t _InliningMode)
/// <param name="_Scheduler">
/// The intended Scheduler to run the task on.
/// </param>
static void _ScheduleFuncWithAutoInline(const std::function<void()>& _Func, _TaskInliningMode_t _InliningMode, scheduler_ptr _Scheduler)
{
_TaskCollection_t::_RunTask(&_TaskProcThunk::_Bridge, new _TaskProcThunk(_Func), _InliningMode);
_TaskCollection_t::_RunTask(&_TaskProcThunk::_Bridge, new _TaskProcThunk(_Func), _InliningMode, _Scheduler);
}

class _ContextCallback
Expand Down Expand Up @@ -2089,7 +2092,8 @@ struct _Task_impl_base
}
}
},
_PTaskHandle->_M_inliningMode);
_PTaskHandle->_M_inliningMode,
_M_TaskCollection._GetScheduler());
}
else
{
Expand Down Expand Up @@ -2514,7 +2518,7 @@ struct _Task_impl : public _Task_impl_base
if (_M_Continuations)
{
// Scheduling cancellation with automatic inlining.
_ScheduleFuncWithAutoInline([=]() { _RunTaskContinuations(); }, details::_DefaultAutoInline);
_ScheduleFuncWithAutoInline([=]() { _RunTaskContinuations(); }, details::_DefaultAutoInline, _M_TaskCollection._GetScheduler());
}
}
return true;
Expand Down
46 changes: 46 additions & 0 deletions Release/tests/functional/pplx/pplx_test/pplx_task_options.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ class TaskOptionsTestScheduler : public pplx::scheduler_interface
{
public:
TaskOptionsTestScheduler() : m_numTasks(0), m_scheduler(get_scheduler()) {}
TaskOptionsTestScheduler(std::shared_ptr<pplx::scheduler_interface> scheduler) : m_numTasks(0), m_scheduler(std::move(scheduler)) {}

virtual void schedule(pplx::TaskProc_t proc, void* param)
{
Expand Down Expand Up @@ -159,6 +160,24 @@ SUITE(pplx_task_options_tests)
VERIFY_ARE_EQUAL(sched1.get_num_tasks(), 1);
VERIFY_ARE_EQUAL(sched2.get_num_tasks(), 2);
}

TEST(then_from_exception_custom_scheduler_test)
{
class custom_direct_executor : public pplx::scheduler_interface
{
public:
virtual void schedule(pplx::TaskProc_t proc, _In_ void* param) { proc(param); }
};

TaskOptionsTestScheduler sched(std::make_shared<custom_direct_executor>());
long n = 0;

auto t1 = pplx::create_task([&n]() { n++; throw std::runtime_error("exception"); }, sched);
t1.then([&n](pplx::task<void> task_result) { n++; try { task_result.get(); } catch (...){} }) // inherit sched
.wait();

VERIFY_ARE_EQUAL(sched.get_num_tasks(), n);
}

TEST(opand_nooptions_test)
{
Expand Down Expand Up @@ -260,6 +279,33 @@ SUITE(pplx_task_options_tests)
VERIFY_ARE_EQUAL(sched2.get_num_tasks(), 0);
}

TEST(whenall_then_from_exception_custom_scheduler_test)
{
class custom_direct_executor : public pplx::scheduler_interface
{
public:
virtual void schedule(pplx::TaskProc_t proc, _In_ void* param) { proc(param); }
};

TaskOptionsTestScheduler sched(std::make_shared<custom_direct_executor>());

std::vector<pplx::task<void>> taskVect;
const int task_count = 3;
long n = 0;
for (int i = 0; i < (task_count-1); i++)
{
taskVect.push_back(pplx::create_task([&n]() {n++;}, sched));
}
taskVect.push_back(pplx::create_task([&n]() { n++; throw std::runtime_error("exception");}, sched));

auto t3 = pplx::when_all(
begin(taskVect), end(taskVect), sched);
n++; // sched used within when_all
t3.then([&n](pplx::task<void> task_result) { n++; try { task_result.get(); } catch (...){} }, sched).wait();

VERIFY_ARE_EQUAL(sched.get_num_tasks(), n);
}

TEST(opor_nooptions_test)
{
TaskOptionsTestScheduler sched;
Expand Down

0 comments on commit baded66

Please sign in to comment.