From d1e9ed513083ed0b0e823597cf8eb6ec8994d027 Mon Sep 17 00:00:00 2001 From: Jonas Lagoni Date: Tue, 8 Nov 2022 14:38:53 +0100 Subject: [PATCH] feat: add jetstream pull subscribe wrappers (#480) --- .../channel/jetStreamPullSubscription.js | 70 +++++++++++++++++++ components/index/jetStreamPullSubscription.js | 55 +++++++++++++++ .../asyncapi-nats-client/API.md | 17 +++++ .../src/testclient/index.ts | 35 ++++++++++ .../StreetlightStreetlightIdCommandTurnon.ts | 42 +++++++++++ .../asyncapi-nats-client/API.md | 32 +++++++++ .../StreetlightStreetlightIdCommandTurnon.ts | 42 +++++++++++ .../asyncapi-nats-client/src/index.ts | 35 ++++++++++ template/src/channels/$$channel$$.ts.js | 10 ++- template/src/index.ts.js | 8 ++- template/src/testclient/index.ts.js | 8 ++- .../testclient/testchannels/$$channel$$.ts.js | 14 ++-- 12 files changed, 359 insertions(+), 9 deletions(-) create mode 100644 components/channel/jetStreamPullSubscription.js create mode 100644 components/index/jetStreamPullSubscription.js diff --git a/components/channel/jetStreamPullSubscription.js b/components/channel/jetStreamPullSubscription.js new file mode 100644 index 000000000..0ac9e16ee --- /dev/null +++ b/components/channel/jetStreamPullSubscription.js @@ -0,0 +1,70 @@ +import { realizeChannelName, camelCase, getMessageType, messageHasNullPayload, realizeParametersForChannelWrapper, renderJSDocParameters} from '../../utils/index'; +import { unwrap } from './ChannelParameterUnwrap'; +// eslint-disable-next-line no-unused-vars +import { Message, ChannelParameter } from '@asyncapi/parser'; + +/** + * Component which returns a function which subscribes to the given channel + * + * @param {string} defaultContentType + * @param {string} channelName to subscribe to + * @param {Message} message which is being received + * @param {Object.} channelParameters parameters to the channel + */ +export function JetstreamPullSubscription(channelName, message, channelParameters) { + const messageType = getMessageType(message); + let parameters = []; + parameters = Object.entries(channelParameters).map(([parameterName]) => { + return `${camelCase(parameterName)}Param`; + }); + const hasNullPayload = messageHasNullPayload(message.payload()); + + //Determine the callback process when receiving messages. + //If the message payload is null no hooks are called to process the received data. + let whenReceivingMessage = `onDataCallback(undefined, null ${parameters.length > 0 && `, ${parameters.join(',')}`});`; + if (!hasNullPayload) { + whenReceivingMessage = ` + let receivedData: any = codec.decode(msg.data); + onDataCallback(undefined, ${messageType}.unmarshal(receivedData) ${parameters.length > 0 && `, ${parameters.join(',')}`}); + `; + } + + return ` + /** + * Internal functionality to setup jetstream pull subscription on the \`${channelName}\` channel + * + * @param onDataCallback to call when messages are received + * @param nc to subscribe with + * @param codec used to convert messages + ${renderJSDocParameters(channelParameters)} + */ + export function jetStreamPullSubscribe( + onDataCallback: ( + err ? : NatsTypescriptTemplateError, + msg?: ${messageType} + ${realizeParametersForChannelWrapper(channelParameters, false)}, + jetstreamMsg?: Nats.JsMsg) => void, + js: Nats.JetStreamClient, + codec: Nats.Codec < any > + ${realizeParametersForChannelWrapper(channelParameters)}, + options: Nats.ConsumerOptsBuilder | Partial + ): Promise < Nats.JetStreamPullSubscription > { + return new Promise(async (resolve, reject) => { + try { + const subscription = await js.pullSubscribe(${realizeChannelName(channelParameters, channelName)}, options); + + (async () => { + for await (const msg of subscription) { + ${unwrap(channelName, channelParameters)} + + ${whenReceivingMessage} + } + })(); + resolve(subscription); + } catch (e: any) { + reject(NatsTypescriptTemplateError.errorForCode(ErrorCode.INTERNAL_NATS_TS_ERROR, e)); + } + }) + } + `; +} diff --git a/components/index/jetStreamPullSubscription.js b/components/index/jetStreamPullSubscription.js new file mode 100644 index 000000000..85ce0a3a9 --- /dev/null +++ b/components/index/jetStreamPullSubscription.js @@ -0,0 +1,55 @@ +import { pascalCase, camelCase, getMessageType, realizeParametersForChannelWrapper, realizeParametersForChannelWithoutType, renderJSDocParameters} from '../../utils/index'; +// eslint-disable-next-line no-unused-vars +import { Message, ChannelParameter } from '@asyncapi/parser'; + +/** + * Component which returns a subscribe to function for the client + * + * @param {string} defaultContentType + * @param {string} channelName to publish to + * @param {Message} message which is being received + * @param {string} messageDescription + * @param {Object.} channelParameters parameters to the channel + */ +export function JetstreamPullSubscribe(channelName, message, messageDescription, channelParameters) { + return ` + /** + * Push subscription to the \`${channelName}\` + * + * ${messageDescription} + * + * @param onDataCallback to call when messages are received + ${renderJSDocParameters(channelParameters)} + * @param flush ensure client is force flushed after subscribing + * @param options to subscribe with, bindings from the AsyncAPI document overwrite these if specified + */ + public jetStreamPullSubscribeTo${pascalCase(channelName)}( + onDataCallback: ( + err ? : NatsTypescriptTemplateError, + msg?: ${getMessageType(message)} + ${realizeParametersForChannelWrapper(channelParameters, false)}, + jetstreamMsg?: Nats.JsMsg) => void + ${realizeParametersForChannelWrapper(channelParameters)}, + options: Nats.ConsumerOptsBuilder | Partial + ): Promise { + return new Promise(async (resolve, reject) => { + if (!this.isClosed() && this.nc !== undefined && this.codec !== undefined && this.js !== undefined) { + try { + const sub = ${camelCase(channelName)}Channel.jetStreamPullSubscribe( + onDataCallback, + this.js, + this.codec + ${Object.keys(channelParameters).length ? ` ,${realizeParametersForChannelWithoutType(channelParameters)}` : ''}, + options + ); + resolve(sub); + } catch (e: any) { + reject(e); + } + } else { + reject(NatsTypescriptTemplateError.errorForCode(ErrorCode.NOT_CONNECTED)); + } + }); + } + `; +} diff --git a/examples/simple-publish/asyncapi-nats-client/API.md b/examples/simple-publish/asyncapi-nats-client/API.md index 54b059d9e..cc01e9e2d 100644 --- a/examples/simple-publish/asyncapi-nats-client/API.md +++ b/examples/simple-publish/asyncapi-nats-client/API.md @@ -196,6 +196,7 @@ The test/mirror client which is the reverse to the normal NatsAsyncApiClient. * [.subscribeToStreetlightStreetlightIdCommandTurnon(onDataCallback, streetlight_id, flush, options)](#NatsAsyncApiTestClient+subscribeToStreetlightStreetlightIdCommandTurnon) * [.jetStreamPullStreetlightStreetlightIdCommandTurnon(onDataCallback, streetlight_id, options)](#NatsAsyncApiTestClient+jetStreamPullStreetlightStreetlightIdCommandTurnon) * [.jetStreamPushSubscribeToStreetlightStreetlightIdCommandTurnon(onDataCallback, streetlight_id, flush, options)](#NatsAsyncApiTestClient+jetStreamPushSubscribeToStreetlightStreetlightIdCommandTurnon) + * [.jetStreamPullSubscribeToStreetlightStreetlightIdCommandTurnon(onDataCallback, streetlight_id, flush, options)](#NatsAsyncApiTestClient+jetStreamPullSubscribeToStreetlightStreetlightIdCommandTurnon) @@ -313,3 +314,19 @@ Channel for the turn on command which should turn on the streetlight | flush | ensure client is force flushed after subscribing | | options | to subscribe with, bindings from the AsyncAPI document overwrite these if specified | + + +### natsAsyncApiTestClient.jetStreamPullSubscribeToStreetlightStreetlightIdCommandTurnon(onDataCallback, streetlight_id, flush, options) +Push subscription to the `streetlight/{streetlight_id}/command/turnon` + +Channel for the turn on command which should turn on the streetlight + +**Kind**: instance method of [NatsAsyncApiTestClient](#NatsAsyncApiTestClient) + +| Param | Description | +| --- | --- | +| onDataCallback | to call when messages are received | +| streetlight_id | parameter to use in topic | +| flush | ensure client is force flushed after subscribing | +| options | to subscribe with, bindings from the AsyncAPI document overwrite these if specified | + diff --git a/examples/simple-publish/asyncapi-nats-client/src/testclient/index.ts b/examples/simple-publish/asyncapi-nats-client/src/testclient/index.ts index 625c6e085..5c1a3e341 100644 --- a/examples/simple-publish/asyncapi-nats-client/src/testclient/index.ts +++ b/examples/simple-publish/asyncapi-nats-client/src/testclient/index.ts @@ -210,4 +210,39 @@ export class NatsAsyncApiTestClient { } }); } + /** + * Push subscription to the `streetlight/{streetlight_id}/command/turnon` + * + * Channel for the turn on command which should turn on the streetlight + * + * @param onDataCallback to call when messages are received + * @param streetlight_id parameter to use in topic + * @param flush ensure client is force flushed after subscribing + * @param options to subscribe with, bindings from the AsyncAPI document overwrite these if specified + */ + public jetStreamPullSubscribeToStreetlightStreetlightIdCommandTurnon( + onDataCallback: ( + err ? : NatsTypescriptTemplateError, + msg ? : TurnOn, streetlight_id ? : string, + jetstreamMsg ? : Nats.JsMsg) => void, streetlight_id: string, + options: Nats.ConsumerOptsBuilder | Partial < Nats.ConsumerOpts > + ): Promise < Nats.JetStreamPullSubscription > { + return new Promise(async (resolve, reject) => { + if (!this.isClosed() && this.nc !== undefined && this.codec !== undefined && this.js !== undefined) { + try { + const sub = streetlightStreetlightIdCommandTurnonChannel.jetStreamPullSubscribe( + onDataCallback, + this.js, + this.codec, streetlight_id, + options + ); + resolve(sub); + } catch (e: any) { + reject(e); + } + } else { + reject(NatsTypescriptTemplateError.errorForCode(ErrorCode.NOT_CONNECTED)); + } + }); + } } \ No newline at end of file diff --git a/examples/simple-publish/asyncapi-nats-client/src/testclient/testchannels/StreetlightStreetlightIdCommandTurnon.ts b/examples/simple-publish/asyncapi-nats-client/src/testclient/testchannels/StreetlightStreetlightIdCommandTurnon.ts index f63c1f6d2..c09f52fae 100644 --- a/examples/simple-publish/asyncapi-nats-client/src/testclient/testchannels/StreetlightStreetlightIdCommandTurnon.ts +++ b/examples/simple-publish/asyncapi-nats-client/src/testclient/testchannels/StreetlightStreetlightIdCommandTurnon.ts @@ -130,4 +130,46 @@ export function jetStreamPushSubscribe( reject(NatsTypescriptTemplateError.errorForCode(ErrorCode.INTERNAL_NATS_TS_ERROR, e)); } }) +} +/** + * Internal functionality to setup jetstream pull subscription on the `streetlight/{streetlight_id}/command/turnon` channel + * + * @param onDataCallback to call when messages are received + * @param nc to subscribe with + * @param codec used to convert messages + * @param streetlight_id parameter to use in topic + */ +export function jetStreamPullSubscribe( + onDataCallback: ( + err ? : NatsTypescriptTemplateError, + msg ? : TurnOn, streetlight_id ? : string, + jetstreamMsg ? : Nats.JsMsg) => void, + js: Nats.JetStreamClient, + codec: Nats.Codec < any > , streetlight_id: string, + options: Nats.ConsumerOptsBuilder | Partial < Nats.ConsumerOpts > +): Promise < Nats.JetStreamPullSubscription > { + return new Promise(async (resolve, reject) => { + try { + const subscription = await js.pullSubscribe(`streetlight.${streetlight_id}.command.turnon`, options); + (async () => { + for await (const msg of subscription) { + const unmodifiedChannel = `streetlight.{streetlight_id}.command.turnon`; + let channel = msg.subject; + const streetlightIdSplit = unmodifiedChannel.split("{streetlight_id}"); + const splits = [ + streetlightIdSplit[0], + streetlightIdSplit[1] + ]; + channel = channel.substring(splits[0].length); + const streetlightIdEnd = channel.indexOf(splits[1]); + const streetlightIdParam = "" + channel.substring(0, streetlightIdEnd); + let receivedData: any = codec.decode(msg.data); + onDataCallback(undefined, TurnOn.unmarshal(receivedData), streetlightIdParam); + } + })(); + resolve(subscription); + } catch (e: any) { + reject(NatsTypescriptTemplateError.errorForCode(ErrorCode.INTERNAL_NATS_TS_ERROR, e)); + } + }) } \ No newline at end of file diff --git a/examples/simple-subscribe/asyncapi-nats-client/API.md b/examples/simple-subscribe/asyncapi-nats-client/API.md index 758be83b9..a00b0d5b9 100644 --- a/examples/simple-subscribe/asyncapi-nats-client/API.md +++ b/examples/simple-subscribe/asyncapi-nats-client/API.md @@ -29,6 +29,7 @@ Module which wraps functionality for the `streetlight/{streetlight_id}/command/t * [~subscribe(onDataCallback, nc, codec, streetlight_id, options)](#module_streetlightStreetlightIdCommandTurnon..subscribe) * [~jetStreamPull(onDataCallback, js, codec, streetlight_id)](#module_streetlightStreetlightIdCommandTurnon..jetStreamPull) * [~jetStreamPushSubscribe(onDataCallback, nc, codec, streetlight_id, options)](#module_streetlightStreetlightIdCommandTurnon..jetStreamPushSubscribe) + * [~jetStreamPullSubscribe(onDataCallback, nc, codec, streetlight_id)](#module_streetlightStreetlightIdCommandTurnon..jetStreamPullSubscribe) @@ -74,6 +75,20 @@ Internal functionality to setup jetstream push subscription on the `streetlight/ | streetlight_id | parameter to use in topic | | options | to subscribe with, bindings from the AsyncAPI document overwrite these if specified | + + +### streetlightStreetlightIdCommandTurnon~jetStreamPullSubscribe(onDataCallback, nc, codec, streetlight_id) +Internal functionality to setup jetstream pull subscription on the `streetlight/{streetlight_id}/command/turnon` channel + +**Kind**: inner method of [streetlightStreetlightIdCommandTurnon](#module_streetlightStreetlightIdCommandTurnon) + +| Param | Description | +| --- | --- | +| onDataCallback | to call when messages are received | +| nc | to subscribe with | +| codec | used to convert messages | +| streetlight_id | parameter to use in topic | + ## NatsAsyncApiClient @@ -94,6 +109,7 @@ The generated client based on your AsyncAPI document. * [.subscribeToStreetlightStreetlightIdCommandTurnon(onDataCallback, streetlight_id, flush, options)](#NatsAsyncApiClient+subscribeToStreetlightStreetlightIdCommandTurnon) * [.jetStreamPullStreetlightStreetlightIdCommandTurnon(onDataCallback, streetlight_id, options)](#NatsAsyncApiClient+jetStreamPullStreetlightStreetlightIdCommandTurnon) * [.jetStreamPushSubscribeToStreetlightStreetlightIdCommandTurnon(onDataCallback, streetlight_id, flush, options)](#NatsAsyncApiClient+jetStreamPushSubscribeToStreetlightStreetlightIdCommandTurnon) + * [.jetStreamPullSubscribeToStreetlightStreetlightIdCommandTurnon(onDataCallback, streetlight_id, flush, options)](#NatsAsyncApiClient+jetStreamPullSubscribeToStreetlightStreetlightIdCommandTurnon) @@ -211,6 +227,22 @@ Channel for the turn on command which should turn on the streetlight | flush | ensure client is force flushed after subscribing | | options | to subscribe with, bindings from the AsyncAPI document overwrite these if specified | + + +### natsAsyncApiClient.jetStreamPullSubscribeToStreetlightStreetlightIdCommandTurnon(onDataCallback, streetlight_id, flush, options) +Push subscription to the `streetlight/{streetlight_id}/command/turnon` + +Channel for the turn on command which should turn on the streetlight + +**Kind**: instance method of [NatsAsyncApiClient](#NatsAsyncApiClient) + +| Param | Description | +| --- | --- | +| onDataCallback | to call when messages are received | +| streetlight_id | parameter to use in topic | +| flush | ensure client is force flushed after subscribing | +| options | to subscribe with, bindings from the AsyncAPI document overwrite these if specified | + ## NatsAsyncApiTestClient diff --git a/examples/simple-subscribe/asyncapi-nats-client/src/channels/StreetlightStreetlightIdCommandTurnon.ts b/examples/simple-subscribe/asyncapi-nats-client/src/channels/StreetlightStreetlightIdCommandTurnon.ts index 8c20a7367..06282ccf9 100644 --- a/examples/simple-subscribe/asyncapi-nats-client/src/channels/StreetlightStreetlightIdCommandTurnon.ts +++ b/examples/simple-subscribe/asyncapi-nats-client/src/channels/StreetlightStreetlightIdCommandTurnon.ts @@ -130,4 +130,46 @@ export function jetStreamPushSubscribe( reject(NatsTypescriptTemplateError.errorForCode(ErrorCode.INTERNAL_NATS_TS_ERROR, e)); } }) +} +/** + * Internal functionality to setup jetstream pull subscription on the `streetlight/{streetlight_id}/command/turnon` channel + * + * @param onDataCallback to call when messages are received + * @param nc to subscribe with + * @param codec used to convert messages + * @param streetlight_id parameter to use in topic + */ +export function jetStreamPullSubscribe( + onDataCallback: ( + err ? : NatsTypescriptTemplateError, + msg ? : TurnOn, streetlight_id ? : string, + jetstreamMsg ? : Nats.JsMsg) => void, + js: Nats.JetStreamClient, + codec: Nats.Codec < any > , streetlight_id: string, + options: Nats.ConsumerOptsBuilder | Partial < Nats.ConsumerOpts > +): Promise < Nats.JetStreamPullSubscription > { + return new Promise(async (resolve, reject) => { + try { + const subscription = await js.pullSubscribe(`streetlight.${streetlight_id}.command.turnon`, options); + (async () => { + for await (const msg of subscription) { + const unmodifiedChannel = `streetlight.{streetlight_id}.command.turnon`; + let channel = msg.subject; + const streetlightIdSplit = unmodifiedChannel.split("{streetlight_id}"); + const splits = [ + streetlightIdSplit[0], + streetlightIdSplit[1] + ]; + channel = channel.substring(splits[0].length); + const streetlightIdEnd = channel.indexOf(splits[1]); + const streetlightIdParam = "" + channel.substring(0, streetlightIdEnd); + let receivedData: any = codec.decode(msg.data); + onDataCallback(undefined, TurnOn.unmarshal(receivedData), streetlightIdParam); + } + })(); + resolve(subscription); + } catch (e: any) { + reject(NatsTypescriptTemplateError.errorForCode(ErrorCode.INTERNAL_NATS_TS_ERROR, e)); + } + }) } \ No newline at end of file diff --git a/examples/simple-subscribe/asyncapi-nats-client/src/index.ts b/examples/simple-subscribe/asyncapi-nats-client/src/index.ts index 26e2aaa2b..7378a06bf 100644 --- a/examples/simple-subscribe/asyncapi-nats-client/src/index.ts +++ b/examples/simple-subscribe/asyncapi-nats-client/src/index.ts @@ -218,4 +218,39 @@ export class NatsAsyncApiClient { } }); } + /** + * Push subscription to the `streetlight/{streetlight_id}/command/turnon` + * + * Channel for the turn on command which should turn on the streetlight + * + * @param onDataCallback to call when messages are received + * @param streetlight_id parameter to use in topic + * @param flush ensure client is force flushed after subscribing + * @param options to subscribe with, bindings from the AsyncAPI document overwrite these if specified + */ + public jetStreamPullSubscribeToStreetlightStreetlightIdCommandTurnon( + onDataCallback: ( + err ? : NatsTypescriptTemplateError, + msg ? : TurnOn, streetlight_id ? : string, + jetstreamMsg ? : Nats.JsMsg) => void, streetlight_id: string, + options: Nats.ConsumerOptsBuilder | Partial < Nats.ConsumerOpts > + ): Promise < Nats.JetStreamPullSubscription > { + return new Promise(async (resolve, reject) => { + if (!this.isClosed() && this.nc !== undefined && this.codec !== undefined && this.js !== undefined) { + try { + const sub = streetlightStreetlightIdCommandTurnonChannel.jetStreamPullSubscribe( + onDataCallback, + this.js, + this.codec, streetlight_id, + options + ); + resolve(sub); + } catch (e: any) { + reject(e); + } + } else { + reject(NatsTypescriptTemplateError.errorForCode(ErrorCode.NOT_CONNECTED)); + } + }); + } } \ No newline at end of file diff --git a/template/src/channels/$$channel$$.ts.js b/template/src/channels/$$channel$$.ts.js index 8a16c7c86..90891abe4 100644 --- a/template/src/channels/$$channel$$.ts.js +++ b/template/src/channels/$$channel$$.ts.js @@ -9,6 +9,7 @@ import { pascalCase, isRequestReply, isReplier, isRequester, isPubsub, camelCase import { AsyncAPIDocument, Channel } from '@asyncapi/parser'; import { JetstreamPushSubscription } from '../../../components/channel/jetstreamPushSubscription'; import { JetstreamPull } from '../../../components/channel/jetstreamPull'; +import { JetstreamPullSubscription } from '../../../components/channel/jetStreamPullSubscription'; import { JetstreamPublish } from '../../../components/channel/jetstreamPublish'; /** @@ -77,16 +78,19 @@ function getChannelCode(channel, channelName, params) { publishMessage, channel.parameters(), publishOperation); + const jetstreamPullSubscriptionCode = JetstreamPullSubscription( + channelName, + publishMessage, + channel.parameters()); const jetstreamPushSubscriptionCode = JetstreamPushSubscription( channelName, publishMessage, - channel.parameters(), - publishOperation); + channel.parameters()); const jetstreamPullCode = JetstreamPull( channelName, publishMessage, channel.parameters()); - channelcode = `${normalSubscribeCode}\n${jetstreamPullCode}\n${jetstreamPushSubscriptionCode}`; + channelcode = `${normalSubscribeCode}\n${jetstreamPullCode}\n${jetstreamPushSubscriptionCode}\n${jetstreamPullSubscriptionCode}`; } } return channelcode; diff --git a/template/src/index.ts.js b/template/src/index.ts.js index ff3baaa08..78a9cc8cd 100644 --- a/template/src/index.ts.js +++ b/template/src/index.ts.js @@ -9,6 +9,7 @@ import { isRequestReply, isReplier, isRequester, isPubsub} from '../../utils/ind import { AsyncAPIDocument } from '@asyncapi/parser'; import { JetstreamPushSubscription } from '../../components/index/jetstreamPushSubscription'; import { JetstreamPull } from '../../components/index/jetstreamPull'; +import { JetstreamPullSubscribe } from '../../components/index/jetStreamPullSubscription'; import { JetstreamPublish } from '../../components/index/jetstreamPublish'; /** @@ -82,6 +83,11 @@ function getChannelWrappers(asyncapi, params) { publishMessage, channelDescription, channelParameters); + const jetstreamPullSubscribe = JetstreamPullSubscribe( + channelName, + publishMessage, + channelDescription, + channelParameters); const jetstreamPushSubscriptionCode = JetstreamPushSubscription( channelName, publishMessage, @@ -92,7 +98,7 @@ function getChannelWrappers(asyncapi, params) { publishMessage, channelDescription, channelParameters); - return `${normalSubscribeCode}\n${jetstreamPullCode}\n${jetstreamPushSubscriptionCode}`; + return `${normalSubscribeCode}\n${jetstreamPullCode}\n${jetstreamPushSubscriptionCode}\n${jetstreamPullSubscribe}`; } } }); diff --git a/template/src/testclient/index.ts.js b/template/src/testclient/index.ts.js index e6fdb2bdc..d77e3a5f6 100644 --- a/template/src/testclient/index.ts.js +++ b/template/src/testclient/index.ts.js @@ -7,6 +7,7 @@ import { Request } from '../../../components/index/request'; import { isRequestReply, isReplier, isRequester, isPubsub} from '../../../utils/index'; // eslint-disable-next-line no-unused-vars import { AsyncAPIDocument, ChannelParameter } from '@asyncapi/parser'; +import { JetstreamPullSubscribe } from '../../../components/index/jetStreamPullSubscription'; import { JetstreamPushSubscription } from '../../../components/index/jetstreamPushSubscription'; import { JetstreamPull } from '../../../components/index/jetstreamPull'; import { JetstreamPublish } from '../../../components/index/jetstreamPublish'; @@ -68,6 +69,11 @@ function getChannelWrappers(asyncapi, params) { subscribeMessage, channelDescription, channelParameters); + const jetstreamPullSubscribe = JetstreamPullSubscribe( + channelName, + subscribeMessage, + channelDescription, + channelParameters); const jetstreamPushSubscriptionCode = JetstreamPushSubscription( channelName, subscribeMessage, @@ -78,7 +84,7 @@ function getChannelWrappers(asyncapi, params) { subscribeMessage, channelDescription, channelParameters); - return `${normalSubscribeCode}\n${jetstreamPullCode}\n${jetstreamPushSubscriptionCode}`; + return `${normalSubscribeCode}\n${jetstreamPullCode}\n${jetstreamPushSubscriptionCode}\n${jetstreamPullSubscribe}`; } if (channel.hasPublish()) { const normalPublish = Publish( diff --git a/template/src/testclient/testchannels/$$channel$$.ts.js b/template/src/testclient/testchannels/$$channel$$.ts.js index 7885682b1..e499910a8 100644 --- a/template/src/testclient/testchannels/$$channel$$.ts.js +++ b/template/src/testclient/testchannels/$$channel$$.ts.js @@ -10,6 +10,7 @@ import { AsyncAPIDocument, Channel } from '@asyncapi/parser'; import { JetstreamPushSubscription } from '../../../../components/channel/jetstreamPushSubscription'; import { JetstreamPull } from '../../../../components/channel/jetstreamPull'; import { JetstreamPublish } from '../../../../components/channel/jetstreamPublish'; +import { JetstreamPullSubscription } from '../../../../components/channel/jetStreamPullSubscription'; /** * @typedef TemplateParameters @@ -60,19 +61,24 @@ function getChannelCode(channel, channelName, params) { if (isPubsub(channel)) { if (channel.hasSubscribe()) { + const message = channel.subscribe() ? channel.subscribe().message(0) : undefined; const normalSubscribeCode = Subscribe( channelName, - channel.subscribe() ? channel.subscribe().message(0) : undefined, + message, channel.parameters()); const jetstreamPushSubscriptionCode = JetstreamPushSubscription( channelName, - channel.subscribe() ? channel.subscribe().message(0) : undefined, + message, + channel.parameters()); + const jetstreamPullSubscriptionCode = JetstreamPullSubscription( + channelName, + message, channel.parameters()); const jetstreamPullCode = JetstreamPull( channelName, - channel.subscribe() ? channel.subscribe().message(0) : undefined, + message, channel.parameters()); - channelcode = `${normalSubscribeCode}\n${jetstreamPullCode}\n${jetstreamPushSubscriptionCode}`; + channelcode = `${normalSubscribeCode}\n${jetstreamPullCode}\n${jetstreamPushSubscriptionCode}\n${jetstreamPullSubscriptionCode}`; } if (channel.hasPublish()) { const publishCode = Publish(