Skip to content

Commit

Permalink
refactor: replace StorageManager by StorageDispatcher (#2766)
Browse files Browse the repository at this point in the history
* refactor: replace `StorageManager` by `StorageDispatcher`

* style: fix too long line

* fix: adapt indentation
  • Loading branch information
bossenti committed Apr 26, 2024
1 parent f6dfc87 commit 6a1da8d
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 66 deletions.
Expand Up @@ -36,7 +36,6 @@
import org.apache.streampipes.storage.api.INoSqlStorage;
import org.apache.streampipes.storage.api.IPipelineElementDescriptionStorage;
import org.apache.streampipes.storage.management.StorageDispatcher;
import org.apache.streampipes.storage.management.StorageManager;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -185,7 +184,7 @@ private List<ConsumableStreamPipesEntity> getAll() {
}

private IPipelineElementDescriptionStorage getTripleStore() {
return StorageManager.INSTANCE.getPipelineElementStorage();
return StorageDispatcher.INSTANCE.getNoSqlStore().getPipelineElementDescriptionStorage();
}

private INoSqlStorage getNoSqlStorage() {
Expand Down
Expand Up @@ -27,7 +27,7 @@
import org.apache.streampipes.model.pipeline.PipelineModification;
import org.apache.streampipes.model.template.BoundPipelineElement;
import org.apache.streampipes.model.template.PipelineTemplateDescription;
import org.apache.streampipes.storage.management.StorageManager;
import org.apache.streampipes.storage.management.StorageDispatcher;

import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -126,10 +126,9 @@ private InvocableStreamPipesEntity clonePe(InvocableStreamPipesEntity pipelineEl
}

private SpDataStream getStream(String datasetId) {
return StorageManager
.INSTANCE
.getPipelineElementStorage()
.getEventStreamById(datasetId);
return StorageDispatcher.INSTANCE.getNoSqlStore()
.getPipelineElementDescriptionStorage()
.getEventStreamById(datasetId);
}


Expand Down
Expand Up @@ -29,7 +29,7 @@
import org.apache.streampipes.model.graph.DataSinkInvocation;
import org.apache.streampipes.model.template.PipelineTemplateDescription;
import org.apache.streampipes.storage.api.IPipelineElementDescriptionStorage;
import org.apache.streampipes.storage.management.StorageManager;
import org.apache.streampipes.storage.management.StorageDispatcher;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -131,8 +131,7 @@ protected DataSinkDescription getSink(String id) throws ElementNotFoundException
}

protected IPipelineElementDescriptionStorage getStorage() {
return StorageManager
.INSTANCE
.getPipelineElementStorage();
return StorageDispatcher.INSTANCE.getNoSqlStore()
.getPipelineElementDescriptionStorage();
}
}
Expand Up @@ -36,30 +36,29 @@
import org.apache.streampipes.resource.management.SpResourceManager;
import org.apache.streampipes.serializers.json.JacksonSerializer;
import org.apache.streampipes.storage.api.IPipelineElementDescriptionStorage;
import org.apache.streampipes.storage.management.StorageManager;
import org.apache.streampipes.storage.management.StorageDispatcher;

import com.fasterxml.jackson.core.JsonProcessingException;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.logging.Logger;

public abstract class ElementVerifier<T extends NamedStreamPipesEntity> {

protected static final Logger LOGGER = Logger.getAnonymousLogger();

private String graphData;
private Class<T> elementClass;
private boolean shouldTransform;
private final boolean shouldTransform;

protected T elementDescription;

protected List<VerificationResult> validationResults;
protected List<Verifier> validators;

protected IPipelineElementDescriptionStorage storageApi =
StorageManager.INSTANCE.getPipelineElementStorage();
protected IPipelineElementDescriptionStorage storageApi = StorageDispatcher
.INSTANCE
.getNoSqlStore()
.getPipelineElementDescriptionStorage();

public ElementVerifier(String graphData, Class<T> elementClass) {
this.elementClass = elementClass;
Expand Down Expand Up @@ -119,7 +118,7 @@ public Message verifyAndAdd(String principalSid, boolean publicElement) throws S

}

public Message verifyAndUpdate() throws SepaParseException {
public Message verifyAndUpdate() {
try {
this.elementDescription = transform();
} catch (JsonProcessingException e) {
Expand Down Expand Up @@ -178,15 +177,19 @@ private List<Notification> collectNotifications() {
}

private boolean isVerifiedSuccessfully() {
return validationResults.stream().noneMatch(validator -> (validator instanceof VerificationError));
return validationResults.stream()
.noneMatch(validator -> (validator instanceof VerificationError));
}

protected T transform() throws JsonProcessingException {
return JacksonSerializer.getObjectMapper().readValue(graphData, elementClass);
return JacksonSerializer.getObjectMapper()
.readValue(graphData, elementClass);
}

private void createAndStorePermission(String principalSid,
boolean publicElement) {
private void createAndStorePermission(
String principalSid,
boolean publicElement
) {
Permission permission = makePermission(
this.elementDescription.getElementId(),
this.elementDescription.getClass(),
Expand All @@ -196,17 +199,20 @@ private void createAndStorePermission(String principalSid,
storeNewObjectPermission(permission);
}

protected Permission makePermission(String objectInstanceId,
Class<?> objectInstanceClass,
String principalSid,
boolean publicElement) {
protected Permission makePermission(
String objectInstanceId,
Class<?> objectInstanceClass,
String principalSid,
boolean publicElement
) {
return PermissionBuilder
.create(objectInstanceId, objectInstanceClass, principalSid)
.publicElement(publicElement)
.build();
.create(objectInstanceId, objectInstanceClass, principalSid)
.publicElement(publicElement)
.build();
}

protected void storeNewObjectPermission(Permission permission) {
new SpResourceManager().managePermissions().create(permission);
new SpResourceManager().managePermissions()
.create(permission);
}
}
Expand Up @@ -24,7 +24,7 @@
import org.apache.streampipes.model.SpDataStream;
import org.apache.streampipes.model.client.Category;
import org.apache.streampipes.rest.core.base.impl.AbstractAuthGuardedRestResource;
import org.apache.streampipes.storage.management.StorageManager;
import org.apache.streampipes.storage.management.StorageDispatcher;

import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
Expand All @@ -41,7 +41,9 @@ public class PipelineElementCategory extends AbstractAuthGuardedRestResource {

@GetMapping(path = "/ep", produces = MediaType.APPLICATION_JSON_VALUE)
public ResponseEntity<List<Category>> getEps() {
return ok(makeCategories(StorageManager.INSTANCE.getPipelineElementStorage().getAllDataStreams()));
return ok(makeCategories(StorageDispatcher.INSTANCE.getNoSqlStore()
.getPipelineElementDescriptionStorage()
.getAllDataStreams()));
}

@GetMapping(path = "/epa", produces = MediaType.APPLICATION_JSON_VALUE)
Expand Down

This file was deleted.

0 comments on commit 6a1da8d

Please sign in to comment.