Skip to content

Commit

Permalink
feat: Support custom service tags and service selection for adapters (#…
Browse files Browse the repository at this point in the history
…2691)

* feat: Support custom service tags and service selection for adapters

* Remove empty scss file

* Support service selection in runtime resolvable static properties

* Fix checkstyle

* add short comment to new ENV variable

* feat: Add unit tests for CustomServiceTagResolver

* add comment for clarification

* add comment for clarification

* Rename DeploymentConfiguration

---------

Co-authored-by: bossenti <bossenti@posteo.de>
Co-authored-by: Philipp Zehnder <tenthe@users.noreply.github.com>
  • Loading branch information
3 people committed Apr 30, 2024
1 parent 1082ac2 commit 0fdd15d
Show file tree
Hide file tree
Showing 49 changed files with 804 additions and 196 deletions.
Expand Up @@ -97,7 +97,10 @@ public enum Envs {
SP_NATS_HOST("SP_NATS_HOST", "nats"),
SP_NATS_PORT("SP_NATS_PORT", "4222"),

SP_PULSAR_URL("SP_PULSAR_URL", "pulsar://localhost:6650");
SP_PULSAR_URL("SP_PULSAR_URL", "pulsar://localhost:6650"),

// expects a comma separated string of service names
SP_SERVICE_TAGS("SP_SERVICE_TAGS", "");

private final String envVariableName;
private String defaultValue;
Expand Down
Expand Up @@ -274,4 +274,9 @@ public StringEnvironmentVariable getPulsarUrl() {
return new StringEnvironmentVariable(Envs.SP_PULSAR_URL);
}

@Override
public StringEnvironmentVariable getCustomServiceTags() {
return new StringEnvironmentVariable(Envs.SP_SERVICE_TAGS);
}

}
Expand Up @@ -135,4 +135,6 @@ public interface Environment {

StringEnvironmentVariable getPulsarUrl();

StringEnvironmentVariable getCustomServiceTags();

}
Expand Up @@ -23,7 +23,7 @@
import org.apache.streampipes.commons.exceptions.connect.AdapterException;
import org.apache.streampipes.commons.prometheus.adapter.AdapterMetrics;
import org.apache.streampipes.connect.management.util.GroundingUtils;
import org.apache.streampipes.connect.management.util.WorkerPaths;
import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointGenerator;
import org.apache.streampipes.manager.monitoring.pipeline.ExtensionsLogProvider;
import org.apache.streampipes.manager.verification.DataStreamVerifier;
import org.apache.streampipes.model.SpDataStream;
Expand All @@ -32,12 +32,11 @@
import org.apache.streampipes.resource.management.AdapterResourceManager;
import org.apache.streampipes.resource.management.DataStreamResourceManager;
import org.apache.streampipes.storage.api.IAdapterStorage;
import org.apache.streampipes.storage.management.StorageDispatcher;
import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.URISyntaxException;
import java.util.List;
import java.util.NoSuchElementException;

Expand All @@ -54,12 +53,12 @@ public class AdapterMasterManagement {

private final DataStreamResourceManager dataStreamResourceManager;

public AdapterMasterManagement(IAdapterStorage adapterStorage,
public AdapterMasterManagement(IAdapterStorage adapterInstanceStorage,
AdapterResourceManager adapterResourceManager,
DataStreamResourceManager dataStreamResourceManager,
AdapterMetrics adapterMetrics
) {
this.adapterInstanceStorage = adapterStorage;
this.adapterInstanceStorage = adapterInstanceStorage;
this.adapterMetrics = adapterMetrics;
this.adapterResourceManager = adapterResourceManager;
this.dataStreamResourceManager = dataStreamResourceManager;
Expand Down Expand Up @@ -164,7 +163,11 @@ public void startStreamAdapter(String elementId) throws AdapterException {

try {
// Find endpoint to start adapter on
var baseUrl = WorkerPaths.findEndpointUrl(ad.getAppId());
var baseUrl = new ExtensionsServiceEndpointGenerator().getEndpointBaseUrl(
ad.getAppId(),
SpServiceUrlProvider.ADAPTER,
ad.getDeploymentConfiguration().getDesiredServiceTags()
);

// Update selected endpoint URL of adapter
ad.setSelectedEndpointUrl(baseUrl);
Expand All @@ -177,7 +180,7 @@ public void startStreamAdapter(String elementId) throws AdapterException {
adapterMetrics.register(ad.getElementId(), ad.getName());

LOG.info("Started adapter " + elementId + " on: " + baseUrl);
} catch (NoServiceEndpointsAvailableException | URISyntaxException e) {
} catch (NoServiceEndpointsAvailableException e) {
throw new AdapterException("Could not start adapter due to unavailable service endpoint", e);
}
}
Expand All @@ -192,8 +195,4 @@ private void installDataSource(SpDataStream stream,
throw new AdapterException();
}
}

private IAdapterStorage getAdapterInstanceStorage() {
return StorageDispatcher.INSTANCE.getNoSqlStore().getAdapterInstanceStorage();
}
}
Expand Up @@ -24,11 +24,15 @@
import org.apache.streampipes.connect.management.AdapterEventPreviewPipeline;
import org.apache.streampipes.connect.management.util.WorkerPaths;
import org.apache.streampipes.extensions.api.connect.exception.WorkerAdapterException;
import org.apache.streampipes.manager.api.extensions.IExtensionsServiceEndpointGenerator;
import org.apache.streampipes.manager.execution.ExtensionServiceExecutions;
import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointGenerator;
import org.apache.streampipes.model.connect.adapter.AdapterDescription;
import org.apache.streampipes.model.connect.guess.AdapterEventPreview;
import org.apache.streampipes.model.connect.guess.GuessSchema;
import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTag;
import org.apache.streampipes.serializers.json.JacksonSerializer;
import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand All @@ -39,21 +43,25 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Set;

public class GuessManagement {

private static final Logger LOG = LoggerFactory.getLogger(GuessManagement.class);
private final WorkerUrlProvider workerUrlProvider;
private final IExtensionsServiceEndpointGenerator endpointGenerator;
private final ObjectMapper objectMapper;

public GuessManagement() {
this.workerUrlProvider = new WorkerUrlProvider();
this.endpointGenerator = new ExtensionsServiceEndpointGenerator();
this.objectMapper = JacksonSerializer.getObjectMapper();
}

public GuessSchema guessSchema(AdapterDescription adapterDescription)
throws ParseException, WorkerAdapterException, NoServiceEndpointsAvailableException, IOException {
var workerUrl = getWorkerUrl(adapterDescription.getAppId());
var workerUrl = getWorkerUrl(
adapterDescription.getAppId(),
adapterDescription.getDeploymentConfiguration().getDesiredServiceTags()
);
var description = objectMapper.writeValueAsString(adapterDescription);

LOG.info("Guess schema at: " + workerUrl);
Expand All @@ -72,8 +80,9 @@ public GuessSchema guessSchema(AdapterDescription adapterDescription)
}
}

private String getWorkerUrl(String appId) throws NoServiceEndpointsAvailableException {
var baseUrl = workerUrlProvider.getWorkerBaseUrl(appId);
private String getWorkerUrl(String appId,
Set<SpServiceTag> customServiceTags) throws NoServiceEndpointsAvailableException {
var baseUrl = endpointGenerator.getEndpointBaseUrl(appId, SpServiceUrlProvider.ADAPTER, customServiceTags);
return baseUrl + WorkerPaths.getGuessSchemaPath();
}

Expand Down
Expand Up @@ -52,10 +52,10 @@ public class WorkerRestClient {

private static final Logger LOG = LoggerFactory.getLogger(WorkerRestClient.class);

public static void invokeStreamAdapter(String endpointUrl,
public static void invokeStreamAdapter(String baseUrl,
String elementId) throws AdapterException {
var adapterStreamDescription = getAndDecryptAdapter(elementId);
var url = endpointUrl + WorkerPaths.getStreamInvokePath();
var url = baseUrl + WorkerPaths.getStreamInvokePath();

startAdapter(url, adapterStreamDescription);
updateStreamAdapterStatus(adapterStreamDescription.getElementId(), true);
Expand Down Expand Up @@ -127,11 +127,11 @@ private static HttpResponse triggerPost(String url,
return request.execute().returnResponse();
}

public static RuntimeOptionsResponse getConfiguration(String workerEndpoint,
public static RuntimeOptionsResponse getConfiguration(String baseUrl,
String appId,
RuntimeOptionsRequest runtimeOptionsRequest)
throws AdapterException, SpConfigurationException {
String url = workerEndpoint + WorkerPaths.getRuntimeResolvablePath(appId);
String url = baseUrl + WorkerPaths.getRuntimeResolvablePath(appId);

try {
String payload = JacksonSerializer.getObjectMapper().writeValueAsString(runtimeOptionsRequest);
Expand Down

This file was deleted.

Expand Up @@ -17,13 +17,6 @@
*/
package org.apache.streampipes.connect.management.util;

import org.apache.streampipes.commons.exceptions.NoServiceEndpointsAvailableException;
import org.apache.streampipes.manager.execution.endpoint.ExtensionsServiceEndpointGenerator;
import org.apache.streampipes.svcdiscovery.api.model.SpServiceUrlProvider;

import java.net.URI;
import java.net.URISyntaxException;

public class WorkerPaths {

private static final String WorkerMainPath = "/api/v1/worker";
Expand All @@ -36,14 +29,6 @@ public static String getStreamStopPath() {
return WorkerMainPath + "/stream/stop";
}

public static String getSetInvokePath() {
return WorkerMainPath + "/set/invoke";
}

public static String getSetStopPath() {
return WorkerMainPath + "/set/stop";
}

public static String getRunningAdaptersPath() {
return WorkerMainPath + "/running";
}
Expand All @@ -56,12 +41,4 @@ public static String getGuessSchemaPath() {
return WorkerMainPath + "/guess/schema";
}

public static String findEndpointUrl(String appId) throws NoServiceEndpointsAvailableException, URISyntaxException {
SpServiceUrlProvider serviceUrlProvider = SpServiceUrlProvider.ADAPTER;
String endpointUrl = new ExtensionsServiceEndpointGenerator(appId, serviceUrlProvider).getEndpointResourceUrl();
URI uri = new URI(endpointUrl);
return uri.getScheme() + "://" + uri.getAuthority();
}


}
Expand Up @@ -25,6 +25,7 @@
import org.apache.streampipes.model.connect.rules.stream.StreamTransformationRuleDescription;
import org.apache.streampipes.model.connect.rules.value.AddTimestampRuleDescription;
import org.apache.streampipes.model.connect.rules.value.ValueTransformationRuleDescription;
import org.apache.streampipes.model.deployment.ExtensionDeploymentConfiguration;
import org.apache.streampipes.model.grounding.EventGrounding;
import org.apache.streampipes.model.schema.EventSchema;
import org.apache.streampipes.model.shared.annotation.TsModel;
Expand Down Expand Up @@ -55,6 +56,8 @@ public class AdapterDescription extends VersionedNamedStreamPipesEntity {
// Is used to store where the adapter is running to stop it
private String selectedEndpointUrl;

private ExtensionDeploymentConfiguration deploymentConfiguration;

/**
* This is used to identify all the service within the service group the adapter can be invoked in
*/
Expand All @@ -72,6 +75,7 @@ public AdapterDescription() {
this.config = new ArrayList<>();
this.category = new ArrayList<>();
this.dataStream = new SpDataStream();
this.deploymentConfiguration = new ExtensionDeploymentConfiguration();
}

public AdapterDescription(int version) {
Expand All @@ -81,6 +85,7 @@ public AdapterDescription(int version) {
this.config = new ArrayList<>();
this.category = new ArrayList<>();
this.dataStream = new SpDataStream();
this.deploymentConfiguration = new ExtensionDeploymentConfiguration();
this.setVersion(version);
}

Expand All @@ -89,6 +94,7 @@ public AdapterDescription(String elementId, String name, String description) {
this.rules = new ArrayList<>();
this.category = new ArrayList<>();
this.dataStream = new SpDataStream();
this.deploymentConfiguration = new ExtensionDeploymentConfiguration();
}


Expand All @@ -109,6 +115,7 @@ public AdapterDescription(AdapterDescription other) {
this.dataStream = new SpDataStream(other.getDataStream());
}
this.running = other.isRunning();
this.deploymentConfiguration = other.getDeploymentConfiguration();
}

public String getRev() {
Expand Down Expand Up @@ -259,4 +266,12 @@ public boolean isRunning() {
public void setRunning(boolean running) {
this.running = running;
}

public ExtensionDeploymentConfiguration getDeploymentConfiguration() {
return deploymentConfiguration;
}

public void setDeploymentConfiguration(ExtensionDeploymentConfiguration deploymentConfiguration) {
this.deploymentConfiguration = deploymentConfiguration;
}
}
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.streampipes.model.deployment;

import org.apache.streampipes.model.extensions.svcdiscovery.SpServiceTag;

import java.util.HashSet;
import java.util.Set;

public class ExtensionDeploymentConfiguration {

private Set<SpServiceTag> desiredServiceTags;
private String selectedEndpointUrl;

public ExtensionDeploymentConfiguration() {
this.desiredServiceTags = new HashSet<>();
}

public Set<SpServiceTag> getDesiredServiceTags() {
return desiredServiceTags;
}

public void setDesiredServiceTags(Set<SpServiceTag> desiredServiceTags) {
this.desiredServiceTags = desiredServiceTags;
}

public String getSelectedEndpointUrl() {
return selectedEndpointUrl;
}

public void setSelectedEndpointUrl(String selectedEndpointUrl) {
this.selectedEndpointUrl = selectedEndpointUrl;
}
}

0 comments on commit 0fdd15d

Please sign in to comment.