Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADDED] kvStore_WatchMulti, js_Subscribe[Sync]Multi #750

Merged
merged 18 commits into from May 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
146 changes: 117 additions & 29 deletions src/js.c
Expand Up @@ -2197,38 +2197,68 @@

static natsStatus
_processConsInfo(const char **dlvSubject, jsConsumerInfo *info, jsConsumerConfig *userCfg,
bool isPullMode, const char *subj, const char *queue)
bool isPullMode, const char **subjects, int numSubjects, const char *queue)
{
bool dlvSubjEmpty = false;
jsConsumerConfig *ccfg = info->Config;
const char *dg = NULL;
natsStatus s = NATS_OK;
bool matches = false;
int i;
bool dlvSubjEmpty = false;
jsConsumerConfig *ccfg = info->Config;
const char *dg = NULL;
natsStatus s = NATS_OK;
const char *stackFilterSubject[] = {ccfg->FilterSubject};
const char **filterSubjects = stackFilterSubject;
int filterSubjectsLen = 1;
int incoming, existing;

*dlvSubject = NULL;
// Always represent the consumer's filter subjects as a list, to match
// uniformly against the incoming subject list. Consider lists of 1 empty
// subject empty lists.
if (ccfg->FilterSubjectsLen > 0)
{
filterSubjects = ccfg->FilterSubjects;
filterSubjectsLen = ccfg->FilterSubjectsLen;
}
if ((filterSubjectsLen == 1) && nats_IsStringEmpty(filterSubjects[0]))
{
filterSubjects = NULL;
filterSubjectsLen = 0;
}
if ((numSubjects == 1) && nats_IsStringEmpty(subjects[0]))
{
subjects = NULL;
numSubjects = 0;

Check warning on line 2228 in src/js.c

View check run for this annotation

Codecov / codecov/patch

src/js.c#L2227-L2228

Added lines #L2227 - L2228 were not covered by tests
}

// Make sure this new subject matches or is a subset.
if (!nats_IsStringEmpty(subj))
// Match the subjects against the consumer's filter subjects.
if (numSubjects > 0 && filterSubjectsLen > 0)
{
if (nats_IsStringEmpty(ccfg->FilterSubject) && (ccfg->FilterSubjectsLen == 0))
levb marked this conversation as resolved.
Show resolved Hide resolved
{
matches = true;
}
else if (!nats_IsStringEmpty(ccfg->FilterSubject) && nats_HasPrefix(subj, ccfg->FilterSubject))
{
matches = true;
}
else if (ccfg->FilterSubjectsLen > 0)
// If the consumer has filter subject(s), then the subject(s) must match.
bool matches = true;

// TODO - This is N**2, but we don't expect a large number of subjects.
for (incoming = 0; incoming < numSubjects; incoming++)
{
for (i = 0; (i < ccfg->FilterSubjectsLen) && !matches; i++)
bool found = false;
for (existing = 0; existing < filterSubjectsLen; existing++)
{
matches = nats_HasPrefix(subj, ccfg->FilterSubjects[i]);
if (strcmp(subjects[incoming], filterSubjects[existing]) == 0)
{
found = true;
break;
}
}
if (!found)
{
matches = false;
break;
}
}

if (!matches)
{
return nats_setError(NATS_ERR, "subject '%s' does not match any consumer filter subjects.", subj);
if (numSubjects == 1 && filterSubjectsLen == 1)
return nats_setError(NATS_ERR, "subject '%s' does not match consumer filter subject '%s'.", subjects[0], filterSubjects[0]);
else
return nats_setError(NATS_ERR, "%d subjects do not match any consumer filter subjects.", numSubjects);
}
}
// Check that if user wants to create a queue sub,
Expand Down Expand Up @@ -2330,7 +2360,7 @@
}

static natsStatus
_subscribe(natsSubscription **new_sub, jsCtx *js, const char *subject, const char *pullDurable,
_subscribeMulti(natsSubscription **new_sub, jsCtx *js, const char **subjects, int numSubjects, const char *pullDurable,
natsMsgHandler usrCB, void *usrCBClosure, bool isPullMode,
jsOptions *jsOpts, jsSubOptions *opts, jsErrCode *errCode)
{
Expand Down Expand Up @@ -2393,7 +2423,7 @@
consumer = opts->Consumer;
consBound= (!nats_IsStringEmpty(stream) && !nats_IsStringEmpty(consumer));

if (nats_IsStringEmpty(subject) && !consBound)
if (((numSubjects <= 0) || nats_IsStringEmpty(subjects[0])) && !consBound)
return nats_setDefaultError(NATS_INVALID_ARG);

// Do some quick checks here for ordered consumers.
Expand Down Expand Up @@ -2450,9 +2480,10 @@

// Find the stream mapped to the subject if not bound to a stream already,
// that is, if user did not provide a `Stream` name through options).
if (nats_IsStringEmpty(stream))
if (nats_IsStringEmpty(stream) && numSubjects > 0)
{
s = _lookupStreamBySubject(&stream, nc, subject, &jo, errCode);
// Use the first subject to find the stream.
s = _lookupStreamBySubject(&stream, nc, subjects[0], &jo, errCode);
if (s != NATS_OK)
goto END;

Expand All @@ -2475,7 +2506,7 @@
s = nats_setError(NATS_ERR, "%s", "no configuration in consumer info");
goto END;
}
s = _processConsInfo(&deliver, info, &(opts->Config), isPullMode, subject, opts->Queue);
s = _processConsInfo(&deliver, info, &(opts->Config), isPullMode, subjects, numSubjects, opts->Queue);
if (s != NATS_OK)
goto END;

Expand Down Expand Up @@ -2509,7 +2540,15 @@
}

// Do filtering always, server will clear as needed.
cfg->FilterSubject = subject;
if (numSubjects == 1)
{
cfg->FilterSubject = subjects[0];
}
else
{
cfg->FilterSubjects = subjects;
cfg->FilterSubjectsLen = numSubjects;
}

if (opts->Ordered)
{
Expand Down Expand Up @@ -2559,8 +2598,7 @@
s = nats_setDefaultError(NATS_NO_MEMORY);
}
IF_OK_DUP_STRING(s, jsi->stream, stream);
if ((s == NATS_OK) && !nats_IsStringEmpty(subject))
DUP_STRING(s, jsi->psubj, subject);
IFOK(s, nats_formatStringArray(&jsi->psubj, subjects, numSubjects));
if (s == NATS_OK)
{
jsi->js = js;
Expand Down Expand Up @@ -2713,6 +2751,26 @@
return NATS_UPDATE_ERR_STACK(s);
}

static natsStatus
_subscribe(natsSubscription **new_sub, jsCtx *js, const char *subject, const char *pullDurable,
natsMsgHandler usrCB, void *usrCBClosure, bool isPullMode,
jsOptions *jsOpts, jsSubOptions *opts, jsErrCode *errCode)
{
natsStatus s = NATS_OK;
const char *singleSubject[] = {subject};
int numSubjects = 1;
const char **subjects = singleSubject;

if (nats_IsStringEmpty(subject))
{
numSubjects = 0;
subjects = NULL;
}

s = _subscribeMulti(new_sub, js, subjects, numSubjects, pullDurable, usrCB, usrCBClosure, isPullMode, jsOpts, opts, errCode);
levb marked this conversation as resolved.
Show resolved Hide resolved
return NATS_UPDATE_ERR_STACK(s);
}

natsStatus
js_Subscribe(natsSubscription **sub, jsCtx *js, const char *subject,
natsMsgHandler cb, void *cbClosure,
Expand All @@ -2730,6 +2788,23 @@
return NATS_UPDATE_ERR_STACK(s);
}

natsStatus
js_SubscribeMulti(natsSubscription **sub, jsCtx *js, const char **subjects, int numSubjects,

Check warning on line 2792 in src/js.c

View check run for this annotation

Codecov / codecov/patch

src/js.c#L2792

Added line #L2792 was not covered by tests
natsMsgHandler cb, void *cbClosure,
jsOptions *jsOpts, jsSubOptions *opts, jsErrCode *errCode)
{
natsStatus s;

if (errCode != NULL)
*errCode = 0;

Check warning on line 2799 in src/js.c

View check run for this annotation

Codecov / codecov/patch

src/js.c#L2799

Added line #L2799 was not covered by tests

if (cb == NULL)
return nats_setDefaultError(NATS_INVALID_ARG);

Check warning on line 2802 in src/js.c

View check run for this annotation

Codecov / codecov/patch

src/js.c#L2802

Added line #L2802 was not covered by tests

s = _subscribeMulti(sub, js, subjects, numSubjects, NULL, cb, cbClosure, false, jsOpts, opts, errCode);
return NATS_UPDATE_ERR_STACK(s);

Check warning on line 2805 in src/js.c

View check run for this annotation

Codecov / codecov/patch

src/js.c#L2804-L2805

Added lines #L2804 - L2805 were not covered by tests
}

natsStatus
js_SubscribeSync(natsSubscription **sub, jsCtx *js, const char *subject,
jsOptions *jsOpts, jsSubOptions *opts, jsErrCode *errCode)
Expand All @@ -2743,6 +2818,19 @@
return NATS_UPDATE_ERR_STACK(s);
}

natsStatus
js_SubscribeSyncMulti(natsSubscription **sub, jsCtx *js, const char **subjects, int numSubjects,
jsOptions *jsOpts, jsSubOptions *opts, jsErrCode *errCode)
{
natsStatus s;

if (errCode != NULL)
*errCode = 0;

s = _subscribeMulti(sub, js, subjects, numSubjects, NULL, NULL, NULL, false, jsOpts, opts, errCode);
return NATS_UPDATE_ERR_STACK(s);

Check warning on line 2831 in src/js.c

View check run for this annotation

Codecov / codecov/patch

src/js.c#L2831

Added line #L2831 was not covered by tests
}

natsStatus
js_PullSubscribe(natsSubscription **sub, jsCtx *js, const char *subject, const char *durable,
jsOptions *jsOpts, jsSubOptions *opts, jsErrCode *errCode)
Expand Down
1 change: 0 additions & 1 deletion src/jsm.c
Expand Up @@ -3199,7 +3199,6 @@ js_unmarshalConsumerInfo(nats_JSON *json, jsConsumerInfo **new_ci)
IFOK(s, nats_JSONGetBool(json, "push_bound", &(ci->PushBound)));
IFOK(s, nats_JSONGetBool(json, "paused", &(ci->Paused)));
IFOK(s, nats_JSONGetLong(json, "pause_remaining", &(ci->PauseRemaining)));

if (s == NATS_OK)
*new_ci = ci;
else
Expand Down
53 changes: 50 additions & 3 deletions src/kv.c
Expand Up @@ -1057,14 +1057,30 @@

natsStatus
kvStore_Watch(kvWatcher **new_watcher, kvStore *kv, const char *key, kvWatchOptions *opts)
{
const char *subjects = { key };
return kvStore_WatchMulti(new_watcher, kv, &subjects, 1, opts);
}

natsStatus
kvStore_WatchMulti(kvWatcher **new_watcher, kvStore *kv, const char **keys, int numKeys, kvWatchOptions *opts)
{
natsStatus s;
kvWatcher *w = NULL;
jsSubOptions so;
char *singleSubject[1];
char **multipleSubjects = NULL; // allocate if numKeys > 1
char **subscribeSubjects = singleSubject;
int i;
DEFINE_BUF_FOR_SUBJECT;

if ((new_watcher == NULL) || (kv == NULL) || nats_IsStringEmpty(key))
if ((new_watcher == NULL) || (kv == NULL) || numKeys <= 0)
return nats_setDefaultError(NATS_INVALID_ARG);
for (i=0; i<numKeys; i++)
{
if (nats_IsStringEmpty(keys[i]))
return nats_setDefaultError(NATS_INVALID_ARG);
}

*new_watcher = NULL;

Expand All @@ -1076,7 +1092,36 @@
w->kv = kv;
w->refs = 1;

BUILD_SUBJECT(KEY_NAME_ONLY, NOT_FOR_A_PUT);
if (numKeys == 1)
{
// special case for single key to avoid a calloc.
subscribeSubjects[0] = (char *)keys[0];

}
else
{
multipleSubjects = (char **)NATS_CALLOC(numKeys, sizeof(const char *));
if (multipleSubjects == NULL)
{
_freeWatcher(w);
return nats_setDefaultError(NATS_NO_MEMORY);

Check warning on line 1107 in src/kv.c

View check run for this annotation

Codecov / codecov/patch

src/kv.c#L1106-L1107

Added lines #L1106 - L1107 were not covered by tests
}
subscribeSubjects = multipleSubjects;
}
for (i = 0; i < numKeys; i++)
{
const char *key = keys[i];
BUILD_SUBJECT(KEY_NAME_ONLY, NOT_FOR_A_PUT); // into buf, '\0'-terminated.
subscribeSubjects[i] = NATS_STRDUP(natsBuf_Data(&buf));
if (subscribeSubjects[i] == NULL)
{
s = nats_setDefaultError(NATS_NO_MEMORY);
NATS_FREE_STRINGS(subscribeSubjects, i);
NATS_FREE(multipleSubjects);
_freeWatcher(w);
return nats_setDefaultError(NATS_NO_MEMORY);

Check warning on line 1122 in src/kv.c

View check run for this annotation

Codecov / codecov/patch

src/kv.c#L1118-L1122

Added lines #L1118 - L1122 were not covered by tests
}
}
IFOK(s, natsMutex_Create(&(w->mu)));
if (s == NATS_OK)
{
Expand All @@ -1096,7 +1141,7 @@
// Need to explicitly bind to the stream here because the subject
// we construct may not help find the stream when using mirrors.
so.Stream = kv->stream;
s = js_SubscribeSync(&(w->sub), kv->js, natsBuf_Data(&buf), NULL, &so, NULL);
s = js_SubscribeSyncMulti(&(w->sub), kv->js, (const char **)subscribeSubjects, numKeys, NULL, &so, NULL);
IFOK(s, natsSubscription_SetPendingLimits(w->sub, -1, -1));
if (s == NATS_OK)
{
Expand All @@ -1113,6 +1158,8 @@
}

natsBuf_Cleanup(&buf);
NATS_FREE_STRINGS(subscribeSubjects, numKeys);
NATS_FREE(multipleSubjects);

if (s == NATS_OK)
*new_watcher = w;
Expand Down
9 changes: 9 additions & 0 deletions src/mem.h
Expand Up @@ -27,6 +27,15 @@
#endif
#define NATS_FREE(p) free((p))

// **Note** does not free the array itself.
static void NATS_FREE_STRINGS(char **strings, int count)
{
if (strings == NULL)
return;

Check warning on line 34 in src/mem.h

View check run for this annotation

Codecov / codecov/patch

src/mem.h#L34

Added line #L34 was not covered by tests
for (int i = 0; i < count; i++)
NATS_FREE((char *)strings[i]);
}

// GNU C Library version 2.25 or later.
#if defined(__GLIBC__) && \
(__GLIBC__ > 2 || (__GLIBC__ == 2 && __GLIBC_MINOR__ >= 25))
Expand Down