Skip to content

Commit

Permalink
[ADDED] natsOptions_SetFailRequestsOnDisconnect() option
Browse files Browse the repository at this point in the history
If enabled, any pending request (using new style) will fail with
NATS_CONNECTION_DISCONNECTED status.

Also fixed the failing of pending flush requests which was done
in the doReconnect thread while it should be done prior to
creating the pending buffer and starting the reconnect thread.

Resolves #391

Signed-off-by: Ivan Kozlovic <ivan@synadia.com>
  • Loading branch information
kozlovic committed Dec 28, 2020
1 parent caa998c commit e7b0332
Show file tree
Hide file tree
Showing 7 changed files with 110 additions and 6 deletions.
16 changes: 11 additions & 5 deletions src/conn.c
Expand Up @@ -1203,6 +1203,7 @@ natsConn_disposeRespInfo(natsConnection *nc, respInfo *resp, bool needsLock)
natsConn_Lock(nc);

resp->closed = false;
resp->closedSts = NATS_OK;
resp->removed = false;
resp->msg = NULL;

Expand Down Expand Up @@ -1360,7 +1361,7 @@ natsConn_initResp(natsConnection *nc, natsMsgHandler cb)
// This will clear any pending Request calls.
// Lock is assumed to be held by the caller.
static void
_clearPendingRequestCalls(natsConnection *nc)
_clearPendingRequestCalls(natsConnection *nc, natsStatus reason)
{
natsStrHashIter iter;
void *p = NULL;
Expand All @@ -1374,6 +1375,7 @@ _clearPendingRequestCalls(natsConnection *nc)
respInfo *val = (respInfo*) p;
natsMutex_Lock(val->mu);
val->closed = true;
val->closedSts = reason;
val->removed = true;
natsCondition_Signal(val->cond);
natsMutex_Unlock(val->mu);
Expand Down Expand Up @@ -1410,9 +1412,6 @@ _doReconnect(void *arg)

natsConn_Lock(nc);

// Kick out all calls to natsConnection_Flush[Timeout]().
_clearPendingFlushRequests(nc);

// Clear any error.
nc->err = NATS_OK;
nc->errStr[0] = '\0';
Expand Down Expand Up @@ -2031,6 +2030,13 @@ _processOpError(natsConnection *nc, natsStatus s, bool initialConnect)
nc->sockCtx.fd = NATS_SOCK_INVALID;
}

// Fail pending flush requests.
if (ls == NATS_OK)
_clearPendingFlushRequests(nc);
// If option set, also fail pending requests.
if ((ls == NATS_OK) && nc->opts->failRequestsOnDisconnect)
_clearPendingRequestCalls(nc, NATS_CONNECTION_DISCONNECTED);

// Create the pending buffer to hold all write requests while we try
// to reconnect.
if (ls == NATS_OK)
Expand Down Expand Up @@ -2382,7 +2388,7 @@ _close(natsConnection *nc, natsConnStatus status, bool fromPublicClose, bool doC
_clearPendingFlushRequests(nc);

// Kick out any queued and blocking requests.
_clearPendingRequestCalls(nc);
_clearPendingRequestCalls(nc, NATS_CONNECTION_CLOSED);

if (nc->ptmr != NULL)
natsTimer_Stop(nc->ptmr);
Expand Down
17 changes: 17 additions & 0 deletions src/nats.h
Expand Up @@ -1534,6 +1534,23 @@ natsOptions_SetSendAsap(natsOptions *opts, bool sendAsap);
NATS_EXTERN natsStatus
natsOptions_UseOldRequestStyle(natsOptions *opts, bool useOldStyle);

/** \brief Fails pending requests on disconnect event.
*
* If this option is enabled, all pending #natsConnection_Request() family
* calls will fail with the #NATS_CONNECTION_DISCONNECTED status.
*
* \note This does not apply to requests from connections that use the
* old style requests.
*
* @see natsOptions_UseOldRequestStyle
*
* @param opts the pointer to the #natsOptions object.
* @param failRequests a boolean indicating if pending requests should fail
* when a disconnect event occurs.
*/
NATS_EXTERN natsStatus
natsOptions_SetFailRequestsOnDisconnect(natsOptions *opts, bool failRequests);

/** \brief Sets if connection receives its own messages.
*
* This configures whether the server will echo back messages
Expand Down
5 changes: 5 additions & 0 deletions src/natsp.h
Expand Up @@ -261,6 +261,10 @@ struct __natsOptions
// not rely on the flusher.
bool sendAsap;

// If set to true, pending requests will fail with NATS_CONNECTION_DISCONNECTED
// when the library detects a disconnection.
bool failRequestsOnDisconnect;

// NoEcho configures whether the server will echo back messages
// that are sent on this connection if we also have matching subscriptions.
// Note this is supported on servers >= version 1.2. Proto 1 or greater.
Expand Down Expand Up @@ -451,6 +455,7 @@ typedef struct __respInfo
natsCondition *cond;
natsMsg *msg;
bool closed;
natsStatus closedSts;
bool removed;
bool pooled;

Expand Down
10 changes: 10 additions & 0 deletions src/opts.c
Expand Up @@ -1077,6 +1077,16 @@ natsOptions_UseOldRequestStyle(natsOptions *opts, bool useOldStype)
return NATS_OK;
}

natsStatus
natsOptions_SetFailRequestsOnDisconnect(natsOptions *opts, bool failRequests)
{
LOCK_AND_CHECK_OPTIONS(opts, 0);
opts->failRequestsOnDisconnect = failRequests;
UNLOCK_OPTS(opts);

return NATS_OK;
}

static void
_freeUserCreds(userCreds *uc)
{
Expand Down
2 changes: 1 addition & 1 deletion src/pub.c
Expand Up @@ -492,7 +492,7 @@ natsConnection_RequestMsg(natsMsg **replyMsg, natsConnection *nc,
{
// Set the correct error status that we return to the user
if (resp->closed)
s = NATS_CONNECTION_CLOSED;
s = resp->closedSts;
else
s = NATS_TIMEOUT;
}
Expand Down
1 change: 1 addition & 0 deletions test/list.txt
Expand Up @@ -78,6 +78,7 @@ IsReconnectingAndStatus
ReconnectBufSize
RetryOnFailedConnect
NoPartialOnReconnect
ReconnectFailsPendingRequests
ErrOnConnectAndDeadlock
ErrOnMaxPayloadLimit
Auth
Expand Down
65 changes: 65 additions & 0 deletions test/test.c
Expand Up @@ -17121,6 +17121,70 @@ test_NoPartialOnReconnect(void)
_stopServer(pid);
}

static void
_stopServerInThread(void *closure)
{
natsPid pid = *((natsPid*) closure);

nats_Sleep(150);
_stopServer(pid);
}

static void
test_ReconnectFailsPendingRequest(void)
{
natsStatus s;
natsOptions *opts = NULL;
natsConnection *nc = NULL;
natsSubscription *sub = NULL;
natsMsg *msg = NULL;
natsThread *t = NULL;
natsPid pid = NATS_INVALID_PID;
bool failr = false;
int iter;

for (iter=1; iter<=2; iter++)
{
failr = (iter == 2 ? true : false);

test("Create options: ");
s = natsOptions_Create(&opts);
IFOK(s, natsOptions_SetFailRequestsOnDisconnect(opts, failr));
testCond(s == NATS_OK);

test("Start server: ");
pid = _startServer("nats://127.0.0.1:4222", "-p 4222", true);
CHECK_SERVER_STARTED(pid);
testCond(true);

test("Connect: ");
s = natsConnection_Connect(&nc, opts);
testCond(s == NATS_OK);

test("Create service provider: ");
s = natsConnection_SubscribeSync(&sub, nc, "requests");
testCond(s == NATS_OK);

test("Start thread that will stop server: ");
s = natsThread_Create(&t, _stopServerInThread, (void*) &pid);
testCond(s == NATS_OK);

test((failr ? "Fails due to disconnect: " : "Fails due to timeout: "));
s = natsConnection_RequestString(&msg, nc, "requests", "help", 300);
testCond(s == (failr ? NATS_CONNECTION_DISCONNECTED : NATS_TIMEOUT));

natsThread_Join(t);
natsThread_Destroy(t);
t = NULL;
natsSubscription_Destroy(sub);
sub = NULL;
natsConnection_Destroy(nc);
nc = NULL;
natsOptions_Destroy(opts);
opts = NULL;
}
}

static void
test_HeadersNotSupported(void)
{
Expand Down Expand Up @@ -20531,6 +20595,7 @@ static testInfo allTests[] =
{"ReconnectBufSize", test_ReconnectBufSize},
{"RetryOnFailedConnect", test_RetryOnFailedConnect},
{"NoPartialOnReconnect", test_NoPartialOnReconnect},
{"ReconnectFailsPendingRequests", test_ReconnectFailsPendingRequest},

{"ErrOnConnectAndDeadlock", test_ErrOnConnectAndDeadlock},
{"ErrOnMaxPayloadLimit", test_ErrOnMaxPayloadLimit},
Expand Down

0 comments on commit e7b0332

Please sign in to comment.