/
slish.js
158 lines (138 loc) · 4.26 KB
/
slish.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
var fs = require('fs');
var Path = require('path');
/**
* Constructor for the log streamer
*
* @param fileToObserve The absolute path to the file which should be observed
*
* @param options An object containing different optional config options
* - pollIntervall: An integer denoting the intercall in milliseconds
* at which the file should be polled
* - streamTailInitially: A flag indicating if the tail should be initially streamed
* - initialBytesToRead: The size of the tail that should be initially streamed.
* Only effective if `streamTailInitially` is set to true.
*/
var Slish = module.exports = function(fileToObserve, options) {
options = options || {};
this.consumers = [];
this.pollInterval = options.pollInterval || 50;
this.fileToObserve = fileToObserve;
this.initialBytesToRead = options.initialBytesToRead || 500;
this.lastSize = 0;
this.streamTailInitially = options.streamTailInitially || true;
this._init();
}
var proto = Slish.prototype;
/**
Starts observing the file.
*/
proto.startStreaming = function() {
var self = this;
this.intervalHandler = setInterval(function() { self._poll() }, self.pollInterval);
}
/**
Stops observing the file
*/
proto.stopStreaming = function() {
clearInterval(this.intervalHandler);
}
proto.addConsumer = function(writer) {
this.consumers.push(writer);
}
/**
* Sets the freqeuncy at which the file is polled to check for
* new data.
*/
proto.setInterval = function(interval) {
this.pollInterval = interval;
}
/**
* Streams data to all registered consumers
*/
proto._stream = function(data) {
this.consumers.forEach(function(writer) {
writer.consume(data);
});
}
/**
* Inits the state of the streamer by reading
* the file to observe.
*
* Streams the tail of the file if specified.
*/
proto._init = function() {
var self = this;
fs.stat(self.fileToObserve, function(error, stats) {
if (error) {
console.error(error.message);
process.exit(1);
}
if (self.initialBytesToRead > stats.size) {
self.initialBytesToRead = stats.size;
self.lastSize = stats.size;
}
if (self.initialBytesToRead == 0) {
return;
}
if (!self.streamTailInitially) {
return;
}
var buffer = new Buffer(self.initialBytesToRead);
fs.open(self.fileToObserve, "r", function(error, fd) {
self._getLastLines(fd, stats.size, self.initialBytesToRead, function(error, lastLines) {
self._stream(lastLines);
});
});
});
}
/**
* Checks the file to observe for new content.
*/
proto._poll = function() {
var self = this;
fs.stat(self.fileToObserve, function(error, stats) {
var currentSize = stats.size;
var delta = currentSize - self.lastSize;
self.lastSize = currentSize;
if (delta > 0) {
var buffer = new Buffer(delta);
fs.open(self.fileToObserve, "r", function(error, fd) {
fs.read(fd, buffer, 0, delta, currentSize - delta - 1, function(error, bRead, buffer) {
var newContent = String(buffer);
self._stream(newContent);
});
});
}
});
}
/**
* Fetches the tail of the file denoted by the passed file descriptor.
* that fits into `tailSize` bytes.
*
* The tail is passed to callback.
*
* @param fd The file descriptor for the file of interest
* @param fileSize The size of the file denoted by `fd`
* @param tailSize The size of the tail that should be fetched
* @param callback Function that gets the tail
* `callback(error, tail)`
*/
proto._getLastLines = function(fd, fileSize, tailSize, callback) {
var buffer = new Buffer(tailSize);
// This reads the last bytes of the file
// `fileSize - tailSize` determines the position at which to start reading
fs.read(fd, buffer, 0, tailSize, fileSize - tailSize, function(error, bRead, buffer) {
if (error) {
return callback(error);
}
var bufferString = String(buffer);
var posToStartRead = -1;
for (i = 0; i < bufferString.length; i++) {
if (bufferString.charAt(i) == "\n") {
posToStartRead = i;
break;
}
}
callback(null, bufferString.slice(posToStartRead+1, bufferString.length));
});
}