From e893e9f51828efd97e35e5b58b50359b143412f5 Mon Sep 17 00:00:00 2001 From: Jonas Lagoni Date: Mon, 7 Nov 2022 16:44:55 +0100 Subject: [PATCH] feat: add jetstream pull wrappers (#482) --- components/channel/jetstreamPull.js | 60 +++++++++++++++++++ components/index/jetstreamPull.js | 36 +++++++++++ .../StreetlightStreetlightIdCommandTurnon.ts | 33 ++++++++++ .../asyncapi-nats-client/src/index.ts | 27 +++++++++ template/src/channels/$$channel$$.ts.js | 8 ++- template/src/index.ts.js | 9 ++- template/src/testclient/index.ts.js | 9 ++- .../testclient/testchannels/$$channel$$.ts.js | 8 ++- 8 files changed, 186 insertions(+), 4 deletions(-) create mode 100644 components/channel/jetstreamPull.js create mode 100644 components/index/jetstreamPull.js diff --git a/components/channel/jetstreamPull.js b/components/channel/jetstreamPull.js new file mode 100644 index 000000000..4a69c04a3 --- /dev/null +++ b/components/channel/jetstreamPull.js @@ -0,0 +1,60 @@ +import { camelCase, getMessageType, realizeParametersForChannelWrapper, renderJSDocParameters, messageHasNullPayload, realizeChannelName} from '../../utils/index'; +// eslint-disable-next-line no-unused-vars +import { Message, ChannelParameter } from '@asyncapi/parser'; +import { unwrap } from './ChannelParameterUnwrap'; + +/** + * Component which returns a subscribe to function for the client + * + * @param {string} defaultContentType + * @param {string} channelName to publish to + * @param {Message} message which is being received + * @param {Object.} channelParameters parameters to the channel + */ +export function JetstreamPull(channelName, message, channelParameters) { + const messageType = getMessageType(message); + let parameters = []; + parameters = Object.entries(channelParameters).map(([parameterName]) => { + return `${camelCase(parameterName)}Param`; + }); + const hasNullPayload = messageHasNullPayload(message.payload()); + + //Determine the callback process when receiving messages. + //If the message payload is null no hooks are called to process the received data. + let whenReceivingMessage = `onDataCallback(undefined, null ${parameters.length > 0 && `, ${parameters.join(',')}`});`; + if (!hasNullPayload) { + whenReceivingMessage = ` + let receivedData: any = codec.decode(msg.data); + onDataCallback(undefined, ${messageType}.unmarshal(receivedData) ${parameters.length > 0 && `, ${parameters.join(',')}`}, msg); + `; + } + return ` + /** + * Internal functionality to setup jetstrema pull on the \`${channelName}\` channel + * + * @param onDataCallback to call when messages are received + * @param js client to pull with + * @param codec used to convert messages + ${renderJSDocParameters(channelParameters)} + */ + export function jetStreamPull( + onDataCallback: ( + err ? : NatsTypescriptTemplateError, + msg ? : ${messageType} + ${realizeParametersForChannelWrapper(channelParameters, false)}, + jetstreamMsg?: Nats.JsMsg) => void, + js: Nats.JetStreamClient, + codec: Nats.Codec < any > + ${realizeParametersForChannelWrapper(channelParameters)}, + ) { + const stream = ${realizeChannelName(channelParameters, channelName)}; + (async () => { + const msg = await js.pull(stream, 'durableName'); + + ${unwrap(channelName, channelParameters)} + + ${whenReceivingMessage} + })(); + } + `; +} diff --git a/components/index/jetstreamPull.js b/components/index/jetstreamPull.js new file mode 100644 index 000000000..684c489ed --- /dev/null +++ b/components/index/jetstreamPull.js @@ -0,0 +1,36 @@ +import { camelCase, getMessageType, realizeParametersForChannelWrapper, renderJSDocParameters, realizeParametersForChannelWithoutType, pascalCase} from '../../utils/index'; + +export function JetstreamPull(channelName, message, messageDescription, channelParameters) { + return ` + /** + * JetStream pull function. + * + * Pull message from \`${channelName}\` + * + * ${messageDescription} + * + * @param onDataCallback to call when messages are received + ${renderJSDocParameters(channelParameters)} + * @param options to pull message with, bindings from the AsyncAPI document overwrite these if specified + */ + public jetStreamPull${pascalCase(channelName)}( + onDataCallback: ( + err ? : NatsTypescriptTemplateError, + msg?: ${getMessageType(message)} + ${realizeParametersForChannelWrapper(channelParameters, false)}, + jetstreamMsg?: Nats.JsMsg) => void + ${realizeParametersForChannelWrapper(channelParameters)} + ): void { + if (!this.isClosed() && this.nc !== undefined && this.codec !== undefined && this.js !== undefined) { + ${camelCase(channelName)}Channel.jetStreamPull( + onDataCallback, + this.js, + this.codec + ${Object.keys(channelParameters).length ? ` ,${realizeParametersForChannelWithoutType(channelParameters)},` : ''} + ); + } else { + throw NatsTypescriptTemplateError.errorForCode(ErrorCode.NOT_CONNECTED); + } + } + `; +} diff --git a/examples/simple-subscribe/asyncapi-nats-client/src/channels/StreetlightStreetlightIdCommandTurnon.ts b/examples/simple-subscribe/asyncapi-nats-client/src/channels/StreetlightStreetlightIdCommandTurnon.ts index d89727564..cd195a812 100644 --- a/examples/simple-subscribe/asyncapi-nats-client/src/channels/StreetlightStreetlightIdCommandTurnon.ts +++ b/examples/simple-subscribe/asyncapi-nats-client/src/channels/StreetlightStreetlightIdCommandTurnon.ts @@ -53,4 +53,37 @@ export function subscribe( reject(NatsTypescriptTemplateError.errorForCode(ErrorCode.INTERNAL_NATS_TS_ERROR, e)); } }) +} +/** + * Internal functionality to setup jetstrema pull on the `streetlight/{streetlight_id}/command/turnon` channel + * + * @param onDataCallback to call when messages are received + * @param js client to pull with + * @param codec used to convert messages + * @param streetlight_id parameter to use in topic + */ +export function jetStreamPull( + onDataCallback: ( + err ? : NatsTypescriptTemplateError, + msg ? : TurnOn, streetlight_id ? : string, + jetstreamMsg ? : Nats.JsMsg) => void, + js: Nats.JetStreamClient, + codec: Nats.Codec < any > , streetlight_id: string, +) { + const stream = `streetlight.${streetlight_id}.command.turnon`; + (async () => { + const msg = await js.pull(stream, 'durableName'); + const unmodifiedChannel = `streetlight.{streetlight_id}.command.turnon`; + let channel = msg.subject; + const streetlightIdSplit = unmodifiedChannel.split("{streetlight_id}"); + const splits = [ + streetlightIdSplit[0], + streetlightIdSplit[1] + ]; + channel = channel.substring(splits[0].length); + const streetlightIdEnd = channel.indexOf(splits[1]); + const streetlightIdParam = "" + channel.substring(0, streetlightIdEnd); + let receivedData: any = codec.decode(msg.data); + onDataCallback(undefined, TurnOn.unmarshal(receivedData), streetlightIdParam, msg); + })(); } \ No newline at end of file diff --git a/examples/simple-subscribe/asyncapi-nats-client/src/index.ts b/examples/simple-subscribe/asyncapi-nats-client/src/index.ts index 6fa71fa01..e699a1caf 100644 --- a/examples/simple-subscribe/asyncapi-nats-client/src/index.ts +++ b/examples/simple-subscribe/asyncapi-nats-client/src/index.ts @@ -155,4 +155,31 @@ export class NatsAsyncApiClient { } }); } + /** + * JetStream pull function. + * + * Pull message from `streetlight/{streetlight_id}/command/turnon` + * + * Channel for the turn on command which should turn on the streetlight + * + * @param onDataCallback to call when messages are received + * @param streetlight_id parameter to use in topic + * @param options to pull message with, bindings from the AsyncAPI document overwrite these if specified + */ + public jetStreamPullStreetlightStreetlightIdCommandTurnon( + onDataCallback: ( + err ? : NatsTypescriptTemplateError, + msg ? : TurnOn, streetlight_id ? : string, + jetstreamMsg ? : Nats.JsMsg) => void, streetlight_id: string + ): void { + if (!this.isClosed() && this.nc !== undefined && this.codec !== undefined && this.js !== undefined) { + streetlightStreetlightIdCommandTurnonChannel.jetStreamPull( + onDataCallback, + this.js, + this.codec, streetlight_id, + ); + } else { + throw NatsTypescriptTemplateError.errorForCode(ErrorCode.NOT_CONNECTED); + } + } } \ No newline at end of file diff --git a/template/src/channels/$$channel$$.ts.js b/template/src/channels/$$channel$$.ts.js index 34f82d640..d22ac8bd6 100644 --- a/template/src/channels/$$channel$$.ts.js +++ b/template/src/channels/$$channel$$.ts.js @@ -7,6 +7,7 @@ import { General } from '../../../components/channel/general'; import { pascalCase, isRequestReply, isReplier, isRequester, isPubsub, camelCase} from '../../../utils/index'; // eslint-disable-next-line no-unused-vars import { AsyncAPIDocument, Channel } from '@asyncapi/parser'; +import { JetstreamPull } from '../../../components/channel/jetstreamPull'; import { JetstreamPublish } from '../../../components/channel/jetstreamPublish'; /** @@ -70,11 +71,16 @@ function getChannelCode(channel, channelName, params) { channelcode = `${publishCode} \n${jetstreamPublishCode}`; } if (channel.hasPublish()) { - channelcode = Subscribe( + const normalSubscribeCode = Subscribe( channelName, publishMessage, channel.parameters(), publishOperation); + const jetstreamPullCode = JetstreamPull( + channelName, + publishMessage, + channel.parameters()); + channelcode = `${normalSubscribeCode}\n${jetstreamPullCode}`; } } return channelcode; diff --git a/template/src/index.ts.js b/template/src/index.ts.js index 7be90b058..66f10dc9d 100644 --- a/template/src/index.ts.js +++ b/template/src/index.ts.js @@ -7,6 +7,7 @@ import { Request } from '../../components/index/request'; import { isRequestReply, isReplier, isRequester, isPubsub} from '../../utils/index'; // eslint-disable-next-line no-unused-vars import { AsyncAPIDocument } from '@asyncapi/parser'; +import { JetstreamPull } from '../../components/index/jetstreamPull'; import { JetstreamPublish } from '../../components/index/jetstreamPublish'; /** @@ -75,11 +76,17 @@ function getChannelWrappers(asyncapi, params) { return `${normalPublish} \n ${jetStreamPublish}`; } if (channel.hasPublish()) { - return Subscribe( + const normalSubscribeCode = Subscribe( channelName, publishMessage, channelDescription, channelParameters); + const jetstreamPullCode = JetstreamPull( + channelName, + publishMessage, + channelDescription, + channelParameters); + return `${normalSubscribeCode}\n${jetstreamPullCode}`; } } }); diff --git a/template/src/testclient/index.ts.js b/template/src/testclient/index.ts.js index 9121626e2..01eeaa399 100644 --- a/template/src/testclient/index.ts.js +++ b/template/src/testclient/index.ts.js @@ -7,6 +7,7 @@ import { Request } from '../../../components/index/request'; import { isRequestReply, isReplier, isRequester, isPubsub} from '../../../utils/index'; // eslint-disable-next-line no-unused-vars import { AsyncAPIDocument, ChannelParameter } from '@asyncapi/parser'; +import { JetstreamPull } from '../../../components/index/jetstreamPull'; import { JetstreamPublish } from '../../../components/index/jetstreamPublish'; /** @@ -61,11 +62,17 @@ function getChannelWrappers(asyncapi, params) { if (isPubsub(channel)) { if (channel.hasSubscribe()) { - return Subscribe( + const normalSubscribeCode = Subscribe( channelName, subscribeMessage, channelDescription, channelParameters); + const jetstreamPullCode = JetstreamPull( + channelName, + subscribeMessage, + channelDescription, + channelParameters); + return `${normalSubscribeCode}\n${jetstreamPullCode}`; } if (channel.hasPublish()) { const normalPublish = Publish( diff --git a/template/src/testclient/testchannels/$$channel$$.ts.js b/template/src/testclient/testchannels/$$channel$$.ts.js index 6e7c3492b..f3a275264 100644 --- a/template/src/testclient/testchannels/$$channel$$.ts.js +++ b/template/src/testclient/testchannels/$$channel$$.ts.js @@ -7,6 +7,7 @@ import { General } from '../../../../components/channel/general'; import { pascalCase, isRequestReply, isReplier, isRequester, isPubsub, camelCase} from '../../../../utils/index'; // eslint-disable-next-line no-unused-vars import { AsyncAPIDocument, Channel } from '@asyncapi/parser'; +import { JetstreamPull } from '../../../../components/channel/jetstreamPull'; import { JetstreamPublish } from '../../../../components/channel/jetstreamPublish'; /** @@ -58,10 +59,15 @@ function getChannelCode(channel, channelName, params) { if (isPubsub(channel)) { if (channel.hasSubscribe()) { - channelcode = Subscribe( + const normalSubscribeCode = Subscribe( channelName, channel.subscribe() ? channel.subscribe().message(0) : undefined, channel.parameters()); + const jetstreamPullCode = JetstreamPull( + channelName, + channel.subscribe() ? channel.subscribe().message(0) : undefined, + channel.parameters()); + channelcode = `${normalSubscribeCode}\n${jetstreamPullCode}`; } if (channel.hasPublish()) { const publishCode = Publish(