Skip to content

Commit

Permalink
[v0.8.0] Extended Data Generator (#134)
Browse files Browse the repository at this point in the history
* Extended Synthetic Generator

* Added new Synthetic Generator options on Schemas

* Adapted compontents to new generation schemas and code refactoring

* Added on Flink new generator functionalities\nAdded missing resizeddata on pipeline generation\nGeneral code refactoring

* Added on Storm new generator functionalities\nAdded missing resizeddata on pipeline generation

* Minor fix

* Added on Storm new generator functionalities\nAdded missing resizeddata on pipeline generation\nMajor code refactoring as previously done to Storm and Flink

* minor code style fix

* Fix: added missing configuration options for burst distribution

* Fix: Implements serializable

* Renamed class

* fixed formula

* minor fixes

* moved ts collection to fix latency computation

* [v0.8.0] Bump version
  • Loading branch information
ale93p committed May 29, 2020
1 parent f480d46 commit 65824c2
Show file tree
Hide file tree
Showing 29 changed files with 756 additions and 583 deletions.
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package fr.unice.namb.flink.connectors;

import fr.unice.namb.utils.common.DataStream;
import fr.unice.namb.utils.common.StreamGenerator;
import fr.unice.namb.utils.common.DataGenerator;
import fr.unice.namb.utils.configuration.Config;
import fr.unice.namb.utils.configuration.Config.DataDistribution;
import fr.unice.namb.utils.configuration.schema.NambConfigSchema.Data;
import fr.unice.namb.utils.configuration.schema.NambConfigSchema.Flow;
import fr.unice.namb.utils.configuration.schema.NambConfigSchema.Synthetic;
import fr.unice.namb.utils.configuration.Config.ArrivalDistribution;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
Expand All @@ -11,38 +15,31 @@

public class SyntheticConnector extends RichParallelSourceFunction<Tuple4<String, String, Long, Long>> {

private volatile boolean isRunning;

private int dataSize;
private int dataValues;
private Config.DataDistribution dataValuesBalancing;
private int flowRate;
private long sleepTime;
private Config.ArrivalDistribution distribution;
private volatile boolean isRunning;

private DataGenerator dataGenerator;
private DataStream dataStream;
private StreamGenerator dataStream;

private long count;
private int rate;
private String me;

public SyntheticConnector(int dataSize, int dataValues, Config.DataDistribution dataValuesBalancing, Config.ArrivalDistribution flowDistribution, int flowRate, double frequency, String sourceName){
this.dataSize = dataSize;
this.dataValues = dataValues;
this.dataValuesBalancing = dataValuesBalancing;
this.distribution = flowDistribution;
this.flowRate = flowRate;
public SyntheticConnector(Data data, Flow flow, double frequency, String sourceName) throws Exception{
this.dataGenerator = new DataGenerator(data);
this.dataStream = new StreamGenerator(flow);

this.me = sourceName;
if(frequency > 0) this.rate = (int)(1 / frequency);
else this.rate = 0;
}

public SyntheticConnector(Synthetic conf, double frequency, String sourceName) throws Exception{
this(conf.getData(), conf.getFlow(), frequency, sourceName);
}

@Override
public void open(Configuration parameters){
this.dataGenerator = new DataGenerator(this.dataSize, this.dataValues, this.dataValuesBalancing);
this.dataStream = new DataStream();
if (this.flowRate != 0)
this.sleepTime = dataStream.convertToInterval(this.flowRate);

this.count = 0;
this.isRunning = true;
this.me = this.me + "_" + getRuntimeContext().getIndexOfThisSubtask();
Expand All @@ -54,17 +51,20 @@ public void run(SourceContext<Tuple4<String, String, Long, Long>> sourceContext)
while(isRunning){
try {
String nextValue = new String(dataGenerator.getNextValue());
if (this.flowRate != 0) {
Thread.sleep(
this.dataStream.getInterMessageTime(this.distribution, (int) this.sleepTime)

double sleepTime = this.dataStream.getSleepTime();

if(sleepTime != 0) {
Thread.sleep(
(long) sleepTime, (int)((sleepTime - (long)sleepTime) * 1000000)
);
}

this.count++;
String tuple_id = UUID.randomUUID().toString();
Long ts = System.currentTimeMillis();
long ts = System.currentTimeMillis();
sourceContext.collect(new Tuple4<>(nextValue, tuple_id, this.count, ts));

ts = System.currentTimeMillis();
if (this.rate > 0 && this.count % this.rate == 0){
System.out.println("[DEBUG] [" + this.me + "] : " + tuple_id + "," + this.count + "," + ts + "," + nextValue);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ public void open(Configuration conf){
}

@Override
public void flatMap(Tuple4<String, String, Long, Long> in, Collector<Tuple4<String, String, Long, Long>> out) throws Exception{
public void flatMap(Tuple4<String, String, Long, Long> in, Collector<Tuple4<String, String, Long, Long>> out){


Long ts = 0L;


String nextValue = in.f0;
Expand All @@ -60,15 +60,13 @@ public void flatMap(Tuple4<String, String, Long, Long> in, Collector<Tuple4<Stri
// simulate processing load
for(long i = 0; i < _cycles; i++){}

long ts = System.currentTimeMillis();
if(this._filtering > 0) {
if (this._rand.nextInt(Config.WF_FILTERING_PRECISION) <= this._filtering * Config.WF_FILTERING_PRECISION) {
ts = System.currentTimeMillis();
out.collect(new Tuple4<>(nextValue, tuple_id, sourceCount, ts));

}
}
else {
ts = System.currentTimeMillis();
out.collect(new Tuple4<>(nextValue, tuple_id, sourceCount, ts));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import static fr.unice.namb.flink.utils.BuildCommons.setRouting;

public class BuildPipeline {
public static void build(StreamExecutionEnvironment env, AppBuilder app, NambConfigSchema conf, FlinkConfigSchema flinkConf){
public static void build(StreamExecutionEnvironment env, AppBuilder app, NambConfigSchema conf, FlinkConfigSchema flinkConf) throws Exception{
/*
Pipeline Schema Translation
*/
Expand Down Expand Up @@ -52,7 +52,7 @@ public static void build(StreamExecutionEnvironment env, AppBuilder app, NambCon
.name(newTask.getName());
}
else {
source = env.addSource(new SyntheticConnector(newTask.getDataSize(), newTask.getDataValues(), newTask.getDataDistribution(), newTask.getFlowDistribution(), newTask.getFlowRate(), debugFrequency, newTask.getName()))
source = env.addSource(new SyntheticConnector(newTask.getData(), newTask.getFlow(), debugFrequency, newTask.getName()))
.setParallelism((int) newTask.getParallelism())
.name(newTask.getName());
}
Expand Down Expand Up @@ -82,21 +82,21 @@ public static void build(StreamExecutionEnvironment env, AppBuilder app, NambCon

//TODO: impolement windowing
op = setRouting(streamUnion, newTask.getRouting())
.flatMap(new BusyWaitFlatMap(newTask.getProcessing(), newTask.getFiltering(), newTask.getDataSize(), debugFrequency, newTask.getName()));
.flatMap(new BusyWaitFlatMap(newTask.getProcessing(), newTask.getFiltering(), newTask.getResizeddata(), debugFrequency, newTask.getName()));
}
else{

if( pipeline.get(parentsList.get(0)).getType() == Config.ComponentType.source){
DataStream<Tuple4<String, String, Long, Long>> parent = (DataStream<Tuple4<String, String, Long, Long>>) createdTasks.get(parentsList.get(0));

op = setRouting(parent, newTask.getRouting())
.flatMap(new BusyWaitFlatMap(newTask.getProcessing(), newTask.getFiltering(), newTask.getDataSize(), debugFrequency, newTask.getName()));
.flatMap(new BusyWaitFlatMap(newTask.getProcessing(), newTask.getFiltering(), newTask.getResizeddata(), debugFrequency, newTask.getName()));
}
else{
SingleOutputStreamOperator<Tuple4<String, String, Long, Long>> parent = (SingleOutputStreamOperator<Tuple4<String, String, Long, Long>>) createdTasks.get(parentsList.get(0));

op = setRouting(parent, newTask.getRouting())
.flatMap(new BusyWaitFlatMap(newTask.getProcessing(), newTask.getFiltering(), newTask.getDataSize(), debugFrequency, newTask.getName()));
.flatMap(new BusyWaitFlatMap(newTask.getProcessing(), newTask.getFiltering(), newTask.getResizeddata(), debugFrequency, newTask.getName()));
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

public class BuildWorkflow {

public static void build(StreamExecutionEnvironment env, AppBuilder app, NambConfigSchema conf, FlinkConfigSchema flinkConf) {
public static void build(StreamExecutionEnvironment env, AppBuilder app, NambConfigSchema conf, FlinkConfigSchema flinkConf) throws Exception {
/*
Workflow Schema Translation
*/
Expand All @@ -32,20 +32,13 @@ public static void build(StreamExecutionEnvironment env, AppBuilder app, NambCon

// DataFlow configurations
int depth = conf.getWorkflow().getDepth();
int totalParallelism = conf.getWorkflow().getScalability().getParallelism();
Config.ParaBalancing paraBalancing = conf.getWorkflow().getScalability().getBalancing();
double variability = conf.getWorkflow().getScalability().getVariability();
// int totalParallelism = conf.getWorkflow().getScalability().getParallelism();
// Config.ParaBalancing paraBalancing = conf.getWorkflow().getScalability().getBalancing();
// double variability = conf.getWorkflow().getScalability().getVariability();
Config.ConnectionShape topologyShape = conf.getWorkflow().getConnection().getShape();
Config.TrafficRouting trafficRouting = conf.getWorkflow().getConnection().getRouting();
double processingLoad = conf.getWorkflow().getWorkload().getProcessing();
Config.LoadBalancing loadBalancing = conf.getWorkflow().getWorkload().getBalancing();

// DataStream configurations
int dataSize = conf.getDatastream().getSynthetic().getData().getSize();
int dataValues = conf.getDatastream().getSynthetic().getData().getValues();
Config.DataDistribution dataValuesBalancing = conf.getDatastream().getSynthetic().getData().getDistribution();
Config.ArrivalDistribution distribution = conf.getDatastream().getSynthetic().getFlow().getDistribution();
int rate = conf.getDatastream().getSynthetic().getFlow().getRate();
// double processingLoad = conf.getWorkflow().getWorkload().getProcessing();
// Config.LoadBalancing loadBalancing = conf.getWorkflow().getWorkload().getBalancing();

// Generating app builder
ArrayList<Integer> dagLevelsWidth = app.getDagLevelsWidth();
Expand Down Expand Up @@ -91,7 +84,7 @@ public static void build(StreamExecutionEnvironment env, AppBuilder app, NambCon
} else {
for (int s = 1; s <= numberOfSources; s++) {
sourceName = "source_" + s;
DataStream<Tuple4<String, String, Long, Long>> source = env.addSource(new SyntheticConnector(dataSize, dataValues, dataValuesBalancing, distribution, rate, debugFrequency, sourceName))
DataStream<Tuple4<String, String, Long, Long>> source = env.addSource(new SyntheticConnector(conf.getDatastream().getSynthetic(), debugFrequency, sourceName))
.setParallelism(componentParallelism.next())
.name(sourceName);
sourcesList.add(new MutablePair<>(sourceName, source));
Expand Down

0 comments on commit 65824c2

Please sign in to comment.