Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Clear alarm node refactor: async alarm searching, optimize alarm type pattern evaluation, throw error on unsuccessful alarm API response #10561

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -456,15 +456,18 @@ private AlarmApiCallResult withPropagated(AlarmApiCallResult result) {
private void validateAlarmRequest(AlarmModificationRequest request) {
ConstraintValidator.validateFields(request);
if (request.getEndTs() > 0 && request.getStartTs() > request.getEndTs()) {
throw new DataValidationException("Alarm start ts can't be greater then alarm end ts!");
throw new DataValidationException("Alarm start ts can't be greater than alarm end ts!");
}
if (!tenantService.tenantExists(request.getTenantId())) {
throw new DataValidationException("Alarm is referencing to non-existent tenant!");
}
if (request.getStartTs() == 0L) {
request.setStartTs(System.currentTimeMillis());
}
if (request.getEndTs() == 0L) {
if (request.getStartTs() == 0L && request.getEndTs() == 0L) {
long currentTs = System.currentTimeMillis();
request.setStartTs(currentTs);
request.setEndTs(currentTs);
} else if (request.getStartTs() == 0L) {
request.setStartTs(request.getEndTs());
} else if (request.getEndTs() == 0L) {
request.setEndTs(request.getStartTs());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public abstract class TbAbstractAlarmNode<C extends TbAbstractAlarmNodeConfigura

@Override
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
this.config = loadAlarmNodeConfig(configuration);
config = loadAlarmNodeConfig(configuration);
scriptEngine = ctx.createScriptEngine(config.getScriptLang(),
ScriptLanguage.TBEL.equals(config.getScriptLang()) ? config.getAlarmDetailsBuildTbel() : config.getAlarmDetailsBuildJs());
}
Expand All @@ -56,13 +56,13 @@ public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNode
public void onMsg(TbContext ctx, TbMsg msg) {
withCallback(processAlarm(ctx, msg),
alarmResult -> {
if (alarmResult.alarm == null) {
if (alarmResult.getAlarm() == null) {
ctx.tellNext(msg, TbNodeConnectionType.FALSE);
} else if (alarmResult.isCreated) {
} else if (alarmResult.isCreated()) {
tellNext(ctx, msg, alarmResult, TbMsgType.ENTITY_CREATED, "Created");
} else if (alarmResult.isUpdated || alarmResult.isSeverityUpdated) {
} else if (alarmResult.isUpdated() || alarmResult.isSeverityUpdated()) {
tellNext(ctx, msg, alarmResult, TbMsgType.ENTITY_UPDATED, "Updated");
} else if (alarmResult.isCleared) {
} else if (alarmResult.isCleared()) {
tellNext(ctx, msg, alarmResult, TbMsgType.ALARM_CLEAR, "Cleared");
} else {
ctx.tellSuccess(msg);
Expand All @@ -73,32 +73,33 @@ public void onMsg(TbContext ctx, TbMsg msg) {

protected abstract ListenableFuture<TbAlarmResult> processAlarm(TbContext ctx, TbMsg msg);

protected ListenableFuture<JsonNode> buildAlarmDetails(TbMsg msg, JsonNode previousDetails) {
protected ListenableFuture<JsonNode> buildAlarmDetails(TbContext ctx, TbMsg msg, JsonNode previousDetails) {
try {
TbMsg dummyMsg = msg;
if (previousDetails != null) {
TbMsgMetaData metaData = msg.getMetaData().copy();
metaData.putValue(PREV_ALARM_DETAILS, JacksonUtil.toString(previousDetails));
dummyMsg = TbMsg.transformMsgMetadata(msg, metaData);
}
return scriptEngine.executeJsonAsync(dummyMsg);
ctx.logJsEvalRequest();
ListenableFuture<JsonNode> alarmDetailsFuture = scriptEngine.executeJsonAsync(dummyMsg);
withCallback(alarmDetailsFuture, alarmDetails -> ctx.logJsEvalResponse(), t -> ctx.logJsEvalFailure());
return alarmDetailsFuture;
} catch (Exception e) {
return Futures.immediateFailedFuture(e);
}
}

public static TbMsg toAlarmMsg(TbContext ctx, TbAlarmResult alarmResult, TbMsg originalMsg) {
JsonNode jsonNodes = JacksonUtil.valueToTree(alarmResult.alarm);
String data = jsonNodes.toString();
TbMsgMetaData metaData = originalMsg.getMetaData().copy();
if (alarmResult.isCreated) {
if (alarmResult.isCreated()) {
metaData.putValue(DataConstants.IS_NEW_ALARM, Boolean.TRUE.toString());
} else if (alarmResult.isUpdated || alarmResult.isSeverityUpdated) {
} else if (alarmResult.isUpdated() || alarmResult.isSeverityUpdated()) {
metaData.putValue(DataConstants.IS_EXISTING_ALARM, Boolean.TRUE.toString());
} else if (alarmResult.isCleared) {
} else if (alarmResult.isCleared()) {
metaData.putValue(DataConstants.IS_CLEARED_ALARM, Boolean.TRUE.toString());
}
return ctx.transformMsg(originalMsg, TbMsgType.ALARM, originalMsg.getOriginator(), metaData, data);
return ctx.transformMsg(originalMsg, TbMsgType.ALARM, originalMsg.getOriginator(), metaData, JacksonUtil.toString(alarmResult.getAlarm()));
}

@Override
Expand All @@ -109,8 +110,13 @@ public void destroy() {
}

private void tellNext(TbContext ctx, TbMsg msg, TbAlarmResult alarmResult, TbMsgType actionMsgType, String alarmAction) {
ctx.enqueue(ctx.alarmActionMsg(alarmResult.alarm, ctx.getSelfId(), actionMsgType),
ctx.enqueue(ctx.alarmActionMsg(alarmResult.getAlarm(), ctx.getSelfId(), actionMsgType),
() -> ctx.tellNext(toAlarmMsg(ctx, alarmResult, msg), alarmAction),
throwable -> ctx.tellFailure(toAlarmMsg(ctx, alarmResult, msg), throwable));
}

long currentTimeMillis() {
return System.currentTimeMillis();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import lombok.extern.slf4j.Slf4j;
import org.thingsboard.rule.engine.api.RuleNode;
import org.thingsboard.rule.engine.api.TbContext;
Expand Down Expand Up @@ -56,30 +57,30 @@ protected TbClearAlarmNodeConfiguration loadAlarmNodeConfig(TbNodeConfiguration

@Override
protected ListenableFuture<TbAlarmResult> processAlarm(TbContext ctx, TbMsg msg) {
String alarmType = TbNodeUtils.processPattern(this.config.getAlarmType(), msg);
Alarm alarm;
if (msg.getOriginator().getEntityType().equals(EntityType.ALARM)) {
alarm = ctx.getAlarmService().findAlarmById(ctx.getTenantId(), new AlarmId(msg.getOriginator().getId()));
var originator = msg.getOriginator();
ListenableFuture<Alarm> alarmFuture;
if (originator.getEntityType().equals(EntityType.ALARM)) {
alarmFuture = ctx.getAlarmService().findAlarmByIdAsync(ctx.getTenantId(), new AlarmId(originator.getId()));
} else {
alarm = ctx.getAlarmService().findLatestActiveByOriginatorAndType(ctx.getTenantId(), msg.getOriginator(), alarmType);
String alarmType = TbNodeUtils.processPattern(config.getAlarmType(), msg);
alarmFuture = ctx.getDbCallbackExecutor().submit(() -> ctx.getAlarmService().findLatestActiveByOriginatorAndType(ctx.getTenantId(), originator, alarmType));
}
if (alarm != null && !alarm.getStatus().isCleared()) {
return clearAlarm(ctx, msg, alarm);
}
return Futures.immediateFuture(new TbAlarmResult(false, false, false, null));
return Futures.transformAsync(alarmFuture, alarm -> {
if (alarm != null && !alarm.isCleared()) {
return clearAlarm(ctx, msg, alarm);
}
return Futures.immediateFuture(new TbAlarmResult(false, false, false, null));
}, MoreExecutors.directExecutor());
}

private ListenableFuture<TbAlarmResult> clearAlarm(TbContext ctx, TbMsg msg, Alarm alarm) {
ctx.logJsEvalRequest();
ListenableFuture<JsonNode> asyncDetails = buildAlarmDetails(msg, alarm.getDetails());
ListenableFuture<JsonNode> asyncDetails = buildAlarmDetails(ctx, msg, alarm.getDetails());
return Futures.transform(asyncDetails, details -> {
ctx.logJsEvalResponse();
AlarmApiCallResult result = ctx.getAlarmService().clearAlarm(ctx.getTenantId(), alarm.getId(), System.currentTimeMillis(), details);
AlarmApiCallResult result = ctx.getAlarmService().clearAlarm(ctx.getTenantId(), alarm.getId(), currentTimeMillis(), details);
if (result.isSuccessful()) {
return new TbAlarmResult(false, false, result.isCleared(), result.getAlarm());
} else {
return new TbAlarmResult(false, false, false, alarm);
}
throw new RuntimeException("Failed to clear alarm: API returned unsuccessful result. Probably alarm was already deleted.");
}, ctx.getDbCallbackExecutor());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,17 @@
package org.thingsboard.rule.engine.action;

import lombok.Data;
import lombok.EqualsAndHashCode;
import org.thingsboard.rule.engine.api.NodeConfiguration;
import org.thingsboard.server.common.data.script.ScriptLanguage;

@Data
@EqualsAndHashCode(callSuper = true)
public class TbClearAlarmNodeConfiguration extends TbAbstractAlarmNodeConfiguration implements NodeConfiguration<TbClearAlarmNodeConfiguration> {

@Override
public TbClearAlarmNodeConfiguration defaultConfiguration() {
TbClearAlarmNodeConfiguration configuration = new TbClearAlarmNodeConfiguration();
var configuration = new TbClearAlarmNodeConfiguration();
configuration.setScriptLang(ScriptLanguage.TBEL);
configuration.setAlarmDetailsBuildJs(ALARM_DETAILS_BUILD_JS_TEMPLATE);
configuration.setAlarmDetailsBuildTbel(ALARM_DETAILS_BUILD_TBEL_TEMPLATE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,6 @@
import org.thingsboard.server.common.data.plugin.ComponentType;
import org.thingsboard.server.common.msg.TbMsg;

import java.io.IOException;
import java.util.List;

@Slf4j
@RuleNode(
type = ComponentType.ACTION,
Expand All @@ -57,26 +54,23 @@
)
public class TbCreateAlarmNode extends TbAbstractAlarmNode<TbCreateAlarmNodeConfiguration> {

private List<String> relationTypes;
private AlarmSeverity notDynamicAlarmSeverity;

@Override
public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
super.init(ctx, configuration);
if (!this.config.isDynamicSeverity()) {
this.notDynamicAlarmSeverity = EnumUtils.getEnum(AlarmSeverity.class, this.config.getSeverity());
if (this.notDynamicAlarmSeverity == null) {
throw new TbNodeException("Incorrect Alarm Severity value: " + this.config.getSeverity(), true);
if (!config.isDynamicSeverity()) {
notDynamicAlarmSeverity = EnumUtils.getEnum(AlarmSeverity.class, config.getSeverity());
if (notDynamicAlarmSeverity == null) {
throw new TbNodeException("Incorrect Alarm Severity value: " + config.getSeverity(), true);
}
}
}


@Override
protected TbCreateAlarmNodeConfiguration loadAlarmNodeConfig(TbNodeConfiguration configuration) throws TbNodeException {
TbCreateAlarmNodeConfiguration nodeConfiguration = TbNodeUtils.convert(configuration, TbCreateAlarmNodeConfiguration.class);
relationTypes = nodeConfiguration.getRelationTypes();
return nodeConfiguration;
return TbNodeUtils.convert(configuration, TbCreateAlarmNodeConfiguration.class);
}

@Override
Expand All @@ -85,49 +79,41 @@ protected ListenableFuture<TbAlarmResult> processAlarm(TbContext ctx, TbMsg msg)
final Alarm msgAlarm;

if (!config.isUseMessageAlarmData()) {
alarmType = TbNodeUtils.processPattern(this.config.getAlarmType(), msg);
alarmType = TbNodeUtils.processPattern(config.getAlarmType(), msg);
msgAlarm = null;
} else {
try {
msgAlarm = getAlarmFromMessage(ctx, msg);
msgAlarm = getAlarmFromMessage(ctx.getTenantId(), msg);
alarmType = msgAlarm.getType();
} catch (IOException e) {
ctx.tellFailure(msg, e);
return null;
} catch (IllegalArgumentException e) {
return Futures.immediateFailedFuture(e);
}
}

Alarm existingAlarm = ctx.getAlarmService().findLatestActiveByOriginatorAndType(ctx.getTenantId(), msg.getOriginator(), alarmType);
if (existingAlarm == null || existingAlarm.getStatus().isCleared()) {
if (existingAlarm == null || existingAlarm.isCleared()) {
return createNewAlarm(ctx, msg, msgAlarm);
} else {
return updateAlarm(ctx, msg, existingAlarm, msgAlarm);
}
}

private Alarm getAlarmFromMessage(TbContext ctx, TbMsg msg) throws IOException {
Alarm msgAlarm;
msgAlarm = JacksonUtil.fromString(msg.getData(), Alarm.class);
msgAlarm.setTenantId(ctx.getTenantId());
if (msgAlarm.getOriginator() == null) {
msgAlarm.setOriginator(msg.getOriginator());
}
private Alarm getAlarmFromMessage(TenantId tenantId, TbMsg msg) throws IllegalArgumentException {
Alarm msgAlarm = JacksonUtil.fromString(msg.getData(), Alarm.class);
msgAlarm.setTenantId(tenantId);
msgAlarm.setOriginator(msg.getOriginator());
return msgAlarm;
}

private ListenableFuture<TbAlarmResult> createNewAlarm(TbContext ctx, TbMsg msg, Alarm msgAlarm) {
ListenableFuture<JsonNode> asyncDetails;
boolean buildDetails = !config.isUseMessageAlarmData() || config.isOverwriteAlarmDetails();
if (buildDetails) {
ctx.logJsEvalRequest();
asyncDetails = buildAlarmDetails(msg, null);
asyncDetails = buildAlarmDetails(ctx, msg, null);
} else {
asyncDetails = Futures.immediateFuture(null);
}
ListenableFuture<Alarm> asyncAlarm = Futures.transform(asyncDetails, details -> {
if (buildDetails) {
ctx.logJsEvalResponse();
}
Alarm newAlarm;
if (msgAlarm != null) {
newAlarm = msgAlarm;
Expand All @@ -148,15 +134,11 @@ private ListenableFuture<TbAlarmResult> updateAlarm(TbContext ctx, TbMsg msg, Al
ListenableFuture<JsonNode> asyncDetails;
boolean buildDetails = !config.isUseMessageAlarmData() || config.isOverwriteAlarmDetails();
if (buildDetails) {
ctx.logJsEvalRequest();
asyncDetails = buildAlarmDetails(msg, existingAlarm.getDetails());
asyncDetails = buildAlarmDetails(ctx, msg, existingAlarm.getDetails());
} else {
asyncDetails = Futures.immediateFuture(null);
}
ListenableFuture<AlarmApiCallResult> asyncUpdated = Futures.transform(asyncDetails, details -> {
if (buildDetails) {
ctx.logJsEvalResponse();
}
if (msgAlarm != null) {
existingAlarm.setSeverity(msgAlarm.getSeverity());
existingAlarm.setPropagate(msgAlarm.isPropagate());
Expand All @@ -173,7 +155,7 @@ private ListenableFuture<TbAlarmResult> updateAlarm(TbContext ctx, TbMsg msg, Al
existingAlarm.setPropagate(config.isPropagate());
existingAlarm.setPropagateToOwner(config.isPropagateToOwner());
existingAlarm.setPropagateToTenant(config.isPropagateToTenant());
existingAlarm.setPropagateRelationTypes(relationTypes);
existingAlarm.setPropagateRelationTypes(config.getRelationTypes());
existingAlarm.setDetails(details);
}
existingAlarm.setEndTs(currentTimeMillis());
Expand All @@ -189,28 +171,24 @@ private Alarm buildAlarm(TbMsg msg, JsonNode details, TenantId tenantId) {
.originator(msg.getOriginator())
.cleared(false)
.acknowledged(false)
.severity(this.config.isDynamicSeverity() ? processAlarmSeverity(msg) : notDynamicAlarmSeverity)
.severity(config.isDynamicSeverity() ? processAlarmSeverity(msg) : notDynamicAlarmSeverity)
.propagate(config.isPropagate())
.propagateToOwner(config.isPropagateToOwner())
.propagateToTenant(config.isPropagateToTenant())
.type(TbNodeUtils.processPattern(this.config.getAlarmType(), msg))
.propagateRelationTypes(relationTypes)
.type(TbNodeUtils.processPattern(config.getAlarmType(), msg))
.propagateRelationTypes(config.getRelationTypes())
.startTs(ts)
.endTs(ts)
.details(details)
.build();
}

private AlarmSeverity processAlarmSeverity(TbMsg msg) {
AlarmSeverity severity = EnumUtils.getEnum(AlarmSeverity.class, TbNodeUtils.processPattern(this.config.getSeverity(), msg));
AlarmSeverity severity = EnumUtils.getEnum(AlarmSeverity.class, TbNodeUtils.processPattern(config.getSeverity(), msg));
if (severity == null) {
throw new RuntimeException("Used incorrect pattern or Alarm Severity not included in message");
}
return severity;
}

long currentTimeMillis() {
return System.currentTimeMillis();
}

}