Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

using data selector as wrapper over raw data #815

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
@@ -0,0 +1,52 @@
package zingg.common.core.data.df;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import zingg.common.client.IArguments;
import zingg.common.client.ZFrame;
import zingg.common.client.ZinggClientException;
import zingg.common.client.util.ColName;
import zingg.common.core.block.Canopy;
import zingg.common.core.block.Tree;
import zingg.common.core.context.Context;

public class BlockedFrame<S, D, R, C, T> implements IZFrameProcessor<S, D, R, C, T> {

protected ZFrame<D,R,C> originalDF;

protected ZFrame<D,R,C> processedDF;

protected IArguments args;

protected Context<S,D,R,C,T> context;

public static final Log LOG = LogFactory.getLog(BlockedFrame.class);

public BlockedFrame(ZFrame<D, R, C> originalDF, IArguments args, Context<S,D,R,C,T> context) throws Exception, ZinggClientException {
super();
this.originalDF = originalDF;
this.args = args;
this.context = context;
this.processedDF = getBlocked();
}

@Override
public ZFrame<D, R, C> getOriginalDF() {
return originalDF;
}

@Override
public ZFrame<D, R, C> getProcessedDF() {
return processedDF;
}

protected ZFrame<D, R, C> getBlocked() throws Exception, ZinggClientException {
//testData = dropDuplicates(testData);
Tree<Canopy<R>> tree = context.getBlockingTreeUtil().readBlockingTree(args);
ZFrame<D, R, C> blocked = context.getBlockingTreeUtil().getBlockHashes(getOriginalDF(), tree);
ZFrame<D, R, C> blocked1 = blocked.repartition(args.getNumPartitions(), blocked.col(ColName.HASH_COL));//.cache();
return blocked1;
}

}
@@ -0,0 +1,38 @@
package zingg.common.core.data.df;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import zingg.common.client.IArguments;
import zingg.common.client.ZFrame;
import zingg.common.client.cols.ZidAndFieldDefSelector;

public class FieldDefFrame<S, D, R, C, T> implements IZFrameProcessor<S, D, R, C, T> {

protected ZFrame<D,R,C> originalDF;

protected ZFrame<D,R,C> processedDF;

protected IArguments args;

public static final Log LOG = LogFactory.getLog(FieldDefFrame.class);

public FieldDefFrame(ZFrame<D, R, C> originalDF, IArguments args) {
super();
this.originalDF = originalDF;
this.args = args;
this.processedDF = getOriginalDF().select(new ZidAndFieldDefSelector(args.getFieldDefinition()).getCols());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better to have an overloaded constructor here which can take the fielddefselectors. we have a few so may need to pass those in the future

// return getDSUtil().getFieldDefColumnsDS(testDataOriginal, args, true);
}

@Override
public ZFrame<D, R, C> getOriginalDF() {
return originalDF;
}

@Override
public ZFrame<D, R, C> getProcessedDF() {
return processedDF;
}

}
@@ -0,0 +1,11 @@
package zingg.common.core.data.df;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it may be better to introduce a method process() so that the logic is not in the constructor.


import zingg.common.client.ZFrame;

public interface IZFrameProcessor<S, D, R, C, T> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it need S and T?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BlockingTreeUtil<S, D,R,C,T>

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StopWordsRemover<S,D,R,C,T>

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so we should have one frame<D,R,C> and then another that extends this and adds S and T.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what benefit it would give? ZData in any case has <S,D,R,C,T>

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unnecessarily two very similar interfaces to be maintained whose objects can't be assigned to one another


public ZFrame<D,R,C> getOriginalDF();

public ZFrame<D,R,C> getProcessedDF();

}
@@ -0,0 +1,45 @@
package zingg.common.core.data.df;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import zingg.common.client.IArguments;
import zingg.common.client.ZFrame;
import zingg.common.client.ZinggClientException;
import zingg.common.core.preprocess.StopWordsRemover;

public class PreprocessedFrame<S, D, R, C, T> implements IZFrameProcessor<S, D, R, C, T> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

change stopword design - have a processor there. so preprocessedframe can be passed any processor(stopwords, tomorrow lowercase etc)


protected ZFrame<D,R,C> originalDF;

protected ZFrame<D,R,C> processedDF;

protected IArguments args;

protected StopWordsRemover<S, D, R, C, T> stopWordsRemover;

public static final Log LOG = LogFactory.getLog(PreprocessedFrame.class);

public PreprocessedFrame(ZFrame<D, R, C> originalDF, IArguments args, StopWordsRemover<S, D, R, C, T> stopWordsRemover) throws ZinggClientException {
super();
this.originalDF = originalDF;
this.args = args;
this.stopWordsRemover = stopWordsRemover;
this.processedDF = preprocess();
}

protected ZFrame<D, R, C> preprocess() throws ZinggClientException {
return this.stopWordsRemover.preprocessForStopWords(getOriginalDF());
}

@Override
public ZFrame<D, R, C> getOriginalDF() {
return originalDF;
}

@Override
public ZFrame<D, R, C> getProcessedDF() {
return processedDF;
}

}
@@ -0,0 +1,37 @@
package zingg.common.core.data.df;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import zingg.common.client.IArguments;
import zingg.common.client.ZFrame;
import zingg.common.client.util.ColName;

public class RepartitionFrame<S, D, R, C, T> implements IZFrameProcessor<S, D, R, C, T> {

protected ZFrame<D,R,C> originalDF;

protected ZFrame<D,R,C> processedDF;

protected IArguments args;

public static final Log LOG = LogFactory.getLog(RepartitionFrame.class);

public RepartitionFrame(ZFrame<D, R, C> originalDF, IArguments args) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should pass the numberofPartitions and column on which we need to partition

super();
this.originalDF = originalDF;
this.args = args;
this.processedDF = getOriginalDF().repartition(args.getNumPartitions(),getOriginalDF().col(ColName.ID_COL));
}

@Override
public ZFrame<D, R, C> getOriginalDF() {
return originalDF;
}

@Override
public ZFrame<D, R, C> getProcessedDF() {
return processedDF;
}

}
63 changes: 63 additions & 0 deletions common/core/src/main/java/zingg/common/core/data/df/ZData.java
@@ -0,0 +1,63 @@
package zingg.common.core.data.df;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import zingg.common.client.IArguments;
import zingg.common.client.ZFrame;
import zingg.common.client.ZinggClientException;
import zingg.common.core.context.Context;
import zingg.common.core.preprocess.StopWordsRemover;

public class ZData<S, D, R, C, T> {

protected ZFrame<D,R,C> rawData;
protected IArguments args;
protected Context<S,D,R,C,T> context;
protected StopWordsRemover<S, D, R, C, T> stopWordsRemover;

protected FieldDefFrame<S, D, R, C, T> fieldDefFrame;
protected BlockedFrame<S, D, R, C, T> blockedData;
protected PreprocessedFrame<S, D, R, C, T> preprocessedFrame;
protected RepartitionFrame<S, D, R, C, T> repartitionFrame;

public static final Log LOG = LogFactory.getLog(ZData.class);

public ZData(ZFrame<D, R, C> rawData, IArguments args, Context<S,D,R,C,T> context,StopWordsRemover<S, D, R, C, T> stopWordsRemover) throws ZinggClientException {
try {
this.rawData = rawData;
this.args = args;
this.context = context;
this.stopWordsRemover = stopWordsRemover;
this.fieldDefFrame = new FieldDefFrame<S, D, R, C, T>(getRawData(),args);
this.preprocessedFrame = new PreprocessedFrame<S, D, R, C, T>(getFieldDefFrame().getProcessedDF(),args,stopWordsRemover);
this.repartitionFrame = new RepartitionFrame<S, D, R, C, T>(getPreprocessedFrame().getProcessedDF(),args);
this.blockedData = new BlockedFrame<S, D, R, C, T>(getRepartitionFrame().getProcessedDF(), args, context);
} catch (ZinggClientException e) {
throw e;
} catch (Exception e) {
throw new ZinggClientException(e);
}
}

public ZFrame<D, R, C> getRawData() {
return rawData;
}

public FieldDefFrame<S, D, R, C, T> getFieldDefFrame() {
return fieldDefFrame;
}

public PreprocessedFrame<S, D, R, C, T> getPreprocessedFrame() {
return preprocessedFrame;
}

public RepartitionFrame<S, D, R, C, T> getRepartitionFrame() {
return repartitionFrame;
}

public BlockedFrame<S, D, R, C, T> getBlockedFrame() throws Exception, ZinggClientException{
return blockedData;
}

}

This file was deleted.

Expand Up @@ -7,7 +7,7 @@
import zingg.common.client.ZinggClientException;
import zingg.common.client.options.ZinggOptions;
import zingg.common.client.util.ColName;
import zingg.common.core.data.df.ZFrameDataSelector;
import zingg.common.core.data.df.ZData;
import zingg.common.core.filter.PredictionFilter;
import zingg.common.core.pairs.SelfPairBuilderSourceSensitive;

Expand Down Expand Up @@ -41,7 +41,7 @@ protected ZFrame<D,R,C> getActualDupes(ZFrame<D,R,C> blocked, ZFrame<D,R,C> test
}

@Override
public void writeOutput( ZFrameDataSelector<S,D,R,C,T> rawData, ZFrame<D,R,C> dupes) throws ZinggClientException {
public void writeOutput( ZData<S,D,R,C,T> rawData, ZFrame<D,R,C> dupes) throws ZinggClientException {
try {
// input dupes are pairs
/// pick ones according to the threshold by user
Expand All @@ -56,7 +56,7 @@ public void writeOutput( ZFrameDataSelector<S,D,R,C,T> rawData, ZFrame<D,R,C>
dupesActual = dupesActual.withColumn(ColName.CLUSTER_COLUMN, dupesActual.col(ColName.ID_COL));
dupesActual = getDSUtil().addUniqueCol(dupesActual, ColName.CLUSTER_COLUMN);
ZFrame<D,R,C>dupes2 = getDSUtil().alignLinked(dupesActual, args);
dupes2 = getDSUtil().postprocessLinked(dupes2, rawData.getFieldDefColumnsDS());
dupes2 = getDSUtil().postprocessLinked(dupes2, rawData.getFieldDefFrame().getProcessedDF());
LOG.debug("uncertain output schema is " + dupes2.showSchema());
getPipeUtil().write(dupes2, args.getOutput());
}
Expand Down