Skip to content

Commit

Permalink
[WIP] [v0.7.2] Fix logging (#133)
Browse files Browse the repository at this point in the history
* [AppBuilder] Compute Processing Load at init (Closes #73)

* [Storm & Flink] Refactored BenchmarkApplication component: splitted TopologyBuilder
*  Fixed tuple logging: now tuples are sampled based on source count and not local one. This assure to have printed always the same tuples, better for latency computation.

* [v0.7.2] Bump Version
  • Loading branch information
ale93p committed Mar 16, 2020
1 parent feca209 commit f480d46
Show file tree
Hide file tree
Showing 15 changed files with 911 additions and 763 deletions.

Large diffs are not rendered by default.

Expand Up @@ -50,6 +50,7 @@ public void flatMap(Tuple4<String, String, Long, Long> in, Collector<Tuple4<Stri

String nextValue = in.f0;
String tuple_id = in.f1;
long sourceCount = in.f2;

if(this._dataSize > 0 && this._dataSize < nextValue.length()){
nextValue = nextValue.substring(0, this._dataSize);
Expand All @@ -62,18 +63,18 @@ public void flatMap(Tuple4<String, String, Long, Long> in, Collector<Tuple4<Stri
if(this._filtering > 0) {
if (this._rand.nextInt(Config.WF_FILTERING_PRECISION) <= this._filtering * Config.WF_FILTERING_PRECISION) {
ts = System.currentTimeMillis();
out.collect(new Tuple4<>(nextValue, tuple_id, this._count, ts));
out.collect(new Tuple4<>(nextValue, tuple_id, sourceCount, ts));

}
}
else {
ts = System.currentTimeMillis();
out.collect(new Tuple4<>(nextValue, tuple_id, this._count, ts));
out.collect(new Tuple4<>(nextValue, tuple_id, sourceCount, ts));
}

if (this._rate > 0 && this._count % this._rate == 0){
if (this._rate > 0 && sourceCount % this._rate == 0){
if (ts == 0) ts = System.currentTimeMillis();
System.out.println("[DEBUG] [" + _me + "] : " + tuple_id + "," + _count + "," + ts + "," + nextValue );
System.out.println("[DEBUG] [" + _me + "] : " + tuple_id + "," + this._count + "," + ts + "," + nextValue );
}

}
Expand Down
Expand Up @@ -30,11 +30,12 @@ public void apply(TimeWindow window, Iterable<Tuple4<String, String, Long, Long>
for (Tuple4<String, String, Long, Long> t: values) {
String nextValue = t.f0;
String tuple_id = t.f1;
long sourceCount = t.f2;
this._count ++;
// simulate processing load
for (long i = 0; i < this._cycles; i++) { }
ts = System.currentTimeMillis();
if (this._rate > 0 && this._count % this._rate == 0){
if (this._rate > 0 && sourceCount % this._rate == 0){
System.out.println("[DEBUG] [" + this._me + "] : " + tuple_id + "," + this._count + "," + ts + "," + nextValue);
}
value = t;
Expand Down
@@ -0,0 +1,97 @@
package fr.unice.namb.flink.utils;

import fr.unice.namb.utils.configuration.Config;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.datastream.AllWindowedStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

public class BuildCommons {

public static DataStream<Tuple4<String, String, Long, Long>> setRouting(SingleOutputStreamOperator<Tuple4<String, String, Long, Long>> operator, Config.TrafficRouting routing, Object field, boolean apply) throws IllegalArgumentException {
if (apply){
switch (routing) {
case hash:
if (field instanceof Integer)
return operator.keyBy((int) field);
else if (field instanceof String)
return operator.keyBy((String) field);
else
throw new IllegalArgumentException("Field must be <int> or <String> instead it is <" + field.getClass().getName() + ">");
case balanced:
return operator.rebalance();
case broadcast:
return operator.broadcast();
case none:
return operator;
default:
throw new IllegalArgumentException(routing + " is not a valid routing type");
}
}
return operator;
}

public static DataStream<Tuple4<String, String, Long, Long>> setRouting(DataStream<Tuple4<String, String, Long, Long>> operator, Config.TrafficRouting routing, Object field) throws IllegalArgumentException {
switch (routing) {
case hash:
if (field instanceof Integer)
return operator.keyBy((int) field);
else if (field instanceof String)
return operator.keyBy((String) field);
else
throw new IllegalArgumentException("Field must be <int> or <String> instead it is <" + field.getClass().getName() + ">");
case balanced:
return operator.rebalance();
case broadcast:
return operator.broadcast();
case none:
return operator;
default:
throw new IllegalArgumentException(routing + " is not a valid routing type");
}
}

public static DataStream<Tuple4<String, String, Long, Long>> setRouting(SingleOutputStreamOperator<Tuple4<String, String, Long, Long>> operator, Config.TrafficRouting routing, boolean apply) throws IllegalArgumentException {
return setRouting(operator, routing, 0, apply);
}

public static DataStream<Tuple4<String, String, Long, Long>> setRouting(SingleOutputStreamOperator<Tuple4<String, String, Long, Long>> operator, Config.TrafficRouting routing) throws IllegalArgumentException {
return setRouting(operator, routing, 0, true);
}

public static DataStream<Tuple4<String, String, Long, Long>> setRouting(DataStream<Tuple4<String, String, Long, Long>> operator, Config.TrafficRouting routing) throws IllegalArgumentException {
return setRouting(operator, routing, 0);
}



public static AllWindowedStream<Tuple4<String, String, Long, Long>, TimeWindow> setWindow(SingleOutputStreamOperator<Tuple4<String, String, Long, Long>> parent, Config.TrafficRouting trafficRouting, Config.WindowingType type, int duration, int interval, boolean applyRouting) {
switch (type) {
case tumbling:
return setRouting(parent, trafficRouting, applyRouting).timeWindowAll(Time.seconds(duration));
case sliding:
return setRouting(parent, trafficRouting, applyRouting).timeWindowAll(Time.seconds(duration), Time.seconds(interval));
}
return null;
}

public static AllWindowedStream<Tuple4<String, String, Long, Long>, TimeWindow> setWindow(DataStream<Tuple4<String, String, Long, Long>> parent, Config.TrafficRouting trafficRouting, Config.WindowingType type, int duration, int interval) {
switch (type) {
case tumbling:
return setRouting(parent, trafficRouting).timeWindowAll(Time.seconds(duration));
case sliding:
return setRouting(parent, trafficRouting).timeWindowAll(Time.seconds(duration), Time.seconds(interval));
}
return null;
}

public static AllWindowedStream<Tuple4<String, String, Long, Long>, TimeWindow> setWindow(SingleOutputStreamOperator<Tuple4<String, String, Long, Long>> parent, Config.TrafficRouting trafficRouting, Config.WindowingType type, int duration, boolean apply){
return setWindow(parent, trafficRouting, type, duration, 0);
}

public static AllWindowedStream<Tuple4<String, String, Long, Long>, TimeWindow> setWindow(DataStream<Tuple4<String, String, Long, Long>> parent, Config.TrafficRouting trafficRouting, Config.WindowingType type, int duration){
return setWindow(parent, trafficRouting, type, duration, 0);
}
}
@@ -0,0 +1,116 @@
package fr.unice.namb.flink.utils;

import fr.unice.namb.flink.connectors.SyntheticConnector;
import fr.unice.namb.flink.operators.BusyWaitFlatMap;
import fr.unice.namb.utils.common.AppBuilder;
import fr.unice.namb.utils.common.Task;
import fr.unice.namb.utils.configuration.Config;
import fr.unice.namb.utils.configuration.schema.FlinkConfigSchema;
import fr.unice.namb.utils.configuration.schema.NambConfigSchema;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Properties;

import static fr.unice.namb.flink.utils.BuildCommons.setRouting;

public class BuildPipeline {
public static void build(StreamExecutionEnvironment env, AppBuilder app, NambConfigSchema conf, FlinkConfigSchema flinkConf){
/*
Pipeline Schema Translation
*/

double debugFrequency = flinkConf.getDebugFrequency();

HashMap<String, Task> pipeline = app.getPipelineTree();
ArrayList<String> dagLevel = app.getPipelineTreeSources();
HashMap<String, Object> createdTasks = new HashMap<>();

while (dagLevel.size() > 0){

ArrayList<String> nextDagLevel = new ArrayList<>();
for (String task : dagLevel) {
if (!createdTasks.containsKey(task)) {
Task newTask = pipeline.get(task);
if (newTask.getType() == Config.ComponentType.source) {
DataStream<Tuple4<String, String, Long, Long>> source = null;
if(newTask.isExternal()){
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", newTask.getKafkaServer());
properties.setProperty("zookeeper.connect", newTask.getZookeeperServer());
properties.setProperty("group.id", newTask.getKafkaGroup());
FlinkKafkaConsumer<Tuple4<String, String, Long, Long>> kafkaConsumer = new FlinkKafkaConsumer<>(newTask.getKafkaTopic(), new KafkaDeserializationSchema(debugFrequency, newTask.getName()), properties);

source = env
.addSource(kafkaConsumer)
.setParallelism((int) newTask.getParallelism())
.name(newTask.getName());
}
else {
source = env.addSource(new SyntheticConnector(newTask.getDataSize(), newTask.getDataValues(), newTask.getDataDistribution(), newTask.getFlowDistribution(), newTask.getFlowRate(), debugFrequency, newTask.getName()))
.setParallelism((int) newTask.getParallelism())
.name(newTask.getName());
}
createdTasks.put(newTask.getName(), source);
}
else{

ArrayList<String> parentsList = newTask.getParents();
SingleOutputStreamOperator<Tuple4<String, String, Long, Long>> op = null;
DataStream<Tuple4<String, String, Long, Long>> streamUnion = null;
if(parentsList.size() > 1) {
if( pipeline.get(parentsList.get(0)).getType() == Config.ComponentType.source){

streamUnion = ((DataStream<Tuple4<String, String, Long, Long>>) createdTasks.get(parentsList.get(0))).union((DataStream<Tuple4<String, String, Long, Long>>) createdTasks.get(parentsList.get(1)));
for (int i=2; i<parentsList.size(); i++) {
streamUnion.union((DataStream<Tuple4<String, String, Long, Long>>) createdTasks.get(parentsList.get(i)));
}

}
else{
streamUnion = ((SingleOutputStreamOperator<Tuple4<String, String, Long, Long>>) createdTasks.get(parentsList.get(0))).union((SingleOutputStreamOperator<Tuple4<String, String, Long, Long>>) createdTasks.get(parentsList.get(1)));
for (int i=2; i<parentsList.size(); i++) {
streamUnion.union((SingleOutputStreamOperator<Tuple4<String, String, Long, Long>>) createdTasks.get(parentsList.get(i)));
}

}

//TODO: impolement windowing
op = setRouting(streamUnion, newTask.getRouting())
.flatMap(new BusyWaitFlatMap(newTask.getProcessing(), newTask.getFiltering(), newTask.getDataSize(), debugFrequency, newTask.getName()));
}
else{

if( pipeline.get(parentsList.get(0)).getType() == Config.ComponentType.source){
DataStream<Tuple4<String, String, Long, Long>> parent = (DataStream<Tuple4<String, String, Long, Long>>) createdTasks.get(parentsList.get(0));

op = setRouting(parent, newTask.getRouting())
.flatMap(new BusyWaitFlatMap(newTask.getProcessing(), newTask.getFiltering(), newTask.getDataSize(), debugFrequency, newTask.getName()));
}
else{
SingleOutputStreamOperator<Tuple4<String, String, Long, Long>> parent = (SingleOutputStreamOperator<Tuple4<String, String, Long, Long>>) createdTasks.get(parentsList.get(0));

op = setRouting(parent, newTask.getRouting())
.flatMap(new BusyWaitFlatMap(newTask.getProcessing(), newTask.getFiltering(), newTask.getDataSize(), debugFrequency, newTask.getName()));
}

}

op.setParallelism((int)newTask.getParallelism())
.name(newTask.getName());
createdTasks.put(newTask.getName(), op);
}

}

nextDagLevel.addAll(pipeline.get(task).getChilds());
}
dagLevel = new ArrayList<>(nextDagLevel);
}
}
}

0 comments on commit f480d46

Please sign in to comment.