Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADDED] natsOptions_SetFailRequestsOnDisconnect() option #392

Merged
merged 1 commit into from Dec 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 11 additions & 5 deletions src/conn.c
Expand Up @@ -1203,6 +1203,7 @@ natsConn_disposeRespInfo(natsConnection *nc, respInfo *resp, bool needsLock)
natsConn_Lock(nc);

resp->closed = false;
resp->closedSts = NATS_OK;
resp->removed = false;
resp->msg = NULL;

Expand Down Expand Up @@ -1360,7 +1361,7 @@ natsConn_initResp(natsConnection *nc, natsMsgHandler cb)
// This will clear any pending Request calls.
// Lock is assumed to be held by the caller.
static void
_clearPendingRequestCalls(natsConnection *nc)
_clearPendingRequestCalls(natsConnection *nc, natsStatus reason)
{
natsStrHashIter iter;
void *p = NULL;
Expand All @@ -1374,6 +1375,7 @@ _clearPendingRequestCalls(natsConnection *nc)
respInfo *val = (respInfo*) p;
natsMutex_Lock(val->mu);
val->closed = true;
val->closedSts = reason;
val->removed = true;
natsCondition_Signal(val->cond);
natsMutex_Unlock(val->mu);
Expand Down Expand Up @@ -1410,9 +1412,6 @@ _doReconnect(void *arg)

natsConn_Lock(nc);

// Kick out all calls to natsConnection_Flush[Timeout]().
_clearPendingFlushRequests(nc);

// Clear any error.
nc->err = NATS_OK;
nc->errStr[0] = '\0';
Expand Down Expand Up @@ -2031,6 +2030,13 @@ _processOpError(natsConnection *nc, natsStatus s, bool initialConnect)
nc->sockCtx.fd = NATS_SOCK_INVALID;
}

// Fail pending flush requests.
if (ls == NATS_OK)
_clearPendingFlushRequests(nc);
// If option set, also fail pending requests.
if ((ls == NATS_OK) && nc->opts->failRequestsOnDisconnect)
_clearPendingRequestCalls(nc, NATS_CONNECTION_DISCONNECTED);

// Create the pending buffer to hold all write requests while we try
// to reconnect.
if (ls == NATS_OK)
Expand Down Expand Up @@ -2382,7 +2388,7 @@ _close(natsConnection *nc, natsConnStatus status, bool fromPublicClose, bool doC
_clearPendingFlushRequests(nc);

// Kick out any queued and blocking requests.
_clearPendingRequestCalls(nc);
_clearPendingRequestCalls(nc, NATS_CONNECTION_CLOSED);

if (nc->ptmr != NULL)
natsTimer_Stop(nc->ptmr);
Expand Down
17 changes: 17 additions & 0 deletions src/nats.h
Expand Up @@ -1534,6 +1534,23 @@ natsOptions_SetSendAsap(natsOptions *opts, bool sendAsap);
NATS_EXTERN natsStatus
natsOptions_UseOldRequestStyle(natsOptions *opts, bool useOldStyle);

/** \brief Fails pending requests on disconnect event.
*
* If this option is enabled, all pending #natsConnection_Request() family
* calls will fail with the #NATS_CONNECTION_DISCONNECTED status.
*
* \note This does not apply to requests from connections that use the
* old style requests.
*
* @see natsOptions_UseOldRequestStyle
*
* @param opts the pointer to the #natsOptions object.
* @param failRequests a boolean indicating if pending requests should fail
* when a disconnect event occurs.
*/
NATS_EXTERN natsStatus
natsOptions_SetFailRequestsOnDisconnect(natsOptions *opts, bool failRequests);

/** \brief Sets if connection receives its own messages.
*
* This configures whether the server will echo back messages
Expand Down
5 changes: 5 additions & 0 deletions src/natsp.h
Expand Up @@ -261,6 +261,10 @@ struct __natsOptions
// not rely on the flusher.
bool sendAsap;

// If set to true, pending requests will fail with NATS_CONNECTION_DISCONNECTED
// when the library detects a disconnection.
bool failRequestsOnDisconnect;

// NoEcho configures whether the server will echo back messages
// that are sent on this connection if we also have matching subscriptions.
// Note this is supported on servers >= version 1.2. Proto 1 or greater.
Expand Down Expand Up @@ -451,6 +455,7 @@ typedef struct __respInfo
natsCondition *cond;
natsMsg *msg;
bool closed;
natsStatus closedSts;
bool removed;
bool pooled;

Expand Down
10 changes: 10 additions & 0 deletions src/opts.c
Expand Up @@ -1077,6 +1077,16 @@ natsOptions_UseOldRequestStyle(natsOptions *opts, bool useOldStype)
return NATS_OK;
}

natsStatus
natsOptions_SetFailRequestsOnDisconnect(natsOptions *opts, bool failRequests)
{
LOCK_AND_CHECK_OPTIONS(opts, 0);
opts->failRequestsOnDisconnect = failRequests;
UNLOCK_OPTS(opts);

return NATS_OK;
}

static void
_freeUserCreds(userCreds *uc)
{
Expand Down
2 changes: 1 addition & 1 deletion src/pub.c
Expand Up @@ -492,7 +492,7 @@ natsConnection_RequestMsg(natsMsg **replyMsg, natsConnection *nc,
{
// Set the correct error status that we return to the user
if (resp->closed)
s = NATS_CONNECTION_CLOSED;
s = resp->closedSts;
else
s = NATS_TIMEOUT;
}
Expand Down
1 change: 1 addition & 0 deletions test/list.txt
Expand Up @@ -78,6 +78,7 @@ IsReconnectingAndStatus
ReconnectBufSize
RetryOnFailedConnect
NoPartialOnReconnect
ReconnectFailsPendingRequests
ErrOnConnectAndDeadlock
ErrOnMaxPayloadLimit
Auth
Expand Down
65 changes: 65 additions & 0 deletions test/test.c
Expand Up @@ -17121,6 +17121,70 @@ test_NoPartialOnReconnect(void)
_stopServer(pid);
}

static void
_stopServerInThread(void *closure)
{
natsPid pid = *((natsPid*) closure);

nats_Sleep(150);
_stopServer(pid);
}

static void
test_ReconnectFailsPendingRequest(void)
{
natsStatus s;
natsOptions *opts = NULL;
natsConnection *nc = NULL;
natsSubscription *sub = NULL;
natsMsg *msg = NULL;
natsThread *t = NULL;
natsPid pid = NATS_INVALID_PID;
bool failr = false;
int iter;

for (iter=1; iter<=2; iter++)
{
failr = (iter == 2 ? true : false);

test("Create options: ");
s = natsOptions_Create(&opts);
IFOK(s, natsOptions_SetFailRequestsOnDisconnect(opts, failr));
testCond(s == NATS_OK);

test("Start server: ");
pid = _startServer("nats://127.0.0.1:4222", "-p 4222", true);
CHECK_SERVER_STARTED(pid);
testCond(true);

test("Connect: ");
s = natsConnection_Connect(&nc, opts);
testCond(s == NATS_OK);

test("Create service provider: ");
s = natsConnection_SubscribeSync(&sub, nc, "requests");
testCond(s == NATS_OK);

test("Start thread that will stop server: ");
s = natsThread_Create(&t, _stopServerInThread, (void*) &pid);
testCond(s == NATS_OK);

test((failr ? "Fails due to disconnect: " : "Fails due to timeout: "));
s = natsConnection_RequestString(&msg, nc, "requests", "help", 300);
testCond(s == (failr ? NATS_CONNECTION_DISCONNECTED : NATS_TIMEOUT));

natsThread_Join(t);
natsThread_Destroy(t);
t = NULL;
natsSubscription_Destroy(sub);
sub = NULL;
natsConnection_Destroy(nc);
nc = NULL;
natsOptions_Destroy(opts);
opts = NULL;
}
}

static void
test_HeadersNotSupported(void)
{
Expand Down Expand Up @@ -20531,6 +20595,7 @@ static testInfo allTests[] =
{"ReconnectBufSize", test_ReconnectBufSize},
{"RetryOnFailedConnect", test_RetryOnFailedConnect},
{"NoPartialOnReconnect", test_NoPartialOnReconnect},
{"ReconnectFailsPendingRequests", test_ReconnectFailsPendingRequest},

{"ErrOnConnectAndDeadlock", test_ErrOnConnectAndDeadlock},
{"ErrOnMaxPayloadLimit", test_ErrOnMaxPayloadLimit},
Expand Down