Skip to content

Commit

Permalink
feat: add jetstream pull wrappers (#482)
Browse files Browse the repository at this point in the history
  • Loading branch information
jonaslagoni committed Nov 7, 2022
1 parent b294a8a commit e893e9f
Show file tree
Hide file tree
Showing 8 changed files with 186 additions and 4 deletions.
60 changes: 60 additions & 0 deletions components/channel/jetstreamPull.js
@@ -0,0 +1,60 @@
import { camelCase, getMessageType, realizeParametersForChannelWrapper, renderJSDocParameters, messageHasNullPayload, realizeChannelName} from '../../utils/index';
// eslint-disable-next-line no-unused-vars
import { Message, ChannelParameter } from '@asyncapi/parser';
import { unwrap } from './ChannelParameterUnwrap';

/**
* Component which returns a subscribe to function for the client
*
* @param {string} defaultContentType
* @param {string} channelName to publish to
* @param {Message} message which is being received
* @param {Object.<string, ChannelParameter>} channelParameters parameters to the channel
*/
export function JetstreamPull(channelName, message, channelParameters) {
const messageType = getMessageType(message);
let parameters = [];
parameters = Object.entries(channelParameters).map(([parameterName]) => {
return `${camelCase(parameterName)}Param`;
});
const hasNullPayload = messageHasNullPayload(message.payload());

//Determine the callback process when receiving messages.
//If the message payload is null no hooks are called to process the received data.
let whenReceivingMessage = `onDataCallback(undefined, null ${parameters.length > 0 && `, ${parameters.join(',')}`});`;
if (!hasNullPayload) {
whenReceivingMessage = `
let receivedData: any = codec.decode(msg.data);
onDataCallback(undefined, ${messageType}.unmarshal(receivedData) ${parameters.length > 0 && `, ${parameters.join(',')}`}, msg);
`;
}
return `
/**
* Internal functionality to setup jetstrema pull on the \`${channelName}\` channel
*
* @param onDataCallback to call when messages are received
* @param js client to pull with
* @param codec used to convert messages
${renderJSDocParameters(channelParameters)}
*/
export function jetStreamPull(
onDataCallback: (
err ? : NatsTypescriptTemplateError,
msg ? : ${messageType}
${realizeParametersForChannelWrapper(channelParameters, false)},
jetstreamMsg?: Nats.JsMsg) => void,
js: Nats.JetStreamClient,
codec: Nats.Codec < any >
${realizeParametersForChannelWrapper(channelParameters)},
) {
const stream = ${realizeChannelName(channelParameters, channelName)};
(async () => {
const msg = await js.pull(stream, 'durableName');
${unwrap(channelName, channelParameters)}
${whenReceivingMessage}
})();
}
`;
}
36 changes: 36 additions & 0 deletions components/index/jetstreamPull.js
@@ -0,0 +1,36 @@
import { camelCase, getMessageType, realizeParametersForChannelWrapper, renderJSDocParameters, realizeParametersForChannelWithoutType, pascalCase} from '../../utils/index';

export function JetstreamPull(channelName, message, messageDescription, channelParameters) {
return `
/**
* JetStream pull function.
*
* Pull message from \`${channelName}\`
*
* ${messageDescription}
*
* @param onDataCallback to call when messages are received
${renderJSDocParameters(channelParameters)}
* @param options to pull message with, bindings from the AsyncAPI document overwrite these if specified
*/
public jetStreamPull${pascalCase(channelName)}(
onDataCallback: (
err ? : NatsTypescriptTemplateError,
msg?: ${getMessageType(message)}
${realizeParametersForChannelWrapper(channelParameters, false)},
jetstreamMsg?: Nats.JsMsg) => void
${realizeParametersForChannelWrapper(channelParameters)}
): void {
if (!this.isClosed() && this.nc !== undefined && this.codec !== undefined && this.js !== undefined) {
${camelCase(channelName)}Channel.jetStreamPull(
onDataCallback,
this.js,
this.codec
${Object.keys(channelParameters).length ? ` ,${realizeParametersForChannelWithoutType(channelParameters)},` : ''}
);
} else {
throw NatsTypescriptTemplateError.errorForCode(ErrorCode.NOT_CONNECTED);
}
}
`;
}
Expand Up @@ -53,4 +53,37 @@ export function subscribe(
reject(NatsTypescriptTemplateError.errorForCode(ErrorCode.INTERNAL_NATS_TS_ERROR, e));
}
})
}
/**
* Internal functionality to setup jetstrema pull on the `streetlight/{streetlight_id}/command/turnon` channel
*
* @param onDataCallback to call when messages are received
* @param js client to pull with
* @param codec used to convert messages
* @param streetlight_id parameter to use in topic
*/
export function jetStreamPull(
onDataCallback: (
err ? : NatsTypescriptTemplateError,
msg ? : TurnOn, streetlight_id ? : string,
jetstreamMsg ? : Nats.JsMsg) => void,
js: Nats.JetStreamClient,
codec: Nats.Codec < any > , streetlight_id: string,
) {
const stream = `streetlight.${streetlight_id}.command.turnon`;
(async () => {
const msg = await js.pull(stream, 'durableName');
const unmodifiedChannel = `streetlight.{streetlight_id}.command.turnon`;
let channel = msg.subject;
const streetlightIdSplit = unmodifiedChannel.split("{streetlight_id}");
const splits = [
streetlightIdSplit[0],
streetlightIdSplit[1]
];
channel = channel.substring(splits[0].length);
const streetlightIdEnd = channel.indexOf(splits[1]);
const streetlightIdParam = "" + channel.substring(0, streetlightIdEnd);
let receivedData: any = codec.decode(msg.data);
onDataCallback(undefined, TurnOn.unmarshal(receivedData), streetlightIdParam, msg);
})();
}
27 changes: 27 additions & 0 deletions examples/simple-subscribe/asyncapi-nats-client/src/index.ts
Expand Up @@ -155,4 +155,31 @@ export class NatsAsyncApiClient {
}
});
}
/**
* JetStream pull function.
*
* Pull message from `streetlight/{streetlight_id}/command/turnon`
*
* Channel for the turn on command which should turn on the streetlight
*
* @param onDataCallback to call when messages are received
* @param streetlight_id parameter to use in topic
* @param options to pull message with, bindings from the AsyncAPI document overwrite these if specified
*/
public jetStreamPullStreetlightStreetlightIdCommandTurnon(
onDataCallback: (
err ? : NatsTypescriptTemplateError,
msg ? : TurnOn, streetlight_id ? : string,
jetstreamMsg ? : Nats.JsMsg) => void, streetlight_id: string
): void {
if (!this.isClosed() && this.nc !== undefined && this.codec !== undefined && this.js !== undefined) {
streetlightStreetlightIdCommandTurnonChannel.jetStreamPull(
onDataCallback,
this.js,
this.codec, streetlight_id,
);
} else {
throw NatsTypescriptTemplateError.errorForCode(ErrorCode.NOT_CONNECTED);
}
}
}
8 changes: 7 additions & 1 deletion template/src/channels/$$channel$$.ts.js
Expand Up @@ -7,6 +7,7 @@ import { General } from '../../../components/channel/general';
import { pascalCase, isRequestReply, isReplier, isRequester, isPubsub, camelCase} from '../../../utils/index';
// eslint-disable-next-line no-unused-vars
import { AsyncAPIDocument, Channel } from '@asyncapi/parser';
import { JetstreamPull } from '../../../components/channel/jetstreamPull';
import { JetstreamPublish } from '../../../components/channel/jetstreamPublish';

/**
Expand Down Expand Up @@ -70,11 +71,16 @@ function getChannelCode(channel, channelName, params) {
channelcode = `${publishCode} \n${jetstreamPublishCode}`;
}
if (channel.hasPublish()) {
channelcode = Subscribe(
const normalSubscribeCode = Subscribe(
channelName,
publishMessage,
channel.parameters(),
publishOperation);
const jetstreamPullCode = JetstreamPull(
channelName,
publishMessage,
channel.parameters());
channelcode = `${normalSubscribeCode}\n${jetstreamPullCode}`;
}
}
return channelcode;
Expand Down
9 changes: 8 additions & 1 deletion template/src/index.ts.js
Expand Up @@ -7,6 +7,7 @@ import { Request } from '../../components/index/request';
import { isRequestReply, isReplier, isRequester, isPubsub} from '../../utils/index';
// eslint-disable-next-line no-unused-vars
import { AsyncAPIDocument } from '@asyncapi/parser';
import { JetstreamPull } from '../../components/index/jetstreamPull';
import { JetstreamPublish } from '../../components/index/jetstreamPublish';

/**
Expand Down Expand Up @@ -75,11 +76,17 @@ function getChannelWrappers(asyncapi, params) {
return `${normalPublish} \n ${jetStreamPublish}`;
}
if (channel.hasPublish()) {
return Subscribe(
const normalSubscribeCode = Subscribe(
channelName,
publishMessage,
channelDescription,
channelParameters);
const jetstreamPullCode = JetstreamPull(
channelName,
publishMessage,
channelDescription,
channelParameters);
return `${normalSubscribeCode}\n${jetstreamPullCode}`;
}
}
});
Expand Down
9 changes: 8 additions & 1 deletion template/src/testclient/index.ts.js
Expand Up @@ -7,6 +7,7 @@ import { Request } from '../../../components/index/request';
import { isRequestReply, isReplier, isRequester, isPubsub} from '../../../utils/index';
// eslint-disable-next-line no-unused-vars
import { AsyncAPIDocument, ChannelParameter } from '@asyncapi/parser';
import { JetstreamPull } from '../../../components/index/jetstreamPull';
import { JetstreamPublish } from '../../../components/index/jetstreamPublish';

/**
Expand Down Expand Up @@ -61,11 +62,17 @@ function getChannelWrappers(asyncapi, params) {

if (isPubsub(channel)) {
if (channel.hasSubscribe()) {
return Subscribe(
const normalSubscribeCode = Subscribe(
channelName,
subscribeMessage,
channelDescription,
channelParameters);
const jetstreamPullCode = JetstreamPull(
channelName,
subscribeMessage,
channelDescription,
channelParameters);
return `${normalSubscribeCode}\n${jetstreamPullCode}`;
}
if (channel.hasPublish()) {
const normalPublish = Publish(
Expand Down
8 changes: 7 additions & 1 deletion template/src/testclient/testchannels/$$channel$$.ts.js
Expand Up @@ -7,6 +7,7 @@ import { General } from '../../../../components/channel/general';
import { pascalCase, isRequestReply, isReplier, isRequester, isPubsub, camelCase} from '../../../../utils/index';
// eslint-disable-next-line no-unused-vars
import { AsyncAPIDocument, Channel } from '@asyncapi/parser';
import { JetstreamPull } from '../../../../components/channel/jetstreamPull';
import { JetstreamPublish } from '../../../../components/channel/jetstreamPublish';

/**
Expand Down Expand Up @@ -58,10 +59,15 @@ function getChannelCode(channel, channelName, params) {

if (isPubsub(channel)) {
if (channel.hasSubscribe()) {
channelcode = Subscribe(
const normalSubscribeCode = Subscribe(
channelName,
channel.subscribe() ? channel.subscribe().message(0) : undefined,
channel.parameters());
const jetstreamPullCode = JetstreamPull(
channelName,
channel.subscribe() ? channel.subscribe().message(0) : undefined,
channel.parameters());
channelcode = `${normalSubscribeCode}\n${jetstreamPullCode}`;
}
if (channel.hasPublish()) {
const publishCode = Publish(
Expand Down

0 comments on commit e893e9f

Please sign in to comment.