Skip to content

Commit

Permalink
single responsibility principle for processing data
Browse files Browse the repository at this point in the history
  • Loading branch information
vikasgupta78 committed Apr 11, 2024
1 parent c14132c commit 6b53d07
Show file tree
Hide file tree
Showing 8 changed files with 238 additions and 60 deletions.
@@ -0,0 +1,52 @@
package zingg.common.core.data.df;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import zingg.common.client.IArguments;
import zingg.common.client.ZFrame;
import zingg.common.client.ZinggClientException;
import zingg.common.client.util.ColName;
import zingg.common.core.block.Canopy;
import zingg.common.core.block.Tree;
import zingg.common.core.context.Context;

public class BlockedFrame<S, D, R, C, T> implements IZFrameProcessor<S, D, R, C, T> {

protected ZFrame<D,R,C> originalDF;

protected ZFrame<D,R,C> processedDF;

protected IArguments args;

protected Context<S,D,R,C,T> context;

public static final Log LOG = LogFactory.getLog(BlockedFrame.class);

public BlockedFrame(ZFrame<D, R, C> originalDF, IArguments args, Context<S,D,R,C,T> context) throws Exception, ZinggClientException {
super();
this.originalDF = originalDF;
this.args = args;
this.context = context;
this.processedDF = getBlocked();
}

@Override
public ZFrame<D, R, C> getOriginalDF() {
return originalDF;
}

@Override
public ZFrame<D, R, C> getProcessedDF() {
return processedDF;
}

protected ZFrame<D, R, C> getBlocked() throws Exception, ZinggClientException {
//testData = dropDuplicates(testData);
Tree<Canopy<R>> tree = context.getBlockingTreeUtil().readBlockingTree(args);
ZFrame<D, R, C> blocked = context.getBlockingTreeUtil().getBlockHashes(getOriginalDF(), tree);
ZFrame<D, R, C> blocked1 = blocked.repartition(args.getNumPartitions(), blocked.col(ColName.HASH_COL));//.cache();
return blocked1;
}

}
@@ -0,0 +1,38 @@
package zingg.common.core.data.df;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import zingg.common.client.IArguments;
import zingg.common.client.ZFrame;
import zingg.common.client.cols.ZidAndFieldDefSelector;

public class FieldDefFrame<S, D, R, C, T> implements IZFrameProcessor<S, D, R, C, T> {

protected ZFrame<D,R,C> originalDF;

protected ZFrame<D,R,C> processedDF;

protected IArguments args;

public static final Log LOG = LogFactory.getLog(FieldDefFrame.class);

public FieldDefFrame(ZFrame<D, R, C> originalDF, IArguments args) {
super();
this.originalDF = originalDF;
this.args = args;
this.processedDF = getOriginalDF().select(new ZidAndFieldDefSelector(args.getFieldDefinition()).getCols());
// return getDSUtil().getFieldDefColumnsDS(testDataOriginal, args, true);
}

@Override
public ZFrame<D, R, C> getOriginalDF() {
return originalDF;
}

@Override
public ZFrame<D, R, C> getProcessedDF() {
return processedDF;
}

}
@@ -0,0 +1,11 @@
package zingg.common.core.data.df;

import zingg.common.client.ZFrame;

public interface IZFrameProcessor<S, D, R, C, T> {

public ZFrame<D,R,C> getOriginalDF();

public ZFrame<D,R,C> getProcessedDF();

}
@@ -0,0 +1,45 @@
package zingg.common.core.data.df;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import zingg.common.client.IArguments;
import zingg.common.client.ZFrame;
import zingg.common.client.ZinggClientException;
import zingg.common.core.preprocess.StopWordsRemover;

public class PreprocessedFrame<S, D, R, C, T> implements IZFrameProcessor<S, D, R, C, T> {

protected ZFrame<D,R,C> originalDF;

protected ZFrame<D,R,C> processedDF;

protected IArguments args;

protected StopWordsRemover<S, D, R, C, T> stopWordsRemover;

public static final Log LOG = LogFactory.getLog(PreprocessedFrame.class);

public PreprocessedFrame(ZFrame<D, R, C> originalDF, IArguments args, StopWordsRemover<S, D, R, C, T> stopWordsRemover) throws ZinggClientException {
super();
this.originalDF = originalDF;
this.args = args;
this.stopWordsRemover = stopWordsRemover;
this.processedDF = preprocess();
}

protected ZFrame<D, R, C> preprocess() throws ZinggClientException {
return this.stopWordsRemover.preprocessForStopWords(getOriginalDF());
}

@Override
public ZFrame<D, R, C> getOriginalDF() {
return originalDF;
}

@Override
public ZFrame<D, R, C> getProcessedDF() {
return processedDF;
}

}
@@ -0,0 +1,37 @@
package zingg.common.core.data.df;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import zingg.common.client.IArguments;
import zingg.common.client.ZFrame;
import zingg.common.client.util.ColName;

public class RepartitionFrame<S, D, R, C, T> implements IZFrameProcessor<S, D, R, C, T> {

protected ZFrame<D,R,C> originalDF;

protected ZFrame<D,R,C> processedDF;

protected IArguments args;

public static final Log LOG = LogFactory.getLog(RepartitionFrame.class);

public RepartitionFrame(ZFrame<D, R, C> originalDF, IArguments args) {
super();
this.originalDF = originalDF;
this.args = args;
this.processedDF = getOriginalDF().repartition(args.getNumPartitions(),getOriginalDF().col(ColName.ID_COL));
}

@Override
public ZFrame<D, R, C> getOriginalDF() {
return originalDF;
}

@Override
public ZFrame<D, R, C> getProcessedDF() {
return processedDF;
}

}
78 changes: 33 additions & 45 deletions common/core/src/main/java/zingg/common/core/data/df/ZData.java
Expand Up @@ -6,70 +6,58 @@
import zingg.common.client.IArguments;
import zingg.common.client.ZFrame;
import zingg.common.client.ZinggClientException;
import zingg.common.client.cols.ZidAndFieldDefSelector;
import zingg.common.client.util.ColName;
import zingg.common.core.block.Canopy;
import zingg.common.core.block.Tree;
import zingg.common.core.context.Context;
import zingg.common.core.preprocess.StopWordsRemover;
import zingg.common.core.util.Analytics;
import zingg.common.core.util.Metric;

public class ZFrameDataSelector<S, D, R, C, T> {
public class ZData<S, D, R, C, T> {

protected ZFrame<D,R,C> rawData;
protected ZFrame<D,R,C> fieldDefColumnsDS;
protected ZFrame<D,R,C> blockedData;
protected IArguments args;
protected Context<S,D,R,C,T> context;
protected StopWordsRemover<S, D, R, C, T> stopWordsRemover;
protected ZFrame<D, R, C> preprocessedRepartitionedData;

public static final Log LOG = LogFactory.getLog(ZFrameDataSelector.class);
protected FieldDefFrame<S, D, R, C, T> fieldDefFrame;
protected BlockedFrame<S, D, R, C, T> blockedData;
protected PreprocessedFrame<S, D, R, C, T> preprocessedFrame;
protected RepartitionFrame<S, D, R, C, T> repartitionFrame;

public ZFrameDataSelector(ZFrame<D, R, C> rawData, IArguments args, Context<S,D,R,C,T> context,StopWordsRemover<S, D, R, C, T> stopWordsRemover) {
super();
this.rawData = rawData;
this.args = args;
this.context = context;
this.stopWordsRemover = stopWordsRemover;
public static final Log LOG = LogFactory.getLog(ZData.class);

public ZData(ZFrame<D, R, C> rawData, IArguments args, Context<S,D,R,C,T> context,StopWordsRemover<S, D, R, C, T> stopWordsRemover) throws ZinggClientException {
try {
this.rawData = rawData;
this.args = args;
this.context = context;
this.stopWordsRemover = stopWordsRemover;
this.fieldDefFrame = new FieldDefFrame<S, D, R, C, T>(getRawData(),args);
this.preprocessedFrame = new PreprocessedFrame<S, D, R, C, T>(getFieldDefFrame().getProcessedDF(),args,stopWordsRemover);
this.repartitionFrame = new RepartitionFrame<S, D, R, C, T>(getPreprocessedFrame().getProcessedDF(),args);
this.blockedData = new BlockedFrame<S, D, R, C, T>(getRepartitionFrame().getProcessedDF(), args, context);
} catch (ZinggClientException e) {
throw e;
} catch (Exception e) {
throw new ZinggClientException(e);
}
}

public ZFrame<D, R, C> getRawData() {
return rawData;
}

public ZFrame<D, R, C> getFieldDefColumnsDS() {
if (fieldDefColumnsDS==null) {
ZidAndFieldDefSelector zidAndFieldDefSelector = new ZidAndFieldDefSelector(args.getFieldDefinition());
fieldDefColumnsDS = getRawData().select(zidAndFieldDefSelector.getCols());
}
return fieldDefColumnsDS;
// return getDSUtil().getFieldDefColumnsDS(testDataOriginal, args, true);
public FieldDefFrame<S, D, R, C, T> getFieldDefFrame() {
return fieldDefFrame;
}

public ZFrame<D, R, C> getPreprocessedRepartitionedData() {
return preprocessedRepartitionedData;
}

public ZFrame<D,R,C> getBlocked() throws Exception, ZinggClientException{
if (blockedData==null) {
preprocessedRepartitionedData = stopWordsRemover.preprocessForStopWords(getFieldDefColumnsDS());
preprocessedRepartitionedData = preprocessedRepartitionedData.repartition(args.getNumPartitions(), preprocessedRepartitionedData.col(ColName.ID_COL));
//testData = dropDuplicates(testData);
long count = preprocessedRepartitionedData.count();
LOG.info("Read " + count);
Analytics.track(Metric.DATA_COUNT, count, args.getCollectMetrics());
LOG.debug("Blocking model file location is " + args.getBlockFile());
ZFrame<D, R, C> blocked1; //.cache();
Tree<Canopy<R>> tree = context.getBlockingTreeUtil().readBlockingTree(args);
ZFrame<D, R, C> blocked = context.getBlockingTreeUtil().getBlockHashes(preprocessedRepartitionedData, tree);
blocked1 = blocked.repartition(args.getNumPartitions(), blocked.col(ColName.HASH_COL));
this.blockedData = blocked1;
}
public PreprocessedFrame<S, D, R, C, T> getPreprocessedFrame() {
return preprocessedFrame;
}

public RepartitionFrame<S, D, R, C, T> getRepartitionFrame() {
return repartitionFrame;
}

public BlockedFrame<S, D, R, C, T> getBlockedFrame() throws Exception, ZinggClientException{
return blockedData;
}



}
Expand Up @@ -7,7 +7,7 @@
import zingg.common.client.ZinggClientException;
import zingg.common.client.options.ZinggOptions;
import zingg.common.client.util.ColName;
import zingg.common.core.data.df.ZFrameDataSelector;
import zingg.common.core.data.df.ZData;
import zingg.common.core.filter.PredictionFilter;
import zingg.common.core.pairs.SelfPairBuilderSourceSensitive;

Expand Down Expand Up @@ -41,7 +41,7 @@ protected ZFrame<D,R,C> getActualDupes(ZFrame<D,R,C> blocked, ZFrame<D,R,C> test
}

@Override
public void writeOutput( ZFrameDataSelector<S,D,R,C,T> rawData, ZFrame<D,R,C> dupes) throws ZinggClientException {
public void writeOutput( ZData<S,D,R,C,T> rawData, ZFrame<D,R,C> dupes) throws ZinggClientException {
try {
// input dupes are pairs
/// pick ones according to the threshold by user
Expand All @@ -56,7 +56,7 @@ public void writeOutput( ZFrameDataSelector<S,D,R,C,T> rawData, ZFrame<D,R,C>
dupesActual = dupesActual.withColumn(ColName.CLUSTER_COLUMN, dupesActual.col(ColName.ID_COL));
dupesActual = getDSUtil().addUniqueCol(dupesActual, ColName.CLUSTER_COLUMN);
ZFrame<D,R,C>dupes2 = getDSUtil().alignLinked(dupesActual, args);
dupes2 = getDSUtil().postprocessLinked(dupes2, rawData.getFieldDefColumnsDS());
dupes2 = getDSUtil().postprocessLinked(dupes2, rawData.getFieldDefFrame().getProcessedDF());
LOG.debug("uncertain output schema is " + dupes2.showSchema());
getPipeUtil().write(dupes2, args.getOutput());
}
Expand Down

0 comments on commit 6b53d07

Please sign in to comment.