Skip to content

Commit

Permalink
test(#2374): provide a framework to easily unit test processing eleme…
Browse files Browse the repository at this point in the history
…nts (#2730)

* #2374 refactored unit tests to use ProcessingElementTestExecutor

* #2374 cleanup

* #2374 cleanup

* #2374 updated test executor to non-static methods

* #2374 updated test executor to non-static methods
  • Loading branch information
IsaakKrut committed Apr 16, 2024
1 parent 8fa927d commit 64d2b94
Show file tree
Hide file tree
Showing 7 changed files with 559 additions and 586 deletions.
Expand Up @@ -18,106 +18,82 @@

package org.apache.streampipes.processors.filters.jvm.processor.booleanfilter;

import org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
import org.apache.streampipes.model.runtime.Event;
import org.apache.streampipes.model.runtime.EventFactory;
import org.apache.streampipes.model.runtime.SchemaInfo;
import org.apache.streampipes.model.runtime.SourceInfo;
import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
import org.apache.streampipes.test.extensions.api.StoreEventCollector;
import org.apache.streampipes.wrapper.params.compat.ProcessorParams;

import org.apache.streampipes.processors.filters.jvm.processor.numericalfilter.ProcessingElementTestExecutor;


import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class TestBooleanFilterProcessor {
private static final String STREAM_PREFIX = "s0::";

private static final Logger LOG = LoggerFactory.getLogger(TestBooleanFilterProcessor.class);

static Stream<Arguments> data() {
return Stream.of(
Arguments.of("True", Arrays.asList(true, true, false, true, false, true, false, true), 5),
Arguments.of("True", Arrays.asList(true, true, true), 3),
Arguments.of("True", Arrays.asList(false, false, false), 0),
Arguments.of("True", Collections.emptyList(), 0),
Arguments.of("False", Arrays.asList(true, false, true, false, true, false, true, false, true), 4),
Arguments.of("False", Arrays.asList(true, true, true), 0),
Arguments.of("False", Arrays.asList(false, false, false), 3),
Arguments.of("False", Collections.emptyList(), 0)
);
BooleanFilterProcessor processor;
private static final String FIELD_NAME = "Test";
private static final String FIELD_NAME_WITH_PREFIX = "s0::" + FIELD_NAME;
@BeforeEach
public void setup(){
processor = new BooleanFilterProcessor();
}

@ParameterizedTest
@MethodSource("data")
public void testBoolenFilter(
public void test(
String boolToKeep,
List<Boolean> eventBooleans,
int expectedFilteredBooleanCount
List<Boolean> outputEventBooleans
) {

var fieldName = "Test";
var processorParams = mock(ProcessorParams.class);
var eventProcessorRuntimeContext = mock(EventProcessorRuntimeContext.class);
Map<String, Object> userConfiguration =
Map.of(
BooleanFilterProcessor.BOOLEAN_MAPPING, FIELD_NAME_WITH_PREFIX,
BooleanFilterProcessor.VALUE, boolToKeep
);

var processor = new BooleanFilterProcessor();
var extractor = mock(ProcessingElementParameterExtractor.class);
when(processorParams.extractor()).thenReturn(extractor);
when(extractor.mappingPropertyValue(BooleanFilterProcessor.BOOLEAN_MAPPING)).thenReturn(STREAM_PREFIX + fieldName);
when(extractor.selectedSingleValue(BooleanFilterProcessor.VALUE, String.class)).thenReturn(boolToKeep);
List<Map<String, Object>> events = new ArrayList<>();
eventBooleans.forEach(bool->events.add(Map.of(FIELD_NAME_WITH_PREFIX, bool)));

var collector = new StoreEventCollector();
processor.onInvocation(processorParams, collector, eventProcessorRuntimeContext);
List<Map<String, Object>> outputEvents = new ArrayList<>();
outputEventBooleans.forEach(bool->outputEvents.add(Map.of(FIELD_NAME, bool)));

sendEvents(processor, collector, eventBooleans, fieldName);
ProcessingElementTestExecutor testExecutor = new ProcessingElementTestExecutor(processor, userConfiguration);

assertEquals(expectedFilteredBooleanCount, collector.getEvents().size());
testExecutor.run(events, outputEvents);
}

private void sendEvents(
BooleanFilterProcessor processor,
StoreEventCollector collector,
List<Boolean> eventBooleans,
String fieldName) {
List<Event> events = makeEvents(eventBooleans, fieldName);
for (Event event : events) {
LOG.info("Sending event with value "
+ event.getFieldBySelector(STREAM_PREFIX + fieldName)
.getAsPrimitive()
.getAsBoolean());
processor.onEvent(event, collector);

}
}

private List<Event> makeEvents(List<Boolean> eventBooleans, String fieldName) {
List<Event> events = new ArrayList<>();
for (Boolean eventSetting : eventBooleans) {
events.add(makeEvent(eventSetting, fieldName));
}
return events;
}

private Event makeEvent(Boolean value, String fieldName) {
Map<String, Object> map = new HashMap<>();
map.put(fieldName, value);
return EventFactory.fromMap(map, new SourceInfo("test" + "-topic", "s0"),
new SchemaInfo(null, new ArrayList<>())
static Stream<Arguments> data() {
return Stream.of(
Arguments.of("True",
Arrays.asList(true, true, false, true, false, true, false, true),
Arrays.asList(true, true, true, true, true)),
Arguments.of("True",
Arrays.asList(true, true, true),
Arrays.asList(true, true, true)),
Arguments.of("True",
Arrays.asList(false, false, false),
Collections.emptyList()),
Arguments.of("True",
Collections.emptyList(),
Collections.emptyList()),
Arguments.of("False",
Arrays.asList(true, false, true, false, true, false, true, false, true),
Arrays.asList(false, false, false, false)),
Arguments.of("False",
Arrays.asList(true, true, true),
Collections.emptyList()),
Arguments.of("False",
Arrays.asList(false, false, false),
Arrays.asList(false, false, false)),
Arguments.of("False",
Collections.emptyList(),
Collections.emptyList())
);
}
}
Expand Up @@ -18,124 +18,87 @@

package org.apache.streampipes.processors.filters.jvm.processor.compose;

//@RunWith(Parameterized.class)
import org.apache.streampipes.model.graph.DataProcessorInvocation;
import org.apache.streampipes.model.output.CustomOutputStrategy;
import org.apache.streampipes.model.output.OutputStrategy;
import org.apache.streampipes.processors.filters.jvm.processor.numericalfilter.ProcessingElementTestExecutor;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.stream.Stream;

public class TestComposeProcessor {
//
// private static final Logger LOG = LoggerFactory.getLogger(TestComposeProcessor.class);
//
// @org.junit.runners.Parameterized.Parameter
// public String testName;
//
// @org.junit.runners.Parameterized.Parameter(1)
// public List<Map<String, Object>> eventMaps;
//
// @org.junit.runners.Parameterized.Parameter(2)
// public List<String> selectorPrefixes;
//
// @org.junit.runners.Parameterized.Parameter(3)
// public int expectedNumOfEvents;
//
// @org.junit.runners.Parameterized.Parameter(4)
// public int expectedEventSize;
//
// private static final String outputKeySelector1 = "key-selector1";
// private static final String outputKeySelector2 = "key-selector2";
//
// @org.junit.runners.Parameterized.Parameters
// public static Iterable<Object[]> data() {
// Map<String, Object> mapWithFirstOutputSelector = new HashMap<>();
// mapWithFirstOutputSelector.put(outputKeySelector1, new Object());
//
// Map<String, Object> mapWithSecondOutputSelector = new HashMap<>();
// mapWithSecondOutputSelector.put(outputKeySelector2, new Object());
//
// Map<String, Object> mapWithInvalidOutputSelector = new HashMap<>();
// mapWithInvalidOutputSelector.put("invalid-selector", new Object());
//
// List<Map<String, Object>> singleMap = new ArrayList<>();
// singleMap.add(mapWithFirstOutputSelector);
//
// List<Map<String, Object>> twoMapsMatching = new ArrayList<>();
// twoMapsMatching.add(mapWithFirstOutputSelector);
// twoMapsMatching.add(mapWithSecondOutputSelector);
//
// List<Map<String, Object>> twoMapsOneMatching = new ArrayList<>();
// twoMapsOneMatching.add(mapWithFirstOutputSelector);
// twoMapsOneMatching.add(mapWithInvalidOutputSelector);
//
// List<Map<String, Object>> twoMapsNoneMatching = new ArrayList<>();
// twoMapsNoneMatching.add(mapWithInvalidOutputSelector);
// twoMapsNoneMatching.add(new HashMap<>(mapWithInvalidOutputSelector));
//
// return Arrays.asList(new Object[][]{
// {"testWithOneEvent", singleMap, List.of("s0"), 0, 0},
// {"testWithTwoEventsSamePrefix", twoMapsMatching, List.of("s0", "s0"), 0, 0},
// {"testWithTwoEvents", twoMapsMatching, List.of("s0", "s1"), 1, 2},
// {"testWithTwoEventsAnd1InvalidSelector", twoMapsOneMatching, List.of("s0", "s1"), 1, 1},
// {"testWithTwoEventsWithInvalidSelectors", twoMapsNoneMatching, List.of("s0", "s1"), 1, 0}
// });
// }
//
//
//
// @Test
// public void testComposeProcessor() {
// LOG.info("Executing test: {}", testName);
// var processor = new ComposeProcessor();
// var originalGraph = processor.declareModel();
// originalGraph.setSupportedGrounding(EventGroundingGenerator.makeDummyGrounding());
//
// var graph = InvocationGraphGenerator.makeEmptyInvocation(originalGraph);
// List<OutputStrategy> outputStrategies = new ArrayList<>();
// outputStrategies.add(new CustomOutputStrategy(List.of("s0::" + outputKeySelector1, "s1::" + outputKeySelector2)));
// graph.setOutputStrategies(outputStrategies);
// var params = new ProcessorParams(graph);
//
// var eventCollector = new StoreEventCollector();
// processor.onInvocation(params, eventCollector, null);
//
// List<Event> collectedEvents = sendEvents(processor, eventCollector);
//
// LOG.info("Expected collected event count is: {}", expectedNumOfEvents);
// LOG.info("Actual collected event count is: {}", collectedEvents.size());
// assertEquals(expectedNumOfEvents, collectedEvents.size());
//
// if (!collectedEvents.isEmpty()){
// int eventSize = collectedEvents.get(0).getFields().size();
//
// LOG.info("Expected event size is: {}", expectedEventSize);
// LOG.info("Actual event size is: {}", eventSize);
// assertEquals(expectedEventSize, eventSize);
// }
// }
//
// private List<Event> sendEvents(ComposeProcessor processor, StoreEventCollector collector) {
// List<Event> events = makeEvents();
// for (Event event : events) {
// LOG.info("Sending event with map: " + event.getFields()
// + ", and prefix selector: " + event.getSourceInfo().getSelectorPrefix());
// processor.onEvent(event, collector);
// try {
// Thread.sleep(100);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// }
// return collector.getEvents();
// }
// private List<Event> makeEvents() {
// List<Event> events = new ArrayList<>();
// for (int i = 0; i < eventMaps.size(); i++) {
// events.add(makeEvent(eventMaps.get(i), selectorPrefixes.get(i)));
// }
// return events;
// }
//
// private Event makeEvent(Map<String, Object> eventMap, String selectorPrefix) {
// return EventFactory.fromMap(eventMap, new SourceInfo("test" + "-topic", selectorPrefix),
// new SchemaInfo(null, new ArrayList<>()));
// }
}

ComposeProcessor processor;
private static final String SELECTOR_1 = "key-selector1";
private static final String SELECTOR_2 = "key-selector2";
private static final String INVALID_SELECTOR = "invalid-selector";
private static final String S0_PREFIX = "s0::";
private static final String S1_PREFIX = "s1::";

@BeforeEach
public void setup(){
processor = new ComposeProcessor();
}

@ParameterizedTest
@MethodSource("data")
public void test(List<Map<String, Object>> events,
List<Map<String, Object>> outputEvents) {

Consumer<DataProcessorInvocation> invocationConfig = (invocation->{
List<OutputStrategy> outputStrategies = new ArrayList<>();
outputStrategies.add(new CustomOutputStrategy(List.of(S0_PREFIX + SELECTOR_1, S1_PREFIX + SELECTOR_2)));
invocation.setOutputStrategies(outputStrategies);
});

ProcessingElementTestExecutor testExecutor = new ProcessingElementTestExecutor(processor, invocationConfig);

testExecutor.run(events, outputEvents);
}
static Stream<Arguments> data() {

var object1 = new Object();
var object2 = new Object();

return Stream.of(
Arguments.of(
List.of(
Map.of(S0_PREFIX + SELECTOR_1, object1)),
List.of()),
Arguments.of(
List.of(
Map.of(S0_PREFIX + SELECTOR_1, object1),
Map.of(S0_PREFIX + SELECTOR_2, object2)),
List.of()),
Arguments.of(
List.of(
Map.of(S0_PREFIX + SELECTOR_1, object1),
Map.of(S1_PREFIX + SELECTOR_2, object2)),
List.of(
Map.of(SELECTOR_1, object1, SELECTOR_2, object2)
)),
Arguments.of(
List.of(
Map.of(S0_PREFIX + SELECTOR_1, object1),
Map.of(S1_PREFIX + INVALID_SELECTOR, object2)),
List.of(
Map.of(SELECTOR_1, object1)
)),
Arguments.of(
List.of(
Map.of(S0_PREFIX + INVALID_SELECTOR, object1),
Map.of(S1_PREFIX + INVALID_SELECTOR, object2)),
List.of(
Map.of()
))
);
}
}

0 comments on commit 64d2b94

Please sign in to comment.