Skip to content

Commit

Permalink
Merge branch 'develop' into 'master'
Browse files Browse the repository at this point in the history
Develop

See merge request bitsensor/back-end/elastalert!35
  • Loading branch information
martijnrondeel committed Apr 2, 2019
2 parents 4b6a372 + 671cac6 commit 808030b
Show file tree
Hide file tree
Showing 15 changed files with 186 additions and 15 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Expand Up @@ -74,3 +74,5 @@ lib/
*.pyc
config/config.json
package-lock.json

.vscode
2 changes: 1 addition & 1 deletion Dockerfile
@@ -1,5 +1,5 @@
FROM alpine:latest as py-ea
ARG ELASTALERT_VERSION=v0.1.38
ARG ELASTALERT_VERSION=v0.1.39
ENV ELASTALERT_VERSION=${ELASTALERT_VERSION}
# URL from which to download Elastalert.
ARG ELASTALERT_URL=https://github.com/Yelp/elastalert/archive/$ELASTALERT_VERSION.zip
Expand Down
2 changes: 1 addition & 1 deletion Makefile
@@ -1,4 +1,4 @@
v ?= v0.1.38
v ?= v0.1.39

all: build

Expand Down
11 changes: 10 additions & 1 deletion README.md
Expand Up @@ -14,7 +14,7 @@ The most convenient way to run the ElastAlert server is by using our Docker cont
To run the Docker image you will want to mount the volumes for configuration and rule files to keep them after container updates. In order to do that conveniently, please do: `git clone https://github.com/bitsensor/elastalert.git; cd elastalert`

```bash
docker run -d -p 3030:3030 \
docker run -d -p 3030:3030 -p 3333:3333 \
-v `pwd`/config/elastalert.yaml:/opt/elastalert/config.yaml \
-v `pwd`/config/elastalert-test.yaml:/opt/elastalert/config-test.yaml \
-v `pwd`/config/config.json:/opt/elastalert-server/config/config.json \
Expand Down Expand Up @@ -61,6 +61,7 @@ You can use the following config options:
{
"appName": "elastalert-server", // The name used by the logging framework.
"port": 3030, // The port to bind to
"wsport": 3333, // The port to bind to for websockets
"elastalertPath": "/opt/elastalert", // The path to the root ElastAlert folder. It's the folder that contains the `setup.py` script.
"start": "2014-01-01T00:00:00", // Optional date to start querying from
"end": "2016-01-01T00:00:00", // Optional date to stop querying at
Expand Down Expand Up @@ -211,7 +212,11 @@ This server exposes the following REST API's:
}
}
```

- **WEBSOCKET `/test`**

This allows you to test a rule and receive progress over a websocket. Send a message as JSON object (stringified) with two keys: `rule` (yaml string) and `options` (JSON object). You will receive progress messages over the socket as the test runs.

- **GET `/metadata/:type`**

Returns metadata from elasticsearch related to elasalert's state. `:type` should be one of: elastalert_status, elastalert, elastalert_error, or silence. See [docs about the elastalert metadata index](https://elastalert.readthedocs.io/en/latest/elastalert_status.html).
Expand All @@ -220,6 +225,10 @@ This server exposes the following REST API's:

Returns field mapping from elasticsearch for a given index.

- **GET `/search/:index`**

Performs elasticsearch query on behalf of the API. JSON body to this endpoint will become body of an ES search.

- **[WIP] GET `/config`**

Gets the ElastAlert configuration from `config.yaml` in `elastalertPath` (from the config).
Expand Down
7 changes: 6 additions & 1 deletion config/config-hisoric-data-example.json
@@ -1,6 +1,7 @@
{
"appName": "elastalert-server",
"port": 3030,
"wsport": 3333,
"elastalertPath": "/opt/elastalert",
"start": "2014-01-01T00:00:00",
"end": "2016-01-01T00:00:00",
Expand All @@ -14,5 +15,9 @@
"templatesPath": {
"relative": true,
"path": "/rule_templates"
}
},
"es_host": "elasticsearch",
"es_port": 9200,
"writeback_index": "elastalert_status"

}
7 changes: 6 additions & 1 deletion config/config-local-elastalert-installation.json
@@ -1,6 +1,7 @@
{
"appName": "elastalert-server",
"port": 3030,
"wsport": 3333,
"elastalertPath": "/opt/elastalert",
"verbose": false,
"es_debug": false,
Expand All @@ -12,5 +13,9 @@
"templatesPath": {
"relative": false,
"path": "/opt/elastalert/rule_templates"
}
},
"es_host": "elasticsearch",
"es_port": 9200,
"writeback_index": "elastalert_status"

}
3 changes: 2 additions & 1 deletion config/config.json
@@ -1,6 +1,7 @@
{
"appName": "elastalert-server",
"port": 3030,
"wsport": 3333,
"elastalertPath": "/opt/elastalert",
"verbose": false,
"es_debug": false,
Expand All @@ -13,7 +14,7 @@
"relative": true,
"path": "/rule_templates"
},
"es_host": "localhost",
"es_host": "elasticsearch",
"es_port": 9200,
"writeback_index": "elastalert_status"
}
5 changes: 3 additions & 2 deletions package.json
@@ -1,6 +1,6 @@
{
"name": "@bitsensor/elastalert",
"version": "1.0.0",
"version": "2.0.0",
"description": "A server that runs ElastAlert and exposes REST API's for manipulating rules and alerts.",
"license": "MIT",
"main": "index.js",
Expand Down Expand Up @@ -35,7 +35,8 @@
"raven": "^2.6.1",
"request": "^2.85.0",
"request-promise-native": "^1.0.5",
"tar": "^4.4.1"
"tar": "^4.4.1",
"ws": "^6.0.0"
},
"devDependencies": {
"eslint": "^4.17.0",
Expand Down
27 changes: 27 additions & 0 deletions src/common/websocket.js
@@ -0,0 +1,27 @@
import WebSocket from 'ws';

export var wss = null;

export function listen(port) {
wss = new WebSocket.Server({ port, path: '/test' });

wss.on('connection', ws => {
ws.isAlive = true;
ws.on('pong', () => {
ws.isAlive = true;
});
});

return wss;
}

// Keepalive in case clients lose connection during a long rule test.
// If client doesn't respond in 10s this will close the socket and
// therefore stop the elastalert test from continuing to run detached.
setInterval(() => {
wss.clients.forEach(ws => {
if (ws.isAlive === false) return ws.terminate();
ws.isAlive = false;
ws.ping(() => {});
});
}, 10000);
13 changes: 12 additions & 1 deletion src/controllers/process/index.js
Expand Up @@ -9,6 +9,7 @@ export default class ProcessController {

constructor() {
this._elastalertPath = config.get('elastalertPath');
this._onExitCallbacks = [];
this._status = Status.IDLE;

/**
Expand All @@ -18,6 +19,10 @@ export default class ProcessController {
this._process = null;
}

onExit(onExitCallback) {
this._onExitCallbacks.push(onExitCallback);
}

get status() {
return this._status;
}
Expand All @@ -38,7 +43,7 @@ export default class ProcessController {

// Create ElastAlert index if it doesn't exist yet
logger.info('Creating index');
var indexCreate = spawnSync('python', ['-m', 'elastalert.create_index', '--index', 'elastalert_status', '--old-index', ''], {
var indexCreate = spawnSync('python', ['-m', 'elastalert.create_index', '--index', config.get('writeback_index'), '--old-index', ''], {
cwd: this._elastalertPath
});

Expand Down Expand Up @@ -112,6 +117,12 @@ export default class ProcessController {
this._status = Status.ERROR;
}
this._process = null;

this._onExitCallbacks.map(function(onExitCallback) {
if (onExitCallback !== null) {
onExitCallback();
}
});
});

// Set listener for ElastAlert error
Expand Down
36 changes: 33 additions & 3 deletions src/controllers/test/index.js
Expand Up @@ -19,7 +19,7 @@ export default class TestController {
});
}

testRule(rule, options) {
testRule(rule, options, socket) {
const self = this;
let tempFileName = '~' + randomstring.generate() + '.temp';
let tempFilePath = path.join(self.testFolder, tempFileName);
Expand Down Expand Up @@ -55,16 +55,42 @@ export default class TestController {
break;
}


try {
let testProcess = spawn('python', processOptions, {
cwd: self._elastalertPath
});

// When the websocket closes we kill the test process
// so it doesn't keep running detached
if (socket) {
socket.on('close', () => {
testProcess.kill();

fileSystem.deleteFile(tempFilePath)
.catch(function (error) {
logger.error(`Failed to delete temporary test file ${tempFilePath} with error:`, error);
});
});
}

testProcess.stdout.on('data', function (data) {
if (socket) {
socket.send(JSON.stringify({
event: 'result',
data: data.toString()
}));
}
stdoutLines.push(data.toString());
});

testProcess.stderr.on('data', function (data) {
if (socket) {
socket.send(JSON.stringify({
event: 'progress',
data: data.toString()
}));
}
stderrLines.push(data.toString());
});

Expand All @@ -77,8 +103,10 @@ export default class TestController {
resolve(stdoutLines.join('\n'));
}
} else {
reject(stderrLines.join('\n'));
logger.error(stderrLines.join('\n'));
if (!socket) {
reject(stderrLines.join('\n'));
logger.error(stderrLines.join('\n'));
}
}

fileSystem.deleteFile(tempFilePath)
Expand All @@ -95,6 +123,8 @@ export default class TestController {
logger.error(`Failed to write file ${tempFileName} to ${self.testFolder} with error:`, error);
reject(error);
});
}).catch((error) => {
logger.error('Failed to test rule with error:', error);
});
}

Expand Down
26 changes: 25 additions & 1 deletion src/elastalert_server.js
Expand Up @@ -4,6 +4,7 @@ import Logger from './common/logger';
import config from './common/config';
import path from 'path';
import FileSystem from './common/file_system';
import { listen } from './common/websocket';
import setupRouter from './routes/route_setup';
import ProcessController from './controllers/process';
import RulesController from './controllers/rules';
Expand Down Expand Up @@ -69,6 +70,10 @@ export default class ElastalertServer {
self._fileSystemController = new FileSystem();
self._processController = new ProcessController();
self._processController.start();
self._processController.onExit(function() {
// If the elastalert process exits, we should stop the server.
process.exit(0);
});

self._rulesController = new RulesController();
self._templatesController = new TemplatesController();
Expand All @@ -77,8 +82,27 @@ export default class ElastalertServer {
self._fileSystemController.createDirectoryIfNotExists(self.getDataFolder()).catch(function (error) {
logger.error('Error creating data folder with error:', error);
});

logger.info('Server listening on port ' + config.get('port'));

let wss = listen(config.get('wsport'));

wss.on('connection', ws => {
ws.on('message', (data) => {
try {
data = JSON.parse(data);
if (data.rule) {
let rule = data.rule;
let options = data.options;
self._testController.testRule(rule, options, ws);
}
} catch (error) {
console.log(error);
}
});
});

logger.info('Websocket listening on port 3333');
} catch (error) {
logger.error('Starting server failed with error:', error);
process.exit(1);
Expand Down
33 changes: 32 additions & 1 deletion src/handlers/metadata/get.js
Expand Up @@ -2,12 +2,43 @@ import config from '../../common/config';
import { getClient } from '../../common/elasticsearch_client';


function escapeLuceneSyntax(str) {
return [].map
.call(str, char => {
if (
char === '/' ||
char === '+' ||
char === '-' ||
char === '&' ||
char === '|' ||
char === '!' ||
char === '(' ||
char === ')' ||
char === '{' ||
char === '}' ||
char === '[' ||
char === ']' ||
char === '^' ||
char === '"' ||
char === '~' ||
char === '*' ||
char === '?' ||
char === ':' ||
char === '\\'
) {
return `\\${char}`;
}
return char;
})
.join('');
}

function getQueryString(request) {
if (request.params.type === 'elastalert_error') {
return '*:*';
}
else {
return `rule_name:${request.query.rule_name || '*'}`;
return `rule_name:"${escapeLuceneSyntax(request.query.rule_name) || '*'}"`;
}
}

Expand Down
18 changes: 18 additions & 0 deletions src/handlers/search/get.js
@@ -0,0 +1,18 @@
import { getClient } from '../../common/elasticsearch_client';

export default function searchHandler(request, response) {
/**
* @type {ElastalertServer}
*/
var client = getClient();

client.search({
index: request.params.index,
body: request.body
}).then(function(resp) {
response.send(resp);
}, function(error) {
response.send({ error });
});

}

0 comments on commit 808030b

Please sign in to comment.