Skip to content

Commit

Permalink
Accumulation fixed for issue #154
Browse files Browse the repository at this point in the history
  • Loading branch information
hoangle committed Oct 20, 2014
1 parent eb69d90 commit 0651a6e
Show file tree
Hide file tree
Showing 20 changed files with 259 additions and 286 deletions.
5 changes: 4 additions & 1 deletion src/SAFplus/build/base-images/scripts/safplus_watchdog.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,15 @@ def watchdog_loop():
#pid = safplus.get_amf_pid()
#if pid == 0:
if not amfproc:
if os.path.isfile(stop_file): # Kill watchdog if stop file exists
if os.path.isfile(stop_file): # Kill watchdog if stop file exists
logging.info("Stop file exists: SAFplus is stopping")
return
else: # Restart AMF if stop file not found
logging.info("Stop file not found: Starting AMF from watchdog")
safplus.kill_amf() # when AMF dies, kill all its children to make sure there are no orphaned processes hanging around. This only kills binaries in the bin directory, rather than all children...
#This fixes the rapid restart using kill -9 (amf) and UDP keepalives
#Restart delay in the watchdog to make sure we got the keepalive timeout.
wdSleep(SAFPLUS_POLL_INTERVAL/2)
amfproc = safplus.cleanup_and_start_ams()
if safplus.reconfigWdLog:
fileLogger.handlers = []
Expand Down
3 changes: 0 additions & 3 deletions src/SAFplus/components/alarm/utils/clIdlOpen.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,6 @@

ClRcT clIdlFree(void *pData)
{
CL_DEBUG_PRINT(CL_DEBUG_WARN,("Warning! clIdlFree in file %s "
"uses clHeapFree for freeing memory allocated by server "
"function.\n", __FILE__));
clHeapFree(pData);
return CL_OK;
}
103 changes: 50 additions & 53 deletions src/SAFplus/components/amf/server/ams/clAms.c
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ ClBoolT gAmsDBRead = CL_FALSE;
ClOsalTaskIdT gClusterStateVerifierTask;
ClCpmAmsToCpmCallT *gAmsToCpmCallbackFuncs = NULL;

/* When system controller node is going down, "gCpmShuttingDown" variable stops payloads coming up and it is added
* instead of using already existed variable ("gpClCpm->cpmShutDown") in order to prevent reestablished.
*/
ClBoolT gCpmShuttingDown = CL_FALSE;

/* This function call table is never changes so could be removed and simplified */
ClCpmCpmToAmsCallT gCpmToAmsCallbackFuncs = {
_clAmsSACSIHAStateGet,
Expand Down Expand Up @@ -323,77 +328,68 @@ static void *clAmsClusterStateVerifier(void *cookie)
masterAddress=CL_IOC_MAX_NODES;
clCpmMasterAddressGet(&masterAddress);
if (localAddress == masterAddress)
{
for(i=1; (i< CL_IOC_MAX_NODES) && gpClCpm->polling; i++)
{
{
for (i = 1; ((i < CL_IOC_MAX_NODES) && (!gCpmShuttingDown)); i++)
{
if (i == localAddress)
continue;

ClNodeCacheMemberT ncInfo;
rc = clNodeCacheMemberGet(i,&ncInfo);
if (rc == CL_OK) /* Node exists in TIPC */
{
rc = clNodeCacheMemberGet(i, &ncInfo);
if (rc == CL_OK) /* Node exists in TIPC */
{
/* Check if AMF database match with NodeCache data */
if (!clAmsHasNodeJoined(i))
{
{
/* It takes some time for a node to come up after TIPC registers, so don't kill the node until it has failed multiple times */
if (checkFailed[i] >= 2)
{
clLogAlert("AMS", "INI","Node [%s] in slot [%d] discovered by messaging layer but has not registered with AMF. Resetting it",ncInfo.name, i);
/* clCpmNodeRestart((ClIocNodeAddressT) i, CL_TRUE); */ /* Restart payload node */
/* gpClCpm->cpmToAmsCallback->nodeRestart(&nodeName, graceful);
_clAmsSANodeRestart */
// clGmsClusterLeave(gpClCpm->cpmGmsHdl,CL_TIME_FOREVER,i);
//clLogDebug("IOC", "NTF", "Spoofing IOC node leave notification for node [%d] to force it to leave the cluster.", i);
ClIocAddressT allNodeReps;
{
clLogAlert("AMS", "INI", "Node [%s] in slot [%d] discovered by messaging layer but has not registered with AMF. Resetting it", ncInfo.name, i);
ClIocAddressT allNodeReps;
allNodeReps.iocPhyAddress.nodeAddress = CL_IOC_BROADCAST_ADDRESS;
allNodeReps.iocPhyAddress.portId = CL_IOC_XPORT_PORT;
//ClIocLogicalAddressT allLocalComps = CL_IOC_ADDRESS_FORM(CL_IOC_INTRANODE_ADDRESS_TYPE, i, CL_IOC_BROADCAST_ADDRESS);
#if 0
ClIocLogicalAddressT allLocalComps = CL_IOC_ADDRESS_FORM(CL_IOC_BROADCAST_ADDRESS_TYPE, CL_IOC_BROADCAST_ADDRESS, CL_IOC_BROADCAST_ADDRESS);

clIocNotificationNodeStatusSend(gpClCpm->cpmEoObj->commObj,CL_IOC_NODE_LEAVE_NOTIFICATION,i,(ClIocAddressT*)&allLocalComps,(ClIocAddressT*)&allNodeReps, NULL);
#endif
static ClUint32T nodeVersion = CL_VERSION_CODE(5, 0, 0);
ClUint32T myCapability = 0;
ClIocNotificationT notification;
notification.id = (ClIocNotificationIdT) htonl(CL_IOC_NODE_LEAVE_NOTIFICATION);
notification.id = htonl(CL_IOC_NODE_LEAVE_NOTIFICATION);
notification.nodeVersion = htonl(nodeVersion);
notification.nodeAddress.iocPhyAddress.nodeAddress = htonl(i);
notification.nodeAddress.iocPhyAddress.portId = htonl(myCapability);
notification.protoVersion = htonl(CL_IOC_NOTIFICATION_VERSION); // htonl(1);
notification.protoVersion = htonl(CL_IOC_NOTIFICATION_VERSION); // htonl(1);
rc = clIocNotificationPacketSend(gpClCpm->cpmEoObj->commObj, &notification, &allNodeReps, CL_FALSE, NULL);

checkFailed[i] = 0;
continue;
}

if (checkFailed[i] == 1) clLogWarning("AMS", "INI","Node [%s] in slot [%d] discovered by messaging layer but has not registered with AMF",ncInfo.name, i);
checkFailed[i]++;
}
}
if (checkFailed[i] == 1)
clLogWarning("AMS", "INI", "Node [%s] in slot [%d] discovered by messaging layer but has not registered with AMF", ncInfo.name, i);
checkFailed[i]++;
}
else
{
{
/* Match, continue to check other nodes */
checkFailed[i] = 0;
}
}
}
}
}
}
}
}

// testing the shutting down variable and waiting for the cond need to happen atomically.
clOsalMutexLock(&gpClCpm->cpmEoObj->eoMutex);
if (gpClCpm->polling) clOsalCondWait(&gpClCpm->cpmEoObj->eoCond,&gpClCpm->cpmEoObj->eoMutex,delay);
if (!gCpmShuttingDown) clOsalCondWait(&gpClCpm->cpmEoObj->eoCond,&gpClCpm->cpmEoObj->eoMutex,delay);
clOsalMutexUnlock(&gpClCpm->cpmEoObj->eoMutex);
if (!gpClCpm) /* Process is down! (should never happen b/c we are holding a reference) */
{
return NULL;
}
else
{
else
{
ClEoExecutionObjT *cpmEoObj = gpClCpm->cpmEoObj;
if (!cpmEoObj) return NULL;
if (!cpmEoObj) return NULL; // should never happen... but if it does do not assert b/c we are shutting down just quit thread.

if (( cpmEoObj->state == CL_EO_STATE_FAILED) || (cpmEoObj->state == CL_EO_STATE_KILL) || (cpmEoObj->state == CL_EO_STATE_STOP) || !gpClCpm->polling)
{
clEoRefDec(cpmEoObj);
return NULL;
return NULL;
}
}

Expand Down Expand Up @@ -611,6 +607,7 @@ clAmsFinalize(
// Setting the flags and sending the broadcast must happen atomically so that it either happens when
// the cluster state verifier is waiting for the cond, or outside of the test + wait entirely.
clOsalMutexLock(&gpClCpm->cpmEoObj->eoMutex);
gCpmShuttingDown = CL_TRUE;
gpClCpm->polling = CL_FALSE; // kick the verifier out of its loop
clOsalCondBroadcast(&gpClCpm->cpmEoObj->eoCond); // Wake up the cluster state verifier (and anybody else that needs to be quitting)
clOsalMutexUnlock(&gpClCpm->cpmEoObj->eoMutex);
Expand Down Expand Up @@ -787,16 +784,16 @@ clAmsFaultQueueDestroy(void)
return CL_OK;
}

ClRcT clAmsCheckNodeJoinState(const ClCharT *pNodeName, ClBoolT nodeRegister)
ClRcT clAmsCheckNodeJoinState(const ClCharT *pNodeName)
{
ClRcT rc = CL_OK;
ClAmsEntityRefT entityRef;

memset(&entityRef,0,sizeof(ClAmsEntityRefT));
if(!pNodeName) return rc;
clOsalMutexLock(gAms.mutex);
if(!gAms.isEnabled || gAms.serviceState == CL_AMS_SERVICE_STATE_UNAVAILABLE)

if ((!gAms.isEnabled) || (gAms.serviceState == CL_AMS_SERVICE_STATE_UNAVAILABLE) || (gCpmShuttingDown))
{
clLogNotice("NODE", "JOIN", "Returning try again for node join as AMF is not ready");
rc = CL_AMS_RC(CL_ERR_TRY_AGAIN);
Expand Down Expand Up @@ -854,16 +851,16 @@ ClRcT clAmsCheckNodeJoinState(const ClCharT *pNodeName, ClBoolT nodeRegister)
goto out_unlock;
}
}
/*
* We let the caller: CPM dictate the terms here. as the node
* could be a dynamically added one not yet there in ams.
*/
rc = CL_OK;
else
{
/*
* We let the caller: CPM dictate the terms here. as the node
* could be a dynamically added one not yet there in ams.
*/
rc = CL_OK;
}

if(nodeRegister)
{
gAms.mode |= CL_AMS_INSTANTIATE_MODE_CKPT_ALL | CL_AMS_INSTANTIATE_MODE_NODE_JOIN;
}
gAms.mode |= CL_AMS_INSTANTIATE_MODE_CKPT_ALL | CL_AMS_INSTANTIATE_MODE_NODE_JOIN;

out_unlock:
clOsalMutexUnlock(gAms.mutex);
Expand Down
2 changes: 1 addition & 1 deletion src/SAFplus/components/amf/server/ams/clAms.h
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ extern ClRcT clAmsFaultQueueFind(ClAmsEntityT *entity, void **entry);

extern ClRcT clAmsFaultQueueDestroy(void);

extern ClRcT clAmsCheckNodeJoinState(const ClCharT *pNodeName, ClBoolT nodeRegister);
extern ClRcT clAmsCheckNodeJoinState(const ClCharT *pNodeName);

extern void clAmsSetInstantiateCommand(ClInt32T argc, ClCharT **argv);

Expand Down
3 changes: 0 additions & 3 deletions src/SAFplus/components/amf/server/cpm/clCpmComponent.c
Original file line number Diff line number Diff line change
Expand Up @@ -153,9 +153,6 @@ static ClRcT cpmTimerCallback(void *arg)
{
if (comp->compPresenceState == CL_AMS_PRESENCE_STATE_TERMINATING)
{
clLogWrite(CL_LOG_HANDLE_APP, CL_LOG_SEV_ERROR, NULL,
CL_CPM_LOG_2_LCM_COMP_OPER_ERR,
comp->compConfig->compName, "terminated");
clLogError(CPM_LOG_AREA_CPM,CL_LOG_CONTEXT_UNSPECIFIED,
"Component %s did not terminate within the specified limit\n",
comp->compConfig->compName);
Expand Down
46 changes: 18 additions & 28 deletions src/SAFplus/components/amf/server/cpm/clCpmMain.c
Original file line number Diff line number Diff line change
Expand Up @@ -3659,52 +3659,42 @@ static ClRcT clCpmIocNotification(ClEoExecutionObjT *pThis,
len = length - sizeof(notification);
if (len == sizeof(ClUint32T))
{
#if 0
ClUint32T currentLeader = 0;
ClRcT rc = clBufferHeaderTrim(eoRecvMsg, sizeof(notification));
if (rc == CL_OK)
{
clBufferNBytesRead(eoRecvMsg, (ClUint8T*)&currentLeader, &len);
currentLeader = ntohl(currentLeader);
clLogDebug("GMS", "LEA", "Update current leader [%d]", currentLeader);
clNodeCacheLeaderUpdate(currentLeader);
}
#endif
ClUint32T reportedLeader = 0;
len = sizeof(ClUint32T);
ClIocNodeAddressT currentLeader;
clBufferNBytesRead(eoRecvMsg, (ClUint8T*)&reportedLeader, &len);
reportedLeader = ntohl(reportedLeader);
if (clNodeCacheLeaderGet(&currentLeader) == CL_OK)
{
if (currentLeader == reportedLeader)
{
clLogDebug("NTF", "LEA", "Node [%d] reports leader as [%d]. Consistent with this node.", currentLeader,
clLogDebug("NTF", "LEA", "Node [%d] reports leader as [%d]. Consistent with this node.", notification.nodeAddress.iocPhyAddress.nodeAddress,
reportedLeader);
}
else
{
clLogAlert("NTF", "LEA", "Split brain. Node [%d] reports leader as [%d]. Inconsistent with this node's leader [%d]",
clLogAlert("NTF", "LEA", "Split brain. Node [%d] reports leader as [%d]. Inconsistent with this node's leader [%d]",
notification.nodeAddress.iocPhyAddress.nodeAddress, reportedLeader, currentLeader);
clNodeCacheLeaderUpdate(reportedLeader);

/* Only update leaderID if msg come from SC's leader */
if (clCpmIsSC() && (reportedLeader == notification.nodeAddress.iocPhyAddress.nodeAddress))
if (reportedLeader == notification.nodeAddress.iocPhyAddress.nodeAddress)
{
/* Gas: take new leader and try register level 3 */
clNodeCacheLeaderSend(reportedLeader);
ClIocAddressT allNodeReps;
clNodeCacheLeaderUpdate(reportedLeader);

// Trigger GMS to do elect on this update
clNodeCacheLeaderSendLocal(reportedLeader);
}
// I am the leader
else if (gpClCpm->activeMasterNodeId == gpClCpm->pCpmLocalInfo->nodeId)
{
/* Notify all nodes that I am the leader. It is necessary to do this so that external apps/nodes (with no AMF or GMS)
* receive the new leader notification
*/
clNodeCacheLeaderSend(gpClCpm->pCpmLocalInfo->nodeId);

allNodeReps.iocPhyAddress.nodeAddress = CL_IOC_BROADCAST_ADDRESS;
allNodeReps.iocPhyAddress.portId = CL_IOC_XPORT_PORT;
static ClUint32T nodeVersion = CL_VERSION_CODE(5, 0, 0);
ClUint32T myCapability = 0;
ClIocNotificationT notification;
notification.id = (ClIocNotificationIdT) htonl(CL_IOC_NODE_LEAVE_NOTIFICATION);
notification.nodeVersion = htonl(nodeVersion);
notification.nodeAddress.iocPhyAddress.nodeAddress = htonl(clIocLocalAddressGet());
notification.nodeAddress.iocPhyAddress.portId = htonl(myCapability);
notification.protoVersion = htonl(CL_IOC_NOTIFICATION_VERSION); // htonl(1);
clIocNotificationPacketSend(pThis->commObj, &notification, &allNodeReps, CL_FALSE, NULL );
ClIocLogicalAddressT allLocalComps = CL_IOC_ADDRESS_FORM(CL_IOC_INTRANODE_ADDRESS_TYPE, gpClCpm->pCpmLocalInfo->nodeId, CL_IOC_BROADCAST_ADDRESS);
clIocNotificationNodeStatusSend(pThis->commObj, CL_IOC_NODE_ARRIVAL_NOTIFICATION, gpClCpm->pCpmLocalInfo->nodeId, (ClIocAddressT*) &allLocalComps, (ClIocAddressT*) &notification.nodeAddress.iocPhyAddress, NULL );
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions src/SAFplus/components/amf/server/cpm/clCpmNode.c
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ static ClVersionDatabaseT clCpmServerToServerVersionDb =
};

typedef ClRcT (*funcArray[]) (void);

extern ClBoolT gCpmShuttingDown;

ClRcT CL_CPM_CALL_RMD_SYNC(ClIocNodeAddressT destAddr,
ClIocPortT eoPort,
Expand Down Expand Up @@ -533,7 +533,7 @@ ClRcT VDECL(cpmCpmLocalRegister)(ClEoDataT data,

if(nodeAddress != clIocLocalAddressGet())
{
rc = clAmsCheckNodeJoinState((const ClCharT*)cpmLocalInfo.nodeName, CL_TRUE);
rc = clAmsCheckNodeJoinState((const ClCharT*)cpmLocalInfo.nodeName);
if(rc != CL_OK)
return rc;
}
Expand Down Expand Up @@ -973,6 +973,7 @@ ClRcT VDECL(cpmProcNodeShutDownReq)(ClEoDataT data,
}
return rc;
}
gCpmShuttingDown = CL_TRUE;
startShutdownTimer(iocAddress);
}

Expand Down
10 changes: 0 additions & 10 deletions src/SAFplus/components/event/server/clEvtCkpt.c
Original file line number Diff line number Diff line change
Expand Up @@ -4365,8 +4365,6 @@ clEventChannelOpenRecover(ClBoolT switchover)
{
rc = CL_OK;
}
clLogError(EVENT_LOG_AREA_CKPT,EVENT_LOG_GLOBAL,"clCkptSectionIterationNext(): rc[0x %x]",
rc);
break;
}
/*
Expand Down Expand Up @@ -4424,11 +4422,8 @@ clEventChannelSubRecover(ClBoolT switchover)
||
CL_CKPT_ERR_NO_SECTIONS == CL_GET_ERROR_CODE(rc))
{

rc = CL_OK;
}
clLogError(EVENT_LOG_AREA_CKPT,EVENT_LOG_GLOBAL,"clCkptSectionIterationNext(): rc[0x %x]",
rc);
break;
}
/*
Expand Down Expand Up @@ -4469,11 +4464,8 @@ clEventUserInfoRecover(ClBoolT switchover)
||
CL_CKPT_ERR_NO_SECTIONS == CL_GET_ERROR_CODE(rc))
{

rc = CL_OK;
}
clLogError(EVENT_LOG_AREA_CKPT,EVENT_LOG_GLOBAL,"clCkptSectionIterationNext(): rc[0x %x]",
rc);
break;
}
/*
Expand Down Expand Up @@ -4568,10 +4560,8 @@ clEventCleanAllChannel(ClIocNodeAddressT nodeLeave)
||
CL_CKPT_ERR_NO_SECTIONS == CL_GET_ERROR_CODE(rc))
{

rc = CL_OK;
}
clLogError(EVENT_LOG_AREA_CKPT,EVENT_LOG_GLOBAL,"clCkptSectionIterationNext(): rc[0x %x]",rc);
break;
}
count++;
Expand Down
2 changes: 1 addition & 1 deletion src/SAFplus/components/gms/client/clGmsRmdClient.c
Original file line number Diff line number Diff line change
Expand Up @@ -2052,7 +2052,7 @@ ClRcT VDECL (cl_gms_cluster_track_callback_rmd) (
// response contents are already heap allocated by unmarhall function

rc = clJobQueuePush (&gEoJobQueues[CL_IOC_LOW_PRIORITY], (ClCallbackT) clGmsClusterTrackCallbackHandler, res);
clLogInfo(CLM,NA,"clJobQueuePush rc [0x%x]\n",rc);
//clLogInfo(CLM,NA,"clJobQueuePush rc [0x%x]\n",rc);

return rc;
#endif
Expand Down

0 comments on commit 0651a6e

Please sign in to comment.