-
-
Notifications
You must be signed in to change notification settings - Fork 11
/
jetstreamPushSubscription.js
72 lines (67 loc) · 3 KB
/
jetstreamPushSubscription.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import { realizeChannelName, camelCase, getMessageType, messageHasNullPayload, realizeParametersForChannelWrapper, renderJSDocParameters} from '../../utils/index';
import { unwrap } from './ChannelParameterUnwrap';
// eslint-disable-next-line no-unused-vars
import { Message, ChannelParameter } from '@asyncapi/parser';
/**
* Component which returns a function which subscribes to the given channel
*
* @param {string} defaultContentType
* @param {string} channelName to subscribe to
* @param {Message} message which is being received
* @param {Object.<string, ChannelParameter>} channelParameters parameters to the channel
*/
export function JetstreamPushSubscription(channelName, message, channelParameters) {
const messageType = getMessageType(message);
let parameters = [];
parameters = Object.entries(channelParameters).map(([parameterName]) => {
return `${camelCase(parameterName)}Param`;
});
const hasNullPayload = messageHasNullPayload(message.payload());
//Determine the callback process when receiving messages.
//If the message payload is null no hooks are called to process the received data.
let whenReceivingMessage = `onDataCallback(undefined, null ${parameters.length > 0 && `, ${parameters.join(',')}`});`;
if (!hasNullPayload) {
whenReceivingMessage = `
let receivedData: any = codec.decode(msg.data);
onDataCallback(undefined, ${messageType}.unmarshal(receivedData) ${parameters.length > 0 && `, ${parameters.join(',')}`});
`;
}
return `
/**
* Internal functionality to setup jetstream push subscription on the \`${channelName}\` channel
*
* @param onDataCallback to call when messages are received
* @param nc to subscribe with
* @param codec used to convert messages
${renderJSDocParameters(channelParameters)}
* @param options to subscribe with, bindings from the AsyncAPI document overwrite these if specified
*/
export function jetStreamPushSubscribe(
onDataCallback: (
err ? : NatsTypescriptTemplateError,
msg?: ${messageType}
${realizeParametersForChannelWrapper(channelParameters, false)},
jetstreamMsg?: Nats.JsMsg) => void,
js: Nats.JetStreamClient,
codec: Nats.Codec < any >
${realizeParametersForChannelWrapper(channelParameters)},
options: Nats.ConsumerOptsBuilder | Partial<Nats.ConsumerOpts>
): Promise < Nats.JetStreamSubscription > {
return new Promise(async (resolve, reject) => {
try {
let subscription = js.subscribe(${realizeChannelName(channelParameters, channelName)}, options);
(async () => {
for await (const msg of await subscription) {
${unwrap(channelName, channelParameters)}
${whenReceivingMessage}
}
console.log("subscription closed");
})();
resolve(subscription);
} catch (e: any) {
reject(NatsTypescriptTemplateError.errorForCode(ErrorCode.INTERNAL_NATS_TS_ERROR, e));
}
})
}
`;
}