Skip to content

Commit

Permalink
#2374 Added test configuration builder and resolved selector prefixes
Browse files Browse the repository at this point in the history
  • Loading branch information
IsaakKrut committed Apr 24, 2024
1 parent 4769f24 commit c25b1be
Show file tree
Hide file tree
Showing 9 changed files with 296 additions and 142 deletions.
Expand Up @@ -20,6 +20,7 @@


import org.apache.streampipes.processors.filters.jvm.processor.numericalfilter.ProcessingElementTestExecutor;
import org.apache.streampipes.processors.filters.jvm.processor.numericalfilter.TestConfiguration;


import org.junit.jupiter.api.BeforeEach;
Expand All @@ -38,7 +39,7 @@ public class TestBooleanFilterProcessor {

BooleanFilterProcessor processor;
private static final String FIELD_NAME = "Test";
private static final String FIELD_NAME_WITH_PREFIX = "s0::" + FIELD_NAME;

@BeforeEach
public void setup(){
processor = new BooleanFilterProcessor();
Expand All @@ -51,19 +52,18 @@ public void test(
List<Boolean> outputEventBooleans
) {

Map<String, Object> userConfiguration =
Map.of(
BooleanFilterProcessor.BOOLEAN_MAPPING, FIELD_NAME_WITH_PREFIX,
BooleanFilterProcessor.VALUE, boolToKeep
);
TestConfiguration configuration = TestConfiguration.builder()
.configWithDefaultPrefix(BooleanFilterProcessor.BOOLEAN_MAPPING, FIELD_NAME)
.config(BooleanFilterProcessor.VALUE, boolToKeep)
.build();

List<Map<String, Object>> events = new ArrayList<>();
eventBooleans.forEach(bool->events.add(Map.of(FIELD_NAME_WITH_PREFIX, bool)));
eventBooleans.forEach(bool->events.add(Map.of(FIELD_NAME, bool)));

List<Map<String, Object>> outputEvents = new ArrayList<>();
outputEventBooleans.forEach(bool->outputEvents.add(Map.of(FIELD_NAME, bool)));

ProcessingElementTestExecutor testExecutor = new ProcessingElementTestExecutor(processor, userConfiguration);
ProcessingElementTestExecutor testExecutor = new ProcessingElementTestExecutor(processor, configuration);

testExecutor.run(events, outputEvents);
}
Expand Down
Expand Up @@ -21,7 +21,9 @@
import org.apache.streampipes.model.graph.DataProcessorInvocation;
import org.apache.streampipes.model.output.CustomOutputStrategy;
import org.apache.streampipes.model.output.OutputStrategy;
import org.apache.streampipes.processors.filters.jvm.processor.numericalfilter.PrefixStrategy;
import org.apache.streampipes.processors.filters.jvm.processor.numericalfilter.ProcessingElementTestExecutor;
import org.apache.streampipes.processors.filters.jvm.processor.numericalfilter.TestConfiguration;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
Expand Down Expand Up @@ -51,15 +53,19 @@ public void setup(){
@ParameterizedTest
@MethodSource("data")
public void test(List<Map<String, Object>> events,
List<Map<String, Object>> outputEvents) {
List<Map<String, Object>> outputEvents,
PrefixStrategy prefixStrategy) {

Consumer<DataProcessorInvocation> invocationConfig = (invocation->{
List<OutputStrategy> outputStrategies = new ArrayList<>();
outputStrategies.add(new CustomOutputStrategy(List.of(S0_PREFIX + SELECTOR_1, S1_PREFIX + SELECTOR_2)));
invocation.setOutputStrategies(outputStrategies);
});

ProcessingElementTestExecutor testExecutor = new ProcessingElementTestExecutor(processor, invocationConfig);
TestConfiguration configuration = TestConfiguration.builder().prefixStrategy(prefixStrategy).build();

ProcessingElementTestExecutor testExecutor =
new ProcessingElementTestExecutor(processor, configuration, invocationConfig);

testExecutor.run(events, outputEvents);
}
Expand All @@ -71,34 +77,39 @@ static Stream<Arguments> data() {
return Stream.of(
Arguments.of(
List.of(
Map.of(S0_PREFIX + SELECTOR_1, object1)),
List.of()),
Map.of(SELECTOR_1, object1)),
List.of(),
PrefixStrategy.SAME_PREFIX),
Arguments.of(
List.of(
Map.of(S0_PREFIX + SELECTOR_1, object1),
Map.of(S0_PREFIX + SELECTOR_2, object2)),
List.of()),
Map.of(SELECTOR_1, object1),
Map.of(SELECTOR_2, object2)),
List.of(),
PrefixStrategy.SAME_PREFIX),
Arguments.of(
List.of(
Map.of(S0_PREFIX + SELECTOR_1, object1),
Map.of(S1_PREFIX + SELECTOR_2, object2)),
Map.of(SELECTOR_1, object1),
Map.of(SELECTOR_2, object2)),
List.of(
Map.of(SELECTOR_1, object1, SELECTOR_2, object2)
)),
),
PrefixStrategy.ALTERNATE),
Arguments.of(
List.of(
Map.of(S0_PREFIX + SELECTOR_1, object1),
Map.of(S1_PREFIX + INVALID_SELECTOR, object2)),
Map.of(SELECTOR_1, object1),
Map.of(INVALID_SELECTOR, object2)),
List.of(
Map.of(SELECTOR_1, object1)
)),
),
PrefixStrategy.ALTERNATE),
Arguments.of(
List.of(
Map.of(S0_PREFIX + INVALID_SELECTOR, object1),
Map.of(S1_PREFIX + INVALID_SELECTOR, object2)),
Map.of(INVALID_SELECTOR, object1),
Map.of(INVALID_SELECTOR, object2)),
List.of(
Map.of()
))
),
PrefixStrategy.ALTERNATE)
);
}
}
Expand Up @@ -19,7 +19,9 @@

import org.apache.streampipes.model.graph.DataProcessorInvocation;
import org.apache.streampipes.model.output.CustomOutputStrategy;
import org.apache.streampipes.processors.filters.jvm.processor.numericalfilter.PrefixStrategy;
import org.apache.streampipes.processors.filters.jvm.processor.numericalfilter.ProcessingElementTestExecutor;
import org.apache.streampipes.processors.filters.jvm.processor.numericalfilter.TestConfiguration;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
Expand All @@ -36,8 +38,8 @@

public class TestMergeByTimeProcessor {

private static final String S0_PREFIX = "s0::";
private static final String S1_PREFIX = "s1::";
private static final String S0_PREFIX = "s0";
private static final String S1_PREFIX = "s1";
private static final Integer timeInterval = 100;

MergeByTimeProcessor processor;
Expand All @@ -49,17 +51,24 @@ public void setup(){
@ParameterizedTest
@MethodSource("data")
public void test(List<Map<String, Object>> events,
List<Map<String, Object>> outputEvents){
List<Map<String, Object>> outputEvents,
PrefixStrategy prefixStrategy,
List<String> customPrefixes){


Map<String, Object> userConfiguration =
Map.of(
MergeByTimeProcessor.TIME_INTERVAL, timeInterval,
MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY,
S0_PREFIX + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY,
MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_2_KEY,
S1_PREFIX + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_2_KEY
);
TestConfiguration.TestConfigurationBuilder configurationBuilder = TestConfiguration.builder()
.config(MergeByTimeProcessor.TIME_INTERVAL, timeInterval)
.configWithPrefix(MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY,
MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY, S0_PREFIX)
.configWithPrefix(MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_2_KEY,
MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_2_KEY, S1_PREFIX);

if (customPrefixes != null){
configurationBuilder.customPrefixStrategy(customPrefixes);
} else {
configurationBuilder.prefixStrategy(prefixStrategy);
}


Consumer<DataProcessorInvocation> invocationConfig = (invocation->{
List<String> outputKeySelectors = invocation.getOutputStrategies()
Expand All @@ -69,54 +78,63 @@ public void test(List<Map<String, Object>> events,
.findFirst()
.map(CustomOutputStrategy::getSelectedPropertyKeys)
.orElse(new ArrayList<>());
outputKeySelectors.add(S0_PREFIX + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY);
outputKeySelectors.add(S1_PREFIX + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_2_KEY);
outputKeySelectors.add(S0_PREFIX + "::" + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY);
outputKeySelectors.add(S1_PREFIX + "::" + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_2_KEY);
});

ProcessingElementTestExecutor testExecutor =
new ProcessingElementTestExecutor(processor, userConfiguration, invocationConfig);
new ProcessingElementTestExecutor(processor, configurationBuilder.build(), invocationConfig);

testExecutor.run(events, outputEvents);
}

static Stream<Arguments> data() {
return Stream.of(
Arguments.of(List.of(
Map.of(S0_PREFIX + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY, "0"),
Map.of(S1_PREFIX + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_2_KEY, "90")
Arguments.of(
List.of(
Map.of(MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY, "0"),
Map.of(MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_2_KEY, "90")
),
List.of(
Map.of(
MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_2_KEY, "90",
MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY, "0"
)
)),
Arguments.of(List.of(
Map.of(S0_PREFIX + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY, "0"),
Map.of(S1_PREFIX + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_2_KEY, "110")
), List.of()),
Arguments.of(List.of(
Map.of(S0_PREFIX + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY, "0"),
Map.of(S1_PREFIX + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_2_KEY, "80"),
Map.of(S0_PREFIX + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY, "110"),
Map.of(S1_PREFIX + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_2_KEY, "500")
)),
PrefixStrategy.ALTERNATE,
null),
Arguments.of(
List.of(
Map.of(MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY, "0"),
Map.of(MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_2_KEY, "110")
),
List.of(),
PrefixStrategy.ALTERNATE,
null),
Arguments.of(
List.of(
Map.of(MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY, "0"),
Map.of(MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_2_KEY, "80"),
Map.of(MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY, "110"),
Map.of(MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_2_KEY, "500")
),
List.of(
Map.of(
MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_2_KEY, "80",
MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY, "0"
)
)),
Arguments.of(List.of(
Map.of(S1_PREFIX + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_2_KEY, "0"),
Map.of(S0_PREFIX + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY, "10"),
Map.of(S0_PREFIX + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY, "110"),
Map.of(S1_PREFIX + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_2_KEY, "115"),
Map.of(S0_PREFIX + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY, "120"),
Map.of(S1_PREFIX + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_2_KEY, "230"),
Map.of(S0_PREFIX + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY, "340"),
Map.of(S0_PREFIX + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY, "500"),
Map.of(S1_PREFIX + MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_2_KEY, "510")
)),
PrefixStrategy.ALTERNATE,
null),
Arguments.of(
List.of(
Map.of(MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_2_KEY, "0"),
Map.of(MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY, "10"),
Map.of(MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY, "110"),
Map.of(MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_2_KEY, "115"),
Map.of(MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY, "120"),
Map.of(MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_2_KEY, "230"),
Map.of(MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY, "340"),
Map.of(MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY, "500"),
Map.of(MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_2_KEY, "510")
),
List.of(
Map.of(
Expand All @@ -131,7 +149,9 @@ static Stream<Arguments> data() {
MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_2_KEY, "510",
MergeByTimeProcessor.TIMESTAMP_MAPPING_STREAM_1_KEY, "500"
)
))
),
null,
List.of(S1_PREFIX, S0_PREFIX, S0_PREFIX, S1_PREFIX, S0_PREFIX, S1_PREFIX, S0_PREFIX, S0_PREFIX, S1_PREFIX))
);
}
}
Expand Up @@ -38,12 +38,11 @@ public void setup() {
@Test
public void testLowerThenOperatorFilterNotApplied() {

Map<String, Object> userConfiguration = Map.of(
// TODO Here we need a better solution for the mapping to deal with the selector prefix
NumericalFilterProcessor.NUMBER_MAPPING, "::" + PROPERTY_NAME,
NumericalFilterProcessor.VALUE, 10.0,
NumericalFilterProcessor.OPERATION, "<"
);
TestConfiguration configuration = TestConfiguration.builder()
.configWithDefaultPrefix(NumericalFilterProcessor.NUMBER_MAPPING, PROPERTY_NAME)
.config(NumericalFilterProcessor.VALUE, 10.0)
.config(NumericalFilterProcessor.OPERATION, "<")
.build();

List<Map<String, Object>> inputEvents = List.of(
Map.of(PROPERTY_NAME, 1.0f)
Expand All @@ -53,28 +52,27 @@ public void testLowerThenOperatorFilterNotApplied() {
Map.of(PROPERTY_NAME, 1.0f)
);

ProcessingElementTestExecutor testExecutor = new ProcessingElementTestExecutor(processor, userConfiguration);
ProcessingElementTestExecutor testExecutor = new ProcessingElementTestExecutor(processor, configuration);

testExecutor.run(inputEvents, outputEvents);
}

@Test
public void testLowerThenOperatorFilterApplied() {

Map<String, Object> userConfiguration = Map.of(
// TODO Here we need a better solution for the mapping to deal with the selector prefix
NumericalFilterProcessor.NUMBER_MAPPING, "::" + PROPERTY_NAME,
NumericalFilterProcessor.VALUE, 10.0,
NumericalFilterProcessor.OPERATION, "<"
);
TestConfiguration configuration = TestConfiguration.builder()
.configWithDefaultPrefix(NumericalFilterProcessor.NUMBER_MAPPING, PROPERTY_NAME)
.config(NumericalFilterProcessor.VALUE, 10.0)
.config(NumericalFilterProcessor.OPERATION, "<")
.build();

List<Map<String, Object>> inputEvents = List.of(
Map.of(PROPERTY_NAME, 11.0f)
);

List<Map<String, Object>> outputEvents = List.of();

ProcessingElementTestExecutor testExecutor = new ProcessingElementTestExecutor(processor, userConfiguration);
ProcessingElementTestExecutor testExecutor = new ProcessingElementTestExecutor(processor, configuration);

testExecutor.run(inputEvents, outputEvents);
}
Expand Down
@@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package org.apache.streampipes.processors.filters.jvm.processor.numericalfilter;

public enum PrefixStrategy {
SAME_PREFIX, ALTERNATE
}

0 comments on commit c25b1be

Please sign in to comment.