Skip to content

Commit

Permalink
Merge pull request #693 from nats-io/reset-legacy-oc
Browse files Browse the repository at this point in the history
[FIX] more robust watcher in case consumer is recreated while the cluster is flapping
  • Loading branch information
aricart committed Apr 29, 2024
2 parents 66a47b9 + ecb4096 commit 7fe57c3
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 9 deletions.
40 changes: 32 additions & 8 deletions jetstream/jsbaseclient_api.ts
Expand Up @@ -15,13 +15,15 @@

import { Empty } from "../nats-base-client/encoders.ts";
import { Codec, JSONCodec } from "../nats-base-client/codec.ts";
import { extend } from "../nats-base-client/util.ts";
import { backoff, delay, extend } from "../nats-base-client/util.ts";
import { NatsConnectionImpl } from "../nats-base-client/nats.ts";
import { checkJsErrorCode } from "./jsutil.ts";
import {
ErrorCode,
JetStreamOptions,
Msg,
NatsConnection,
NatsError,
RequestOptions,
} from "../nats-base-client/core.ts";
import { ApiResponse } from "./jsapi_types.ts";
Expand Down Expand Up @@ -81,7 +83,7 @@ export class BaseApiClient {
async _request(
subj: string,
data: unknown = null,
opts?: RequestOptions,
opts?: Partial<RequestOptions> & { retries?: number },
): Promise<unknown> {
opts = opts || {} as RequestOptions;
opts.timeout = this.timeout;
Expand All @@ -91,12 +93,34 @@ export class BaseApiClient {
a = this.jc.encode(data);
}

const m = await this.nc.request(
subj,
a,
opts,
);
return this.parseJsResponse(m);
let { retries } = opts as {
retries: number;
};

retries = retries || 1;
retries = retries === -1 ? Number.MAX_SAFE_INTEGER : retries;
const bo = backoff();

for (let i = 0; i < retries; i++) {
try {
const m = await this.nc.request(
subj,
a,
opts as RequestOptions,
);
return this.parseJsResponse(m);
} catch (err) {
const ne = err as NatsError;
if (
(ne.code === "503" || ne.code === ErrorCode.Timeout) &&
i + 1 < retries
) {
await delay(bo.backoff(i));
} else {
throw err;
}
}
}
}

async findStream(subject: string): Promise<string> {
Expand Down
2 changes: 1 addition & 1 deletion jetstream/jsclient.ts
Expand Up @@ -792,7 +792,7 @@ export class JetStreamSubscriptionImpl extends TypedSubscription<JsMsg>

const subj = `${info.api.prefix}.CONSUMER.CREATE.${info.stream}`;

this.js._request(subj, req)
this.js._request(subj, req, { retries: -1 })
.then((v) => {
const ci = v as ConsumerInfo;
this.info!.config = ci.config;
Expand Down

0 comments on commit 7fe57c3

Please sign in to comment.