Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Proactor Implementation of EventDispatcher #4060

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion dds/DCPS/EventDispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ namespace DCPS {
* an EventDispatcher. Derived classes are required to implement handle_event
* which will be called when the event is dispatched by the event dispatcher
*/
struct OpenDDS_Dcps_Export EventBase : virtual RcObject {
struct OpenDDS_Dcps_Export EventBase : public virtual RcObject {
virtual ~EventBase();

/// Called when the event is dispatched by an EventDispatcher
Expand Down
130 changes: 130 additions & 0 deletions dds/DCPS/ProactorEventDispatcher.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
*
*
* Distributed under the OpenDDS License.
* See: http://www.opendds.org/license.html
*/

#include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/

#include "ProactorEventDispatcher.h"
#ifdef ACE_HAS_AIO_CALLS
# include <ace/POSIX_CB_Proactor.h>
#endif
#include "Service_Participant.h"

OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL

namespace OpenDDS {
namespace DCPS {

ProactorEventDispatcher::ProactorEventDispatcher(size_t count)
: cv_(mutex_)
, active_threads_(0)
#ifdef ACE_HAS_AIO_CALLS
, proactor_(new ACE_Proactor(new ACE_POSIX_AIOCB_Proactor()))
#else
, proactor_(new ACE_Proactor())
#endif
{
proactor_->number_of_threads(count);
pool_ = make_rch<ThreadPool>(count, run, this);
ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
while (active_threads_ != count) {
cv_.wait(TheServiceParticipant->get_thread_status_manager());
}
}

ProactorEventDispatcher::~ProactorEventDispatcher()
{
shutdown();
}

void ProactorEventDispatcher::shutdown(bool)
{
Proactor_rap local;
{
ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
local = proactor_;
proactor_.reset();
}
if (local) {
local->proactor_end_event_loop();
ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
while (active_threads_ != 0) {
cv_.wait(TheServiceParticipant->get_thread_status_manager());
}
}
}

bool ProactorEventDispatcher::dispatch(EventBase_rch event)
{
return schedule(event, MonotonicTimePoint::zero_value) >= 0;
}

long ProactorEventDispatcher::schedule(EventBase_rch event, const MonotonicTimePoint& expiration)
{
ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
if (!proactor_) {
return -1;
}
event->_add_ref();
const TimeDuration delta = expiration == MonotonicTimePoint::zero_value ? TimeDuration::zero_value : expiration - MonotonicTimePoint::now();
const DDS::Duration_t dds_delta = delta.to_dds_duration();
const long result = proactor_->schedule_timer(*this, event.get(), ACE_Time_Value(dds_delta.sec, dds_delta.nanosec / 1000));
if (result < 0) {
event->_remove_ref();
}
return result;
}

size_t ProactorEventDispatcher::cancel(long id)
{
ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
if (!proactor_) {
return 0;
}
const void* arg = 0;
const size_t result = proactor_->cancel_timer(id, &arg);
if (result) {
EventBase* ptr = static_cast<EventBase*>(const_cast<void*>(arg));
if (ptr) {
ptr->handle_cancel();
ptr->_remove_ref();
}
}
return result;
}

void ProactorEventDispatcher::handle_time_out(const ACE_Time_Value&, const void *act)
{
EventBase* ptr = static_cast<EventBase*>(const_cast<void*>(act));
if (ptr) {
(*ptr)();
}
}

ACE_THR_FUNC_RETURN ProactorEventDispatcher::run(void* arg)
{
ProactorEventDispatcher& dispatcher = *static_cast<ProactorEventDispatcher*>(arg);
Proactor_rap proactor = dispatcher.proactor_;
{
ACE_Guard<ACE_Thread_Mutex> guard(dispatcher.mutex_);
++dispatcher.active_threads_;
dispatcher.cv_.notify_all();
}
if (proactor) {
proactor->proactor_run_event_loop();
}
{
ACE_Guard<ACE_Thread_Mutex> guard(dispatcher.mutex_);
--dispatcher.active_threads_;
dispatcher.cv_.notify_all();
}
return 0;
}

} // DCPS
} // OpenDDS

OPENDDS_END_VERSIONED_NAMESPACE_DECL
60 changes: 60 additions & 0 deletions dds/DCPS/ProactorEventDispatcher.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
*
*
* Distributed under the OpenDDS License.
* See: http://www.opendds.org/license.html
*/

#ifndef OPENDDS_DCPS_PROACTOR_EVENT_DISPATCHER_H
#define OPENDDS_DCPS_PROACTOR_EVENT_DISPATCHER_H

#include "EventDispatcher.h"
#include "ThreadPool.h"

#include <ace/Refcounted_Auto_Ptr.h>
#include <ace/Proactor.h>

OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL

namespace OpenDDS {
namespace DCPS {

class OpenDDS_Dcps_Export ProactorEventDispatcher : public EventDispatcher, public virtual ACE_Handler {
public:
/**
* Create a ProactorEventDispatcher
* @param count the requested size of the internal thread pool (see DispatchProactor)
*/
explicit ProactorEventDispatcher(size_t count = 1);
virtual ~ProactorEventDispatcher();

void shutdown(bool immediate = false);

bool dispatch(EventBase_rch event);

long schedule(EventBase_rch event, const MonotonicTimePoint& expiration = MonotonicTimePoint::now());

size_t cancel(long id);

void handle_time_out(const ACE_Time_Value& tv, const void* act = 0);

private:

typedef ACE_Refcounted_Auto_Ptr<ACE_Proactor, ACE_Thread_Mutex> Proactor_rap;

static ACE_THR_FUNC_RETURN run(void* arg);

mutable ACE_Thread_Mutex mutex_;
mutable ConditionVariable<ACE_Thread_Mutex> cv_;
size_t active_threads_;
Proactor_rap proactor_;
RcHandle<ThreadPool> pool_;
};
typedef RcHandle<ProactorEventDispatcher> ProactorEventDispatcher_rch;

} // DCPS
} // OpenDDS

OPENDDS_END_VERSIONED_NAMESPACE_DECL

#endif // OPENDDS_DCPS_PROACTOR_EVENT_DISPATCHER_H
4 changes: 2 additions & 2 deletions dds/DCPS/ThreadPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

#include "ConditionVariable.h"
#include "PoolAllocator.h"
#include "RcObject.h"

#include <ace/Thread.h>

Expand All @@ -31,8 +32,7 @@ namespace DCPS {
* at destruction. Users of ThreadPool are responsible for making sure the
* running threads are in a joinable state before the destruction of ThreadPool
*/
class OpenDDS_Dcps_Export ThreadPool
{
class OpenDDS_Dcps_Export ThreadPool : public virtual RcObject {
public:

/// A typedef for the starting point of the ThreadPool
Expand Down
141 changes: 141 additions & 0 deletions tests/stress-tests/dds/DCPS/ProactorEventDispatcher.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
*
*
* Distributed under the OpenDDS License.
* See: http://www.opendds.org/license.html
*/

#include <dds/DCPS/ProactorEventDispatcher.h>

#include <dds/DCPS/ConditionVariable.h>
#include <dds/DCPS/ThreadStatusManager.h>

#include <gtest/gtest.h>

namespace {

class TestEventBase : public OpenDDS::DCPS::EventBase {
public:
TestEventBase() : cv_(mutex_), call_count_(0) {}

size_t increment_call_count()
{
ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
++call_count_;
cv_.notify_all();
return call_count_;
}

size_t call_count()
{
ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
return call_count_;
}

void wait(size_t target)
{
ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
while (call_count_ < target) {
cv_.wait(tsm_);
}
}

private:
ACE_Thread_Mutex mutex_;
OpenDDS::DCPS::ConditionVariable<ACE_Thread_Mutex> cv_;
OpenDDS::DCPS::ThreadStatusManager tsm_;
size_t call_count_;
};

struct SimpleTestEvent : public TestEventBase {
SimpleTestEvent() {}
void handle_event() { increment_call_count(); }
};

struct RecursiveTestEventOne : public TestEventBase {
RecursiveTestEventOne(OpenDDS::DCPS::RcHandle<OpenDDS::DCPS::EventDispatcher> dispatcher) : dispatcher_(dispatcher) {}

void handle_event()
{
if (increment_call_count() % 2) {
OpenDDS::DCPS::RcHandle<OpenDDS::DCPS::EventDispatcher> dispatcher = dispatcher_.lock();
if (dispatcher) {
dispatcher->dispatch(OpenDDS::DCPS::rchandle_from(this));
}
}
}

OpenDDS::DCPS::WeakRcHandle<OpenDDS::DCPS::EventDispatcher> dispatcher_;
};

struct RecursiveTestEventTwo : public TestEventBase {
RecursiveTestEventTwo(OpenDDS::DCPS::RcHandle<OpenDDS::DCPS::EventDispatcher> dispatcher, size_t dispatch_scale) : dispatcher_(dispatcher), dispatch_scale_(dispatch_scale) {}

void handle_event()
{
increment_call_count();
const size_t scale = dispatch_scale_;
OpenDDS::DCPS::RcHandle<OpenDDS::DCPS::EventDispatcher> dispatcher = dispatcher_.lock();
if (dispatcher) {
for (size_t i = 0; i < scale; ++i) {
dispatcher->dispatch(OpenDDS::DCPS::rchandle_from(this));
}
}
}

OpenDDS::DCPS::WeakRcHandle<OpenDDS::DCPS::EventDispatcher> dispatcher_;
OpenDDS::DCPS::Atomic<size_t> dispatch_scale_;
};

} // (anonymous) namespace

TEST(dds_DCPS_ProactorEventDispatcher, RecursiveDispatchDelta)
{

#if defined (ACE_HAS_AIO_CALLS)
// POSIX Proactor.
# if defined (ACE_POSIX_AIOCB_PROACTOR)
std::cout << "ACE_NEW (implementation, ACE_POSIX_AIOCB_Proactor);" << std::endl;
# elif defined (ACE_POSIX_SIG_PROACTOR)
std::cout << "ACE_NEW (implementation, ACE_POSIX_SIG_Proactor);" << std::endl;
# else /* Default order: CB, SIG, AIOCB */
# if !defined(ACE_HAS_BROKEN_SIGEVENT_STRUCT)
std::cout << "ACE_NEW (implementation, ACE_POSIX_CB_Proactor);" << std::endl;
# else
# if defined(ACE_HAS_POSIX_REALTIME_SIGNALS)
std::cout << "ACE_NEW (implementation, ACE_POSIX_SIG_Proactor);" << std::endl;
# else
std::cout << "ACE_NEW (implementation, ACE_POSIX_AIOCB_Proactor);" << std::endl;
# endif /* ACE_HAS_POSIX_REALTIME_SIGNALS */
# endif /* !ACE_HAS_BROKEN_SIGEVENT_STRUCT */
# endif /* ACE_POSIX_AIOCB_PROACTOR */
#elif (defined (ACE_WIN32) && !defined (ACE_HAS_WINCE))
// WIN_Proactor.
std::cout << "ACE_NEW (implementation, ACE_WIN32_Proactor);" << std::endl;
#endif /* ACE_HAS_AIO_CALLS */

OpenDDS::DCPS::RcHandle<OpenDDS::DCPS::EventDispatcher> dispatcher = OpenDDS::DCPS::make_rch<OpenDDS::DCPS::ProactorEventDispatcher>(8);
OpenDDS::DCPS::RcHandle<RecursiveTestEventTwo> test_event = OpenDDS::DCPS::make_rch<RecursiveTestEventTwo>(dispatcher, 2);

dispatcher->dispatch(test_event);

test_event->wait(100000u);
test_event->dispatch_scale_ = 0;
dispatcher->shutdown();

EXPECT_GE(test_event->call_count(), 100000u);
}

TEST(dds_DCPS_ProactorEventDispatcher, RecursiveDispatchDelta_ImmediateShutdown)
{
OpenDDS::DCPS::RcHandle<OpenDDS::DCPS::EventDispatcher> dispatcher = OpenDDS::DCPS::make_rch<OpenDDS::DCPS::ProactorEventDispatcher>(8);
OpenDDS::DCPS::RcHandle<RecursiveTestEventTwo> test_event = OpenDDS::DCPS::make_rch<RecursiveTestEventTwo>(dispatcher, 2);

dispatcher->dispatch(test_event);

test_event->wait(100000u);
test_event->dispatch_scale_ = 0;
dispatcher->shutdown(true);

EXPECT_GE(test_event->call_count(), 100000u);
}