Skip to content

Latest commit

 

History

History
143 lines (102 loc) · 4.83 KB

operator-dev-guide.md

File metadata and controls

143 lines (102 loc) · 4.83 KB

Operator Development Guide

#Contents#

  1. Overview
  2. A Simple Operator
  3. A Committing Operator
  4. Filtering Data and Choosing a Sink to Write to
  5. Enhancing Data

#Overview An Operator is how Scribengin processes data. When data comes in through a source, Scribengin can perform any arbitrary operation on it (filtering, transforming, enhancing), and then spit it out to any sink. This guide will walk you through that.

#A Simple Operator

This operator is super simple. The process() method gets passed in the DataStreamOperatorContext and a Message. This operator simply takes the Message it receives (from the source) and writes it to all available sinks. The Operator will automatically handle committing.

import java.util.Set;
import com.neverwinterdp.message.Message;
import com.neverwinterdp.scribengin.dataflow.DataStreamOperator;
import com.neverwinterdp.scribengin.dataflow.DataStreamOperatorContext;

//Your Operator must extend DataStreamOperator
public class SimpleOperator extends DataStreamOperator {
  
  //Your Operator must at least override the method process()
  @Override
  public void process(DataStreamOperatorContext ctx, Message record) throws Exception {

    //Get all sinks
    Set<String> sink = ctx.getAvailableOutputs();
    //For each sink, write the record
    for(String selSink : sink) {
      ctx.write(selSink, record);
    }
  }
}

#A Committing Operator

How do we improve?

Well, ctx.commit() is an expensive operation. This is when Scribengin has a chunk of messages to be moved to the sink and finalizes on it. Various tracking actions must be accomplished during a commit().

Therefor, we would benefit from not running commit() every time we get a message, but instead after a certain number of messages are received, or even after a timeout is hit.

public class CommittingOperator extends DataStreamOperator {
  private int count = 0 ;
  
  @Override
  public void process(DataStreamOperatorContext ctx, Message record) throws Exception {
    Set<String> sink = ctx.getAvailableOutputs();
    for(String selSink : sink) {
      ctx.write(selSink, record);
    }
    
    //Commit every 1000 messages
    count++;
    if(count > 0 && count % 10000 == 0) {
      ctx.commit();
    }
  }
}

#Filtering Data and Choosing a Sink to Write to What if we have multiple sinks to write to? Suppose you want to write to a different Kafka topic depending on the data being passed in, or you wanted to write to different partitions in HDFS?

In this example, we'll demonstrate how to choose which sink to use.

This example assumes you have set up two separate and available sinks. This will be done when setting up your dataflow. For information on how to setup your dataflow and data sink names, refer to the Dataflow Development API Guide

public class FilteringOperator extends DataStreamOperator {
  
  @Override
  public void process(DataStreamOperatorContext ctx, Message record) throws Exception {


    //Here is where we'll analyze our data
    //First, lets convert our record into a String
    String sMessage= new String(record.getData()) ;

    //If that message begins with data...
    if(sMessage.startsWith("DATA")){
      //Then we'll write it to a sink called "data-sink"
      ctx.write("data-sink", record)
    }
    else{
      //Otherwise we'll write it to a sink called "everything-else-sink"
      ctx.write("everything-else-sink", record)
    }

  }
}

#Enhancing Data

Lets's say we want to do something else with our data - say transform it from AVRO to JSON, or filter for junk messages, or anything else before we write to a sink.

In this next example, we'll be enhancing our data by adding a prefix to all our data being passed in

import java.io.ByteArrayOutputStream;

public class EnhancingOperator extends DataStreamOperator {
  private static byte[] enhance = new String("Scribengin ROCKS! ").getBytes();
  private ByteArrayOutputStream outputStream = new ByteArrayOutputStream( );

  @Override
  public void process(DataStreamOperatorContext ctx, Message record) throws Exception {

    //Reset our ByteArrayOutputStream so we can reuse it
    outputStream.reset();

    //We'll use a ByteArrayOutputStream to 
    //  concatenate our static enhancement string with
    //  what's already been passed in via the Message
    outputStream.write( enhance );
    outputStream.write( record.getData() );

    //Set the record's data to our newly enhanced data
    record.setData(outputStream.toByteArray());

    //Write the modified record to all sinks
    Set<String> sink = ctx.getAvailableOutputs();
    for(String selSink : sink) {
      ctx.write(selSink, record);
    }
    
  }
}