Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

using data selector as wrapper over raw data #815

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
@@ -0,0 +1,75 @@
package zingg.common.core.data.df;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename to ZData


import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import zingg.common.client.IArguments;
import zingg.common.client.ZFrame;
import zingg.common.client.ZinggClientException;
import zingg.common.client.cols.ZidAndFieldDefSelector;
import zingg.common.client.util.ColName;
import zingg.common.core.block.Canopy;
import zingg.common.core.block.Tree;
import zingg.common.core.context.Context;
import zingg.common.core.preprocess.StopWordsRemover;
import zingg.common.core.util.Analytics;
import zingg.common.core.util.Metric;

public class ZFrameDataSelector<S, D, R, C, T> {

protected ZFrame<D,R,C> rawData;
protected ZFrame<D,R,C> fieldDefColumnsDS;
protected ZFrame<D,R,C> blockedData;
protected IArguments args;
protected Context<S,D,R,C,T> context;
protected StopWordsRemover<S, D, R, C, T> stopWordsRemover;
protected ZFrame<D, R, C> preprocessedRepartitionedData;

public static final Log LOG = LogFactory.getLog(ZFrameDataSelector.class);

public ZFrameDataSelector(ZFrame<D, R, C> rawData, IArguments args, Context<S,D,R,C,T> context,StopWordsRemover<S, D, R, C, T> stopWordsRemover) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this class shuld not be aware of the blocking tree loading and context etc. it should get the blocking tree and have a method to apply it.

super();
this.rawData = rawData;
this.args = args;
this.context = context;
this.stopWordsRemover = stopWordsRemover;
}

public ZFrame<D, R, C> getRawData() {
return rawData;
}

public ZFrame<D, R, C> getFieldDefColumnsDS() {
if (fieldDefColumnsDS==null) {
ZidAndFieldDefSelector zidAndFieldDefSelector = new ZidAndFieldDefSelector(args.getFieldDefinition());
fieldDefColumnsDS = getRawData().select(zidAndFieldDefSelector.getCols());
}
return fieldDefColumnsDS;
// return getDSUtil().getFieldDefColumnsDS(testDataOriginal, args, true);
}

public ZFrame<D, R, C> getPreprocessedRepartitionedData() {
return preprocessedRepartitionedData;
}

public ZFrame<D,R,C> getBlocked() throws Exception, ZinggClientException{
if (blockedData==null) {
preprocessedRepartitionedData = stopWordsRemover.preprocessForStopWords(getFieldDefColumnsDS());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be a separate method

preprocessedRepartitionedData = preprocessedRepartitionedData.repartition(args.getNumPartitions(), preprocessedRepartitionedData.col(ColName.ID_COL));
//testData = dropDuplicates(testData);
long count = preprocessedRepartitionedData.count();
LOG.info("Read " + count);
Analytics.track(Metric.DATA_COUNT, count, args.getCollectMetrics());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

count should be a method on this and Analytics should not be a part of this object - it should be driven from the executor

LOG.debug("Blocking model file location is " + args.getBlockFile());
ZFrame<D, R, C> blocked1; //.cache();
Tree<Canopy<R>> tree = context.getBlockingTreeUtil().readBlockingTree(args);
ZFrame<D, R, C> blocked = context.getBlockingTreeUtil().getBlockHashes(preprocessedRepartitionedData, tree);
blocked1 = blocked.repartition(args.getNumPartitions(), blocked.col(ColName.HASH_COL));
this.blockedData = blocked1;
}
return blockedData;
}



}
Expand Up @@ -7,6 +7,7 @@
import zingg.common.client.ZinggClientException;
import zingg.common.client.options.ZinggOptions;
import zingg.common.client.util.ColName;
import zingg.common.core.data.df.ZFrameDataSelector;
import zingg.common.core.filter.PredictionFilter;
import zingg.common.core.pairs.SelfPairBuilderSourceSensitive;

Expand Down Expand Up @@ -40,7 +41,7 @@ protected ZFrame<D,R,C> getActualDupes(ZFrame<D,R,C> blocked, ZFrame<D,R,C> test
}

@Override
public void writeOutput(ZFrame<D,R,C> sampleOrginal, ZFrame<D,R,C> dupes) throws ZinggClientException {
public void writeOutput( ZFrameDataSelector<S,D,R,C,T> rawData, ZFrame<D,R,C> dupes) throws ZinggClientException {
try {
// input dupes are pairs
/// pick ones according to the threshold by user
Expand All @@ -55,7 +56,7 @@ public void writeOutput(ZFrame<D,R,C> sampleOrginal, ZFrame<D,R,C> dupes) throws
dupesActual = dupesActual.withColumn(ColName.CLUSTER_COLUMN, dupesActual.col(ColName.ID_COL));
dupesActual = getDSUtil().addUniqueCol(dupesActual, ColName.CLUSTER_COLUMN);
ZFrame<D,R,C>dupes2 = getDSUtil().alignLinked(dupesActual, args);
dupes2 = getDSUtil().postprocessLinked(dupes2, sampleOrginal);
dupes2 = getDSUtil().postprocessLinked(dupes2, rawData.getFieldDefColumnsDS());
LOG.debug("uncertain output schema is " + dupes2.showSchema());
getPipeUtil().write(dupes2, args.getOutput());
}
Expand Down
57 changes: 19 additions & 38 deletions common/core/src/main/java/zingg/common/core/executor/Matcher.java
Expand Up @@ -9,19 +9,15 @@
import zingg.common.client.ZFrame;
import zingg.common.client.ZinggClientException;
import zingg.common.client.cols.PredictionColsSelector;
import zingg.common.client.cols.ZidAndFieldDefSelector;
import zingg.common.client.options.ZinggOptions;
import zingg.common.client.util.ColName;
import zingg.common.core.block.Canopy;
import zingg.common.core.block.Tree;
import zingg.common.core.data.df.ZFrameDataSelector;
import zingg.common.core.filter.IFilter;
import zingg.common.core.filter.PredictionFilter;
import zingg.common.core.model.Model;
import zingg.common.core.pairs.IPairBuilder;
import zingg.common.core.pairs.SelfPairBuilder;
import zingg.common.core.preprocess.StopWordsRemover;
import zingg.common.core.util.Analytics;
import zingg.common.core.util.Metric;

public abstract class Matcher<S,D,R,C,T> extends ZinggBase<S,D,R,C,T>{

Expand All @@ -33,25 +29,20 @@ public Matcher() {
setZinggOption(ZinggOptions.MATCH);
}

public ZFrame<D,R,C> getTestData() throws ZinggClientException{
ZFrame<D,R,C> data = getPipeUtil().read(true, true, args.getNumPartitions(), true, args.getData());
return data;
public ZFrameDataSelector<S,D,R,C,T> getRawData() throws ZinggClientException{
ZFrame<D,R,C> data = readInputData();

return getDataSelector(data);
}

public ZFrame<D, R, C> getFieldDefColumnsDS(ZFrame<D, R, C> testDataOriginal) {
ZidAndFieldDefSelector zidAndFieldDefSelector = new ZidAndFieldDefSelector(args.getFieldDefinition());
return testDataOriginal.select(zidAndFieldDefSelector.getCols());
// return getDSUtil().getFieldDefColumnsDS(testDataOriginal, args, true);
protected ZFrameDataSelector<S, D, R, C, T> getDataSelector(ZFrame<D, R, C> data) {
return new ZFrameDataSelector<S,D,R,C,T>(data,args,context,getStopWords());
}


public ZFrame<D,R,C> getBlocked( ZFrame<D,R,C> testData) throws Exception, ZinggClientException{
LOG.debug("Blocking model file location is " + args.getBlockFile());
Tree<Canopy<R>> tree = getBlockingTreeUtil().readBlockingTree(args);
ZFrame<D,R,C> blocked = getBlockingTreeUtil().getBlockHashes(testData, tree);
ZFrame<D,R,C> blocked1 = blocked.repartition(args.getNumPartitions(), blocked.col(ColName.HASH_COL)); //.cache();
return blocked1;
protected ZFrame<D,R,C> readInputData() throws ZinggClientException {
return getPipeUtil().read(true, true, args.getNumPartitions(), true, args.getData());
}


public ZFrame<D,R,C> getPairs(ZFrame<D,R,C>blocked, ZFrame<D,R,C>bAll) throws Exception{
return getPairs(blocked, bAll, new SelfPairBuilder<S, D, R, C> (getDSUtil(),args));
Expand Down Expand Up @@ -100,16 +91,8 @@ protected ZFrame<D,R,C> getActualDupes(ZFrame<D,R,C> blocked, ZFrame<D,R,C> test
public void execute() throws ZinggClientException {
try {
// read input, filter, remove self joins
ZFrame<D,R,C> testDataOriginal = getTestData();
testDataOriginal = getFieldDefColumnsDS(testDataOriginal);
ZFrame<D,R,C> testData = getStopWords().preprocessForStopWords(testDataOriginal);
testData = testData.repartition(args.getNumPartitions(), testData.col(ColName.ID_COL));
//testData = dropDuplicates(testData);
long count = testData.count();
LOG.info("Read " + count);
Analytics.track(Metric.DATA_COUNT, count, args.getCollectMetrics());

ZFrame<D,R,C>blocked = getBlocked(testData);
ZFrameDataSelector<S,D,R,C,T> rawData = getRawData();
ZFrame<D,R,C>blocked = rawData.getBlocked();
LOG.info("Blocked ");
/*blocked = blocked.cache();
blocked.withColumn("partition_id", functions.spark_partition_id())
Expand All @@ -120,12 +103,12 @@ public void execute() throws ZinggClientException {
blocked.show();
}
//LOG.warn("Num distinct hashes " + blocked.agg(functions.approx_count_distinct(ColName.HASH_COL)).count());
ZFrame<D,R,C> dupesActual = getActualDupes(blocked, testData);
ZFrame<D,R,C> dupesActual = getActualDupes(blocked, rawData.getPreprocessedRepartitionedData());

//dupesActual.explain();
//dupesActual.toJavaRDD().saveAsTextFile("/tmp/zdupes");

writeOutput(testDataOriginal, dupesActual);
writeOutput(rawData, dupesActual);

} catch (Exception e) {
if (LOG.isDebugEnabled()) e.printStackTrace();
Expand All @@ -137,15 +120,13 @@ public void execute() throws ZinggClientException {



public void writeOutput( ZFrame<D,R,C> blocked, ZFrame<D,R,C> dupesActual) throws ZinggClientException {
public void writeOutput( ZFrameDataSelector<S,D,R,C,T> rawData, ZFrame<D,R,C> dupesActual) throws ZinggClientException {
try{
//input dupes are pairs
///pick ones according to the threshold by user


//all clusters consolidated in one place
if (args.getOutput() != null) {
ZFrame<D, R, C> graphWithScores = getOutput(blocked, dupesActual);
ZFrame<D, R, C> graphWithScores = getOutput(rawData, dupesActual);
getPipeUtil().write(graphWithScores, args.getOutput());
}
}
Expand All @@ -157,19 +138,19 @@ public void writeOutput( ZFrame<D,R,C> blocked, ZFrame<D,R,C> dupesActual) th



protected ZFrame<D, R, C> getOutput(ZFrame<D, R, C> blocked, ZFrame<D, R, C> dupesActual) throws ZinggClientException, Exception {
protected ZFrame<D, R, C> getOutput(ZFrameDataSelector<S,D,R,C,T> rawData, ZFrame<D, R, C> dupesActual) throws ZinggClientException, Exception {
//-1 is initial suggestion, 1 is add, 0 is deletion, 2 is unsure
/*blocked = blocked.drop(ColName.HASH_COL);
blocked = blocked.drop(ColName.SOURCE_COL);
blocked = blocked.cache();
*/

ZFrame<D, R, C> fieldDefColumnsDS = rawData.getFieldDefColumnsDS();
dupesActual = dupesActual.cache();
if (LOG.isDebugEnabled()) {
LOG.debug("dupes ------------");
dupesActual.show();
}
ZFrame<D,R,C>graph = getGraphUtil().buildGraph(blocked, dupesActual).cache();
ZFrame<D,R,C>graph = getGraphUtil().buildGraph(fieldDefColumnsDS, dupesActual).cache();
//graph.toJavaRDD().saveAsTextFile("/tmp/zgraph");

if (LOG.isDebugEnabled()) {
Expand Down