Skip to content

Commit

Permalink
[ADDED] kvStore_WatchMulti, js_Subscribe[Sync]Multi (#750)
Browse files Browse the repository at this point in the history
* WIP: passing existing tests

* fix KVWatch

* free subjects

* Added tests

* Added js_SubscribeMulti

* PR feedback: avoid unneeded calloc for KVWatch

* PR feedback: changes to _subscribe wrapper

* nats_formatStringArray refactor+test

* PR feedback: safer _subscribe empty subject handling

* PR feedback: @params in doc

* more coverage

* PR feedback: cleanup the existing buffer on failure to realloc

* refactored the Format test

* adjusting codecov
  • Loading branch information
levb committed May 1, 2024
1 parent 559c4bc commit 3f02b1d
Show file tree
Hide file tree
Showing 9 changed files with 422 additions and 37 deletions.
146 changes: 117 additions & 29 deletions src/js.c
Expand Up @@ -2197,38 +2197,68 @@ _checkConfig(jsConsumerConfig *s, jsConsumerConfig *u)

static natsStatus
_processConsInfo(const char **dlvSubject, jsConsumerInfo *info, jsConsumerConfig *userCfg,
bool isPullMode, const char *subj, const char *queue)
bool isPullMode, const char **subjects, int numSubjects, const char *queue)
{
bool dlvSubjEmpty = false;
jsConsumerConfig *ccfg = info->Config;
const char *dg = NULL;
natsStatus s = NATS_OK;
bool matches = false;
int i;
bool dlvSubjEmpty = false;
jsConsumerConfig *ccfg = info->Config;
const char *dg = NULL;
natsStatus s = NATS_OK;
const char *stackFilterSubject[] = {ccfg->FilterSubject};
const char **filterSubjects = stackFilterSubject;
int filterSubjectsLen = 1;
int incoming, existing;

*dlvSubject = NULL;
// Always represent the consumer's filter subjects as a list, to match
// uniformly against the incoming subject list. Consider lists of 1 empty
// subject empty lists.
if (ccfg->FilterSubjectsLen > 0)
{
filterSubjects = ccfg->FilterSubjects;
filterSubjectsLen = ccfg->FilterSubjectsLen;
}
if ((filterSubjectsLen == 1) && nats_IsStringEmpty(filterSubjects[0]))
{
filterSubjects = NULL;
filterSubjectsLen = 0;
}
if ((numSubjects == 1) && nats_IsStringEmpty(subjects[0]))
{
subjects = NULL;
numSubjects = 0;
}

// Make sure this new subject matches or is a subset.
if (!nats_IsStringEmpty(subj))
// Match the subjects against the consumer's filter subjects.
if (numSubjects > 0 && filterSubjectsLen > 0)
{
if (nats_IsStringEmpty(ccfg->FilterSubject) && (ccfg->FilterSubjectsLen == 0))
{
matches = true;
}
else if (!nats_IsStringEmpty(ccfg->FilterSubject) && nats_HasPrefix(subj, ccfg->FilterSubject))
{
matches = true;
}
else if (ccfg->FilterSubjectsLen > 0)
// If the consumer has filter subject(s), then the subject(s) must match.
bool matches = true;

// TODO - This is N**2, but we don't expect a large number of subjects.
for (incoming = 0; incoming < numSubjects; incoming++)
{
for (i = 0; (i < ccfg->FilterSubjectsLen) && !matches; i++)
bool found = false;
for (existing = 0; existing < filterSubjectsLen; existing++)
{
matches = nats_HasPrefix(subj, ccfg->FilterSubjects[i]);
if (strcmp(subjects[incoming], filterSubjects[existing]) == 0)
{
found = true;
break;
}
}
if (!found)
{
matches = false;
break;
}
}

if (!matches)
{
return nats_setError(NATS_ERR, "subject '%s' does not match any consumer filter subjects.", subj);
if (numSubjects == 1 && filterSubjectsLen == 1)
return nats_setError(NATS_ERR, "subject '%s' does not match consumer filter subject '%s'.", subjects[0], filterSubjects[0]);
else
return nats_setError(NATS_ERR, "%d subjects do not match any consumer filter subjects.", numSubjects);
}
}
// Check that if user wants to create a queue sub,
Expand Down Expand Up @@ -2330,7 +2360,7 @@ js_checkConsName(const char *cons, bool isDurable)
}

static natsStatus
_subscribe(natsSubscription **new_sub, jsCtx *js, const char *subject, const char *pullDurable,
_subscribeMulti(natsSubscription **new_sub, jsCtx *js, const char **subjects, int numSubjects, const char *pullDurable,
natsMsgHandler usrCB, void *usrCBClosure, bool isPullMode,
jsOptions *jsOpts, jsSubOptions *opts, jsErrCode *errCode)
{
Expand Down Expand Up @@ -2393,7 +2423,7 @@ _subscribe(natsSubscription **new_sub, jsCtx *js, const char *subject, const cha
consumer = opts->Consumer;
consBound= (!nats_IsStringEmpty(stream) && !nats_IsStringEmpty(consumer));

if (nats_IsStringEmpty(subject) && !consBound)
if (((numSubjects <= 0) || nats_IsStringEmpty(subjects[0])) && !consBound)
return nats_setDefaultError(NATS_INVALID_ARG);

// Do some quick checks here for ordered consumers.
Expand Down Expand Up @@ -2450,9 +2480,10 @@ _subscribe(natsSubscription **new_sub, jsCtx *js, const char *subject, const cha

// Find the stream mapped to the subject if not bound to a stream already,
// that is, if user did not provide a `Stream` name through options).
if (nats_IsStringEmpty(stream))
if (nats_IsStringEmpty(stream) && numSubjects > 0)
{
s = _lookupStreamBySubject(&stream, nc, subject, &jo, errCode);
// Use the first subject to find the stream.
s = _lookupStreamBySubject(&stream, nc, subjects[0], &jo, errCode);
if (s != NATS_OK)
goto END;

Expand All @@ -2475,7 +2506,7 @@ _subscribe(natsSubscription **new_sub, jsCtx *js, const char *subject, const cha
s = nats_setError(NATS_ERR, "%s", "no configuration in consumer info");
goto END;
}
s = _processConsInfo(&deliver, info, &(opts->Config), isPullMode, subject, opts->Queue);
s = _processConsInfo(&deliver, info, &(opts->Config), isPullMode, subjects, numSubjects, opts->Queue);
if (s != NATS_OK)
goto END;

Expand Down Expand Up @@ -2509,7 +2540,15 @@ _subscribe(natsSubscription **new_sub, jsCtx *js, const char *subject, const cha
}

// Do filtering always, server will clear as needed.
cfg->FilterSubject = subject;
if (numSubjects == 1)
{
cfg->FilterSubject = subjects[0];
}
else
{
cfg->FilterSubjects = subjects;
cfg->FilterSubjectsLen = numSubjects;
}

if (opts->Ordered)
{
Expand Down Expand Up @@ -2559,8 +2598,7 @@ _subscribe(natsSubscription **new_sub, jsCtx *js, const char *subject, const cha
s = nats_setDefaultError(NATS_NO_MEMORY);
}
IF_OK_DUP_STRING(s, jsi->stream, stream);
if ((s == NATS_OK) && !nats_IsStringEmpty(subject))
DUP_STRING(s, jsi->psubj, subject);
IFOK(s, nats_formatStringArray(&jsi->psubj, subjects, numSubjects));
if (s == NATS_OK)
{
jsi->js = js;
Expand Down Expand Up @@ -2713,6 +2751,26 @@ _subscribe(natsSubscription **new_sub, jsCtx *js, const char *subject, const cha
return NATS_UPDATE_ERR_STACK(s);
}

static natsStatus
_subscribe(natsSubscription **new_sub, jsCtx *js, const char *subject, const char *pullDurable,
natsMsgHandler usrCB, void *usrCBClosure, bool isPullMode,
jsOptions *jsOpts, jsSubOptions *opts, jsErrCode *errCode)
{
natsStatus s = NATS_OK;
const char *singleSubject[] = {subject};
int numSubjects = 1;
const char **subjects = singleSubject;

if (nats_IsStringEmpty(subject))
{
numSubjects = 0;
subjects = NULL;
}

s = _subscribeMulti(new_sub, js, subjects, numSubjects, pullDurable, usrCB, usrCBClosure, isPullMode, jsOpts, opts, errCode);
return NATS_UPDATE_ERR_STACK(s);
}

natsStatus
js_Subscribe(natsSubscription **sub, jsCtx *js, const char *subject,
natsMsgHandler cb, void *cbClosure,
Expand All @@ -2730,6 +2788,23 @@ js_Subscribe(natsSubscription **sub, jsCtx *js, const char *subject,
return NATS_UPDATE_ERR_STACK(s);
}

natsStatus
js_SubscribeMulti(natsSubscription **sub, jsCtx *js, const char **subjects, int numSubjects,
natsMsgHandler cb, void *cbClosure,
jsOptions *jsOpts, jsSubOptions *opts, jsErrCode *errCode)
{
natsStatus s;

if (errCode != NULL)
*errCode = 0;

if (cb == NULL)
return nats_setDefaultError(NATS_INVALID_ARG);

s = _subscribeMulti(sub, js, subjects, numSubjects, NULL, cb, cbClosure, false, jsOpts, opts, errCode);
return NATS_UPDATE_ERR_STACK(s);
}

natsStatus
js_SubscribeSync(natsSubscription **sub, jsCtx *js, const char *subject,
jsOptions *jsOpts, jsSubOptions *opts, jsErrCode *errCode)
Expand All @@ -2743,6 +2818,19 @@ js_SubscribeSync(natsSubscription **sub, jsCtx *js, const char *subject,
return NATS_UPDATE_ERR_STACK(s);
}

natsStatus
js_SubscribeSyncMulti(natsSubscription **sub, jsCtx *js, const char **subjects, int numSubjects,
jsOptions *jsOpts, jsSubOptions *opts, jsErrCode *errCode)
{
natsStatus s;

if (errCode != NULL)
*errCode = 0;

s = _subscribeMulti(sub, js, subjects, numSubjects, NULL, NULL, NULL, false, jsOpts, opts, errCode);
return NATS_UPDATE_ERR_STACK(s);
}

natsStatus
js_PullSubscribe(natsSubscription **sub, jsCtx *js, const char *subject, const char *durable,
jsOptions *jsOpts, jsSubOptions *opts, jsErrCode *errCode)
Expand Down
1 change: 0 additions & 1 deletion src/jsm.c
Expand Up @@ -3199,7 +3199,6 @@ js_unmarshalConsumerInfo(nats_JSON *json, jsConsumerInfo **new_ci)
IFOK(s, nats_JSONGetBool(json, "push_bound", &(ci->PushBound)));
IFOK(s, nats_JSONGetBool(json, "paused", &(ci->Paused)));
IFOK(s, nats_JSONGetLong(json, "pause_remaining", &(ci->PauseRemaining)));

if (s == NATS_OK)
*new_ci = ci;
else
Expand Down
53 changes: 50 additions & 3 deletions src/kv.c
Expand Up @@ -1057,14 +1057,30 @@ kvWatcher_Destroy(kvWatcher *w)

natsStatus
kvStore_Watch(kvWatcher **new_watcher, kvStore *kv, const char *key, kvWatchOptions *opts)
{
const char *subjects = { key };
return kvStore_WatchMulti(new_watcher, kv, &subjects, 1, opts);
}

natsStatus
kvStore_WatchMulti(kvWatcher **new_watcher, kvStore *kv, const char **keys, int numKeys, kvWatchOptions *opts)
{
natsStatus s;
kvWatcher *w = NULL;
jsSubOptions so;
char *singleSubject[1];
char **multipleSubjects = NULL; // allocate if numKeys > 1
char **subscribeSubjects = singleSubject;
int i;
DEFINE_BUF_FOR_SUBJECT;

if ((new_watcher == NULL) || (kv == NULL) || nats_IsStringEmpty(key))
if ((new_watcher == NULL) || (kv == NULL) || numKeys <= 0)
return nats_setDefaultError(NATS_INVALID_ARG);
for (i=0; i<numKeys; i++)
{
if (nats_IsStringEmpty(keys[i]))
return nats_setDefaultError(NATS_INVALID_ARG);
}

*new_watcher = NULL;

Expand All @@ -1076,7 +1092,36 @@ kvStore_Watch(kvWatcher **new_watcher, kvStore *kv, const char *key, kvWatchOpti
w->kv = kv;
w->refs = 1;

BUILD_SUBJECT(KEY_NAME_ONLY, NOT_FOR_A_PUT);
if (numKeys == 1)
{
// special case for single key to avoid a calloc.
subscribeSubjects[0] = (char *)keys[0];

}
else
{
multipleSubjects = (char **)NATS_CALLOC(numKeys, sizeof(const char *));
if (multipleSubjects == NULL)
{
_freeWatcher(w);
return nats_setDefaultError(NATS_NO_MEMORY);
}
subscribeSubjects = multipleSubjects;
}
for (i = 0; i < numKeys; i++)
{
const char *key = keys[i];
BUILD_SUBJECT(KEY_NAME_ONLY, NOT_FOR_A_PUT); // into buf, '\0'-terminated.
subscribeSubjects[i] = NATS_STRDUP(natsBuf_Data(&buf));
if (subscribeSubjects[i] == NULL)
{
s = nats_setDefaultError(NATS_NO_MEMORY);
NATS_FREE_STRINGS(subscribeSubjects, i);
NATS_FREE(multipleSubjects);
_freeWatcher(w);
return nats_setDefaultError(NATS_NO_MEMORY);
}
}
IFOK(s, natsMutex_Create(&(w->mu)));
if (s == NATS_OK)
{
Expand All @@ -1096,7 +1141,7 @@ kvStore_Watch(kvWatcher **new_watcher, kvStore *kv, const char *key, kvWatchOpti
// Need to explicitly bind to the stream here because the subject
// we construct may not help find the stream when using mirrors.
so.Stream = kv->stream;
s = js_SubscribeSync(&(w->sub), kv->js, natsBuf_Data(&buf), NULL, &so, NULL);
s = js_SubscribeSyncMulti(&(w->sub), kv->js, (const char **)subscribeSubjects, numKeys, NULL, &so, NULL);
IFOK(s, natsSubscription_SetPendingLimits(w->sub, -1, -1));
if (s == NATS_OK)
{
Expand All @@ -1113,6 +1158,8 @@ kvStore_Watch(kvWatcher **new_watcher, kvStore *kv, const char *key, kvWatchOpti
}

natsBuf_Cleanup(&buf);
NATS_FREE_STRINGS(subscribeSubjects, numKeys);
NATS_FREE(multipleSubjects);

if (s == NATS_OK)
*new_watcher = w;
Expand Down
9 changes: 9 additions & 0 deletions src/mem.h
Expand Up @@ -27,6 +27,15 @@
#endif
#define NATS_FREE(p) free((p))

// **Note** does not free the array itself.
static void NATS_FREE_STRINGS(char **strings, int count)
{
if (strings == NULL)
return;
for (int i = 0; i < count; i++)
NATS_FREE((char *)strings[i]);
}

// GNU C Library version 2.25 or later.
#if defined(__GLIBC__) && \
(__GLIBC__ > 2 || (__GLIBC__ == 2 && __GLIBC_MINOR__ >= 25))
Expand Down

0 comments on commit 3f02b1d

Please sign in to comment.