Skip to content

Commit

Permalink
using ZDataPair
Browse files Browse the repository at this point in the history
  • Loading branch information
vikasgupta78 committed Apr 17, 2024
1 parent 427f7f3 commit 2b5c9fe
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 11 deletions.
40 changes: 40 additions & 0 deletions common/core/src/main/java/zingg/common/core/data/df/ZDataPair.java
@@ -0,0 +1,40 @@
package zingg.common.core.data.df;

import zingg.common.client.ZFrame;
import zingg.common.core.pairs.IPairBuilder;

public class ZDataPair<S, D, R, C, T> {

protected ZFrame<D,R,C> data1;
protected ZFrame<D,R,C> data2;
protected ZFrame<D,R,C> pairs;
protected IPairBuilder<S, D, R, C> iPairBuilder;

public ZDataPair(ZFrame<D, R, C> data1, ZFrame<D, R, C> data2, IPairBuilder<S, D, R, C> iPairBuilder) {
this.data1 = data1;
this.data2 = data2;
this.iPairBuilder = iPairBuilder;
}

public ZFrame<D, R, C> getData1() {
return data1;
}

public ZFrame<D, R, C> getData2() {
return data2;
}

public ZFrame<D, R, C> getPairs() throws Exception {
if (pairs==null) {
pairs = iPairBuilder.getPairs(data1, data2);
}
return pairs;
}

public IPairBuilder<S, D, R, C> getIPairBuilder() {
return iPairBuilder;
}



}
Expand Up @@ -8,6 +8,7 @@
import zingg.common.client.options.ZinggOptions;
import zingg.common.client.util.ColName;
import zingg.common.core.data.df.ZData;
import zingg.common.core.data.df.ZDataPair;
import zingg.common.core.filter.PredictionFilter;
import zingg.common.core.pairs.SelfPairBuilderSourceSensitive;

Expand All @@ -29,7 +30,7 @@ public ZFrame<D,R,C> selectColsFromBlocked(ZFrame<D,R,C> blocked) {
}

@Override
public ZFrame<D,R,C> getPairs(ZFrame<D,R,C>blocked, ZFrame<D,R,C>bAll) throws Exception{
public ZDataPair<S, D, R, C, T> getPairs(ZFrame<D,R,C>blocked, ZFrame<D,R,C>bAll) throws Exception{
return getPairs(blocked, bAll, new SelfPairBuilderSourceSensitive<S, D, R, C> (getDSUtil(),args));
}

Expand Down
24 changes: 14 additions & 10 deletions common/core/src/main/java/zingg/common/core/executor/Matcher.java
Expand Up @@ -12,6 +12,7 @@
import zingg.common.client.options.ZinggOptions;
import zingg.common.client.util.ColName;
import zingg.common.core.data.df.ZData;
import zingg.common.core.data.df.ZDataPair;
import zingg.common.core.filter.IFilter;
import zingg.common.core.filter.PredictionFilter;
import zingg.common.core.model.Model;
Expand Down Expand Up @@ -39,23 +40,26 @@ public ZData<S,D,R,C,T> getRawData() throws ZinggClientException{
}

protected ZData<S, D, R, C, T> getDataSelector(ZFrame<D, R, C> data) throws ZinggClientException {
return new ZData<S,D,R,C,T>(data,args,context,getPreProcessors());
}

protected List<IPreProcessor<S, D, R, C, T>> getPreProcessors() {
List<IPreProcessor<S,D,R,C,T>> preProcessors = new ArrayList<IPreProcessor<S,D,R,C,T>>();
preProcessors.add(getStopWords());
ZData<S, D, R, C, T> zData = new ZData<S,D,R,C,T>(data,args,context,preProcessors);
return zData;
preProcessors.add(getStopWords());
return preProcessors;
}

protected ZFrame<D,R,C> readInputData() throws ZinggClientException {
return getPipeUtil().read(true, true, args.getNumPartitions(), true, args.getData());
}


public ZFrame<D,R,C> getPairs(ZFrame<D,R,C>blocked, ZFrame<D,R,C>bAll) throws Exception{
public ZDataPair<S, D, R, C, T> getPairs(ZFrame<D,R,C>blocked, ZFrame<D,R,C>bAll) throws Exception{
return getPairs(blocked, bAll, new SelfPairBuilder<S, D, R, C> (getDSUtil(),args));
}

public ZFrame<D,R,C> getPairs(ZFrame<D,R,C>blocked, ZFrame<D,R,C>bAll, IPairBuilder<S, D, R, C> iPairBuilder) throws Exception{
return iPairBuilder.getPairs(blocked, bAll);
public ZDataPair<S, D, R, C, T> getPairs(ZFrame<D,R,C>blocked, ZFrame<D,R,C>bAll, IPairBuilder<S, D, R, C> iPairBuilder) throws Exception{
return new ZDataPair<S, D, R, C, T>(blocked, bAll,iPairBuilder);
}

protected abstract Model getModel() throws ZinggClientException;
Expand All @@ -64,12 +68,12 @@ protected ZFrame<D,R,C> selectColsFromBlocked(ZFrame<D,R,C>blocked) {
return blocked.select(ColName.ID_COL, ColName.HASH_COL);
}

protected ZFrame<D,R,C> predictOnBlocks(ZFrame<D,R,C>blocks) throws Exception, ZinggClientException{
protected ZFrame<D,R,C> predictOnBlocks(ZDataPair<S, D, R, C, T> blocks) throws Exception, ZinggClientException{
if (LOG.isDebugEnabled()) {
LOG.debug("block size" + blocks.count());
LOG.debug("block size" + blocks.getPairs().count());
}
Model model = getModel();
ZFrame<D,R,C> dupes = model.predict(blocks);
ZFrame<D,R,C> dupes = model.predict(blocks.getPairs());
if (LOG.isDebugEnabled()) {
LOG.debug("Found dupes " + dupes.count());
}
Expand All @@ -84,7 +88,7 @@ protected ZFrame<D,R,C> getActualDupes(ZData<S,D,R,C,T> rawData) throws Excepti

protected ZFrame<D,R,C> getActualDupes(ZData<S,D,R,C,T> rawData,
IFilter<D, R, C> predictionFilter, IPairBuilder<S, D, R, C> iPairBuilder, PredictionColsSelector colsSelector) throws Exception, ZinggClientException{
ZFrame<D,R,C> blocks = getPairs(selectColsFromBlocked(rawData.getBlockedFrame().getProcessedDF()), rawData.getRepartitionFrame().getProcessedDF(), iPairBuilder);
ZDataPair<S, D, R, C, T> blocks = getPairs(selectColsFromBlocked(rawData.getBlockedFrame().getProcessedDF()), rawData.getRepartitionFrame().getProcessedDF(), iPairBuilder);
ZFrame<D,R,C>dupesActual = predictOnBlocks(blocks);
ZFrame<D, R, C> filteredData = predictionFilter.filter(dupesActual);
if(colsSelector!=null) {
Expand Down

0 comments on commit 2b5c9fe

Please sign in to comment.