Skip to content

Commit

Permalink
Merge tag '10.0.1-devel'
Browse files Browse the repository at this point in the history
Release candidate for 10.0.1. See release notes for changes and of course commit logs
  • Loading branch information
pmnewman committed Nov 15, 2013
2 parents 526ba75 + 1c18b7a commit f460255
Show file tree
Hide file tree
Showing 12 changed files with 137 additions and 66 deletions.
15 changes: 14 additions & 1 deletion Core/MOOSDB/MOOSDB.cpp
Expand Up @@ -157,6 +157,7 @@ void PrintHelpAndExit()
std::cout<<"--response=<string-list> specify tolerable client latencies in ms\n";
std::cout<<"--warning_latency=<positive_float> specify latency above which warning is issued in ms\n";
std::cout<<"--tcpnodelay disable nagle algorithm \n";
std::cout<<"--audit_port=<unsigned int> specify port on which to transmit statistics\n";



Expand Down Expand Up @@ -267,6 +268,12 @@ bool CMOOSDB::Run(int argc, char * argv[] )
//are we being asked to be old skool and use a single thread?
bool bSingleThreaded = P.GetFlag("-s","--single_threaded");


//is the community name being specified on the cli?
unsigned int nAuditPort=9020;
P.GetVariable("--audit_port",nAuditPort);



LogStartTime();

Expand Down Expand Up @@ -297,14 +304,20 @@ bool CMOOSDB::Run(int argc, char * argv[] )

m_pCommServer->SetCommandLineParameters(argc,argv);

m_pCommServer->Run(m_nPort,m_sCommunityName,bDisableNameLookUp);
m_pCommServer->Run(m_nPort,m_sCommunityName,bDisableNameLookUp,nAuditPort);



return true;
}

bool CMOOSDB::IsRunning()
{
if(m_pCommServer.get()==NULL)
return false;

return m_pCommServer->IsRunning();
}


void CMOOSDB::UpdateDBClientsVar()
Expand Down
2 changes: 2 additions & 0 deletions Core/MOOSDB/MOOSDB.h
Expand Up @@ -78,6 +78,8 @@ class CMOOSDB
and returns */
bool Run(int argc = 0, char * argv[] =0);

bool IsRunning();

/** returns the port on which this DB is listening */
long GetDBPort(){return m_nPort;};

Expand Down
4 changes: 2 additions & 2 deletions Core/MOOSDB/MOOSDBMain.cpp
Expand Up @@ -39,9 +39,9 @@ int main(int argc , char * argv[])

//nothing to do - all the threads in the DB object
//do the work
while(1)
while( DB.IsRunning())
{
MOOSPause(100000);
MOOSPause(1000);
}

return 0;
Expand Down
14 changes: 10 additions & 4 deletions Core/libMOOS/App/include/MOOS/libMOOS/App/MOOSApp.h
Expand Up @@ -166,7 +166,8 @@ class CMOOSApp
/** Called when the class has disconnects from the server. Put code you want to run when this happens in a virtual version of this method*/
virtual bool OnDisconnectFromServer();

/** called by a separate thread if a callback has been installed by calling AddMessageCallback()*/
/** called by a separate thread if a callback
* has been installed by calling AddMessageCallback()*/
virtual bool OnMessage(CMOOSMsg & M);

protected:
Expand Down Expand Up @@ -291,13 +292,18 @@ class CMOOSApp
bool UnRegister(const std::string & sVar);

/**
* Register a custom call back for a particular message. This call back will be called from its own thread.
* Register a custom call back for a particular message. This call back
* will be called from its own thread.
* @param sMsgName name of message to watch for
* @param pfn pointer to your function should be type bool func(CMOOSMsg &M, void *pParam)
* @param pfn pointer to your function should be type
* bool func(CMOOSMsg &M, void *pParam)
* @param pYourParam a void * pointer to the thing we want passed as pParam above
* @return true on success
*/
bool AddCustomMessageCallback(const std::string & sCallbackName,const std::string & sMsgName, bool (*pfn)(CMOOSMsg &M, void * pYourParam), void * pYourParam );
bool AddCustomMessageCallback(const std::string & sCallbackName,
const std::string & sMsgName,
bool (*pfn)(CMOOSMsg &M, void * pYourParam),
void * pYourParam );

/**
* Add a callback to ::OnMessage() for a particular message. This will cause OnMessage() to be called from its own thread
Expand Down
14 changes: 8 additions & 6 deletions Core/libMOOS/Comms/MOOSAsyncCommClient.cpp
Expand Up @@ -104,6 +104,10 @@ bool MOOSAsyncCommClient::StartThreads()
if(!ReadingThread_.Initialise(AsyncCommsReaderDispatch,this))
return false;


WritingThread_.Name(GetMOOSName()+" writing thread");
ReadingThread_.Name(GetMOOSName()+" reading thread");

if(!WritingThread_.Start())
return false;

Expand Down Expand Up @@ -297,8 +301,10 @@ bool MOOSAsyncCommClient::DoWriting()
CMOOSCommPkt PktTx;
try
{

PktTx.Serialize(StuffToSend,true);
m_nBytesSent+=PktTx.GetStreamLength();

}
catch (const CMOOSException & e)
{
Expand Down Expand Up @@ -357,11 +363,7 @@ bool MOOSAsyncCommClient::ReadingLoop()
if(!DoReading())
{
OutGoingQueue_.Push(CMOOSMsg(MOOS_TERMINATE_CONNECTION,"-quit-",0) );

//std::cout<<"reading failed!\n";

while(IsConnected())//wait for connection to terminate...
MOOSPause(200);
}
}
else
Expand All @@ -370,7 +372,7 @@ bool MOOSAsyncCommClient::ReadingLoop()
MOOSPause(100);
}
}
//std::cout<<"READING LOOP quiting...\n";
//std::cerr<<"READING LOOP quiting...\n";
return true;
}

Expand Down Expand Up @@ -477,7 +479,7 @@ bool MOOSAsyncCommClient::DoReading()
catch(const CMOOSException & e)
{
MOOS::DeliberatelyNotUsed(e);
//MOOSTrace("Exception in DoReading() : %s\n",e.m_sReason);
MOOSTrace("Exception in DoReading() : %s\n",e.m_sReason);
return false;
}

Expand Down
27 changes: 7 additions & 20 deletions Core/libMOOS/Comms/MOOSCommPkt.cpp
Expand Up @@ -112,17 +112,6 @@ bool CMOOSCommPkt::Fill(unsigned char *InData, int nData)

if( m_nByteCount <=(int)sizeof(int))
{
//here we figure out how many bytes we are expecting
// bool bBOA = false;
// if(0 && (m_nByteCount!=sizeof(int)))
// {
// std::cerr<<"Bug of Alon I thwart thee\n";
// std::cerr<<"m_nByteCount "<<m_nByteCount<<"\n";
// std::cerr<<"nData "<<nData<<"\n";
// std::cerr<<"GetBytesRequired() would have returned "<<GetBytesRequired()<<"\n";
// bBOA = true;
// }

if(m_nByteCount==sizeof(int))
{
memcpy((void*)(&m_nMsgLen),(void*)m_pStream,sizeof(int));
Expand All @@ -132,14 +121,7 @@ bool CMOOSCommPkt::Fill(unsigned char *InData, int nData)
{
m_nMsgLen = SwapByteOrder<int>(m_nMsgLen);
}

// if(bBOA)
// {
// std::cerr<<"calculated m_nMsgLen as "<<m_nMsgLen<<"\n";
// }

}

}


Expand All @@ -153,6 +135,11 @@ int CMOOSCommPkt::GetStreamLength()
}


unsigned int CMOOSCommPkt::GetNumMessagesSerialisedToStream()
{
return m_nToStreamCount;
}

/** This function stuffs messages in/from a packet */
bool CMOOSCommPkt::Serialize(MOOSMSG_LIST &List, bool bToStream, bool bNoNULL, double * pdfPktTime)
{
Expand Down Expand Up @@ -180,8 +167,8 @@ bool CMOOSCommPkt::Serialize(MOOSMSG_LIST &List, bool bToStream, bool bNoNULL, d


MOOSMSG_LIST::iterator p;
int nCount = 0;
for(p = List.begin();p!=List.end();p++,nCount++)
m_nToStreamCount = 0;
for(p = List.begin();p!=List.end();p++,m_nToStreamCount++)
{

unsigned int nRequiredSize = p->GetSizeInBytesWhenSerialised();
Expand Down
13 changes: 10 additions & 3 deletions Core/libMOOS/Comms/MOOSCommServer.cpp
Expand Up @@ -129,6 +129,7 @@ CMOOSCommServer::CMOOSCommServer()
m_sCommunityName = "#1";
m_bQuiet = false;
m_bDisableNameLookUp = true;
m_bQuit = false;

m_bBoostIOThreads= false;

Expand All @@ -152,7 +153,7 @@ void CMOOSCommServer::SetWarningLatencyMS(double dfPeriod)
}


bool CMOOSCommServer::Run(long lPort, const string & sCommunityName,bool bDisableNameLookUp)
bool CMOOSCommServer::Run(long lPort, const string & sCommunityName,bool bDisableNameLookUp,unsigned int nAuditPort)
{

m_sCommunityName = sCommunityName;
Expand All @@ -161,10 +162,15 @@ bool CMOOSCommServer::Run(long lPort, const string & sCommunityName,bool bDisabl

m_bDisableNameLookUp = bDisableNameLookUp;

m_nAuditPort = nAuditPort;



if(m_CommandLineParser.IsAvailable())
{



//here we look to parse latency
//--latency=y:10
std::string sLatency = "0";
Expand Down Expand Up @@ -671,6 +677,7 @@ bool CMOOSCommServer::OnNewClient(XPCTcpSocket * pNewClient,char * sName)
bool CMOOSCommServer::OnClientDisconnect()
{


std::cout<<"\n----------"<<MOOS::ConsoleColours::Yellow()<<"DISCONNECT"<<MOOS::ConsoleColours::reset()<<"------------\n";


Expand Down Expand Up @@ -770,8 +777,8 @@ bool CMOOSCommServer::IsUniqueName(string &sClientName)
//here we can check that the client is speaking the correct wire protocol
//we begin by reading a string and checking it is what we are expecting
//note we are only reading a few bytes so this lets us catch the case where
//an old client that doesn't send a string simpy sends a COmmPkt first
//chances of a comm packete spelling at a ptotocl name are pretty damns slim.....
//an old client that doesn't send a string simp;y sends a COmmPkt first
//chances of a comm packet spelling out a protocol name are pretty damn slim.....
bool CheckProtocol(XPCTcpSocket *pNewClient)
{
char sProtocol[MOOS_PROTOCOL_STRING_BUFFER_SIZE+1];
Expand Down
27 changes: 21 additions & 6 deletions Core/libMOOS/Comms/ThreadedCommServer.cpp
Expand Up @@ -178,7 +178,9 @@ bool ThreadedCommServer::ServerLoop()


MOOS::ServerAudit Auditor;
Auditor.Run();


Auditor.Run("localhost",m_nAuditPort);

if(m_bBoostIOThreads)
{
Expand Down Expand Up @@ -413,6 +415,8 @@ bool ThreadedCommServer::ProcessClient()

bool ThreadedCommServer::OnClientDisconnect(ClientThreadSharedData &SD)
{


//lock the base socket list
m_SocketListLock.Lock();

Expand Down Expand Up @@ -444,6 +448,7 @@ bool ThreadedCommServer::OnClientDisconnect(ClientThreadSharedData &SD)

bool ThreadedCommServer::OnClientDisconnect()
{

return BASE::OnClientDisconnect();
}

Expand All @@ -452,15 +457,22 @@ bool ThreadedCommServer::OnClientDisconnect()
bool ThreadedCommServer::StopAndCleanUpClientThread(std::string sName)
{

//use this name to get the thread which is doing our work


//use this name to get the thread which is doing our work
std::map<std::string,ClientThread*>::iterator q = m_ClientThreads.find(sName);


if(q==m_ClientThreads.end())
return MOOSFail("runtime error ThreadedCommServer::StopAndCleanUpClientThread - cannot figure out worker thread");

//stop the thread and wait for it to return
ClientThread* pWorker = q->second;
pWorker->Kill();
if(!pWorker->Kill())
{
std::cerr<<"failed to kill a client - serious problem\n";
throw std::runtime_error("failed to kill worker");
}

//remove any reference to this worker thread
m_ClientThreads.erase(q);
Expand All @@ -484,7 +496,7 @@ bool ThreadedCommServer::TimerLoop()

ThreadedCommServer::ClientThread::~ClientThread()
{

Kill();
}


Expand All @@ -498,10 +510,13 @@ ThreadedCommServer::ClientThread::ClientThread(const std::string & sName, XPCTcp
_bBoostThread(bBoost)
{
_Worker.Initialise(RunEntry,this);
_Worker.Name("ThreadedCommServer::ClientThread::Worker::"+sName);

if(IsAsynchronous())
{
_Writer.Initialise(WriteEntry,this);
_Writer.Name("ThreadedCommServer::ClientThread::Writer::"+sName);

}
}

Expand Down Expand Up @@ -615,7 +630,6 @@ bool ThreadedCommServer::ClientThread::Run()
bool ThreadedCommServer::ClientThread::OnClientDisconnect()
{


//prepare to send it up the chain
CMOOSCommPkt PktRx,PktTx;
ClientThreadSharedData SD(_sClientName,ClientThreadSharedData::CONNECTION_CLOSED);
Expand All @@ -635,6 +649,7 @@ bool ThreadedCommServer::ClientThread::OnClientDisconnect()
bool ThreadedCommServer::ClientThread::Kill()
{


if(IsAsynchronous())
{
//wait for it to stop..
Expand Down Expand Up @@ -706,6 +721,7 @@ bool ThreadedCommServer::ClientThread::AsynchronousWriteLoop()
//do normal writing
case ClientThreadSharedData::PKT_WRITE:
{

if(SDDownChain._pPkt.isNull())
{
std::cerr<<"logical error"<< MOOSHERE;
Expand Down Expand Up @@ -791,7 +807,6 @@ bool ThreadedCommServer::ClientThread::HandleClientWrite()
catch (const CMOOSException & e)
{
MOOS::DeliberatelyNotUsed(e);
//MOOSTrace("ThreadedCommServer::ClientThread::HandleClient() Exception: %s\n", e.m_sReason);
bResult = false;
}

Expand Down
5 changes: 5 additions & 0 deletions Core/libMOOS/Comms/include/MOOS/libMOOS/Comms/MOOSCommPkt.h
Expand Up @@ -61,6 +61,8 @@ class CMOOSCommPkt
int GetBytesRequired();
double GetCompression();

unsigned int GetNumMessagesSerialisedToStream();

CMOOSCommPkt();
virtual ~CMOOSCommPkt();

Expand All @@ -79,6 +81,9 @@ class CMOOSCommPkt
bool m_bAllocated;
double m_dfCompression;

//how many messages are contained in this packet when serialsised to a stream?
unsigned int m_nToStreamCount;

};

typedef std::list<CMOOSCommPkt> MOOSPKT_LIST;
Expand Down

0 comments on commit f460255

Please sign in to comment.