/
app.js
81 lines (74 loc) · 2.96 KB
/
app.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
var azure = require('azure');
var azureStorage = require('azure-storage');
var request = require('request');
var Repeat = require('repeat');
if(process.env.AZURE_QUEUE_PROVIDER.endsWith("storage")){
//Initializing Azure Storage Connection
var queueSvc = azureStorage.createQueueService();
/*
The getMessagesAndRoute function has to dequeue messages from an Azure Storage Queue and redirect them to the OpenFaaS gateway
*/
function getMessagesAndRoute() {
queueSvc.getMessages(process.env.AZURE_QUEUE_NAME, function(error, result, response){
if(!error){
var messages = result[0];
if(messages!=undefined){
try {
var message = JSON.parse(messages.messageText);
//Gateway on a k8s Cluster is usually reachable on port 8080. Modify the first part of functionUrl if needed
var functionUrl = "http://gateway:8080/function/" + message.functionName;
request.post({
url: functionUrl,
body: message.body
}, function(error,response,body){
if (!error && response.statusCode == 200) {
//Logging for "educational purpose"
console.log(body)
}
});
//After the message has been dequeued, just erase it
queueSvc.deleteMessage(process.env.AZURE_QUEUE_NAME, messages.messageId, messages.popReceipt, function(error, response){
if(!error){
//Should implement here
}
});
} catch (error) {
//Implement catch here
}
}
}
});
}
//Repeat the message dequeue every second
Repeat(getMessagesAndRoute).every(1000,'ms').start();
}else if(process.env.AZURE_QUEUE_PROVIDER.endsWith("servicebus")){
//Initializing Azure Service Bus Connection
var serviceBusService = azure.createServiceBusService();
/*
The getMessagesAndRoute function has to dequeue messages from an Azure Service Bus Queue and redirect them to the OpenFaaS gateway
*/
function getMessagesAndRoute() {
serviceBusService.receiveQueueMessage(process.env.AZURE_QUEUE_NAME, function(error, receivedMessage){
if(!error){
try {
var message = JSON.parse(receivedMessage.body);
//Gateway on a k8s Cluster is usually reachable on port 8080. Modify the first part of functionUrl if needed
var functionUrl = "http://gateway:8080/function/" + message.functionName;
request.post({
url: functionUrl,
body: message.body
}, function(error,response,body){
if (!error && response.statusCode == 200) {
//Logging for "educational purpose"
console.log(body)
}
});
} catch (error) {
//Implement catch here
}
}
});
}
//Repeat the message dequeue every second
Repeat(getMessagesAndRoute).every(1000,'ms').start();
}