Skip to content

Commit

Permalink
feat: add jetstream push wrappers (#483)
Browse files Browse the repository at this point in the history
  • Loading branch information
jonaslagoni committed Nov 7, 2022
1 parent 7d8c4cc commit ac89aa6
Show file tree
Hide file tree
Showing 20 changed files with 314 additions and 4,465 deletions.
51 changes: 51 additions & 0 deletions components/channel/jetstreamPublish.js
@@ -0,0 +1,51 @@
import { realizeChannelName, getMessageType, realizeParametersForChannelWrapper, messageHasNullPayload, renderJSDocParameters } from '../../utils/index';
// eslint-disable-next-line no-unused-vars
import { Message, ChannelParameter } from '@asyncapi/parser';

/**
* Component which returns a function which publishes to the given channel
*
* @param {string} channelName to publish to
* @param {Message} message which is being published
* @param {Object.<string, ChannelParameter>} channelParameters parameters to the channel
*/
export function JetstreamPublish(channelName, message, channelParameters) {
const messageType = getMessageType(message);
const hasNullPayload = messageHasNullPayload(message.payload());
//Determine the publish operation based on whether the message type is null
let publishOperation = `await js.publish(${realizeChannelName(channelParameters, channelName)}, Nats.Empty);`;
if (!hasNullPayload) {
publishOperation = `
let dataToSend : any = message.marshal();
dataToSend = codec.encode(dataToSend);
js.publish(${realizeChannelName(channelParameters, channelName)}, dataToSend, options);`;
}
return `
/**
* Internal functionality to publish message to jetstream channel
* ${channelName}
*
* @param message to publish
* @param js to publish with
* @param codec used to convert messages
${renderJSDocParameters(channelParameters)}
* @param options to publish with
*/
export function jetStreamPublish(
message: ${messageType},
js: Nats.JetStreamClient,
codec: Nats.Codec<any>
${realizeParametersForChannelWrapper(channelParameters)},
options?: Nats.PublishOptions
): Promise<void> {
return new Promise<void>(async (resolve, reject) => {
try{
${publishOperation}
resolve();
}catch(e: any){
reject(NatsTypescriptTemplateError.errorForCode(ErrorCode.INTERNAL_NATS_TS_ERROR, e));
}
});
};
`;
}
42 changes: 42 additions & 0 deletions components/index/jetstreamPublish.js
@@ -0,0 +1,42 @@
import { pascalCase, camelCase, getMessageType, realizeParametersForChannelWrapper, realizeParametersForChannelWithoutType, renderJSDocParameters } from '../../utils/index';
// eslint-disable-next-line no-unused-vars
import { Message, ChannelParameter } from '@asyncapi/parser';

/**
* Component which returns a publish to function for the client
*
* @param {string} defaultContentType
* @param {string} channelName to publish to
* @param {Message} message which is being published
* @param {string} messageDescription
* @param {Object.<string, ChannelParameter>} channelParameters parameters to the channel
*/
export function JetstreamPublish(channelName, message, messageDescription, channelParameters) {
return `
/**
* Publish to the \`${channelName}\` jetstream channel
*
* ${messageDescription}
*
* @param message to publish
${renderJSDocParameters(channelParameters)}
*/
public jetStreamPublishTo${pascalCase(channelName)}(
message: ${getMessageType(message)}
${realizeParametersForChannelWrapper(channelParameters)},
options?: Nats.PublishOptions
): Promise<void> {
if (!this.isClosed() && this.nc !== undefined && this.codec !== undefined && this.js !== undefined) {
return ${camelCase(channelName)}Channel.jetStreamPublish(
message,
this.js,
this.codec
${Object.keys(channelParameters).length ? `,${realizeParametersForChannelWithoutType(channelParameters)}` : ''},
options
);
}else{
return Promise.reject(NatsTypescriptTemplateError.errorForCode(ErrorCode.NOT_CONNECTED));
}
}
`;
}
6 changes: 4 additions & 2 deletions components/index/standard.js
Expand Up @@ -51,6 +51,7 @@ function getConnectFunction(asyncapi) {
try {
this.nc = await Nats.connect(this.options);
this.js = this.nc.jetstream();
resolve();
} catch(e: any) {
reject(NatsTypescriptTemplateError.errorForCode(ErrorCode.INTERNAL_NATS_TS_ERROR, e));
Expand Down Expand Up @@ -96,9 +97,10 @@ async connectTo${pascalCase(serverName)}(codec?: Nats.Codec<any>){ await this.co
*/
export function getStandardClassCode(asyncapi) {
return `
private nc?: Nats.NatsConnection;
private nc ?: Nats.NatsConnection;
private js ? : Nats.JetStreamClient;
private codec ?: Nats.Codec<any>;
private options?: Nats.ConnectionOptions;
private options ?: Nats.ConnectionOptions;
${getConnectFunction(asyncapi)}
${getDisconnectFunction()}
Expand Down
40 changes: 26 additions & 14 deletions examples/simple-publish/asyncapi-nats-client/API.md
Expand Up @@ -4,9 +4,6 @@
<dt><a href="#module_streetlightStreetlightIdCommandTurnon">streetlightStreetlightIdCommandTurnon</a></dt>
<dd><p>Module which wraps functionality for the <code>streetlight/{streetlight_id}/command/turnon</code> channel</p>
</dd>
<dt><a href="#module_streetlightStreetlightIdEventTurnon">streetlightStreetlightIdEventTurnon</a></dt>
<dd><p>Module which wraps functionality for the <code>streetlight/{streetlight_id}/event/turnon</code> channel</p>
</dd>
</dl>

## Classes
Expand All @@ -27,6 +24,11 @@
## streetlightStreetlightIdCommandTurnon
Module which wraps functionality for the `streetlight/{streetlight_id}/command/turnon` channel


* [streetlightStreetlightIdCommandTurnon](#module_streetlightStreetlightIdCommandTurnon)
* [~publish(message, nc, codec, streetlight_id, options)](#module_streetlightStreetlightIdCommandTurnon..publish)
* [~jetStreamPublish(message, js, codec, streetlight_id, options)](#module_streetlightStreetlightIdCommandTurnon..jetStreamPublish)

<a name="module_streetlightStreetlightIdCommandTurnon..publish"></a>

### streetlightStreetlightIdCommandTurnon~publish(message, nc, codec, streetlight_id, options)
Expand All @@ -43,23 +45,18 @@ streetlight/{streetlight_id}/command/turnon
| streetlight_id | parameter to use in topic |
| options | to publish with |

<a name="module_streetlightStreetlightIdEventTurnon"></a>

## streetlightStreetlightIdEventTurnon
Module which wraps functionality for the `streetlight/{streetlight_id}/event/turnon` channel

<a name="module_streetlightStreetlightIdEventTurnon..publish"></a>
<a name="module_streetlightStreetlightIdCommandTurnon..jetStreamPublish"></a>

### streetlightStreetlightIdEventTurnon~publish(message, nc, codec, streetlight_id, options)
Internal functionality to publish message to channel
streetlight/{streetlight_id}/event/turnon
### streetlightStreetlightIdCommandTurnon~jetStreamPublish(message, js, codec, streetlight_id, options)
Internal functionality to publish message to jetstream channel
streetlight/{streetlight_id}/command/turnon

**Kind**: inner method of [<code>streetlightStreetlightIdEventTurnon</code>](#module_streetlightStreetlightIdEventTurnon)
**Kind**: inner method of [<code>streetlightStreetlightIdCommandTurnon</code>](#module_streetlightStreetlightIdCommandTurnon)

| Param | Description |
| --- | --- |
| message | to publish |
| nc | to publish with |
| js | to publish with |
| codec | used to convert messages |
| streetlight_id | parameter to use in topic |
| options | to publish with |
Expand All @@ -82,6 +79,7 @@ The generated client based on your AsyncAPI document.
* [.connectToHost(host, options)](#NatsAsyncApiClient+connectToHost)
* [.connectToLocal()](#NatsAsyncApiClient+connectToLocal)
* [.publishToStreetlightStreetlightIdCommandTurnon(message, streetlight_id)](#NatsAsyncApiClient+publishToStreetlightStreetlightIdCommandTurnon)
* [.jetStreamPublishToStreetlightStreetlightIdCommandTurnon(message, streetlight_id)](#NatsAsyncApiClient+jetStreamPublishToStreetlightStreetlightIdCommandTurnon)

<a name="NatsAsyncApiClient+connect"></a>

Expand Down Expand Up @@ -164,6 +162,20 @@ Channel for the turn on command which should turn on the streetlight
| message | to publish |
| streetlight_id | parameter to use in topic |

<a name="NatsAsyncApiClient+jetStreamPublishToStreetlightStreetlightIdCommandTurnon"></a>

### natsAsyncApiClient.jetStreamPublishToStreetlightStreetlightIdCommandTurnon(message, streetlight_id)
Publish to the `streetlight/{streetlight_id}/command/turnon` jetstream channel

Channel for the turn on command which should turn on the streetlight

**Kind**: instance method of [<code>NatsAsyncApiClient</code>](#NatsAsyncApiClient)

| Param | Description |
| --- | --- |
| message | to publish |
| streetlight_id | parameter to use in topic |

<a name="NatsAsyncApiTestClient"></a>

## NatsAsyncApiTestClient
Expand Down

0 comments on commit ac89aa6

Please sign in to comment.