Skip to content

Commit

Permalink
[v0.7.0] Redisgn Data Generator and External Kafka Source Support (#131)
Browse files Browse the repository at this point in the history
* Improved Data Generator (#126)

* Integrated index generation in DataGenerator; fix #119

* [Storm] Implemented new DataGenerator

* [v0.6.1-dev] bump development version

* [Flink] Implemented new data generator

* [Storm] Implemented new data generator

* [Flink] Implemented new data generator

* [All] Fixed conversion from bytes to string

* [All] Cleaned code

* Update README.md

* Fixed variable naming issue

* Fixed DataGenerator null pointer exception

* Sobstituded with correct conf file

* [v0.7.0] External Source Support: Kafka (#130)

* Added external Kafka Source with basic configurations (curr. only workflow schema)

* Fix: added control for shape and external source

* Added external Kafka Source also for Pipeline Schema

* Added debugging output in source

* cleaned code

* [v0.7.0] External Source Support: Kafka
  • Loading branch information
ale93p committed Dec 11, 2019
1 parent ed9897c commit b6eebb8
Show file tree
Hide file tree
Showing 15 changed files with 476 additions and 55 deletions.
4 changes: 2 additions & 2 deletions benchmarks/flink-bench/pom.xml
Expand Up @@ -11,7 +11,7 @@
</parent>
<groupId>fr.unice.namb</groupId>
<artifactId>flink-bench</artifactId>
<version>${flink.benchmark.version}</version>
<version>${revision}</version>
<packaging>jar</packaging>

<properties>
Expand All @@ -23,7 +23,7 @@
<dependency>
<groupId>fr.unice.namb</groupId>
<artifactId>utils</artifactId>
<version>${utils.version}</version>
<version>${revision}</version>
<scope>compile</scope>
</dependency>

Expand Down
Expand Up @@ -3,24 +3,32 @@
import fr.unice.namb.flink.connectors.SyntheticConnector;
import fr.unice.namb.flink.operators.BusyWaitFlatMap;
import fr.unice.namb.flink.operators.WindowedBusyWaitFunction;
import fr.unice.namb.flink.utils.KafkaDeserializationSchema;
import fr.unice.namb.utils.common.AppBuilder;
import fr.unice.namb.utils.common.Task;
import fr.unice.namb.utils.configuration.Config;
import fr.unice.namb.utils.configuration.schema.FlinkConfigSchema;
import fr.unice.namb.utils.configuration.schema.NambConfigSchema;
import jdk.nashorn.internal.runtime.regexp.joni.exception.ValueException;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.datastream.AllWindowedStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Properties;

public class BenchmarkApplication {

Expand Down Expand Up @@ -117,9 +125,9 @@ private static StreamExecutionEnvironment buildBenchmarkEnvironment(NambConfigSc
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

if(! app.isPipelineDefined()) {


System.out.println("not pipeline");
/*
Workflow Schema Translation
*/

// DataFlow configurations
int depth = conf.getWorkflow().getDepth();
Expand Down Expand Up @@ -162,25 +170,47 @@ private static StreamExecutionEnvironment buildBenchmarkEnvironment(NambConfigSc

String sourceName;


for (int s = 1; s <= numberOfSources; s++) {
sourceName = "source_" + s;
DataStream<Tuple4<String, String, Long, Long>> source = env.addSource(new SyntheticConnector(dataSize, dataValues, dataValuesBalancing, distribution, rate, debugFrequency, sourceName))
/*
Sources Definition
*/
if (app.isExternalSource()){
int s = 1;
sourceName = "kafka_source_" + s;
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", app.getKafkaServer());
properties.setProperty("zookeeper.connect", app.getZookeeperServer());
properties.setProperty("group.id", app.getKafkaGroup());
FlinkKafkaConsumer<Tuple4<String, String, Long, Long>> kafkaConsumer = new FlinkKafkaConsumer<>(app.getKafkaTopic(), new KafkaDeserializationSchema(debugFrequency, sourceName), properties);

DataStream<Tuple4<String, String, Long, Long>> source = env
.addSource(kafkaConsumer)
.setParallelism(cpIterator.next())
.name(sourceName);
sourcesList.add(new MutablePair<>(sourceName, source));

}
else {
for (int s = 1; s <= numberOfSources; s++) {
sourceName = "source_" + s;
DataStream<Tuple4<String, String, Long, Long>> source = env.addSource(new SyntheticConnector(dataSize, dataValues, dataValuesBalancing, distribution, rate, debugFrequency, sourceName))
.setParallelism(cpIterator.next())
.name(sourceName);
sourcesList.add(new MutablePair<>(sourceName, source));

if (numberOfSources > 1) {
sourceName = "unified_source";
DataStream<Tuple4<String, String, Long, Long>> source = sourcesList.get(0).getRight().union(sourcesList.get(1).getRight());
for (int s = 2; s < numberOfSources; s++) {
source.union(sourcesList.get(s).getRight());
}
sourcesList.add(new MutablePair<>(sourceName, source));

if (numberOfSources > 1) {
sourceName = "unified_source";
DataStream<Tuple4<String, String, Long, Long>> source = sourcesList.get(0).getRight().union(sourcesList.get(1).getRight());
for (int s = 2; s < numberOfSources; s++) {
source.union(sourcesList.get(s).getRight());
}
sourcesList.add(new MutablePair<>(sourceName, source));
}
}

/*
Tasks definition
*/

int operatorID = 1;
int cycles;
Expand Down Expand Up @@ -272,8 +302,9 @@ private static StreamExecutionEnvironment buildBenchmarkEnvironment(NambConfigSc
}
}
else{


/*
Pipeline Schema Translation
*/

HashMap<String, Task> pipeline = app.getPipelineTree();
ArrayList<String> dagLevel = app.getPipelineTreeSources();
Expand All @@ -286,9 +317,24 @@ private static StreamExecutionEnvironment buildBenchmarkEnvironment(NambConfigSc
if (!createdTasks.containsKey(task)) {
Task newTask = pipeline.get(task);
if (newTask.getType() == Config.ComponentType.source) {
DataStream<Tuple4<String, String, Long, Long>> source = env.addSource(new SyntheticConnector(newTask.getDataSize(), newTask.getDataValues(), newTask.getDataDistribution(), newTask.getFlowDistribution(), newTask.getFlowRate(), debugFrequency, newTask.getName()))
.setParallelism((int) newTask.getParallelism())
.name(newTask.getName());
DataStream<Tuple4<String, String, Long, Long>> source = null;
if(newTask.isExternal()){
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", newTask.getKafkaServer());
properties.setProperty("zookeeper.connect", newTask.getZookeeperServer());
properties.setProperty("group.id", newTask.getKafkaGroup());
FlinkKafkaConsumer<Tuple4<String, String, Long, Long>> kafkaConsumer = new FlinkKafkaConsumer<>(newTask.getKafkaTopic(), new KafkaDeserializationSchema(debugFrequency, newTask.getName()), properties);

source = env
.addSource(kafkaConsumer)
.setParallelism((int) newTask.getParallelism())
.name(newTask.getName());
}
else {
source = env.addSource(new SyntheticConnector(newTask.getDataSize(), newTask.getDataValues(), newTask.getDataDistribution(), newTask.getFlowDistribution(), newTask.getFlowRate(), debugFrequency, newTask.getName()))
.setParallelism((int) newTask.getParallelism())
.name(newTask.getName());
}
createdTasks.put(newTask.getName(), source);
}
else{
Expand Down
Expand Up @@ -57,11 +57,8 @@ public void flatMap(Tuple4<String, String, Long, Long> in, Collector<Tuple4<Stri

_count ++;
// simulate processing load
// for(int i = 0; i < 10000; i++){}
for(long i = 0; i < _cycles; i++){}



if(this._filtering > 0) {
if (this._rand.nextInt(Config.WF_FILTERING_PRECISION) <= this._filtering * Config.WF_FILTERING_PRECISION) {
ts = System.currentTimeMillis();
Expand All @@ -72,14 +69,10 @@ public void flatMap(Tuple4<String, String, Long, Long> in, Collector<Tuple4<Stri
else {
ts = System.currentTimeMillis();
out.collect(new Tuple4<>(nextValue, tuple_id, this._count, ts));
// out.collect(in);
}

if (this._rate > 0 && this._count % this._rate == 0){
// if (_count % _rate == 0){
// if (_count % 2000 == 0){
if (ts == 0) ts = System.currentTimeMillis();
// System.out.println("[DEBUG] [" + this._me + "] : " + tuple_id + "," + this._count + "," + System.currentTimeMillis() + "," + nextValue);
System.out.println("[DEBUG] [" + _me + "] : " + tuple_id + "," + _count + "," + ts + "," + nextValue );
}

Expand Down
@@ -0,0 +1,84 @@
package fr.unice.namb.flink.utils;

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple4;

import java.io.IOException;
import java.io.ObjectOutputStream;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.UUID;

import static org.apache.flink.util.Preconditions.checkNotNull;

public class KafkaDeserializationSchema implements DeserializationSchema<Tuple4<String, String, Long, Long>>, SerializationSchema<String> {
private static final long serialVersionUID = 1L;

private transient Charset charset;
private long counter = 0;
private int rate;
private String me;


public KafkaDeserializationSchema(double frequency, String me) {
this.charset = checkNotNull(StandardCharsets.UTF_8);
this.me = me;
if (frequency > 0) this.rate = (int) (1/frequency);
}

public Charset getCharset() {
return charset;
}

// ------------------------------------------------------------------------
// Kafka Serialization
// ------------------------------------------------------------------------

@Override
public Tuple4<String, String, Long, Long> deserialize(byte[] message) {

String nextValue = new String(message, charset);
String tuple_id = UUID.randomUUID().toString();
++counter;
long ts = System.currentTimeMillis();

if (this.rate > 0 && this.counter % this.rate == 0){
System.out.println("[DEBUG] [" + this.me + "] : " + tuple_id + "," + this.counter + "," + ts + "," + nextValue);
}

return new Tuple4<>(nextValue, tuple_id, this.counter, ts);
}

@Override
public boolean isEndOfStream(Tuple4<String, String, Long, Long> nextElement) {
return false;
}

@Override
public byte[] serialize(String element) {
return element.getBytes(charset);
}

@Override
public TypeInformation<Tuple4<String, String, Long, Long>> getProducedType() {
return TypeInformation.of(new TypeHint<Tuple4<String, String, Long, Long>>(){});
}

// ------------------------------------------------------------------------
// Java Serialization
// ------------------------------------------------------------------------

private void writeObject (ObjectOutputStream out) throws IOException {
out.defaultWriteObject();
out.writeUTF(charset.name());
}

private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException {
in.defaultReadObject();
String charsetName = in.readUTF();
this.charset = Charset.forName(charsetName);
}
}
11 changes: 9 additions & 2 deletions benchmarks/heron-bench/pom.xml
Expand Up @@ -12,7 +12,7 @@
</parent>
<groupId>fr.unice.namb</groupId>
<artifactId>heron-bench</artifactId>
<version>${heron.benchmark.version}</version>
<version>${revision}</version>
<packaging>jar</packaging>

<properties>
Expand All @@ -24,7 +24,7 @@
<dependency>
<groupId>fr.unice.namb</groupId>
<artifactId>utils</artifactId>
<version>${utils.version}</version>
<version>${revision}</version>
<scope>compile</scope>
</dependency>

Expand All @@ -35,6 +35,13 @@
<scope>compile</scope>
</dependency>

<dependency>
<groupId>org.apache.heron</groupId>
<artifactId>heron-api</artifactId>
<version>0.20.1-incubating</version>
<scope>compile</scope>
</dependency>

</dependencies>

<build>
Expand Down
13 changes: 11 additions & 2 deletions benchmarks/storm-bench/pom.xml
Expand Up @@ -12,7 +12,7 @@
</parent>
<groupId>fr.unice.namb</groupId>
<artifactId>storm-bench</artifactId>
<version>${storm.benchmark.version}</version>
<version>${revision}</version>
<packaging>jar</packaging>

<properties>
Expand All @@ -24,7 +24,7 @@
<dependency>
<groupId>fr.unice.namb</groupId>
<artifactId>utils</artifactId>
<version>${utils.version}</version>
<version>${revision}</version>
<scope>compile</scope>
</dependency>

Expand All @@ -35,10 +35,19 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka-client</artifactId>
<version>${storm.version}</version>
<scope>compile</scope>
</dependency>

</dependencies>





<build>

<sourceDirectory>src/main/java</sourceDirectory>
Expand Down

0 comments on commit b6eebb8

Please sign in to comment.