Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADDED] kvStore_WatchMulti, js_Subscribe[Sync]Multi #750

Merged
merged 18 commits into from May 1, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
147 changes: 118 additions & 29 deletions src/js.c
Expand Up @@ -2197,38 +2197,68 @@ _checkConfig(jsConsumerConfig *s, jsConsumerConfig *u)

static natsStatus
_processConsInfo(const char **dlvSubject, jsConsumerInfo *info, jsConsumerConfig *userCfg,
bool isPullMode, const char *subj, const char *queue)
bool isPullMode, const char **subjects, int numSubjects, const char *queue)
{
bool dlvSubjEmpty = false;
jsConsumerConfig *ccfg = info->Config;
const char *dg = NULL;
natsStatus s = NATS_OK;
bool matches = false;
int i;
bool dlvSubjEmpty = false;
jsConsumerConfig *ccfg = info->Config;
const char *dg = NULL;
natsStatus s = NATS_OK;
const char *stackFilterSubject[] = {ccfg->FilterSubject};
const char **filterSubjects = stackFilterSubject;
int filterSubjectsLen = 1;
int incoming, existing;

*dlvSubject = NULL;
// Always represent the consumer's filter subjects as a list, to match
// uniformly against the incoming subject list. Consider lists of 1 empty
// subject empty lists.
if (ccfg->FilterSubjectsLen > 0)
{
filterSubjects = ccfg->FilterSubjects;
filterSubjectsLen = ccfg->FilterSubjectsLen;
}
if ((filterSubjectsLen == 1) && nats_IsStringEmpty(filterSubjects[0]))
{
filterSubjects = NULL;
filterSubjectsLen = 0;
}
if ((numSubjects == 1) && nats_IsStringEmpty(subjects[0]))
{
subjects = NULL;
numSubjects = 0;
}

// Make sure this new subject matches or is a subset.
if (!nats_IsStringEmpty(subj))
// Match the subjects against the consumer's filter subjects.
if (numSubjects > 0 && filterSubjectsLen > 0)
{
if (nats_IsStringEmpty(ccfg->FilterSubject) && (ccfg->FilterSubjectsLen == 0))
levb marked this conversation as resolved.
Show resolved Hide resolved
{
matches = true;
}
else if (!nats_IsStringEmpty(ccfg->FilterSubject) && nats_HasPrefix(subj, ccfg->FilterSubject))
{
matches = true;
}
else if (ccfg->FilterSubjectsLen > 0)
// If the consumer has filter subject(s), then the subject(s) must match.
bool matches = true;

// TODO - This is N**2, but we don't expect a large number of subjects.
for (incoming = 0; incoming < numSubjects; incoming++)
{
for (i = 0; (i < ccfg->FilterSubjectsLen) && !matches; i++)
bool found = false;
for (existing = 0; existing < filterSubjectsLen; existing++)
{
matches = nats_HasPrefix(subj, ccfg->FilterSubjects[i]);
if (strcmp(subjects[incoming], filterSubjects[existing]) == 0)
{
found = true;
break;
}
}
if (!found)
{
matches = false;
break;
}
}

if (!matches)
{
return nats_setError(NATS_ERR, "subject '%s' does not match any consumer filter subjects.", subj);
if (numSubjects == 1 && filterSubjectsLen == 1)
return nats_setError(NATS_ERR, "subject '%s' does not match consumer filter subject '%s'.", subjects[0], filterSubjects[0]);
else
return nats_setError(NATS_ERR, "%d subjects do not match any consumer filter subjects.", numSubjects);
}
}
// Check that if user wants to create a queue sub,
Expand Down Expand Up @@ -2330,7 +2360,7 @@ js_checkConsName(const char *cons, bool isDurable)
}

static natsStatus
_subscribe(natsSubscription **new_sub, jsCtx *js, const char *subject, const char *pullDurable,
_subscribeMulti(natsSubscription **new_sub, jsCtx *js, const char **subjects, int numSubjects, const char *pullDurable,
natsMsgHandler usrCB, void *usrCBClosure, bool isPullMode,
jsOptions *jsOpts, jsSubOptions *opts, jsErrCode *errCode)
{
Expand Down Expand Up @@ -2393,7 +2423,7 @@ _subscribe(natsSubscription **new_sub, jsCtx *js, const char *subject, const cha
consumer = opts->Consumer;
consBound= (!nats_IsStringEmpty(stream) && !nats_IsStringEmpty(consumer));

if (nats_IsStringEmpty(subject) && !consBound)
if (((numSubjects <= 0) || nats_IsStringEmpty(subjects[0])) && !consBound)
return nats_setDefaultError(NATS_INVALID_ARG);

// Do some quick checks here for ordered consumers.
Expand Down Expand Up @@ -2450,9 +2480,10 @@ _subscribe(natsSubscription **new_sub, jsCtx *js, const char *subject, const cha

// Find the stream mapped to the subject if not bound to a stream already,
// that is, if user did not provide a `Stream` name through options).
if (nats_IsStringEmpty(stream))
if (nats_IsStringEmpty(stream) && numSubjects > 0)
{
s = _lookupStreamBySubject(&stream, nc, subject, &jo, errCode);
// Use the first subject to find the stream.
s = _lookupStreamBySubject(&stream, nc, subjects[0], &jo, errCode);
if (s != NATS_OK)
goto END;

Expand All @@ -2475,7 +2506,7 @@ _subscribe(natsSubscription **new_sub, jsCtx *js, const char *subject, const cha
s = nats_setError(NATS_ERR, "%s", "no configuration in consumer info");
goto END;
}
s = _processConsInfo(&deliver, info, &(opts->Config), isPullMode, subject, opts->Queue);
s = _processConsInfo(&deliver, info, &(opts->Config), isPullMode, subjects, numSubjects, opts->Queue);
if (s != NATS_OK)
goto END;

Expand Down Expand Up @@ -2509,7 +2540,15 @@ _subscribe(natsSubscription **new_sub, jsCtx *js, const char *subject, const cha
}

// Do filtering always, server will clear as needed.
cfg->FilterSubject = subject;
if (numSubjects == 1)
{
cfg->FilterSubject = subjects[0];
}
else
{
cfg->FilterSubjects = subjects;
cfg->FilterSubjectsLen = numSubjects;
}

if (opts->Ordered)
{
Expand Down Expand Up @@ -2559,8 +2598,15 @@ _subscribe(natsSubscription **new_sub, jsCtx *js, const char *subject, const cha
s = nats_setDefaultError(NATS_NO_MEMORY);
}
IF_OK_DUP_STRING(s, jsi->stream, stream);
if ((s == NATS_OK) && !nats_IsStringEmpty(subject))
DUP_STRING(s, jsi->psubj, subject);
if ((s == NATS_OK) && numSubjects > 0)
{
int l = nats_printStringArray(NULL, 0, subjects, numSubjects);
levb marked this conversation as resolved.
Show resolved Hide resolved
jsi->psubj = NATS_CALLOC(l, 1); // '\0' included
if (jsi->psubj == NULL)
s = nats_setDefaultError(NATS_NO_MEMORY);
if (s == NATS_OK)
nats_printStringArray(jsi->psubj, l, subjects, numSubjects);
}
if (s == NATS_OK)
{
jsi->js = js;
Expand Down Expand Up @@ -2713,6 +2759,19 @@ _subscribe(natsSubscription **new_sub, jsCtx *js, const char *subject, const cha
return NATS_UPDATE_ERR_STACK(s);
}

static natsStatus
_subscribe(natsSubscription **new_sub, jsCtx *js, const char *subject, const char *pullDurable,
natsMsgHandler usrCB, void *usrCBClosure, bool isPullMode,
jsOptions *jsOpts, jsSubOptions *opts, jsErrCode *errCode)
{
const char *subjects[] = {subject};

if (nats_IsStringEmpty(subject))
return _subscribeMulti(new_sub, js, NULL, 0, pullDurable, usrCB, usrCBClosure, isPullMode, jsOpts, opts, errCode);

return _subscribeMulti(new_sub, js, subjects, 1, pullDurable, usrCB, usrCBClosure, isPullMode, jsOpts, opts, errCode);
levb marked this conversation as resolved.
Show resolved Hide resolved
}

natsStatus
js_Subscribe(natsSubscription **sub, jsCtx *js, const char *subject,
natsMsgHandler cb, void *cbClosure,
Expand All @@ -2730,6 +2789,23 @@ js_Subscribe(natsSubscription **sub, jsCtx *js, const char *subject,
return NATS_UPDATE_ERR_STACK(s);
}

natsStatus
js_SubscribeMulti(natsSubscription **sub, jsCtx *js, const char **subjects, int numSubjects,
natsMsgHandler cb, void *cbClosure,
jsOptions *jsOpts, jsSubOptions *opts, jsErrCode *errCode)
{
natsStatus s;

if (errCode != NULL)
*errCode = 0;

if (cb == NULL)
return nats_setDefaultError(NATS_INVALID_ARG);

s = _subscribeMulti(sub, js, subjects, numSubjects, NULL, cb, cbClosure, false, jsOpts, opts, errCode);
return NATS_UPDATE_ERR_STACK(s);
}

natsStatus
js_SubscribeSync(natsSubscription **sub, jsCtx *js, const char *subject,
jsOptions *jsOpts, jsSubOptions *opts, jsErrCode *errCode)
Expand All @@ -2743,6 +2819,19 @@ js_SubscribeSync(natsSubscription **sub, jsCtx *js, const char *subject,
return NATS_UPDATE_ERR_STACK(s);
}

natsStatus
js_SubscribeSyncMulti(natsSubscription **sub, jsCtx *js, const char **subjects, int numSubjects,
jsOptions *jsOpts, jsSubOptions *opts, jsErrCode *errCode)
{
natsStatus s;

if (errCode != NULL)
*errCode = 0;

s = _subscribeMulti(sub, js, subjects, numSubjects, NULL, NULL, NULL, false, jsOpts, opts, errCode);
return NATS_UPDATE_ERR_STACK(s);
}

natsStatus
js_PullSubscribe(natsSubscription **sub, jsCtx *js, const char *subject, const char *durable,
jsOptions *jsOpts, jsSubOptions *opts, jsErrCode *errCode)
Expand Down
1 change: 0 additions & 1 deletion src/jsm.c
Expand Up @@ -3199,7 +3199,6 @@ js_unmarshalConsumerInfo(nats_JSON *json, jsConsumerInfo **new_ci)
IFOK(s, nats_JSONGetBool(json, "push_bound", &(ci->PushBound)));
IFOK(s, nats_JSONGetBool(json, "paused", &(ci->Paused)));
IFOK(s, nats_JSONGetLong(json, "pause_remaining", &(ci->PauseRemaining)));

if (s == NATS_OK)
*new_ci = ci;
else
Expand Down
39 changes: 36 additions & 3 deletions src/kv.c
Expand Up @@ -1057,14 +1057,28 @@ kvWatcher_Destroy(kvWatcher *w)

natsStatus
kvStore_Watch(kvWatcher **new_watcher, kvStore *kv, const char *key, kvWatchOptions *opts)
{
const char *subjects = { key };
return kvStore_WatchMulti(new_watcher, kv, &subjects, 1, opts);
}

natsStatus
kvStore_WatchMulti(kvWatcher **new_watcher, kvStore *kv, const char **keys, int numKeys, kvWatchOptions *opts)
{
natsStatus s;
kvWatcher *w = NULL;
jsSubOptions so;
char **subscribeSubjects = NULL;
int i;
DEFINE_BUF_FOR_SUBJECT;

if ((new_watcher == NULL) || (kv == NULL) || nats_IsStringEmpty(key))
if ((new_watcher == NULL) || (kv == NULL) || numKeys <= 0)
return nats_setDefaultError(NATS_INVALID_ARG);
for (i=0; i<numKeys; i++)
{
if (nats_IsStringEmpty(keys[i]))
return nats_setDefaultError(NATS_INVALID_ARG);
}

*new_watcher = NULL;

Expand All @@ -1076,7 +1090,25 @@ kvStore_Watch(kvWatcher **new_watcher, kvStore *kv, const char *key, kvWatchOpti
w->kv = kv;
w->refs = 1;

BUILD_SUBJECT(KEY_NAME_ONLY, NOT_FOR_A_PUT);
subscribeSubjects = (char**) NATS_CALLOC(numKeys, sizeof(const char*));
levb marked this conversation as resolved.
Show resolved Hide resolved
if (subscribeSubjects == NULL)
{
_freeWatcher(w);
return nats_setDefaultError(NATS_NO_MEMORY);
}
for (i=0; i<numKeys; i++)
{
const char *key = keys[i];
BUILD_SUBJECT(KEY_NAME_ONLY, NOT_FOR_A_PUT); // into buf, '\0'-terminated.
subscribeSubjects[i] = NATS_STRDUP(natsBuf_Data(&buf));
if (subscribeSubjects[i] == NULL)
{
s = nats_setDefaultError(NATS_NO_MEMORY);
NATS_FREE_STRINGS(subscribeSubjects, i);
_freeWatcher(w);
return nats_setDefaultError(NATS_NO_MEMORY);
}
}
IFOK(s, natsMutex_Create(&(w->mu)));
if (s == NATS_OK)
{
Expand All @@ -1096,7 +1128,7 @@ kvStore_Watch(kvWatcher **new_watcher, kvStore *kv, const char *key, kvWatchOpti
// Need to explicitly bind to the stream here because the subject
// we construct may not help find the stream when using mirrors.
so.Stream = kv->stream;
s = js_SubscribeSync(&(w->sub), kv->js, natsBuf_Data(&buf), NULL, &so, NULL);
s = js_SubscribeSyncMulti(&(w->sub), kv->js, (const char **)subscribeSubjects, numKeys, NULL, &so, NULL);
IFOK(s, natsSubscription_SetPendingLimits(w->sub, -1, -1));
if (s == NATS_OK)
{
Expand All @@ -1113,6 +1145,7 @@ kvStore_Watch(kvWatcher **new_watcher, kvStore *kv, const char *key, kvWatchOpti
}

natsBuf_Cleanup(&buf);
NATS_FREE_STRINGS(subscribeSubjects, numKeys);

if (s == NATS_OK)
*new_watcher = w;
Expand Down
10 changes: 10 additions & 0 deletions src/mem.h
Expand Up @@ -27,6 +27,16 @@
#endif
#define NATS_FREE(p) free((p))

static void NATS_FREE_STRINGS(char **strings, int count)
{
if (strings == NULL)
return;

for (int i = 0; i < count; i++)
NATS_FREE((char *)strings[i]);
NATS_FREE(strings);
}

// GNU C Library version 2.25 or later.
#if defined(__GLIBC__) && \
(__GLIBC__ > 2 || (__GLIBC__ == 2 && __GLIBC_MINOR__ >= 25))
Expand Down